diff --git a/app/body_fat_percentage_chart.png b/app/body_fat_percentage_chart.png new file mode 100644 index 0000000..7e24de3 Binary files /dev/null and b/app/body_fat_percentage_chart.png differ diff --git a/app/main.py b/app/main.py index 76ed68e..1251ec6 100644 --- a/app/main.py +++ b/app/main.py @@ -13,10 +13,13 @@ from pathlib import Path from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse +from fastapi.staticfiles import StaticFiles from jinja2 import Environment, FileSystemLoader from pydantic import BaseModel -from starlette.middleware.sessions import SessionMiddleware from services.report_generator import ReportGeneratorService +from starlette.middleware.base import BaseHTTPMiddleware +from starlette.middleware.sessions import SessionMiddleware +from starlette.requests import Request as StarletteRequest app = FastAPI( title="Medical Report Generation API", @@ -25,7 +28,32 @@ app = FastAPI( ) # Add session middleware -app.add_middleware(SessionMiddleware, secret_key=os.getenv("SECRET_KEY", "your-secret-key-change-in-production")) +app.add_middleware( + SessionMiddleware, + secret_key=os.getenv("SECRET_KEY", "your-secret-key-change-in-production"), +) + + +# Add security headers middleware to allow external scripts +class SecurityHeadersMiddleware(BaseHTTPMiddleware): + async def dispatch(self, request: StarletteRequest, call_next): + response = await call_next(request) + # Allow external scripts and styles (for Tailwind CDN) + # Only add CSP for HTML responses + content_type = response.headers.get("content-type", "").lower() + if "text/html" in content_type: + response.headers["Content-Security-Policy"] = ( + "default-src 'self'; script-src 'self' 'unsafe-inline' https://cdn.tailwindcss.com https:; style-src 'self' 'unsafe-inline' https://cdn.tailwindcss.com https:; img-src 'self' data: https:;" + ) + return response + + +app.add_middleware(SecurityHeadersMiddleware) + +# Mount static files (if static directory exists) +static_dir = Path("static") +if static_dir.exists(): + app.mount("/static", StaticFiles(directory="static"), name="static") # Setup templates jinja_env = Environment(loader=FileSystemLoader("app/templates")) @@ -59,13 +87,15 @@ def render_template(template_name: str, context: dict) -> HTMLResponse: """Helper function to render Jinja2 templates""" template = jinja_env.get_template(template_name) html_content = template.render(**context) - return HTMLResponse(content=html_content) + return HTMLResponse(content=html_content, media_type="text/html") @app.get("/", response_class=HTMLResponse) async def root(request: Request): """Root endpoint - Upload form page""" - return render_template("upload.html", {"request": request, "session": request.session}) + return render_template( + "upload.html", {"request": request, "session": request.session} + ) @app.post("/upload") @@ -77,56 +107,73 @@ async def upload_files( height: str = Form(...), weight: str = Form(...), gender: str = Form(...), + fat_percentage: float = Form(...), focus: str = Form(default="Endurance"), session_id: str = Form(default="default"), spirometry_pdf: UploadFile = File(...), pnoe_csv: UploadFile = File(...), - seca_excel: UploadFile = File(...), + oxygenation_csv: UploadFile = File(None), ): """Handle file upload and generate report""" # Validate file types if not spirometry_pdf.filename.endswith(".pdf"): - return render_template("upload.html", { - "request": request, - "session": request.session, - "error": "Spirometry file must be a PDF" - }) - + return render_template( + "upload.html", + { + "request": request, + "session": request.session, + "error": "Spirometry file must be a PDF", + }, + ) + if not pnoe_csv.filename.endswith(".csv"): - return render_template("upload.html", { - "request": request, - "session": request.session, - "error": "Pnoe file must be a CSV" - }) - - if not seca_excel.filename.endswith((".xlsx", ".xls")): - return render_template("upload.html", { - "request": request, - "session": request.session, - "error": "SECA file must be an Excel file (.xlsx or .xls)" - }) - + return render_template( + "upload.html", + { + "request": request, + "session": request.session, + "error": "Pnoe file must be a CSV", + }, + ) + + # Validate oxygenation CSV if provided + if oxygenation_csv and oxygenation_csv.filename: + if not oxygenation_csv.filename.endswith(".csv"): + return render_template( + "upload.html", + { + "request": request, + "session": request.session, + "error": "Oxygenation file must be a CSV", + }, + ) + # Create session-specific temp directory session_uuid = str(uuid.uuid4()) session_temp_dir = TEMP_DIR / session_uuid session_temp_dir.mkdir(exist_ok=True, parents=True) - + # Save uploaded files spirometry_path = session_temp_dir / f"spirometry_{spirometry_pdf.filename}" pnoe_path = session_temp_dir / f"pnoe_{pnoe_csv.filename}" - seca_path = session_temp_dir / f"seca_{seca_excel.filename}" - + oxygenation_path = None + try: # Write files with open(spirometry_path, "wb") as f: shutil.copyfileobj(spirometry_pdf.file, f) - + with open(pnoe_path, "wb") as f: shutil.copyfileobj(pnoe_csv.file, f) - - with open(seca_path, "wb") as f: - shutil.copyfileobj(seca_excel.file, f) - + + # Save oxygenation CSV if provided + if oxygenation_csv and oxygenation_csv.filename: + oxygenation_path = ( + session_temp_dir / f"oxygenation_{oxygenation_csv.filename}" + ) + with open(oxygenation_path, "wb") as f: + shutil.copyfileobj(oxygenation_csv.file, f) + # Prepare patient information patient_name = f"{first_name} {last_name}" patient_info = { @@ -137,76 +184,100 @@ async def upload_files( "height": height, "weight": weight, "gender": gender, + "fat_percentage": fat_percentage, "focus": focus, "session_id": session_id, } - + # Generate report + oxygenation_csv_path = str(oxygenation_path) if oxygenation_path else None result = await report_service.generate_report( spirometry_pdf_path=str(spirometry_path), pnoe_csv_path=str(pnoe_path), - seca_excel_path=str(seca_path), patient_info=patient_info, + oxygenation_csv_path=oxygenation_csv_path, ) - + # Store in session request.session["patient_info"] = patient_info request.session["temp_dir"] = str(session_temp_dir) request.session["report_path"] = result["report_path"] request.session["graphs_generated"] = result["graphs_generated"] request.session["analysis_data"] = result["analysis_data"] - + # Extract spirometry CSV path (it's saved in data_dir by the service) - from services.spirometry_table_extractor import extract_spirometry_table_from_pdf - from services.context_generator import ContextGenerator from pathlib import Path as PathLib - + + from services.context_generator import ContextGenerator + from services.spirometry_table_extractor import ( + extract_spirometry_table_from_pdf, + ) + # The spirometry CSV is extracted during report generation # We need to find it or extract it again data_dir = PathLib("data") - spirometry_csv_path = data_dir / f"spirometry_{Path(spirometry_pdf.filename).stem}.csv" - + spirometry_csv_path = ( + data_dir / f"spirometry_{Path(spirometry_pdf.filename).stem}.csv" + ) + # If it doesn't exist, extract it if not spirometry_csv_path.exists(): spirometry_csv_path = extract_spirometry_table_from_pdf( str(spirometry_path), output_dir=str(data_dir) ) spirometry_csv_path = PathLib(spirometry_csv_path) - + # Get calculated metrics for display and editing context_gen = ContextGenerator() context_gen.load_data( str(pnoe_path), str(spirometry_csv_path), - str(seca_path) + None, # No SECA file needed anymore ) - context_gen.extract_patient_info(last_name) # Extract patient info + # Set patient info manually since we're not reading from SECA + weight_kg = float(weight.replace("lbs", "").replace("kg", "").strip()) + if "lbs" in weight.lower(): + weight_kg = weight_kg / 2.20462 # Convert lbs to kg + + context_gen.patient_info = { + "name": first_name, + "last_name": last_name, + "age": age, + "weight": weight_kg, + "fat_percentage": fat_percentage, + "gender": gender, + } spirometry_metrics = context_gen.calculate_spirometry_metrics() pnoe_metrics = context_gen.calculate_pnoe_metrics() - + # Store metrics in session request.session["metrics"] = { "spirometry": spirometry_metrics, "pnoe": pnoe_metrics, } request.session["spirometry_csv_path"] = str(spirometry_csv_path) - + return RedirectResponse(url="/preview", status_code=303) - + except Exception as e: import traceback + error_details = traceback.format_exc() print(f"ERROR: {error_details}") - return render_template("upload.html", { - "request": request, - "session": request.session, - "error": f"Error generating report: {str(e)}" - }) + return render_template( + "upload.html", + { + "request": request, + "session": request.session, + "error": f"Error generating report: {str(e)}", + }, + ) finally: # Close file handles spirometry_pdf.file.close() pnoe_csv.file.close() - seca_excel.file.close() + if oxygenation_csv and oxygenation_csv.filename: + oxygenation_csv.file.close() @app.get("/preview", response_class=HTMLResponse) @@ -214,7 +285,9 @@ async def preview(request: Request): """Preview generated report""" if not request.session.get("report_path"): return RedirectResponse(url="/", status_code=303) - return render_template("preview.html", {"request": request, "session": request.session}) + return render_template( + "preview.html", {"request": request, "session": request.session} + ) @app.get("/graphs/{filename}") @@ -231,7 +304,9 @@ async def edit_form(request: Request): """Display edit metrics form""" if not request.session.get("metrics"): return RedirectResponse(url="/", status_code=303) - return render_template("edit.html", {"request": request, "session": request.session}) + return render_template( + "edit.html", {"request": request, "session": request.session} + ) @app.post("/edit") @@ -239,16 +314,13 @@ async def edit_metrics(request: Request): """Handle metric edits and regenerate report""" if not request.session.get("temp_dir") or not request.session.get("patient_info"): return RedirectResponse(url="/", status_code=303) - + # Get form data form_data = await request.form() - + # Build metric overrides - metric_overrides = { - "pnoe": {}, - "spirometry": {} - } - + metric_overrides = {"pnoe": {}, "spirometry": {}} + # Pnoe overrides if form_data.get("vo2_max"): metric_overrides["pnoe"]["vo2_max"] = float(form_data["vo2_max"]) @@ -262,28 +334,36 @@ async def edit_metrics(request: Request): metric_overrides["pnoe"]["fat_max_value"] = float(form_data["fat_max_value"]) if form_data.get("fat_max_hr"): metric_overrides["pnoe"]["fat_max_hr"] = float(form_data["fat_max_hr"]) - + # VT1 and VT2 overrides - if form_data.get("vt1_hr") or form_data.get("vt1_speed") or form_data.get("vt1_time"): + if ( + form_data.get("vt1_hr") + or form_data.get("vt1_speed") + or form_data.get("vt1_time") + ): metric_overrides["pnoe"]["vt1"] = { "HeartRate": float(form_data.get("vt1_hr", 0)), "Speed": float(form_data.get("vt1_speed", 0)), - "Time": float(form_data.get("vt1_time", 0)) + "Time": float(form_data.get("vt1_time", 0)), } - - if form_data.get("vt2_hr") or form_data.get("vt2_speed") or form_data.get("vt2_time"): + + if ( + form_data.get("vt2_hr") + or form_data.get("vt2_speed") + or form_data.get("vt2_time") + ): metric_overrides["pnoe"]["vt2"] = { "HeartRate": float(form_data.get("vt2_hr", 0)), "Speed": float(form_data.get("vt2_speed", 0)), - "Time": float(form_data.get("vt2_time", 0)) + "Time": float(form_data.get("vt2_time", 0)), } - + # Heart rate zones for i in range(1, 6): zone_key = f"zone{i}_bpm" if form_data.get(zone_key): metric_overrides["pnoe"][zone_key] = form_data[zone_key] - + # Spirometry overrides if form_data.get("fvc_best"): metric_overrides["spirometry"]["fvc_best"] = float(form_data["fvc_best"]) @@ -294,88 +374,120 @@ async def edit_metrics(request: Request): if form_data.get("fev1_pred"): metric_overrides["spirometry"]["fev1_pred"] = float(form_data["fev1_pred"]) if form_data.get("fev1_fvc_pct_best"): - metric_overrides["spirometry"]["fev1_fvc_pct_best"] = float(form_data["fev1_fvc_pct_best"]) + metric_overrides["spirometry"]["fev1_fvc_pct_best"] = float( + form_data["fev1_fvc_pct_best"] + ) if form_data.get("fev1_fvc_pct_pred"): - metric_overrides["spirometry"]["fev1_fvc_pct_pred"] = float(form_data["fev1_fvc_pct_pred"]) - + metric_overrides["spirometry"]["fev1_fvc_pct_pred"] = float( + form_data["fev1_fvc_pct_pred"] + ) + try: # Get file paths from session temp_dir = Path(request.session["temp_dir"]) patient_info = request.session["patient_info"] - + # Find files in temp directory spirometry_path = None pnoe_path = None - seca_path = None - + oxygenation_path = None + for file_path in temp_dir.iterdir(): if file_path.name.startswith("spirometry_"): spirometry_path = file_path elif file_path.name.startswith("pnoe_"): pnoe_path = file_path - elif file_path.name.startswith("seca_"): - seca_path = file_path - - if not all([spirometry_path, pnoe_path, seca_path]): - raise ValueError("Could not find all uploaded files") - + elif file_path.name.startswith("oxygenation_"): + oxygenation_path = file_path + + if not all([spirometry_path, pnoe_path]): + raise ValueError("Could not find all required uploaded files") + # Regenerate report with overrides + oxygenation_csv_path = str(oxygenation_path) if oxygenation_path else None result = await report_service.generate_report( spirometry_pdf_path=str(spirometry_path), pnoe_csv_path=str(pnoe_path), - seca_excel_path=str(seca_path), patient_info=patient_info, - metric_overrides=metric_overrides if (metric_overrides["pnoe"] or metric_overrides["spirometry"]) else None, + metric_overrides=metric_overrides + if (metric_overrides["pnoe"] or metric_overrides["spirometry"]) + else None, + oxygenation_csv_path=oxygenation_csv_path, ) - + # Update session with new report request.session["report_path"] = result["report_path"] request.session["graphs_generated"] = result["graphs_generated"] request.session["analysis_data"] = result["analysis_data"] - + # Recalculate metrics with overrides from services.context_generator import ContextGenerator + context_gen = ContextGenerator() spirometry_csv_path = request.session.get("spirometry_csv_path", "") if not spirometry_csv_path or not Path(spirometry_csv_path).exists(): - from services.spirometry_table_extractor import extract_spirometry_table_from_pdf from pathlib import Path as PathLib + + from services.spirometry_table_extractor import ( + extract_spirometry_table_from_pdf, + ) + data_dir = PathLib("data") spirometry_csv_path = extract_spirometry_table_from_pdf( str(spirometry_path), output_dir=str(data_dir) ) spirometry_csv_path = str(PathLib(spirometry_csv_path)) - + context_gen.load_data( str(pnoe_path), spirometry_csv_path, - str(seca_path) + None, # No SECA file ) + # Set patient info manually + weight_str = patient_info.get("weight", "0") + weight_kg = float(weight_str.replace("lbs", "").replace("kg", "").strip()) + if "lbs" in weight_str.lower(): + weight_kg = weight_kg / 2.20462 # Convert lbs to kg + + context_gen.patient_info = { + "name": patient_info.get("first_name", ""), + "last_name": patient_info.get("last_name", ""), + "age": patient_info.get("age", 25), + "weight": weight_kg, + "fat_percentage": patient_info.get("fat_percentage", 0), + "gender": patient_info.get("gender", "female"), + } context_gen.extract_patient_info(patient_info.get("last_name", "")) - + spirometry_overrides = metric_overrides.get("spirometry", {}) pnoe_overrides = metric_overrides.get("pnoe", {}) - spirometry_metrics = context_gen.calculate_spirometry_metrics(spirometry_overrides) + spirometry_metrics = context_gen.calculate_spirometry_metrics( + spirometry_overrides + ) pnoe_metrics = context_gen.calculate_pnoe_metrics(pnoe_overrides) - + # Update metrics in session request.session["metrics"] = { "spirometry": spirometry_metrics, "pnoe": pnoe_metrics, } request.session["spirometry_csv_path"] = spirometry_csv_path - + return RedirectResponse(url="/preview", status_code=303) - + except Exception as e: import traceback + error_details = traceback.format_exc() print(f"ERROR: {error_details}") - return render_template("edit.html", { - "request": request, - "session": request.session, - "error": f"Error regenerating report: {str(e)}" - }) + return render_template( + "edit.html", + { + "request": request, + "session": request.session, + "error": f"Error regenerating report: {str(e)}", + }, + ) @app.get("/health") diff --git a/app/services/__pycache__/context_generator.cpython-312.pyc b/app/services/__pycache__/context_generator.cpython-312.pyc index 5576322..e3208d4 100644 Binary files a/app/services/__pycache__/context_generator.cpython-312.pyc and b/app/services/__pycache__/context_generator.cpython-312.pyc differ diff --git a/app/services/__pycache__/graph_generator.cpython-312.pyc b/app/services/__pycache__/graph_generator.cpython-312.pyc index 9c84413..cce5a2c 100644 Binary files a/app/services/__pycache__/graph_generator.cpython-312.pyc and b/app/services/__pycache__/graph_generator.cpython-312.pyc differ diff --git a/app/services/__pycache__/report_generator.cpython-312.pyc b/app/services/__pycache__/report_generator.cpython-312.pyc index e8c5764..bfff0c2 100644 Binary files a/app/services/__pycache__/report_generator.cpython-312.pyc and b/app/services/__pycache__/report_generator.cpython-312.pyc differ diff --git a/app/services/context_generator.py b/app/services/context_generator.py index afbd0b4..4c3f563 100644 --- a/app/services/context_generator.py +++ b/app/services/context_generator.py @@ -6,7 +6,7 @@ of the medical report. It performs analysis on Pnoe, Spirometry, and SECA data. """ from datetime import datetime -from typing import Dict, List, Optional, Tuple +from typing import Dict, Optional, Tuple import pandas as pd @@ -24,12 +24,15 @@ class ContextGenerator: self, pnoe_path: str, spirometry_path: str, - seca_path: str, + seca_path: Optional[str] = None, ): """Load all required datasets""" self.pnoe_df = pd.read_csv(pnoe_path, delimiter=";") self.spirometry_df = pd.read_csv(spirometry_path) - self.seca_df = pd.read_excel(seca_path) + if seca_path: + self.seca_df = pd.read_excel(seca_path) + else: + self.seca_df = None self._preprocess_pnoe_data() def _preprocess_pnoe_data(self): @@ -75,7 +78,7 @@ class ContextGenerator: ) def extract_patient_info(self, patient_name: str) -> Dict: - """Extract patient information from SECA dataset""" + """Extract patient information from SECA dataset or use provided patient_info""" if self.seca_df is not None: patient_data = self.seca_df[ self.seca_df["LastName"].str.contains( @@ -99,49 +102,73 @@ class ContextGenerator: "fat_mass_lbs": weight_kg * fat_pct / 100 * 2.20462, "lean_mass_lbs": weight_kg * (1 - fat_pct / 100) * 2.20462, } + # If patient_info is already set (from manual input), calculate fat_mass and lean_mass + elif "weight" in self.patient_info and "fat_percentage" in self.patient_info: + weight_kg = self.patient_info["weight"] + fat_pct = self.patient_info["fat_percentage"] + self.patient_info["fat_mass_lbs"] = weight_kg * fat_pct / 100 * 2.20462 + self.patient_info["lean_mass_lbs"] = ( + weight_kg * (1 - fat_pct / 100) * 2.20462 + ) return self.patient_info - def calculate_spirometry_metrics(self, metric_overrides: Optional[Dict] = None) -> Dict: + def calculate_spirometry_metrics( + self, metric_overrides: Optional[Dict] = None + ) -> Dict: """Calculate spirometry-related metrics""" if metric_overrides is None: metric_overrides = {} - + metrics = {} for param in ["FVC", "FEV1", "FEV1/FVC%"]: param_key = param.lower().replace("/", "_").replace("%", "_pct") - + if f"{param_key}_best" in metric_overrides: - metrics[f"{param_key}_best"] = float(metric_overrides[f"{param_key}_best"]) + metrics[f"{param_key}_best"] = float( + metric_overrides[f"{param_key}_best"] + ) else: row = self.spirometry_df.loc[ self.spirometry_df["Parameters"].str.strip() == param ] if not row.empty: - metrics[f"{param_key}_best"] = row["Best"].values[0] - + value = row["Best"].values[0] + if pd.notna(value): + try: + metrics[f"{param_key}_best"] = float(value) + except (ValueError, TypeError): + pass # Skip if conversion fails + if f"{param_key}_pred" in metric_overrides: - metrics[f"{param_key}_pred"] = float(metric_overrides[f"{param_key}_pred"]) + metrics[f"{param_key}_pred"] = float( + metric_overrides[f"{param_key}_pred"] + ) else: row = self.spirometry_df.loc[ self.spirometry_df["Parameters"].str.strip() == param ] if not row.empty: - metrics[f"{param_key}_pred"] = row["%Pred."].values[0] + value = row["%Pred."].values[0] + if pd.notna(value): + try: + metrics[f"{param_key}_pred"] = float(value) + except (ValueError, TypeError): + pass # Skip if conversion fails return metrics def calculate_pnoe_metrics(self, metric_overrides: Optional[Dict] = None) -> Dict: """Calculate all Pnoe-derived metrics""" if metric_overrides is None: metric_overrides = {} - + metrics = {} - + # VO2 Max metrics if "vo2_max" in metric_overrides: metrics["vo2_max"] = float(metric_overrides["vo2_max"]) else: metrics["vo2_max"] = self.pnoe_df["VO2(ml/min)_smoothed"].max() - + if "vo2_max_per_kg" in metric_overrides: metrics["vo2_max_per_kg"] = float(metric_overrides["vo2_max_per_kg"]) else: @@ -184,7 +211,7 @@ class ContextGenerator: else: vt1, _ = self._detect_thresholds() metrics["vt1"] = vt1 - + if "vt2" in metric_overrides: metrics["vt2"] = metric_overrides["vt2"] else: @@ -200,9 +227,11 @@ class ContextGenerator: else: fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax() fat_max_row = self.pnoe_df.loc[fat_max_idx] - zones = self._calculate_hr_zones(metrics["vt1"], metrics["vt2"], fat_max_row) + zones = self._calculate_hr_zones( + metrics["vt1"], metrics["vt2"], fat_max_row + ) metrics.update(zones) - + return metrics def _detect_thresholds(self) -> Tuple[Optional[Dict], Optional[Dict]]: @@ -261,95 +290,463 @@ class ContextGenerator: zones["zone5_bpm"] = f"{int(max_hr * 0.95)}+bpm" return zones + def _calculate_vo2_drop_points(self, pnoe_metrics: Dict) -> Dict: + """Calculate VO2 Pulse and VO2 Breath drop points""" + # Calculate slope of VO2 Pulse + vo2_pulse_slope = self.pnoe_df["VO2 Pulse_smoothed"].diff() + window = max(1, len(self.pnoe_df) // 3) # Ensure window is at least 1 + vo2_pulse_slope_smoothed = vo2_pulse_slope.rolling(window=window, min_periods=1).mean() + + # Find where VO2 Pulse begins to drop (slope becomes negative) + mask_pulse = vo2_pulse_slope_smoothed <= 0 + drop_indices_pulse = mask_pulse[mask_pulse].index + + vo2_pulse_drop_bpm = None + vo2_pulse_drop_zone = None + if len(drop_indices_pulse) > 0: + drop_idx = drop_indices_pulse[0] + drop_row = self.pnoe_df.loc[drop_idx] + vo2_pulse_drop_bpm = int(drop_row["HR(bpm)_smoothed"]) + # Determine zone based on HR zones + if pnoe_metrics.get("zone1_bpm") and vo2_pulse_drop_bpm: + zones = [pnoe_metrics.get(f"zone{i}_bpm", "") for i in range(1, 6)] + for i, zone_str in enumerate(zones, 1): + if zone_str: + zone_clean = zone_str.replace("bpm", "").strip() + if "-" in zone_clean: + parts = zone_clean.split("-") + if len(parts) == 2: + try: + start, end = int(parts[0]), int(parts[1].replace("+", "")) + if start <= vo2_pulse_drop_bpm <= end: + vo2_pulse_drop_zone = f"Zone {i}" + break + except ValueError: + pass + elif "+" in zone_clean: + # Zone 5 format: "180+bpm" + try: + start = int(zone_clean.replace("+", "")) + if vo2_pulse_drop_bpm >= start: + vo2_pulse_drop_zone = f"Zone {i}" + break + except ValueError: + pass + + # Calculate slope of VO2 Breath + vo2_breath_slope = self.pnoe_df["VO2 Breath_smoothed"].diff() + vo2_breath_slope_smoothed = vo2_breath_slope.rolling(window=window, min_periods=1).mean() + + # Find where VO2 Breath begins to drop + mask_breath = vo2_breath_slope_smoothed <= 0 + drop_indices_breath = mask_breath[mask_breath].index + + vo2_breath_drop_bpm = None + vo2_breath_drop_zone = None + if len(drop_indices_breath) > 0: + drop_idx = drop_indices_breath[0] + drop_row = self.pnoe_df.loc[drop_idx] + vo2_breath_drop_bpm = int(drop_row["HR(bpm)_smoothed"]) + # Determine zone + if pnoe_metrics.get("zone1_bpm") and vo2_breath_drop_bpm: + zones = [pnoe_metrics.get(f"zone{i}_bpm", "") for i in range(1, 6)] + for i, zone_str in enumerate(zones, 1): + if zone_str: + zone_clean = zone_str.replace("bpm", "").strip() + if "-" in zone_clean: + parts = zone_clean.split("-") + if len(parts) == 2: + try: + start, end = int(parts[0]), int(parts[1].replace("+", "")) + if start <= vo2_breath_drop_bpm <= end: + vo2_breath_drop_zone = f"Zone {i}" + break + except ValueError: + pass + elif "+" in zone_clean: + # Zone 5 format: "180+bpm" + try: + start = int(zone_clean.replace("+", "")) + if vo2_breath_drop_bpm >= start: + vo2_breath_drop_zone = f"Zone {i}" + break + except ValueError: + pass + + return { + "vo2_pulse_drop_bpm": vo2_pulse_drop_bpm or 180, + "vo2_pulse_drop_zone": vo2_pulse_drop_zone or "Zone 4", + "vo2_breath_drop_bpm": vo2_breath_drop_bpm or 173, + "vo2_breath_drop_zone": vo2_breath_drop_zone or "Zone 3", + } + + def _calculate_fat_metabolism_metrics(self, pnoe_metrics: Dict) -> Dict: + """Calculate fat metabolism metrics for page 11""" + fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax() + fat_max_row = self.pnoe_df.loc[fat_max_idx] + + fat_max_value = pnoe_metrics.get("fat_max_value", 0) + fat_max_hr = pnoe_metrics.get("fat_max_hr", 0) + max_hr = 220 - self.patient_info["age"] + fat_max_heart_rate_pct = (fat_max_hr / max_hr * 100) if max_hr > 0 else 0 + + # Find carbs and fat crossover point + crossover_idx = None + for idx in self.pnoe_df.index: + if self.pnoe_df.loc[idx, "CHO_smoothed"] > self.pnoe_df.loc[idx, "FAT_smoothed"]: + crossover_idx = idx + break + + crossover_bpm = None + crossover_heart_rate_pct = None + if crossover_idx is not None: + crossover_row = self.pnoe_df.loc[crossover_idx] + crossover_bpm = int(crossover_row["HR(bpm)_smoothed"]) + crossover_heart_rate_pct = (crossover_bpm / max_hr * 100) if max_hr > 0 else 0 + + # Get speed and incline at fat max + fat_max_speed = fat_max_row.get("Speed", 0) + fat_max_incline = fat_max_row.get("Incline", 2.0) if "Incline" in fat_max_row else 2.0 + + return { + "fat_max_value": f"{fat_max_value:.2f}Kcals/min", + "fat_max_heart_rate": f"{fat_max_heart_rate_pct:.0f}% of Max Heart Rate", + "fat_max_bpm": f"{int(fat_max_hr)} bpm", + "fat_max_optimal": "*Optimal 10-12Kcals/minute", + "crossover_bpm": f"{crossover_bpm or 100}bpm", + "crossover_heart_rate": f"{crossover_heart_rate_pct or 51:.0f}% of Max Heart Rate", + "fat_metabolism_note": f"{crossover_bpm or 100}bpm at a speed of {fat_max_speed:.1f}mph and incline of {fat_max_incline:.0f}%", + } + + def _calculate_recovery_metrics(self) -> Dict: + """Calculate recovery metrics for page 11""" + # Find peak exercise point (max HR) + peak_idx = self.pnoe_df["HR(bpm)_smoothed"].idxmax() + peak_hr = self.pnoe_df.loc[peak_idx, "HR(bpm)_smoothed"] + peak_time = self.pnoe_df.loc[peak_idx, "T(sec)"] + + # Find recovery phase (after peak) + recovery_df = self.pnoe_df[self.pnoe_df["T(sec)"] > peak_time].copy() + + if len(recovery_df) == 0: + return { + "cardiac_recovery_time": "(1 minute)", + "cardiac_recovery_percentage": "33%", + "metabolic_recovery_time": "(2 minute)", + "metabolic_recovery_percentage": "65%", + "breath_recovery_time": "(2.5 minute)", + "breath_recovery_percentage": "76%", + } + + # Cardiac recovery (1 minute) + one_min_time = peak_time + 60 + one_min_row = recovery_df[recovery_df["T(sec)"] <= one_min_time] + if len(one_min_row) > 0: + one_min_hr = one_min_row.iloc[-1]["HR(bpm)_smoothed"] + cardiac_recovery_pct = ((peak_hr - one_min_hr) / peak_hr * 100) if peak_hr > 0 else 0 + else: + cardiac_recovery_pct = 33 + + # Metabolic recovery (2 minutes) - using VCO2 + two_min_time = peak_time + 120 + peak_vco2 = self.pnoe_df.loc[peak_idx, "VCO2(ml/min)_smoothed"] + two_min_row = recovery_df[recovery_df["T(sec)"] <= two_min_time] + if len(two_min_row) > 0: + two_min_vco2 = two_min_row.iloc[-1]["VCO2(ml/min)_smoothed"] + metabolic_recovery_pct = ((peak_vco2 - two_min_vco2) / peak_vco2 * 100) if peak_vco2 > 0 else 0 + else: + metabolic_recovery_pct = 65 + + # Breath frequency recovery (2.5 minutes) + two_five_min_time = peak_time + 150 + peak_bf = self.pnoe_df.loc[peak_idx, "BF(bpm)_smoothed"] + two_five_min_row = recovery_df[recovery_df["T(sec)"] <= two_five_min_time] + if len(two_five_min_row) > 0: + two_five_min_bf = two_five_min_row.iloc[-1]["BF(bpm)_smoothed"] + breath_recovery_pct = ((peak_bf - two_five_min_bf) / peak_bf * 100) if peak_bf > 0 else 0 + else: + breath_recovery_pct = 76 + + return { + "cardiac_recovery_time": "(1 minute)", + "cardiac_recovery_percentage": f"{int(cardiac_recovery_pct)}%", + "metabolic_recovery_time": "(2 minute)", + "metabolic_recovery_percentage": f"{int(metabolic_recovery_pct)}%", + "breath_recovery_time": "(2.5 minute)", + "breath_recovery_percentage": f"{int(breath_recovery_pct)}%", + } + + def _calculate_resting_heart_rate_metrics(self) -> Dict: + """Calculate resting heart rate metrics for page 11""" + # Get resting HR from beginning of test + rest_phase = self.pnoe_df.head(30) # First 30 seconds + resting_hr = rest_phase["HR(bpm)_smoothed"].mean() + + age = self.patient_info.get("age", 30) + gender = self.patient_info.get("gender", "female").lower() + + # Determine age range + if 26 <= age <= 35: + age_range = "26-35" + elif 36 <= age <= 45: + age_range = "36-45" + elif 46 <= age <= 55: + age_range = "46-55" + else: + age_range = "26-35" # Default + + # HR ranges based on gender and age (simplified) + if gender == "female": + hr_ranges = { + "poor": "82bpm +", + "below_avg": "75-81bpm", + "average": "71-74bpm", + "above_avg": "66-70bpm", + "good": "62-65bpm", + "excellent": "55-61bpm", + "athlete": "44-54bpm", + } + else: # male + hr_ranges = { + "poor": "82bpm +", + "below_avg": "75-81bpm", + "average": "71-74bpm", + "above_avg": "66-70bpm", + "good": "62-65bpm", + "excellent": "55-61bpm", + "athlete": "44-54bpm", + } + + return { + "resting_heart_rate": f"{int(resting_hr)}bpm", + "hr_age_range": age_range, + "hr_poor": hr_ranges["poor"], + "hr_below_avg": hr_ranges["below_avg"], + "hr_average": hr_ranges["average"], + "hr_above_avg": hr_ranges["above_avg"], + "hr_good": hr_ranges["good"], + "hr_excellent": hr_ranges["excellent"], + "hr_athlete": hr_ranges["athlete"], + } + + def calculate_rmr_and_fuel_source(self) -> Dict: + """Calculate RMR and fuel source from pnoe data""" + metrics = {} + + # Calculate RMR from resting phase (MET <= 1.1) + if "MET" in self.pnoe_df.columns and "EE(kcal/day)" in self.pnoe_df.columns: + rest_phase = self.pnoe_df[self.pnoe_df["MET"] <= 1.1] + if not rest_phase.empty: + rmr = rest_phase["EE(kcal/day)"].mean() + metrics["rmr_kcal"] = float(rmr) + else: + # Fallback: use minimum EE(kcal/min) * 1440 (minutes per day) + if "EE(kcal/min)" in self.pnoe_df.columns: + min_ee = self.pnoe_df["EE(kcal/min)"].min() + metrics["rmr_kcal"] = float(min_ee * 1440) + else: + metrics["rmr_kcal"] = 1500.0 # Default fallback + else: + # Fallback: estimate from weight (simplified) + weight_kg = self.patient_info.get("weight", 70) + gender = self.patient_info.get("gender", "female").lower() + + # Simplified RMR estimation: 22 kcal/kg/day for men, 20 for women + if gender == "male": + rmr = weight_kg * 22 + else: + rmr = weight_kg * 20 + metrics["rmr_kcal"] = float(rmr) + + # Calculate fuel source from resting phase (RER == 0.9 or closest) + if "RER" in self.pnoe_df.columns and "FAT(%)" in self.pnoe_df.columns: + # Find rest phase with RER closest to 0.9 + rest_phase = ( + self.pnoe_df[self.pnoe_df["MET"] <= 1.1].copy() + if "MET" in self.pnoe_df.columns + else self.pnoe_df.copy() + ) + if not rest_phase.empty: + # Find row with RER closest to 0.9 + if "RER" in rest_phase.columns: + rest_phase["RER_diff"] = abs(rest_phase["RER"] - 0.9) + closest_idx = rest_phase["RER_diff"].idxmin() + fat_pct = rest_phase.loc[closest_idx, "FAT(%)"] + metrics["rest_fat_percentage"] = float(fat_pct) + else: + # Use mean FAT(%) from rest phase + metrics["rest_fat_percentage"] = float(rest_phase["FAT(%)"].mean()) + else: + # Fallback: use overall mean + metrics["rest_fat_percentage"] = float(self.pnoe_df["FAT(%)"].mean()) + else: + # Fallback: use a default value + metrics["rest_fat_percentage"] = 75.0 + + # Calculate caloric values for page 5 + rmr = metrics["rmr_kcal"] + neat = rmr * 0.25 # NEAT is typically 20-30% of RMR + weight_loss_rate = 1.0 # 1 lb per week + weight_loss_calories = 500.0 # 500 kcal deficit per day for 1 lb/week + total_calories = rmr + neat - weight_loss_calories + + metrics["resting_calories"] = int(rmr) + metrics["neat_calories"] = int(neat) + metrics["weight_loss_calories"] = int(weight_loss_calories) + metrics["weight_loss_rate"] = weight_loss_rate + metrics["total_calories"] = int(total_calories) + + return metrics + def generate_all_contexts( - self, patient_name: str, graphs: Dict[str, str], metric_overrides: Optional[Dict] = None - ) -> List[Dict]: - """Main method to generate all page contexts""" + self, + patient_name: str, + graphs: Dict[str, str], + metric_overrides: Optional[Dict] = None, + ) -> Dict[str, Dict]: + """Main method to generate all page contexts + + Returns: + Dictionary with keys 'page_1', 'page_2', etc., each containing context data for that page + """ if metric_overrides is None: metric_overrides = {} - + self.extract_patient_info(patient_name) - + # Extract metric overrides for spirometry and pnoe spirometry_overrides = metric_overrides.get("spirometry", {}) pnoe_overrides = metric_overrides.get("pnoe", {}) - + spirometry_metrics = self.calculate_spirometry_metrics(spirometry_overrides) pnoe_metrics = self.calculate_pnoe_metrics(pnoe_overrides) + rmr_metrics = self.calculate_rmr_and_fuel_source() - contexts = [] - contexts.append( - { - "name": self.patient_info["name"], - "surname": self.patient_info["last_name"], - "date": datetime.now().strftime("%B %d, %Y"), - } - ) - contexts.append( - { + contexts = {} + + # Page 1 + contexts["page_1"] = { + "name": self.patient_info["name"], + "surname": self.patient_info["last_name"], + "date": datetime.now().strftime("%B %d, %Y"), + } + + # Page 2 + contexts["page_2"] = { + "patient_name": self.patient_info["name"], + "test_date": datetime.now().strftime("%B %d, %Y"), + } + + # Pages 3, 6 (pages 4 and 5 are handled separately) + for i in [0, 3]: # Skip indices 1 and 2 which are pages 4 and 5 + contexts[f"page_{i + 3}"] = { "patient_name": self.patient_info["name"], - "test_date": datetime.now().strftime("%B %d, %Y"), + "page_number": i + 3, } - ) - for i in range(4): - contexts.append( - {"patient_name": self.patient_info["name"], "page_number": i + 3} - ) + # Page 4 - Nutrition Guidelines with Body Composition + contexts["page_4"] = { + "patient_name": self.patient_info["name"], + "page_number": 4, + "fat_percentage": f"{self.patient_info['fat_percentage']:.1f}", + "body_composition_chart": graphs.get("body_composition", ""), + "body_fat_chart": graphs.get("body_fat_percent", ""), # Alias for template + "body_fat_percent_chart": graphs.get( + "body_fat_percent", "" + ), # Keep for consistency + } + # Page 5 - Resting Metabolic Rate Assessment + contexts["page_5"] = { + "patient_name": self.patient_info["name"], + "page_number": 5, + "metabolism_chart": graphs.get("metabolism_chart", ""), + "fuel_source_chart": graphs.get("fuel_source_chart", ""), + "resting_calories": rmr_metrics.get("resting_calories", 1500), + "neat_calories": rmr_metrics.get("neat_calories", 375), + "weight_loss_calories": rmr_metrics.get("weight_loss_calories", 500), + "weight_loss_rate": rmr_metrics.get("weight_loss_rate", 1.0), + "total_calories": rmr_metrics.get("total_calories", 1375), + } + + # Calculate FEV1 percentage for page 7 fev1_percentage = 0 if spirometry_metrics.get("fvc_best"): fev1_percentage = ( pnoe_metrics["peak_vt"] / spirometry_metrics["fvc_best"] ) * 100 - contexts.append( - { - "peak_vt": f"{pnoe_metrics['peak_vt']:.2f}", - "peak_vt_bpm": f"{int(pnoe_metrics['peak_vt_hr'])}", - "fev1_percentage": f"{fev1_percentage:.1f}", - "lung_analysis_chart": graphs.get("spirometry_chart", ""), - "respiratory_analysis_chart": graphs.get("respiratory", ""), - } - ) - contexts.append( - { - "vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}", - "age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}", - "zone1_bpm": pnoe_metrics.get("zone1_bpm", ""), - "zone2_bpm": pnoe_metrics.get("zone2_bpm", ""), - "zone3_bpm": pnoe_metrics.get("zone3_bpm", ""), - "zone4_bpm": pnoe_metrics.get("zone4_bpm", ""), - "zone5_bpm": pnoe_metrics.get("zone5_bpm", ""), - "vo2_pulse_chart": graphs.get("vo2_pulse", ""), - } - ) - contexts.append( - { - "fat_max_value": f"{pnoe_metrics['fat_max_value']:.2f}", - "fat_max_hr": f"{int(pnoe_metrics['fat_max_hr'])}", - "fuel_utilization_chart": graphs.get("fuel_utilization", ""), - "fat_metabolism_chart": graphs.get("fat_metabolism", ""), - } - ) - contexts.append( - { - "fat_percentage": f"{self.patient_info['fat_percentage']:.1f}", - "fat_mass_lbs": f"{self.patient_info['fat_mass_lbs']:.1f}", - "lean_mass_lbs": f"{self.patient_info['lean_mass_lbs']:.1f}", - "body_composition_chart": graphs.get("body_composition", ""), - "body_fat_percent_chart": graphs.get("body_fat_percent", ""), - } - ) + # Page 7 + contexts["page_7"] = { + "peak_vt": f"{pnoe_metrics['peak_vt']:.2f}", + "peak_vt_bpm": f"{int(pnoe_metrics['peak_vt_hr'])}", + "fev1_percentage": f"{fev1_percentage:.1f}", + "lung_analysis_chart": graphs.get("spirometry_chart", ""), + "respiratory_analysis_chart": graphs.get("respiratory", ""), + } - for i in range(9): - contexts.append( - { - "patient_name": self.patient_info["name"], - "page_number": i + 11, - "vo2_breath_chart": graphs.get("vo2_breath", ""), - "recovery_chart": graphs.get("recovery", ""), - } - ) + # Page 8 + contexts["page_8"] = { + "vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}", + "age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}", + "zone1_bpm": pnoe_metrics.get("zone1_bpm", ""), + "zone2_bpm": pnoe_metrics.get("zone2_bpm", ""), + "zone3_bpm": pnoe_metrics.get("zone3_bpm", ""), + "zone4_bpm": pnoe_metrics.get("zone4_bpm", ""), + "zone5_bpm": pnoe_metrics.get("zone5_bpm", ""), + "vo2_pulse_chart": graphs.get("vo2_pulse", ""), + } + + # Page 9 + contexts["page_9"] = { + "fat_max_value": f"{pnoe_metrics['fat_max_value']:.2f}", + "fat_max_hr": f"{int(pnoe_metrics['fat_max_hr'])}", + "fuel_utilization_chart": graphs.get("fuel_utilization", ""), + "fat_metabolism_chart": graphs.get("fat_metabolism", ""), + } + + # Page 10 - VO2 Pulse and VO2 Breath + vo2_drop_metrics = self._calculate_vo2_drop_points(pnoe_metrics) + contexts["page_10"] = { + "vo2_pulse_chart": graphs.get("vo2_pulse", ""), + "vo2_breath_chart": graphs.get("vo2_breath", ""), + "vo2_pulse_drop_bpm": f"{vo2_drop_metrics['vo2_pulse_drop_bpm']} bpm", + "vo2_pulse_drop_zone": vo2_drop_metrics["vo2_pulse_drop_zone"], + "vo2_breath_drop_bpm": f"{vo2_drop_metrics['vo2_breath_drop_bpm']} bpm", + "vo2_breath_drop_zone": vo2_drop_metrics["vo2_breath_drop_zone"], + } + + # Page 11 - Fat Metabolism and Recovery + fat_metabolism_metrics = self._calculate_fat_metabolism_metrics(pnoe_metrics) + recovery_metrics = self._calculate_recovery_metrics() + resting_hr_metrics = self._calculate_resting_heart_rate_metrics() + + contexts["page_11"] = { + "fat_metabolism_chart": graphs.get("fat_metabolism", ""), + "recovery_chart": graphs.get("recovery", ""), + **fat_metabolism_metrics, + **recovery_metrics, + **resting_hr_metrics, + } + + # Pages 12-17 + for i in range(6): + contexts[f"page_{i + 12}"] = { + "patient_name": self.patient_info["name"], + "page_number": i + 12, + } + + # Page 18 - Glossary with Body Fat Percentage Chart + contexts["page_18"] = { + "patient_name": self.patient_info["name"], + "page_number": 18, + "body_fat_percentage_chart": graphs.get("body_fat_percent", ""), + } + + # Page 19 + contexts["page_19"] = { + "patient_name": self.patient_info["name"], + "page_number": 19, + } return contexts diff --git a/app/services/graph_generator.py b/app/services/graph_generator.py index 82e3454..d1f9620 100644 --- a/app/services/graph_generator.py +++ b/app/services/graph_generator.py @@ -584,6 +584,105 @@ class GraphGenerator: return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) + def generate_tsi_chart( + self, oxygenation_df: pd.DataFrame, save_as_base64: bool = True + ) -> str: + """ + Generate TSI (Tissue Saturation Index) chart with trend lines per stage. + + Args: + oxygenation_df: DataFrame with Time, TSI, and TSI-second columns + save_as_base64: If True, return base64 string, else return file path + + Returns: + Base64 string or file path + """ + from numpy.polynomial.polynomial import Polynomial + + plt.figure(figsize=(12, 5.5)) + + # Plot TSI (Left Leg) + plt.plot( + oxygenation_df["Time"], + oxygenation_df["TSI"], + label="TSI (Left Leg)", + color="steelblue", + linewidth=2, + ) + + # Plot TSI2 (Right Leg) + plt.plot( + oxygenation_df["Time"], + oxygenation_df["TSI-second"], + label="TSI2 (Right Leg)", + color="orange", + linewidth=2, + ) + + # Define time intervals for stages (adjust these based on your test protocol) + max_time = oxygenation_df["Time"].max() + intervals = [ + (0, 250), + (250, 500), + (500, 750), + (750, 1000), + (1000, 1250), + (1250, 1500), + (1500, max_time), + ] + + # Calculate and plot trend lines for each interval + for start_time, end_time in intervals: + # Filter data for this interval + mask_interval = (oxygenation_df["Time"] >= start_time) & ( + oxygenation_df["Time"] <= end_time + ) + + # TSI (Left Leg) trend for this interval + mask_left = mask_interval & ~oxygenation_df["TSI"].isna() + if mask_left.sum() > 1: # Need at least 2 points for a line + x_left = oxygenation_df.loc[mask_left, "Time"] + y_left = oxygenation_df.loc[mask_left, "TSI"] + coefs_left = Polynomial.fit(x_left, y_left, 1).convert().coef + trend_left = coefs_left[0] + coefs_left[1] * x_left + plt.plot( + x_left, + trend_left, + color="black", + linestyle="--", + linewidth=2, + alpha=0.8, + ) + + # TSI-second (Right Leg) trend for this interval + mask_right = mask_interval & ~oxygenation_df["TSI-second"].isna() + if mask_right.sum() > 1: # Need at least 2 points for a line + x_right = oxygenation_df.loc[mask_right, "Time"] + y_right = oxygenation_df.loc[mask_right, "TSI-second"] + coefs_right = Polynomial.fit(x_right, y_right, 1).convert().coef + trend_right = coefs_right[0] + coefs_right[1] * x_right + plt.plot( + x_right, + trend_right, + color="black", + linestyle="--", + linewidth=2, + alpha=0.8, + ) + + plt.xlabel("Time (s)") + plt.ylabel("TSI (%)") + plt.title("TSI (Left) and TSI2 (Right) with Black Slope Lines per Stage") + plt.legend(fontsize=10, loc="upper right") + plt.grid(alpha=0.25) + plt.tight_layout() + + chart_path = self.charts_dir / "tsi_chart.png" + plt.savefig(chart_path, bbox_inches="tight", dpi=160) + plt.close() + + return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) + def generate_body_composition_chart( self, fat_mass_lbs: float, lean_mass_lbs: float, save_as_base64: bool = True ) -> str: @@ -678,25 +777,52 @@ class GraphGenerator: else: age_group = "20-39" # Default - demographic = f"{age_group}\n({gender[0].upper()})" + gender_abbrev = "M" if gender.lower() == "male" else "F" + demographic = f"{age_group}\n({gender_abbrev})" - # Define segments based on gender (female example) + # Define segments based on gender and age group if gender.lower() == "female": - segments = [ - ("#F8A8A8", 0, 15), # Muted Red: 0% to 15% - ("#FFEECC", 15, 5), # Pale Yellow: 15% to 20% - ("#D0F0C0", 20, 15), # Pale Green: 20% to 35% - ("#FFEECC", 35, 5), # Pale Yellow: 35% to 40% - ("#F8A8A8", 40, 10), # Muted Red: 40% to 50% - ] + if age_group == "20-39": + segments = [ + ("#F8A8A8", 0, 15), # Bad: 0-15% + ("#FFEECC", 15, 5), # Okay: 15-20% + ("#D0F0C0", 20, 15), # Good: 20-35% + ("#FFEECC", 35, 5), # Okay: 35-40% + ("#F8A8A8", 40, 10), # Bad: 40-50% + ] + else: # 40-59 and 60-79 have same ranges for females + segments = [ + ("#F8A8A8", 0, 20), # Bad: 0-20% + ("#FFEECC", 20, 5), # Okay: 20-25% + ("#D0F0C0", 25, 10), # Good: 25-35% + ("#FFEECC", 35, 5), # Okay: 35-40% + ("#F8A8A8", 40, 10), # Bad: 40-50% + ] else: # male - segments = [ - ("#F8A8A8", 0, 5), # Muted Red: 0% to 5% - ("#FFEECC", 5, 5), # Pale Yellow: 5% to 10% - ("#D0F0C0", 10, 10), # Pale Green: 10% to 20% - ("#FFEECC", 20, 5), # Pale Yellow: 20% to 25% - ("#F8A8A8", 25, 25), # Muted Red: 25% to 50% - ] + if age_group == "20-39": + segments = [ + ("#F8A8A8", 0, 5), # Bad: 0-5% + ("#FFEECC", 5, 5), # Okay: 5-10% + ("#D0F0C0", 10, 10), # Good: 10-20% + ("#FFEECC", 20, 5), # Okay: 20-25% + ("#F8A8A8", 25, 25), # Bad: 25-50% + ] + elif age_group == "40-59": + segments = [ + ("#F8A8A8", 0, 5), # Bad: 0-5% + ("#FFEECC", 5, 5), # Okay: 5-10% + ("#D0F0C0", 10, 10), # Good: 10-20% + ("#FFEECC", 20, 10), # Okay: 20-30% + ("#F8A8A8", 30, 20), # Bad: 30-50% + ] + else: # 60-79 + segments = [ + ("#F8A8A8", 0, 5), # Bad: 0-5% + ("#FFEECC", 5, 5), # Okay: 5-10% + ("#D0F0C0", 10, 15), # Good: 10-25% + ("#FFEECC", 25, 5), # Okay: 25-30% + ("#F8A8A8", 30, 20), # Bad: 30-50% + ] fig, ax = plt.subplots(figsize=(10, 2)) @@ -779,10 +905,40 @@ class GraphGenerator: Returns: Base64 string or file path """ - # Coerce numeric columns - for col in ["Best", "LLN", "Pred.", "%Pred.", "ZScore"]: - if col in spirometry_df.columns: - spirometry_df[col] = pd.to_numeric(spirometry_df[col], errors="coerce") + # Coerce numeric columns - handle various column name formats + # Map standard column names to possible variations + column_aliases = { + "Best": ["Best", "best", "BEST"], + "LLN": ["LLN", "lln"], + "Pred.": ["Pred.", "Pred", "pred", "Predicted", "predicted"], + "%Pred.": [ + "%Pred.", + "%Pred", + "%pred", + "% Pred.", + "% Pred", + "Pred %", + "Pred%", + ], + "ZScore": ["ZScore", "Z-Score", "z-score", "Zscore", "zscore", "Z Score"], + } + + # Find and normalize column names + column_mapping = {} + for target_col, possible_names in column_aliases.items(): + for col_name in possible_names: + if col_name in spirometry_df.columns: + column_mapping[target_col] = col_name + # Convert to numeric + spirometry_df[col_name] = pd.to_numeric( + spirometry_df[col_name], errors="coerce" + ) + break + + # If standard columns don't exist, create aliases + for target_col, source_col in column_mapping.items(): + if target_col not in spirometry_df.columns and source_col != target_col: + spirometry_df[target_col] = spirometry_df[source_col] # Select rows of interest rows_map = { @@ -793,20 +949,49 @@ class GraphGenerator: records = [] for label, param in rows_map.items(): + # Try exact match first row = spirometry_df.loc[spirometry_df["Parameters"].str.strip() == param] if row.empty: + # Try case-insensitive match + row = spirometry_df.loc[ + spirometry_df["Parameters"].str.strip().str.upper() == param.upper() + ] + if row.empty: + # Try matching without % sign + if "%" in param: + param_no_pct = param.replace("%", "") + row = spirometry_df.loc[ + spirometry_df["Parameters"].str.strip() == param_no_pct + ] + if row.empty: + print(f"Warning: Could not find parameter '{param}' in spirometry data") + print(f"Available parameters: {spirometry_df['Parameters'].tolist()}") continue row = row.iloc[0] + # Get values with fallbacks for column name variations + best_val = row.get("Best", row.get("best", pd.NA)) + pct_val = row.get( + "%Pred.", row.get("%Pred", row.get("Pred %", row.get("Pred%", pd.NA))) + ) + z_val = row.get("ZScore", row.get("Z-Score", row.get("Zscore", pd.NA))) + records.append( { "label": label, "param": param, - "best": row["Best"], - "pct": row["%Pred."], - "z": row["ZScore"], + "best": best_val, + "pct": pct_val, + "z": z_val, } ) + # Validate we have exactly 3 records + if len(records) != 3: + raise ValueError( + f"Expected 3 spirometry parameters (FVC, FEV1, FEV1/FVC%), " + f"but found {len(records)}. Found: {[r['param'] for r in records]}" + ) + # Figure setup fig, axes = plt.subplots( nrows=3, @@ -936,3 +1121,187 @@ class GraphGenerator: plt.close() return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) + + def generate_metabolism_chart( + self, rmr_kcal: float, save_as_base64: bool = True + ) -> str: + """ + Generate metabolism chart (Slow vs Fast Metabolism). + + Args: + rmr_kcal: Resting metabolic rate in kcal/day + save_as_base64: If True, return base64 string, else return file path + + Returns: + Base64 string or file path + """ + from matplotlib.patches import FancyBboxPatch + + fig, ax = plt.subplots(figsize=(10, 2.5)) + + # Chart data and positions + categories = ["Very Slow", "Slow", "Average", "Fast", "Very Fast"] + positions = [1500, 3000, 4500, 6000, 7500] + indicator_pos = rmr_kcal + highlight_end = rmr_kcal + + # Main Bar (Background) + main_bar = FancyBboxPatch( + (0, 0.4), + 9000, + 0.2, + boxstyle="round,pad=0,rounding_size=0.1", + ec="none", + fc="#E0E0E0", + ) + ax.add_patch(main_bar) + + # Highlighted Bar + highlight_bar = FancyBboxPatch( + (0, 0.4), + highlight_end, + 0.2, + boxstyle="round,pad=0,rounding_size=0.1", + ec="none", + fc="#B2FFC8", + ) + ax.add_patch(highlight_bar) + + # Text and Labels + ax.text( + highlight_end / 2, + 0.5, + f"{rmr_kcal:.0f}kCals", + ha="center", + va="center", + color="#006400", + fontsize=14, + weight="bold", + ) + + # Indicator Triangle + ax.plot(indicator_pos, 0.65, "v", markersize=15, color="#606060", clip_on=False) + + # Ticks and Labels + for pos, label in zip(positions, categories): + ax.text( + pos, 0.15, label, ha="center", va="center", fontsize=12, color="#333333" + ) + ax.plot([pos, pos], [0.35, 0.39], color="grey", lw=5) + + # Chart Styling + ax.set_title("Slow vs Fast Metabolism", fontsize=18, weight="bold", loc="left") + ax.set_xlim(0, 9000) + ax.set_ylim(0, 1) + ax.axis("off") + + plt.tight_layout() + + chart_path = self.charts_dir / "metabolism_chart.png" + plt.savefig(chart_path, bbox_inches="tight", dpi=300) + plt.close() + + return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) + + def generate_fuel_source_chart( + self, fat_percentage: float, save_as_base64: bool = True + ) -> str: + """ + Generate fuel source chart (Fats vs Carbs). + + Args: + fat_percentage: Fat percentage at rest + save_as_base64: If True, return base64 string, else return file path + + Returns: + Base64 string or file path + """ + from matplotlib.patches import FancyBboxPatch + + fig, ax = plt.subplots(figsize=(10, 2.5)) + + carb_percentage = 100 - fat_percentage + optimal_point = 75 + + # Main Bars (Fats and Carbs) + # Fats bar (yellow) + fats_bar = FancyBboxPatch( + (0, 0.4), + fat_percentage, + 0.2, + boxstyle="round,pad=0,rounding_size=0.1", + ec="none", + fc="#FEEAAB", + ) + ax.add_patch(fats_bar) + + # Carbs bar (blue) - starts where the fats bar ends + carbs_bar = FancyBboxPatch( + (fat_percentage, 0.4), + carb_percentage, + 0.2, + boxstyle="round,pad=0,rounding_size=0.1", + ec="none", + fc="#A7F5FF", + ) + ax.add_patch(carbs_bar) + + # Text and Labels + ax.text( + fat_percentage / 2, + 0.5, + f"Fats\n{fat_percentage:.1f}%", + ha="center", + va="center", + color="#333333", + fontsize=12, + weight="bold", + ) + ax.text( + fat_percentage + carb_percentage / 2, + 0.5, + f"Carbs\n{carb_percentage:.1f}%", + ha="center", + va="center", + color="#333333", + fontsize=12, + weight="bold", + ) + + # Add 'Optimal' label + ax.text(optimal_point, 0.75, "Optimal", ha="center", va="center", fontsize=12) + + # Indicator Triangle + ax.plot( + fat_percentage, 0.65, "v", markersize=15, color="#606060", clip_on=False + ) + + # Ticks and Labels + positions = [0, 25, 50, 75, 100] + for pos in positions: + ax.text( + pos, + 0.15, + str(pos), + ha="center", + va="center", + fontsize=12, + color="#333333", + ) + ax.plot([pos, pos], [0.35, 0.39], color="grey", lw=5) + + # Add a special tick for the 'Optimal' point + ax.plot([optimal_point, optimal_point], [0.6, 0.7], color="black", lw=2) + + # Chart Styling + ax.set_title("Fuel Source", fontsize=18, weight="bold", loc="left") + ax.set_ylim(0, 1) + ax.axis("off") + + plt.tight_layout() + + chart_path = self.charts_dir / "fuel_source_chart.png" + plt.savefig(chart_path, bbox_inches="tight", dpi=300) + plt.close() + + return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) diff --git a/app/services/report_generator.py b/app/services/report_generator.py index fc8d4a6..56f37c5 100644 --- a/app/services/report_generator.py +++ b/app/services/report_generator.py @@ -151,7 +151,7 @@ class ReportGeneratorService: } def generate_html( - self, patient_info: Dict[str, Any], context_list: List[Dict[str, Any]] + self, patient_info: Dict[str, Any], contexts: Dict[str, Dict[str, Any]] ) -> str: """ Generate HTML content for the report. @@ -159,7 +159,7 @@ class ReportGeneratorService: Args: patient_info: Dictionary containing patient information (patient_name, age, height, weight, focus) - context_list: List of context dictionaries for each page + contexts: Dictionary with keys 'page_1', 'page_2', etc., each containing context data Returns: Complete HTML document as string @@ -175,6 +175,9 @@ class ReportGeneratorService: "focus": patient_info.get("focus", "Endurance"), } + # Get total number of pages + num_pages = len(contexts) + # Footer context footer_context = [ { @@ -183,7 +186,7 @@ class ReportGeneratorService: "social": "@ishplabs", "page_number": i + 1, } - for i in range(len(context_list)) + for i in range(num_pages) ] # Render header @@ -195,11 +198,13 @@ class ReportGeneratorService: for context in footer_context ] - # Render pages - for i, context in enumerate(context_list): - template = self.env.get_template(f"page_{i + 1}.html").render(context) + # Render pages - iterate through pages in order + for i in range(1, num_pages + 1): + page_key = f"page_{i}" + context = contexts.get(page_key, {}) + template = self.env.get_template(f"page_{i}.html").render(context) - if (i + 1) > 2: + if i > 2: full_html = f"""
@@ -209,7 +214,7 @@ class ReportGeneratorService: {template}
- {footer_html_list[i]} + {footer_html_list[i - 1]}
""" @@ -284,10 +289,10 @@ class ReportGeneratorService: self, spirometry_pdf_path: str, pnoe_csv_path: str, - seca_excel_path: str, patient_info: Dict[str, Any], output_filename: str = None, metric_overrides: Optional[Dict[str, Any]] = None, + oxygenation_csv_path: Optional[str] = None, ) -> Dict[str, Any]: """ Generate complete medical report from uploaded files. @@ -325,69 +330,165 @@ class ReportGeneratorService: graphs_generated = self.generate_graphs(df) # Create graph dictionary with base64 encoded images + import base64 + graphs_dict = {} for graph in graphs_generated: # Read the graph file and convert to base64 graph_path = Path(graph["path"]) if graph_path.exists(): - import base64 - with open(graph_path, "rb") as f: graphs_dict[graph["name"]] = base64.b64encode(f.read()).decode( "utf-8" ) # Also generate body composition charts - # Extract patient data for these charts - patient_name = patient_info.get("patient_name", "").split()[-1] # Get last name + # Use patient info directly (no SECA file needed) + fat_pct = patient_info.get("fat_percentage", 0) + age = patient_info.get("age", 25) + gender = patient_info.get("gender", "female").lower() - # Load SECA data to get body composition info - seca_df = pd.read_excel(seca_excel_path) - patient_data = seca_df[ - seca_df["LastName"].str.contains(patient_name, case=False, na=False) - ] + # Convert weight to kg if needed + weight_str = str(patient_info.get("weight", "0")) + # Extract numeric value and unit + weight_str_clean = ( + weight_str.replace("lbs", "").replace("kg", "").replace(" ", "").strip() + ) + try: + weight_value = float(weight_str_clean) + except ValueError: + print(f"Warning: Could not parse weight '{weight_str}', using default 0") + weight_value = 0.0 - if not patient_data.empty: - row = patient_data.iloc[0] - weight_kg = float(row.get("Weight", 0)) - fat_pct = float(row.get("Adult_FMP", 0)) - age = int(row.get("Age", patient_info.get("age", 25))) - gender = row.get("Gender", "female").lower() + # Convert to kg if weight is in lbs + if "lbs" in weight_str.lower(): + weight_kg = weight_value / 2.20462 # Convert lbs to kg + else: + weight_kg = weight_value # Already in kg or assume kg if no unit specified - fat_mass_lbs = weight_kg * fat_pct / 100 * 2.20462 - lean_mass_lbs = weight_kg * (1 - fat_pct / 100) * 2.20462 + # Calculate fat and lean mass in pounds + fat_mass_lbs = weight_kg * fat_pct / 100 * 2.20462 + lean_mass_lbs = weight_kg * (1 - fat_pct / 100) * 2.20462 - # Generate body composition chart - body_comp_b64 = self.graph_generator.generate_body_composition_chart( - fat_mass_lbs, lean_mass_lbs, save_as_base64=True + # Generate body composition chart (save as file first, then convert to base64) + try: + body_comp_path = self.graph_generator.generate_body_composition_chart( + fat_mass_lbs, lean_mass_lbs, save_as_base64=False ) - graphs_dict["body_composition"] = body_comp_b64 - - # Generate body fat percent chart - body_fat_b64 = self.graph_generator.generate_body_fat_percent_chart( - fat_pct, age, gender, save_as_base64=True + graphs_generated.append( + {"name": "body_composition", "path": str(body_comp_path)} ) - graphs_dict["body_fat_percent"] = body_fat_b64 + # Convert to base64 for graphs_dict + with open(body_comp_path, "rb") as f: + graphs_dict["body_composition"] = base64.b64encode(f.read()).decode( + "utf-8" + ) + except Exception as e: + print(f"Warning: Could not generate body composition chart: {e}") + graphs_dict["body_composition"] = "" + + # Generate body fat percent chart (save as file first, then convert to base64) + try: + body_fat_path = self.graph_generator.generate_body_fat_percent_chart( + fat_pct, age, gender, save_as_base64=False + ) + graphs_generated.append( + {"name": "body_fat_percent", "path": str(body_fat_path)} + ) + # Convert to base64 for graphs_dict + with open(body_fat_path, "rb") as f: + graphs_dict["body_fat_percent"] = base64.b64encode(f.read()).decode( + "utf-8" + ) + except Exception as e: + print(f"Warning: Could not generate body fat percent chart: {e}") + graphs_dict["body_fat_percent"] = "" # Generate spirometry chart print("Step 4: Generating spirometry chart...") try: spirometry_df = pd.read_csv(spirometry_csv_path) print(f"Spirometry data loaded: {len(spirometry_df)} rows") + print(f"Spirometry columns: {spirometry_df.columns.tolist()}") + if "Parameters" in spirometry_df.columns: + print(f"Available parameters: {spirometry_df['Parameters'].tolist()}") spirometry_chart_b64 = self.graph_generator.generate_spirometry_chart( spirometry_df, save_as_base64=True ) graphs_dict["spirometry_chart"] = spirometry_chart_b64 + print("Spirometry chart generated successfully") except Exception as e: + import traceback + error_details = traceback.format_exc() print(f"Warning: Could not generate spirometry chart: {e}") + print(f"Error details: {error_details}") graphs_dict["spirometry_chart"] = "" + # Generate TSI chart if oxygenation CSV is provided + if oxygenation_csv_path: + print("Step 4.5: Generating TSI chart...") + try: + oxygenation_df = pd.read_csv(oxygenation_csv_path) + tsi_chart_b64 = self.graph_generator.generate_tsi_chart( + oxygenation_df, save_as_base64=True + ) + graphs_dict["tsi_chart"] = tsi_chart_b64 + except Exception as e: + print(f"Warning: Could not generate TSI chart: {e}") + graphs_dict["tsi_chart"] = "" + + # Generate metabolism and fuel source charts for page 5 + print("Step 4.6: Generating metabolism and fuel source charts...") + try: + # Calculate RMR and fuel source from pnoe data + from services.context_generator import ContextGenerator + + temp_context_gen = ContextGenerator() + temp_context_gen.load_data(pnoe_csv_path, str(spirometry_csv_path), None) + temp_context_gen.patient_info = { + "name": patient_info.get("first_name", ""), + "last_name": patient_info.get("last_name", ""), + "age": patient_info.get("age", 25), + "weight": weight_kg, + "fat_percentage": fat_pct, + "gender": gender, + } + rmr_metrics = temp_context_gen.calculate_rmr_and_fuel_source() + + # Generate metabolism chart + metabolism_chart_b64 = self.graph_generator.generate_metabolism_chart( + rmr_metrics["rmr_kcal"], save_as_base64=True + ) + graphs_dict["metabolism_chart"] = metabolism_chart_b64 + + # Generate fuel source chart + fuel_source_chart_b64 = self.graph_generator.generate_fuel_source_chart( + rmr_metrics["rest_fat_percentage"], save_as_base64=True + ) + graphs_dict["fuel_source_chart"] = fuel_source_chart_b64 + except Exception as e: + print(f"Warning: Could not generate metabolism/fuel source charts: {e}") + graphs_dict["metabolism_chart"] = "" + graphs_dict["fuel_source_chart"] = "" + # Step 5: Generate context for all pages print("Step 5: Generating page contexts...") + patient_name = patient_info.get("patient_name", "") self.context_generator.load_data( - pnoe_csv_path, str(spirometry_csv_path), seca_excel_path + pnoe_csv_path, + str(spirometry_csv_path), + None, # No SECA file ) - context_list = self.context_generator.generate_all_contexts( + # Set patient info manually + self.context_generator.patient_info = { + "name": patient_info.get("first_name", ""), + "last_name": patient_info.get("last_name", ""), + "age": patient_info.get("age", 25), + "weight": weight_kg, + "fat_percentage": fat_pct, + "gender": gender, + } + contexts = self.context_generator.generate_all_contexts( patient_name, graphs_dict, metric_overrides=metric_overrides ) @@ -396,7 +497,7 @@ class ReportGeneratorService: analysis_data["graphs_count"] = len(graphs_generated) # Step 6: Generate HTML - html_content = self.generate_html(patient_info, context_list) + html_content = self.generate_html(patient_info, contexts) # Step 7: Generate PDF if output_filename is None: diff --git a/app/templates/base.html b/app/templates/base.html index be3035b..e853860 100644 --- a/app/templates/base.html +++ b/app/templates/base.html @@ -39,3 +39,4 @@ + diff --git a/app/templates/upload.html b/app/templates/upload.html index eb07777..6f18a5a 100644 --- a/app/templates/upload.html +++ b/app/templates/upload.html @@ -84,9 +84,16 @@ class="mt-1 block w-full text-sm text-gray-500 file:mr-4 file:py-2 file:px-4 file:rounded-md file:border-0 file:text-sm file:font-semibold file:bg-indigo-50 file:text-indigo-700 hover:file:bg-indigo-100">
- - Body Fat Percentage (%) + +
+
+ + +

Upload NIRS muscle oxygen CSV file to generate TSI graph

diff --git a/notebooks/graphs.ipynb b/notebooks/graphs.ipynb index 3d3d4a2..6fdef69 100644 --- a/notebooks/graphs.ipynb +++ b/notebooks/graphs.ipynb @@ -72,7 +72,7 @@ }, { "cell_type": "code", - "execution_count": 37, + "execution_count": null, "id": "99116a35", "metadata": {}, "outputs": [], @@ -237,7 +237,7 @@ }, { "cell_type": "code", - "execution_count": 41, + "execution_count": null, "id": "470e871e", "metadata": {}, "outputs": [ @@ -290,7 +290,14 @@ " # --- Chart data and positions ---\n", " categories = ['Very Slow', 'Slow', 'Average', 'Fast', 'Very Fast']\n", " positions = [1500, 3000, 4500, 6000, 7500]\n", - " kcal_value = 1386\n", + " # Step 1: Filter resting phase (usually lowest VO2 or MET values)\n", + " rest_phase = df[df['MET'] <= 1.1] # assuming <1.1 MET means rest\n", + "\n", + " # Step 2: Compute resting metabolic rate\n", + " rmr = rest_phase['EE(kcal/day)'].mean()\n", + "\n", + " print(f\"Estimated RMR from data: {rmr:.0f} kcal/day\")\n", + " kcal_value = rmr\n", " # Position the indicator and highlight based on the kcal value\n", " # For this example, we'll place it in the 'Very Slow' section.\n", " indicator_pos = kcal_value\n", @@ -349,7 +356,13 @@ " fig, ax = plt.subplots(figsize=(10, 2.5))\n", "\n", " # --- Chart data and positions ---\n", - " fat_percentage = 33\n", + " rest_phase = df[df['RER'] == 0.9] # filter rest data\n", + " fat_rest = rest_phase['FAT(%)'].mean()\n", + " carb_rest = rest_phase['CARBS(%)'].mean()\n", + "\n", + " print(f\"Resting phase fuel mix: Fats {fat_rest:.1f}%, Carbs {carb_rest:.1f}%\")\n", + "\n", + " fat_percentage = fat_rest\n", " carb_percentage = 100 - fat_percentage\n", " optimal_point = 75\n", "\n", diff --git a/notebooks/page_context_gen.ipynb b/notebooks/page_context_gen.ipynb index fdd6db7..d89e919 100644 --- a/notebooks/page_context_gen.ipynb +++ b/notebooks/page_context_gen.ipynb @@ -18,13 +18,12 @@ }, { "cell_type": "code", - "execution_count": 19, + "execution_count": null, "id": "da5ac3c1", "metadata": {}, "outputs": [], "source": [ "pnoe_df = pd.read_csv('data/pnoe_data.csv', delimiter=';')\n", - "patients_info = pd.read_excel('data/patients_data.xlsx')\n", "spirometry_df = pd.read_csv('data/spirometry_data.csv')" ] }, @@ -254,7 +253,7 @@ }, { "cell_type": "code", - "execution_count": 11, + "execution_count": null, "id": "2fa8ff13", "metadata": {}, "outputs": [ @@ -270,15 +269,10 @@ } ], "source": [ - "def body_composition_chart(first_name='Keirstyn', last_name='Moran'):\n", + "def body_composition_chart(fat_percentage=22.4, weight_kg=70):\n", "\n", " \n", " #=========================== Body Composition Donut Chart ========================#\n", - " patient_data = patients_info[(patients_info['FirstName'].str.contains(first_name, case=False, na=False)) & \n", - " (patients_info['LastName'].str.contains(last_name, case=False, na=False))]\n", - "# Get the fat mass percentage for Keirstyn\n", - " fat_percentage = patient_data['Adult_FMP'].iloc[0]\n", - " weight_kg = patient_data['Weight'].iloc[0]\n", " lean_percentage = 100 - fat_percentage\n", "\n", "# Create donut chart\n",