0090b7002c
- Deleted the old body fat percentage chart image. - Updated report generation to load the new body fat percentage master chart for improved accuracy and consistency. - Refactored context generation to reference the new chart in the report structure.
537 lines
20 KiB
Python
537 lines
20 KiB
Python
"""
|
|
Report Generator Service
|
|
|
|
This service handles the generation of medical reports from uploaded files.
|
|
It processes data, generates graphs, and creates PDF reports.
|
|
"""
|
|
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import pandas as pd
|
|
from jinja2 import Environment, FileSystemLoader
|
|
from playwright.async_api import async_playwright
|
|
from services.context_generator import ContextGenerator
|
|
from services.graph_generator import GraphGenerator
|
|
from services.spirometry_table_extractor import extract_spirometry_table_from_pdf
|
|
|
|
|
|
class ReportGeneratorService:
|
|
"""Service for generating medical performance reports"""
|
|
|
|
def __init__(
|
|
self,
|
|
template_dir: str = "app/report_gen",
|
|
graphs_dir: str = "graphs",
|
|
reports_dir: str = "reports",
|
|
data_dir: str = "data",
|
|
):
|
|
"""
|
|
Initialize the report generator service.
|
|
|
|
Args:
|
|
template_dir: Directory containing Jinja2 templates
|
|
graphs_dir: Directory to save generated graphs
|
|
reports_dir: Directory to save generated reports
|
|
data_dir: Directory to store extracted/processed data
|
|
"""
|
|
self.template_dir = template_dir
|
|
self.graphs_dir = Path(graphs_dir)
|
|
self.reports_dir = Path(reports_dir)
|
|
self.data_dir = Path(data_dir)
|
|
self.graph_generator = GraphGenerator(charts_dir=str(self.graphs_dir))
|
|
self.context_generator = ContextGenerator()
|
|
self.env = Environment(loader=FileSystemLoader(template_dir))
|
|
|
|
# Ensure directories exist
|
|
self.graphs_dir.mkdir(exist_ok=True)
|
|
self.reports_dir.mkdir(exist_ok=True)
|
|
self.data_dir.mkdir(exist_ok=True)
|
|
|
|
def process_pnoe_data(self, pnoe_csv_path: str) -> pd.DataFrame:
|
|
"""
|
|
Load and process Pnoe CSV data.
|
|
|
|
Args:
|
|
pnoe_csv_path: Path to Pnoe CSV file
|
|
|
|
Returns:
|
|
Processed DataFrame with smoothed columns
|
|
"""
|
|
# Load data
|
|
df = pd.read_csv(pnoe_csv_path, delimiter=";")
|
|
|
|
# Convert numeric columns (updated approach)
|
|
for col in df.columns:
|
|
try:
|
|
df[col] = pd.to_numeric(df[col])
|
|
except (ValueError, TypeError):
|
|
pass # Keep as-is if not numeric
|
|
|
|
# Calculate derived columns
|
|
df["VO2 Pulse"] = df["VO2(ml/min)"] / df["HR(bpm)"]
|
|
df["VO2 Breath"] = df["VO2(ml/min)"] / df["BF(bpm)"]
|
|
df["CHO"] = df["EE(kcal/min)"] * df["CARBS(%)"] / 100
|
|
df["FAT"] = df["EE(kcal/min)"] * df["FAT(%)"] / 100
|
|
|
|
# Smooth columns
|
|
window_size = 10
|
|
columns_to_smooth = [
|
|
"VO2(ml/min)",
|
|
"VCO2(ml/min)",
|
|
"HR(bpm)",
|
|
"VT(l)",
|
|
"BF(bpm)",
|
|
"VE(l/min)",
|
|
"VO2 Pulse",
|
|
"VO2 Breath",
|
|
"CHO",
|
|
"FAT",
|
|
]
|
|
|
|
for col in columns_to_smooth:
|
|
if col in df.columns:
|
|
df[f"{col}_smoothed"] = (
|
|
df[col].rolling(window=window_size, min_periods=1).mean()
|
|
)
|
|
|
|
return df
|
|
|
|
def generate_graphs(self, df: pd.DataFrame) -> List[Dict[str, str]]:
|
|
"""
|
|
Generate all required graphs from processed data.
|
|
|
|
Args:
|
|
df: Processed DataFrame with smoothed columns
|
|
|
|
Returns:
|
|
List of dictionaries containing graph names and paths
|
|
"""
|
|
graphs_generated = []
|
|
|
|
# List of graphs to generate
|
|
graph_methods = [
|
|
("respiratory", self.graph_generator.generate_respiratory_chart),
|
|
("fuel_utilization", self.graph_generator.generate_fuel_utilization_chart),
|
|
("vo2_pulse", self.graph_generator.generate_vo2_pulse_chart),
|
|
("vo2_breath", self.graph_generator.generate_vo2_breath_chart),
|
|
("fat_metabolism", self.graph_generator.generate_fat_metabolism_chart),
|
|
("recovery", self.graph_generator.generate_recovery_chart),
|
|
]
|
|
|
|
for name, method in graph_methods:
|
|
try:
|
|
path = method(df, save_as_base64=False)
|
|
graphs_generated.append({"name": name, "path": str(path)})
|
|
except Exception as e:
|
|
print(f"Warning: Could not generate {name} chart: {e}")
|
|
|
|
return graphs_generated
|
|
|
|
def calculate_analysis_metrics(self, df: pd.DataFrame) -> Dict[str, Any]:
|
|
"""
|
|
Calculate basic analysis metrics from processed data.
|
|
|
|
Args:
|
|
df: Processed DataFrame with smoothed columns
|
|
|
|
Returns:
|
|
Dictionary containing analysis metrics
|
|
"""
|
|
return {
|
|
"vo2_max": float(df["VO2(ml/min)_smoothed"].max())
|
|
if "VO2(ml/min)_smoothed" in df.columns
|
|
else 0,
|
|
"peak_vt": float(df["VT(l)_smoothed"].max())
|
|
if "VT(l)_smoothed" in df.columns
|
|
else 0,
|
|
"max_hr": float(df["HR(bpm)_smoothed"].max())
|
|
if "HR(bpm)_smoothed" in df.columns
|
|
else 0,
|
|
}
|
|
|
|
def generate_html(
|
|
self, patient_info: Dict[str, Any], contexts: Dict[str, Dict[str, Any]]
|
|
) -> str:
|
|
"""
|
|
Generate HTML content for the report.
|
|
|
|
Args:
|
|
patient_info: Dictionary containing patient information
|
|
(patient_name, age, height, weight, focus)
|
|
contexts: Dictionary with keys 'page_1', 'page_2', etc., each containing context data
|
|
|
|
Returns:
|
|
Complete HTML document as string
|
|
"""
|
|
html_pages = []
|
|
|
|
# Header context
|
|
header_context = {
|
|
"patient_name": patient_info.get("patient_name", ""),
|
|
"age": patient_info.get("age", ""),
|
|
"height": patient_info.get("height", ""),
|
|
"weight": patient_info.get("weight", ""),
|
|
"focus": patient_info.get("focus", "Endurance"),
|
|
}
|
|
|
|
# Get total number of pages
|
|
num_pages = len(contexts)
|
|
|
|
# Footer context
|
|
footer_context = [
|
|
{
|
|
"contact_email": "info@ishplabs.com",
|
|
"website": "www.ishplabs.com",
|
|
"social": "@ishplabs",
|
|
"page_number": i + 1,
|
|
}
|
|
for i in range(num_pages)
|
|
]
|
|
|
|
# Render header
|
|
header_html = self.env.get_template("header.html").render(header_context)
|
|
|
|
# Render footers
|
|
footer_html_list = [
|
|
self.env.get_template("footer.html").render(context)
|
|
for context in footer_context
|
|
]
|
|
|
|
# Render pages - iterate through pages in order
|
|
for i in range(1, num_pages + 1):
|
|
page_key = f"page_{i}"
|
|
context = contexts.get(page_key, {})
|
|
template = self.env.get_template(f"page_{i}.html").render(context)
|
|
|
|
if i > 2:
|
|
full_html = f"""
|
|
<div class="page flex flex-col justify-between">
|
|
<div>
|
|
{header_html}
|
|
</div>
|
|
<main class="flex-grow p-4">
|
|
{template}
|
|
</main>
|
|
<div class="border-t text-center text-sm text-gray-600">
|
|
{footer_html_list[i - 1]}
|
|
</div>
|
|
</div>
|
|
"""
|
|
html_pages.append(full_html)
|
|
else:
|
|
html_pages.append(template)
|
|
|
|
# Combine with page breaks
|
|
final_html = "<div class='page-break'></div>".join(html_pages)
|
|
|
|
# Wrap in full HTML document
|
|
html_doc = f"""
|
|
<!DOCTYPE html>
|
|
<html>
|
|
<head>
|
|
<meta charset="utf-8">
|
|
<link href="https://cdn.jsdelivr.net/npm/tailwindcss/dist/tailwind.min.css" rel="stylesheet">
|
|
<style>
|
|
html, body {{
|
|
height: 100%;
|
|
margin: 0;
|
|
padding: 0;
|
|
}}
|
|
.page-break {{ page-break-after: always; }}
|
|
.page {{
|
|
height: 100vh;
|
|
min-height: 100vh;
|
|
display: flex;
|
|
flex-direction: column;
|
|
}}
|
|
.page main {{
|
|
flex: 1;
|
|
overflow: hidden;
|
|
}}
|
|
* {{
|
|
margin: 0;
|
|
padding: 0;
|
|
box-sizing: border-box;
|
|
}}
|
|
img {{
|
|
max-height: 300px;
|
|
}}
|
|
.chart-large {{
|
|
max-height: 500px !important;
|
|
}}
|
|
</style>
|
|
</head>
|
|
<body class="m-0 p-0">
|
|
{final_html}
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
return html_doc
|
|
|
|
async def html_to_pdf(self, html_content: str, pdf_path: str) -> None:
|
|
"""
|
|
Convert HTML content to PDF file.
|
|
|
|
Args:
|
|
html_content: HTML content as string
|
|
pdf_path: Path where PDF should be saved
|
|
"""
|
|
async with async_playwright() as p:
|
|
browser = await p.chromium.launch()
|
|
page = await browser.new_page()
|
|
await page.set_content(html_content)
|
|
await page.pdf(path=pdf_path, format="A4", print_background=True)
|
|
await browser.close()
|
|
|
|
async def generate_report(
|
|
self,
|
|
spirometry_pdf_path: str,
|
|
pnoe_csv_path: str,
|
|
patient_info: Dict[str, Any],
|
|
output_filename: str = None,
|
|
metric_overrides: Optional[Dict[str, Any]] = None,
|
|
oxygenation_csv_path: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Generate complete medical report from uploaded files.
|
|
|
|
This follows the complete workflow:
|
|
1. Extract spirometry data from PDF
|
|
2. Store all data in data directory
|
|
3. Generate all graphs
|
|
4. Generate context for each page
|
|
5. Generate final HTML and PDF report
|
|
|
|
Args:
|
|
spirometry_pdf_path: Path to Spirometry PDF file
|
|
pnoe_csv_path: Path to Pnoe CSV file
|
|
seca_excel_path: Path to SECA Excel file
|
|
patient_info: Dictionary containing patient information
|
|
output_filename: Optional custom output filename
|
|
|
|
Returns:
|
|
Dictionary containing report path, graphs generated, and analysis data
|
|
"""
|
|
# Step 1: Extract spirometry table from PDF
|
|
print("Step 1: Extracting spirometry data from PDF...")
|
|
spirometry_csv_path = extract_spirometry_table_from_pdf(
|
|
spirometry_pdf_path, output_dir=str(self.data_dir)
|
|
)
|
|
print(f"Spirometry data saved to: {spirometry_csv_path}")
|
|
|
|
# Step 2: Process Pnoe data
|
|
print("Step 2: Processing Pnoe data...")
|
|
df = self.process_pnoe_data(pnoe_csv_path)
|
|
|
|
# Step 3: Generate all graphs
|
|
print("Step 3: Generating graphs...")
|
|
graphs_generated = self.generate_graphs(df)
|
|
|
|
# Create graph dictionary with base64 encoded images
|
|
import base64
|
|
|
|
graphs_dict = {}
|
|
for graph in graphs_generated:
|
|
# Read the graph file and convert to base64
|
|
graph_path = Path(graph["path"])
|
|
if graph_path.exists():
|
|
with open(graph_path, "rb") as f:
|
|
graphs_dict[graph["name"]] = base64.b64encode(f.read()).decode(
|
|
"utf-8"
|
|
)
|
|
|
|
# Also generate body composition charts
|
|
# Use patient info directly (no SECA file needed)
|
|
fat_pct = patient_info.get("fat_percentage", 0)
|
|
age = patient_info.get("age", 25)
|
|
gender = patient_info.get("gender", "female").lower()
|
|
|
|
# Convert weight to kg if needed
|
|
weight_str = str(patient_info.get("weight", "0"))
|
|
# Extract numeric value and unit
|
|
weight_str_clean = (
|
|
weight_str.replace("lbs", "").replace("kg", "").replace(" ", "").strip()
|
|
)
|
|
try:
|
|
weight_value = float(weight_str_clean)
|
|
except ValueError:
|
|
print(f"Warning: Could not parse weight '{weight_str}', using default 0")
|
|
weight_value = 0.0
|
|
|
|
# Convert to kg if weight is in lbs
|
|
if "lbs" in weight_str.lower():
|
|
weight_kg = weight_value / 2.20462 # Convert lbs to kg
|
|
else:
|
|
weight_kg = weight_value # Already in kg or assume kg if no unit specified
|
|
|
|
# Calculate fat and lean mass in pounds
|
|
fat_mass_lbs = weight_kg * fat_pct / 100 * 2.20462
|
|
lean_mass_lbs = weight_kg * (1 - fat_pct / 100) * 2.20462
|
|
|
|
# Generate body composition chart (save as file first, then convert to base64)
|
|
try:
|
|
body_comp_path = self.graph_generator.generate_body_composition_chart(
|
|
fat_mass_lbs, lean_mass_lbs, save_as_base64=False
|
|
)
|
|
graphs_generated.append(
|
|
{"name": "body_composition", "path": str(body_comp_path)}
|
|
)
|
|
# Convert to base64 for graphs_dict
|
|
with open(body_comp_path, "rb") as f:
|
|
graphs_dict["body_composition"] = base64.b64encode(f.read()).decode(
|
|
"utf-8"
|
|
)
|
|
except Exception as e:
|
|
print(f"Warning: Could not generate body composition chart: {e}")
|
|
graphs_dict["body_composition"] = ""
|
|
|
|
# Generate body fat percent chart (save as file first, then convert to base64)
|
|
try:
|
|
body_fat_path = self.graph_generator.generate_body_fat_percent_chart(
|
|
fat_pct, age, gender, save_as_base64=False
|
|
)
|
|
graphs_generated.append(
|
|
{"name": "body_fat_percent", "path": str(body_fat_path)}
|
|
)
|
|
# Convert to base64 for graphs_dict
|
|
with open(body_fat_path, "rb") as f:
|
|
graphs_dict["body_fat_percent"] = base64.b64encode(f.read()).decode(
|
|
"utf-8"
|
|
)
|
|
except Exception as e:
|
|
print(f"Warning: Could not generate body fat percent chart: {e}")
|
|
graphs_dict["body_fat_percent"] = ""
|
|
|
|
# Load static body fat percentage master chart for page 18
|
|
master_chart_path = Path("app/body_fat_percentage_master_chart.png")
|
|
if master_chart_path.exists():
|
|
try:
|
|
with open(master_chart_path, "rb") as f:
|
|
graphs_dict["body_fat_percentage_master_chart"] = base64.b64encode(
|
|
f.read()
|
|
).decode("utf-8")
|
|
except Exception as e:
|
|
print(f"Warning: Could not load body fat percentage master chart: {e}")
|
|
graphs_dict["body_fat_percentage_master_chart"] = ""
|
|
else:
|
|
print(
|
|
f"Warning: Body fat percentage master chart not found at {master_chart_path}"
|
|
)
|
|
graphs_dict["body_fat_percentage_master_chart"] = ""
|
|
|
|
# Generate spirometry chart
|
|
print("Step 4: Generating spirometry chart...")
|
|
try:
|
|
spirometry_df = pd.read_csv(spirometry_csv_path)
|
|
print(f"Spirometry data loaded: {len(spirometry_df)} rows")
|
|
print(f"Spirometry columns: {spirometry_df.columns.tolist()}")
|
|
if "Parameters" in spirometry_df.columns:
|
|
print(f"Available parameters: {spirometry_df['Parameters'].tolist()}")
|
|
spirometry_chart_b64 = self.graph_generator.generate_spirometry_chart(
|
|
spirometry_df, save_as_base64=True
|
|
)
|
|
graphs_dict["spirometry_chart"] = spirometry_chart_b64
|
|
print("Spirometry chart generated successfully")
|
|
except Exception as e:
|
|
import traceback
|
|
|
|
error_details = traceback.format_exc()
|
|
print(f"Warning: Could not generate spirometry chart: {e}")
|
|
print(f"Error details: {error_details}")
|
|
graphs_dict["spirometry_chart"] = ""
|
|
|
|
# Generate TSI chart if oxygenation CSV is provided
|
|
if oxygenation_csv_path:
|
|
print("Step 4.5: Generating TSI chart...")
|
|
try:
|
|
oxygenation_df = pd.read_csv(oxygenation_csv_path)
|
|
tsi_chart_b64 = self.graph_generator.generate_tsi_chart(
|
|
oxygenation_df, save_as_base64=True
|
|
)
|
|
graphs_dict["tsi_chart"] = tsi_chart_b64
|
|
except Exception as e:
|
|
print(f"Warning: Could not generate TSI chart: {e}")
|
|
graphs_dict["tsi_chart"] = ""
|
|
|
|
# Generate metabolism and fuel source charts for page 5
|
|
print("Step 4.6: Generating metabolism and fuel source charts...")
|
|
try:
|
|
# Calculate RMR and fuel source from pnoe data
|
|
from services.context_generator import ContextGenerator
|
|
|
|
temp_context_gen = ContextGenerator()
|
|
temp_context_gen.load_data(pnoe_csv_path, str(spirometry_csv_path), None)
|
|
temp_context_gen.patient_info = {
|
|
"name": patient_info.get("first_name", ""),
|
|
"last_name": patient_info.get("last_name", ""),
|
|
"age": patient_info.get("age", 25),
|
|
"weight": weight_kg,
|
|
"fat_percentage": fat_pct,
|
|
"gender": gender,
|
|
}
|
|
rmr_metrics = temp_context_gen.calculate_rmr_and_fuel_source()
|
|
|
|
# Generate metabolism chart
|
|
metabolism_chart_b64 = self.graph_generator.generate_metabolism_chart(
|
|
rmr_metrics["rmr_kcal"], save_as_base64=True
|
|
)
|
|
graphs_dict["metabolism_chart"] = metabolism_chart_b64
|
|
|
|
# Generate fuel source chart
|
|
fuel_source_chart_b64 = self.graph_generator.generate_fuel_source_chart(
|
|
rmr_metrics["rest_fat_percentage"], save_as_base64=True
|
|
)
|
|
graphs_dict["fuel_source_chart"] = fuel_source_chart_b64
|
|
except Exception as e:
|
|
print(f"Warning: Could not generate metabolism/fuel source charts: {e}")
|
|
graphs_dict["metabolism_chart"] = ""
|
|
graphs_dict["fuel_source_chart"] = ""
|
|
|
|
# Step 5: Generate context for all pages
|
|
print("Step 5: Generating page contexts...")
|
|
patient_name = patient_info.get("patient_name", "")
|
|
self.context_generator.load_data(
|
|
pnoe_csv_path,
|
|
str(spirometry_csv_path),
|
|
None, # No SECA file
|
|
)
|
|
# Set patient info manually
|
|
self.context_generator.patient_info = {
|
|
"name": patient_info.get("first_name", ""),
|
|
"last_name": patient_info.get("last_name", ""),
|
|
"age": patient_info.get("age", 25),
|
|
"weight": weight_kg,
|
|
"fat_percentage": fat_pct,
|
|
"gender": gender,
|
|
}
|
|
contexts = self.context_generator.generate_all_contexts(
|
|
patient_name, graphs_dict, metric_overrides=metric_overrides
|
|
)
|
|
|
|
# Step 5: Calculate analysis metrics
|
|
analysis_data = self.calculate_analysis_metrics(df)
|
|
analysis_data["graphs_count"] = len(graphs_generated)
|
|
|
|
# Step 6: Generate HTML
|
|
html_content = self.generate_html(patient_info, contexts)
|
|
|
|
# Step 7: Generate PDF
|
|
if output_filename is None:
|
|
patient_name_full = patient_info.get("patient_name", "Unknown")
|
|
session_id = patient_info.get("session_id", "default")
|
|
output_filename = (
|
|
f"report_{patient_name_full.replace(' ', '_')}_{session_id}.pdf"
|
|
)
|
|
|
|
report_path = self.reports_dir / output_filename
|
|
print(f"Generating PDF report at {report_path}")
|
|
await self.html_to_pdf(html_content, str(report_path))
|
|
|
|
return {
|
|
"report_path": str(report_path),
|
|
"graphs_generated": graphs_generated,
|
|
"analysis_data": analysis_data,
|
|
}
|