feat: Enhance medical report generation with new features and improved data handling

- Added body fat percentage input and optional muscle oxygenation CSV upload in the upload form.
- Implemented TSI chart generation based on muscle oxygenation data.
- Updated report generation to include metabolism and fuel source charts.
- Refactored context generation to eliminate reliance on SECA data, using patient info directly instead.
- Improved error handling and logging for graph generation processes.
- Enhanced HTML templates for better user experience and functionality.
This commit is contained in:
bolade
2025-11-18 16:57:39 +01:00
parent 83f50882e2
commit 7e985c497e
12 changed files with 1256 additions and 262 deletions
Binary file not shown.

After

Width:  |  Height:  |  Size: 91 KiB

+211 -99
View File
@@ -13,10 +13,13 @@ from pathlib import Path
from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from jinja2 import Environment, FileSystemLoader
from pydantic import BaseModel
from starlette.middleware.sessions import SessionMiddleware
from services.report_generator import ReportGeneratorService
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.middleware.sessions import SessionMiddleware
from starlette.requests import Request as StarletteRequest
app = FastAPI(
title="Medical Report Generation API",
@@ -25,7 +28,32 @@ app = FastAPI(
)
# Add session middleware
app.add_middleware(SessionMiddleware, secret_key=os.getenv("SECRET_KEY", "your-secret-key-change-in-production"))
app.add_middleware(
SessionMiddleware,
secret_key=os.getenv("SECRET_KEY", "your-secret-key-change-in-production"),
)
# Add security headers middleware to allow external scripts
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: StarletteRequest, call_next):
response = await call_next(request)
# Allow external scripts and styles (for Tailwind CDN)
# Only add CSP for HTML responses
content_type = response.headers.get("content-type", "").lower()
if "text/html" in content_type:
response.headers["Content-Security-Policy"] = (
"default-src 'self'; script-src 'self' 'unsafe-inline' https://cdn.tailwindcss.com https:; style-src 'self' 'unsafe-inline' https://cdn.tailwindcss.com https:; img-src 'self' data: https:;"
)
return response
app.add_middleware(SecurityHeadersMiddleware)
# Mount static files (if static directory exists)
static_dir = Path("static")
if static_dir.exists():
app.mount("/static", StaticFiles(directory="static"), name="static")
# Setup templates
jinja_env = Environment(loader=FileSystemLoader("app/templates"))
@@ -59,13 +87,15 @@ def render_template(template_name: str, context: dict) -> HTMLResponse:
"""Helper function to render Jinja2 templates"""
template = jinja_env.get_template(template_name)
html_content = template.render(**context)
return HTMLResponse(content=html_content)
return HTMLResponse(content=html_content, media_type="text/html")
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
"""Root endpoint - Upload form page"""
return render_template("upload.html", {"request": request, "session": request.session})
return render_template(
"upload.html", {"request": request, "session": request.session}
)
@app.post("/upload")
@@ -77,56 +107,73 @@ async def upload_files(
height: str = Form(...),
weight: str = Form(...),
gender: str = Form(...),
fat_percentage: float = Form(...),
focus: str = Form(default="Endurance"),
session_id: str = Form(default="default"),
spirometry_pdf: UploadFile = File(...),
pnoe_csv: UploadFile = File(...),
seca_excel: UploadFile = File(...),
oxygenation_csv: UploadFile = File(None),
):
"""Handle file upload and generate report"""
# Validate file types
if not spirometry_pdf.filename.endswith(".pdf"):
return render_template("upload.html", {
"request": request,
"session": request.session,
"error": "Spirometry file must be a PDF"
})
return render_template(
"upload.html",
{
"request": request,
"session": request.session,
"error": "Spirometry file must be a PDF",
},
)
if not pnoe_csv.filename.endswith(".csv"):
return render_template("upload.html", {
"request": request,
"session": request.session,
"error": "Pnoe file must be a CSV"
})
if not seca_excel.filename.endswith((".xlsx", ".xls")):
return render_template("upload.html", {
"request": request,
"session": request.session,
"error": "SECA file must be an Excel file (.xlsx or .xls)"
})
return render_template(
"upload.html",
{
"request": request,
"session": request.session,
"error": "Pnoe file must be a CSV",
},
)
# Validate oxygenation CSV if provided
if oxygenation_csv and oxygenation_csv.filename:
if not oxygenation_csv.filename.endswith(".csv"):
return render_template(
"upload.html",
{
"request": request,
"session": request.session,
"error": "Oxygenation file must be a CSV",
},
)
# Create session-specific temp directory
session_uuid = str(uuid.uuid4())
session_temp_dir = TEMP_DIR / session_uuid
session_temp_dir.mkdir(exist_ok=True, parents=True)
# Save uploaded files
spirometry_path = session_temp_dir / f"spirometry_{spirometry_pdf.filename}"
pnoe_path = session_temp_dir / f"pnoe_{pnoe_csv.filename}"
seca_path = session_temp_dir / f"seca_{seca_excel.filename}"
oxygenation_path = None
try:
# Write files
with open(spirometry_path, "wb") as f:
shutil.copyfileobj(spirometry_pdf.file, f)
with open(pnoe_path, "wb") as f:
shutil.copyfileobj(pnoe_csv.file, f)
with open(seca_path, "wb") as f:
shutil.copyfileobj(seca_excel.file, f)
# Save oxygenation CSV if provided
if oxygenation_csv and oxygenation_csv.filename:
oxygenation_path = (
session_temp_dir / f"oxygenation_{oxygenation_csv.filename}"
)
with open(oxygenation_path, "wb") as f:
shutil.copyfileobj(oxygenation_csv.file, f)
# Prepare patient information
patient_name = f"{first_name} {last_name}"
patient_info = {
@@ -137,76 +184,100 @@ async def upload_files(
"height": height,
"weight": weight,
"gender": gender,
"fat_percentage": fat_percentage,
"focus": focus,
"session_id": session_id,
}
# Generate report
oxygenation_csv_path = str(oxygenation_path) if oxygenation_path else None
result = await report_service.generate_report(
spirometry_pdf_path=str(spirometry_path),
pnoe_csv_path=str(pnoe_path),
seca_excel_path=str(seca_path),
patient_info=patient_info,
oxygenation_csv_path=oxygenation_csv_path,
)
# Store in session
request.session["patient_info"] = patient_info
request.session["temp_dir"] = str(session_temp_dir)
request.session["report_path"] = result["report_path"]
request.session["graphs_generated"] = result["graphs_generated"]
request.session["analysis_data"] = result["analysis_data"]
# Extract spirometry CSV path (it's saved in data_dir by the service)
from services.spirometry_table_extractor import extract_spirometry_table_from_pdf
from services.context_generator import ContextGenerator
from pathlib import Path as PathLib
from services.context_generator import ContextGenerator
from services.spirometry_table_extractor import (
extract_spirometry_table_from_pdf,
)
# The spirometry CSV is extracted during report generation
# We need to find it or extract it again
data_dir = PathLib("data")
spirometry_csv_path = data_dir / f"spirometry_{Path(spirometry_pdf.filename).stem}.csv"
spirometry_csv_path = (
data_dir / f"spirometry_{Path(spirometry_pdf.filename).stem}.csv"
)
# If it doesn't exist, extract it
if not spirometry_csv_path.exists():
spirometry_csv_path = extract_spirometry_table_from_pdf(
str(spirometry_path), output_dir=str(data_dir)
)
spirometry_csv_path = PathLib(spirometry_csv_path)
# Get calculated metrics for display and editing
context_gen = ContextGenerator()
context_gen.load_data(
str(pnoe_path),
str(spirometry_csv_path),
str(seca_path)
None, # No SECA file needed anymore
)
context_gen.extract_patient_info(last_name) # Extract patient info
# Set patient info manually since we're not reading from SECA
weight_kg = float(weight.replace("lbs", "").replace("kg", "").strip())
if "lbs" in weight.lower():
weight_kg = weight_kg / 2.20462 # Convert lbs to kg
context_gen.patient_info = {
"name": first_name,
"last_name": last_name,
"age": age,
"weight": weight_kg,
"fat_percentage": fat_percentage,
"gender": gender,
}
spirometry_metrics = context_gen.calculate_spirometry_metrics()
pnoe_metrics = context_gen.calculate_pnoe_metrics()
# Store metrics in session
request.session["metrics"] = {
"spirometry": spirometry_metrics,
"pnoe": pnoe_metrics,
}
request.session["spirometry_csv_path"] = str(spirometry_csv_path)
return RedirectResponse(url="/preview", status_code=303)
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"ERROR: {error_details}")
return render_template("upload.html", {
"request": request,
"session": request.session,
"error": f"Error generating report: {str(e)}"
})
return render_template(
"upload.html",
{
"request": request,
"session": request.session,
"error": f"Error generating report: {str(e)}",
},
)
finally:
# Close file handles
spirometry_pdf.file.close()
pnoe_csv.file.close()
seca_excel.file.close()
if oxygenation_csv and oxygenation_csv.filename:
oxygenation_csv.file.close()
@app.get("/preview", response_class=HTMLResponse)
@@ -214,7 +285,9 @@ async def preview(request: Request):
"""Preview generated report"""
if not request.session.get("report_path"):
return RedirectResponse(url="/", status_code=303)
return render_template("preview.html", {"request": request, "session": request.session})
return render_template(
"preview.html", {"request": request, "session": request.session}
)
@app.get("/graphs/{filename}")
@@ -231,7 +304,9 @@ async def edit_form(request: Request):
"""Display edit metrics form"""
if not request.session.get("metrics"):
return RedirectResponse(url="/", status_code=303)
return render_template("edit.html", {"request": request, "session": request.session})
return render_template(
"edit.html", {"request": request, "session": request.session}
)
@app.post("/edit")
@@ -239,16 +314,13 @@ async def edit_metrics(request: Request):
"""Handle metric edits and regenerate report"""
if not request.session.get("temp_dir") or not request.session.get("patient_info"):
return RedirectResponse(url="/", status_code=303)
# Get form data
form_data = await request.form()
# Build metric overrides
metric_overrides = {
"pnoe": {},
"spirometry": {}
}
metric_overrides = {"pnoe": {}, "spirometry": {}}
# Pnoe overrides
if form_data.get("vo2_max"):
metric_overrides["pnoe"]["vo2_max"] = float(form_data["vo2_max"])
@@ -262,28 +334,36 @@ async def edit_metrics(request: Request):
metric_overrides["pnoe"]["fat_max_value"] = float(form_data["fat_max_value"])
if form_data.get("fat_max_hr"):
metric_overrides["pnoe"]["fat_max_hr"] = float(form_data["fat_max_hr"])
# VT1 and VT2 overrides
if form_data.get("vt1_hr") or form_data.get("vt1_speed") or form_data.get("vt1_time"):
if (
form_data.get("vt1_hr")
or form_data.get("vt1_speed")
or form_data.get("vt1_time")
):
metric_overrides["pnoe"]["vt1"] = {
"HeartRate": float(form_data.get("vt1_hr", 0)),
"Speed": float(form_data.get("vt1_speed", 0)),
"Time": float(form_data.get("vt1_time", 0))
"Time": float(form_data.get("vt1_time", 0)),
}
if form_data.get("vt2_hr") or form_data.get("vt2_speed") or form_data.get("vt2_time"):
if (
form_data.get("vt2_hr")
or form_data.get("vt2_speed")
or form_data.get("vt2_time")
):
metric_overrides["pnoe"]["vt2"] = {
"HeartRate": float(form_data.get("vt2_hr", 0)),
"Speed": float(form_data.get("vt2_speed", 0)),
"Time": float(form_data.get("vt2_time", 0))
"Time": float(form_data.get("vt2_time", 0)),
}
# Heart rate zones
for i in range(1, 6):
zone_key = f"zone{i}_bpm"
if form_data.get(zone_key):
metric_overrides["pnoe"][zone_key] = form_data[zone_key]
# Spirometry overrides
if form_data.get("fvc_best"):
metric_overrides["spirometry"]["fvc_best"] = float(form_data["fvc_best"])
@@ -294,88 +374,120 @@ async def edit_metrics(request: Request):
if form_data.get("fev1_pred"):
metric_overrides["spirometry"]["fev1_pred"] = float(form_data["fev1_pred"])
if form_data.get("fev1_fvc_pct_best"):
metric_overrides["spirometry"]["fev1_fvc_pct_best"] = float(form_data["fev1_fvc_pct_best"])
metric_overrides["spirometry"]["fev1_fvc_pct_best"] = float(
form_data["fev1_fvc_pct_best"]
)
if form_data.get("fev1_fvc_pct_pred"):
metric_overrides["spirometry"]["fev1_fvc_pct_pred"] = float(form_data["fev1_fvc_pct_pred"])
metric_overrides["spirometry"]["fev1_fvc_pct_pred"] = float(
form_data["fev1_fvc_pct_pred"]
)
try:
# Get file paths from session
temp_dir = Path(request.session["temp_dir"])
patient_info = request.session["patient_info"]
# Find files in temp directory
spirometry_path = None
pnoe_path = None
seca_path = None
oxygenation_path = None
for file_path in temp_dir.iterdir():
if file_path.name.startswith("spirometry_"):
spirometry_path = file_path
elif file_path.name.startswith("pnoe_"):
pnoe_path = file_path
elif file_path.name.startswith("seca_"):
seca_path = file_path
if not all([spirometry_path, pnoe_path, seca_path]):
raise ValueError("Could not find all uploaded files")
elif file_path.name.startswith("oxygenation_"):
oxygenation_path = file_path
if not all([spirometry_path, pnoe_path]):
raise ValueError("Could not find all required uploaded files")
# Regenerate report with overrides
oxygenation_csv_path = str(oxygenation_path) if oxygenation_path else None
result = await report_service.generate_report(
spirometry_pdf_path=str(spirometry_path),
pnoe_csv_path=str(pnoe_path),
seca_excel_path=str(seca_path),
patient_info=patient_info,
metric_overrides=metric_overrides if (metric_overrides["pnoe"] or metric_overrides["spirometry"]) else None,
metric_overrides=metric_overrides
if (metric_overrides["pnoe"] or metric_overrides["spirometry"])
else None,
oxygenation_csv_path=oxygenation_csv_path,
)
# Update session with new report
request.session["report_path"] = result["report_path"]
request.session["graphs_generated"] = result["graphs_generated"]
request.session["analysis_data"] = result["analysis_data"]
# Recalculate metrics with overrides
from services.context_generator import ContextGenerator
context_gen = ContextGenerator()
spirometry_csv_path = request.session.get("spirometry_csv_path", "")
if not spirometry_csv_path or not Path(spirometry_csv_path).exists():
from services.spirometry_table_extractor import extract_spirometry_table_from_pdf
from pathlib import Path as PathLib
from services.spirometry_table_extractor import (
extract_spirometry_table_from_pdf,
)
data_dir = PathLib("data")
spirometry_csv_path = extract_spirometry_table_from_pdf(
str(spirometry_path), output_dir=str(data_dir)
)
spirometry_csv_path = str(PathLib(spirometry_csv_path))
context_gen.load_data(
str(pnoe_path),
spirometry_csv_path,
str(seca_path)
None, # No SECA file
)
# Set patient info manually
weight_str = patient_info.get("weight", "0")
weight_kg = float(weight_str.replace("lbs", "").replace("kg", "").strip())
if "lbs" in weight_str.lower():
weight_kg = weight_kg / 2.20462 # Convert lbs to kg
context_gen.patient_info = {
"name": patient_info.get("first_name", ""),
"last_name": patient_info.get("last_name", ""),
"age": patient_info.get("age", 25),
"weight": weight_kg,
"fat_percentage": patient_info.get("fat_percentage", 0),
"gender": patient_info.get("gender", "female"),
}
context_gen.extract_patient_info(patient_info.get("last_name", ""))
spirometry_overrides = metric_overrides.get("spirometry", {})
pnoe_overrides = metric_overrides.get("pnoe", {})
spirometry_metrics = context_gen.calculate_spirometry_metrics(spirometry_overrides)
spirometry_metrics = context_gen.calculate_spirometry_metrics(
spirometry_overrides
)
pnoe_metrics = context_gen.calculate_pnoe_metrics(pnoe_overrides)
# Update metrics in session
request.session["metrics"] = {
"spirometry": spirometry_metrics,
"pnoe": pnoe_metrics,
}
request.session["spirometry_csv_path"] = spirometry_csv_path
return RedirectResponse(url="/preview", status_code=303)
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"ERROR: {error_details}")
return render_template("edit.html", {
"request": request,
"session": request.session,
"error": f"Error regenerating report: {str(e)}"
})
return render_template(
"edit.html",
{
"request": request,
"session": request.session,
"error": f"Error regenerating report: {str(e)}",
},
)
@app.get("/health")
+484 -87
View File
@@ -6,7 +6,7 @@ of the medical report. It performs analysis on Pnoe, Spirometry, and SECA data.
"""
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from typing import Dict, Optional, Tuple
import pandas as pd
@@ -24,12 +24,15 @@ class ContextGenerator:
self,
pnoe_path: str,
spirometry_path: str,
seca_path: str,
seca_path: Optional[str] = None,
):
"""Load all required datasets"""
self.pnoe_df = pd.read_csv(pnoe_path, delimiter=";")
self.spirometry_df = pd.read_csv(spirometry_path)
self.seca_df = pd.read_excel(seca_path)
if seca_path:
self.seca_df = pd.read_excel(seca_path)
else:
self.seca_df = None
self._preprocess_pnoe_data()
def _preprocess_pnoe_data(self):
@@ -75,7 +78,7 @@ class ContextGenerator:
)
def extract_patient_info(self, patient_name: str) -> Dict:
"""Extract patient information from SECA dataset"""
"""Extract patient information from SECA dataset or use provided patient_info"""
if self.seca_df is not None:
patient_data = self.seca_df[
self.seca_df["LastName"].str.contains(
@@ -99,49 +102,73 @@ class ContextGenerator:
"fat_mass_lbs": weight_kg * fat_pct / 100 * 2.20462,
"lean_mass_lbs": weight_kg * (1 - fat_pct / 100) * 2.20462,
}
# If patient_info is already set (from manual input), calculate fat_mass and lean_mass
elif "weight" in self.patient_info and "fat_percentage" in self.patient_info:
weight_kg = self.patient_info["weight"]
fat_pct = self.patient_info["fat_percentage"]
self.patient_info["fat_mass_lbs"] = weight_kg * fat_pct / 100 * 2.20462
self.patient_info["lean_mass_lbs"] = (
weight_kg * (1 - fat_pct / 100) * 2.20462
)
return self.patient_info
def calculate_spirometry_metrics(self, metric_overrides: Optional[Dict] = None) -> Dict:
def calculate_spirometry_metrics(
self, metric_overrides: Optional[Dict] = None
) -> Dict:
"""Calculate spirometry-related metrics"""
if metric_overrides is None:
metric_overrides = {}
metrics = {}
for param in ["FVC", "FEV1", "FEV1/FVC%"]:
param_key = param.lower().replace("/", "_").replace("%", "_pct")
if f"{param_key}_best" in metric_overrides:
metrics[f"{param_key}_best"] = float(metric_overrides[f"{param_key}_best"])
metrics[f"{param_key}_best"] = float(
metric_overrides[f"{param_key}_best"]
)
else:
row = self.spirometry_df.loc[
self.spirometry_df["Parameters"].str.strip() == param
]
if not row.empty:
metrics[f"{param_key}_best"] = row["Best"].values[0]
value = row["Best"].values[0]
if pd.notna(value):
try:
metrics[f"{param_key}_best"] = float(value)
except (ValueError, TypeError):
pass # Skip if conversion fails
if f"{param_key}_pred" in metric_overrides:
metrics[f"{param_key}_pred"] = float(metric_overrides[f"{param_key}_pred"])
metrics[f"{param_key}_pred"] = float(
metric_overrides[f"{param_key}_pred"]
)
else:
row = self.spirometry_df.loc[
self.spirometry_df["Parameters"].str.strip() == param
]
if not row.empty:
metrics[f"{param_key}_pred"] = row["%Pred."].values[0]
value = row["%Pred."].values[0]
if pd.notna(value):
try:
metrics[f"{param_key}_pred"] = float(value)
except (ValueError, TypeError):
pass # Skip if conversion fails
return metrics
def calculate_pnoe_metrics(self, metric_overrides: Optional[Dict] = None) -> Dict:
"""Calculate all Pnoe-derived metrics"""
if metric_overrides is None:
metric_overrides = {}
metrics = {}
# VO2 Max metrics
if "vo2_max" in metric_overrides:
metrics["vo2_max"] = float(metric_overrides["vo2_max"])
else:
metrics["vo2_max"] = self.pnoe_df["VO2(ml/min)_smoothed"].max()
if "vo2_max_per_kg" in metric_overrides:
metrics["vo2_max_per_kg"] = float(metric_overrides["vo2_max_per_kg"])
else:
@@ -184,7 +211,7 @@ class ContextGenerator:
else:
vt1, _ = self._detect_thresholds()
metrics["vt1"] = vt1
if "vt2" in metric_overrides:
metrics["vt2"] = metric_overrides["vt2"]
else:
@@ -200,9 +227,11 @@ class ContextGenerator:
else:
fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax()
fat_max_row = self.pnoe_df.loc[fat_max_idx]
zones = self._calculate_hr_zones(metrics["vt1"], metrics["vt2"], fat_max_row)
zones = self._calculate_hr_zones(
metrics["vt1"], metrics["vt2"], fat_max_row
)
metrics.update(zones)
return metrics
def _detect_thresholds(self) -> Tuple[Optional[Dict], Optional[Dict]]:
@@ -261,95 +290,463 @@ class ContextGenerator:
zones["zone5_bpm"] = f"{int(max_hr * 0.95)}+bpm"
return zones
def _calculate_vo2_drop_points(self, pnoe_metrics: Dict) -> Dict:
"""Calculate VO2 Pulse and VO2 Breath drop points"""
# Calculate slope of VO2 Pulse
vo2_pulse_slope = self.pnoe_df["VO2 Pulse_smoothed"].diff()
window = max(1, len(self.pnoe_df) // 3) # Ensure window is at least 1
vo2_pulse_slope_smoothed = vo2_pulse_slope.rolling(window=window, min_periods=1).mean()
# Find where VO2 Pulse begins to drop (slope becomes negative)
mask_pulse = vo2_pulse_slope_smoothed <= 0
drop_indices_pulse = mask_pulse[mask_pulse].index
vo2_pulse_drop_bpm = None
vo2_pulse_drop_zone = None
if len(drop_indices_pulse) > 0:
drop_idx = drop_indices_pulse[0]
drop_row = self.pnoe_df.loc[drop_idx]
vo2_pulse_drop_bpm = int(drop_row["HR(bpm)_smoothed"])
# Determine zone based on HR zones
if pnoe_metrics.get("zone1_bpm") and vo2_pulse_drop_bpm:
zones = [pnoe_metrics.get(f"zone{i}_bpm", "") for i in range(1, 6)]
for i, zone_str in enumerate(zones, 1):
if zone_str:
zone_clean = zone_str.replace("bpm", "").strip()
if "-" in zone_clean:
parts = zone_clean.split("-")
if len(parts) == 2:
try:
start, end = int(parts[0]), int(parts[1].replace("+", ""))
if start <= vo2_pulse_drop_bpm <= end:
vo2_pulse_drop_zone = f"Zone {i}"
break
except ValueError:
pass
elif "+" in zone_clean:
# Zone 5 format: "180+bpm"
try:
start = int(zone_clean.replace("+", ""))
if vo2_pulse_drop_bpm >= start:
vo2_pulse_drop_zone = f"Zone {i}"
break
except ValueError:
pass
# Calculate slope of VO2 Breath
vo2_breath_slope = self.pnoe_df["VO2 Breath_smoothed"].diff()
vo2_breath_slope_smoothed = vo2_breath_slope.rolling(window=window, min_periods=1).mean()
# Find where VO2 Breath begins to drop
mask_breath = vo2_breath_slope_smoothed <= 0
drop_indices_breath = mask_breath[mask_breath].index
vo2_breath_drop_bpm = None
vo2_breath_drop_zone = None
if len(drop_indices_breath) > 0:
drop_idx = drop_indices_breath[0]
drop_row = self.pnoe_df.loc[drop_idx]
vo2_breath_drop_bpm = int(drop_row["HR(bpm)_smoothed"])
# Determine zone
if pnoe_metrics.get("zone1_bpm") and vo2_breath_drop_bpm:
zones = [pnoe_metrics.get(f"zone{i}_bpm", "") for i in range(1, 6)]
for i, zone_str in enumerate(zones, 1):
if zone_str:
zone_clean = zone_str.replace("bpm", "").strip()
if "-" in zone_clean:
parts = zone_clean.split("-")
if len(parts) == 2:
try:
start, end = int(parts[0]), int(parts[1].replace("+", ""))
if start <= vo2_breath_drop_bpm <= end:
vo2_breath_drop_zone = f"Zone {i}"
break
except ValueError:
pass
elif "+" in zone_clean:
# Zone 5 format: "180+bpm"
try:
start = int(zone_clean.replace("+", ""))
if vo2_breath_drop_bpm >= start:
vo2_breath_drop_zone = f"Zone {i}"
break
except ValueError:
pass
return {
"vo2_pulse_drop_bpm": vo2_pulse_drop_bpm or 180,
"vo2_pulse_drop_zone": vo2_pulse_drop_zone or "Zone 4",
"vo2_breath_drop_bpm": vo2_breath_drop_bpm or 173,
"vo2_breath_drop_zone": vo2_breath_drop_zone or "Zone 3",
}
def _calculate_fat_metabolism_metrics(self, pnoe_metrics: Dict) -> Dict:
"""Calculate fat metabolism metrics for page 11"""
fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax()
fat_max_row = self.pnoe_df.loc[fat_max_idx]
fat_max_value = pnoe_metrics.get("fat_max_value", 0)
fat_max_hr = pnoe_metrics.get("fat_max_hr", 0)
max_hr = 220 - self.patient_info["age"]
fat_max_heart_rate_pct = (fat_max_hr / max_hr * 100) if max_hr > 0 else 0
# Find carbs and fat crossover point
crossover_idx = None
for idx in self.pnoe_df.index:
if self.pnoe_df.loc[idx, "CHO_smoothed"] > self.pnoe_df.loc[idx, "FAT_smoothed"]:
crossover_idx = idx
break
crossover_bpm = None
crossover_heart_rate_pct = None
if crossover_idx is not None:
crossover_row = self.pnoe_df.loc[crossover_idx]
crossover_bpm = int(crossover_row["HR(bpm)_smoothed"])
crossover_heart_rate_pct = (crossover_bpm / max_hr * 100) if max_hr > 0 else 0
# Get speed and incline at fat max
fat_max_speed = fat_max_row.get("Speed", 0)
fat_max_incline = fat_max_row.get("Incline", 2.0) if "Incline" in fat_max_row else 2.0
return {
"fat_max_value": f"{fat_max_value:.2f}Kcals/min",
"fat_max_heart_rate": f"{fat_max_heart_rate_pct:.0f}% of Max Heart Rate",
"fat_max_bpm": f"{int(fat_max_hr)} bpm",
"fat_max_optimal": "*Optimal 10-12Kcals/minute",
"crossover_bpm": f"{crossover_bpm or 100}bpm",
"crossover_heart_rate": f"{crossover_heart_rate_pct or 51:.0f}% of Max Heart Rate",
"fat_metabolism_note": f"{crossover_bpm or 100}bpm at a speed of {fat_max_speed:.1f}mph and incline of {fat_max_incline:.0f}%",
}
def _calculate_recovery_metrics(self) -> Dict:
"""Calculate recovery metrics for page 11"""
# Find peak exercise point (max HR)
peak_idx = self.pnoe_df["HR(bpm)_smoothed"].idxmax()
peak_hr = self.pnoe_df.loc[peak_idx, "HR(bpm)_smoothed"]
peak_time = self.pnoe_df.loc[peak_idx, "T(sec)"]
# Find recovery phase (after peak)
recovery_df = self.pnoe_df[self.pnoe_df["T(sec)"] > peak_time].copy()
if len(recovery_df) == 0:
return {
"cardiac_recovery_time": "(1 minute)",
"cardiac_recovery_percentage": "33%",
"metabolic_recovery_time": "(2 minute)",
"metabolic_recovery_percentage": "65%",
"breath_recovery_time": "(2.5 minute)",
"breath_recovery_percentage": "76%",
}
# Cardiac recovery (1 minute)
one_min_time = peak_time + 60
one_min_row = recovery_df[recovery_df["T(sec)"] <= one_min_time]
if len(one_min_row) > 0:
one_min_hr = one_min_row.iloc[-1]["HR(bpm)_smoothed"]
cardiac_recovery_pct = ((peak_hr - one_min_hr) / peak_hr * 100) if peak_hr > 0 else 0
else:
cardiac_recovery_pct = 33
# Metabolic recovery (2 minutes) - using VCO2
two_min_time = peak_time + 120
peak_vco2 = self.pnoe_df.loc[peak_idx, "VCO2(ml/min)_smoothed"]
two_min_row = recovery_df[recovery_df["T(sec)"] <= two_min_time]
if len(two_min_row) > 0:
two_min_vco2 = two_min_row.iloc[-1]["VCO2(ml/min)_smoothed"]
metabolic_recovery_pct = ((peak_vco2 - two_min_vco2) / peak_vco2 * 100) if peak_vco2 > 0 else 0
else:
metabolic_recovery_pct = 65
# Breath frequency recovery (2.5 minutes)
two_five_min_time = peak_time + 150
peak_bf = self.pnoe_df.loc[peak_idx, "BF(bpm)_smoothed"]
two_five_min_row = recovery_df[recovery_df["T(sec)"] <= two_five_min_time]
if len(two_five_min_row) > 0:
two_five_min_bf = two_five_min_row.iloc[-1]["BF(bpm)_smoothed"]
breath_recovery_pct = ((peak_bf - two_five_min_bf) / peak_bf * 100) if peak_bf > 0 else 0
else:
breath_recovery_pct = 76
return {
"cardiac_recovery_time": "(1 minute)",
"cardiac_recovery_percentage": f"{int(cardiac_recovery_pct)}%",
"metabolic_recovery_time": "(2 minute)",
"metabolic_recovery_percentage": f"{int(metabolic_recovery_pct)}%",
"breath_recovery_time": "(2.5 minute)",
"breath_recovery_percentage": f"{int(breath_recovery_pct)}%",
}
def _calculate_resting_heart_rate_metrics(self) -> Dict:
"""Calculate resting heart rate metrics for page 11"""
# Get resting HR from beginning of test
rest_phase = self.pnoe_df.head(30) # First 30 seconds
resting_hr = rest_phase["HR(bpm)_smoothed"].mean()
age = self.patient_info.get("age", 30)
gender = self.patient_info.get("gender", "female").lower()
# Determine age range
if 26 <= age <= 35:
age_range = "26-35"
elif 36 <= age <= 45:
age_range = "36-45"
elif 46 <= age <= 55:
age_range = "46-55"
else:
age_range = "26-35" # Default
# HR ranges based on gender and age (simplified)
if gender == "female":
hr_ranges = {
"poor": "82bpm +",
"below_avg": "75-81bpm",
"average": "71-74bpm",
"above_avg": "66-70bpm",
"good": "62-65bpm",
"excellent": "55-61bpm",
"athlete": "44-54bpm",
}
else: # male
hr_ranges = {
"poor": "82bpm +",
"below_avg": "75-81bpm",
"average": "71-74bpm",
"above_avg": "66-70bpm",
"good": "62-65bpm",
"excellent": "55-61bpm",
"athlete": "44-54bpm",
}
return {
"resting_heart_rate": f"{int(resting_hr)}bpm",
"hr_age_range": age_range,
"hr_poor": hr_ranges["poor"],
"hr_below_avg": hr_ranges["below_avg"],
"hr_average": hr_ranges["average"],
"hr_above_avg": hr_ranges["above_avg"],
"hr_good": hr_ranges["good"],
"hr_excellent": hr_ranges["excellent"],
"hr_athlete": hr_ranges["athlete"],
}
def calculate_rmr_and_fuel_source(self) -> Dict:
"""Calculate RMR and fuel source from pnoe data"""
metrics = {}
# Calculate RMR from resting phase (MET <= 1.1)
if "MET" in self.pnoe_df.columns and "EE(kcal/day)" in self.pnoe_df.columns:
rest_phase = self.pnoe_df[self.pnoe_df["MET"] <= 1.1]
if not rest_phase.empty:
rmr = rest_phase["EE(kcal/day)"].mean()
metrics["rmr_kcal"] = float(rmr)
else:
# Fallback: use minimum EE(kcal/min) * 1440 (minutes per day)
if "EE(kcal/min)" in self.pnoe_df.columns:
min_ee = self.pnoe_df["EE(kcal/min)"].min()
metrics["rmr_kcal"] = float(min_ee * 1440)
else:
metrics["rmr_kcal"] = 1500.0 # Default fallback
else:
# Fallback: estimate from weight (simplified)
weight_kg = self.patient_info.get("weight", 70)
gender = self.patient_info.get("gender", "female").lower()
# Simplified RMR estimation: 22 kcal/kg/day for men, 20 for women
if gender == "male":
rmr = weight_kg * 22
else:
rmr = weight_kg * 20
metrics["rmr_kcal"] = float(rmr)
# Calculate fuel source from resting phase (RER == 0.9 or closest)
if "RER" in self.pnoe_df.columns and "FAT(%)" in self.pnoe_df.columns:
# Find rest phase with RER closest to 0.9
rest_phase = (
self.pnoe_df[self.pnoe_df["MET"] <= 1.1].copy()
if "MET" in self.pnoe_df.columns
else self.pnoe_df.copy()
)
if not rest_phase.empty:
# Find row with RER closest to 0.9
if "RER" in rest_phase.columns:
rest_phase["RER_diff"] = abs(rest_phase["RER"] - 0.9)
closest_idx = rest_phase["RER_diff"].idxmin()
fat_pct = rest_phase.loc[closest_idx, "FAT(%)"]
metrics["rest_fat_percentage"] = float(fat_pct)
else:
# Use mean FAT(%) from rest phase
metrics["rest_fat_percentage"] = float(rest_phase["FAT(%)"].mean())
else:
# Fallback: use overall mean
metrics["rest_fat_percentage"] = float(self.pnoe_df["FAT(%)"].mean())
else:
# Fallback: use a default value
metrics["rest_fat_percentage"] = 75.0
# Calculate caloric values for page 5
rmr = metrics["rmr_kcal"]
neat = rmr * 0.25 # NEAT is typically 20-30% of RMR
weight_loss_rate = 1.0 # 1 lb per week
weight_loss_calories = 500.0 # 500 kcal deficit per day for 1 lb/week
total_calories = rmr + neat - weight_loss_calories
metrics["resting_calories"] = int(rmr)
metrics["neat_calories"] = int(neat)
metrics["weight_loss_calories"] = int(weight_loss_calories)
metrics["weight_loss_rate"] = weight_loss_rate
metrics["total_calories"] = int(total_calories)
return metrics
def generate_all_contexts(
self, patient_name: str, graphs: Dict[str, str], metric_overrides: Optional[Dict] = None
) -> List[Dict]:
"""Main method to generate all page contexts"""
self,
patient_name: str,
graphs: Dict[str, str],
metric_overrides: Optional[Dict] = None,
) -> Dict[str, Dict]:
"""Main method to generate all page contexts
Returns:
Dictionary with keys 'page_1', 'page_2', etc., each containing context data for that page
"""
if metric_overrides is None:
metric_overrides = {}
self.extract_patient_info(patient_name)
# Extract metric overrides for spirometry and pnoe
spirometry_overrides = metric_overrides.get("spirometry", {})
pnoe_overrides = metric_overrides.get("pnoe", {})
spirometry_metrics = self.calculate_spirometry_metrics(spirometry_overrides)
pnoe_metrics = self.calculate_pnoe_metrics(pnoe_overrides)
rmr_metrics = self.calculate_rmr_and_fuel_source()
contexts = []
contexts.append(
{
"name": self.patient_info["name"],
"surname": self.patient_info["last_name"],
"date": datetime.now().strftime("%B %d, %Y"),
}
)
contexts.append(
{
contexts = {}
# Page 1
contexts["page_1"] = {
"name": self.patient_info["name"],
"surname": self.patient_info["last_name"],
"date": datetime.now().strftime("%B %d, %Y"),
}
# Page 2
contexts["page_2"] = {
"patient_name": self.patient_info["name"],
"test_date": datetime.now().strftime("%B %d, %Y"),
}
# Pages 3, 6 (pages 4 and 5 are handled separately)
for i in [0, 3]: # Skip indices 1 and 2 which are pages 4 and 5
contexts[f"page_{i + 3}"] = {
"patient_name": self.patient_info["name"],
"test_date": datetime.now().strftime("%B %d, %Y"),
"page_number": i + 3,
}
)
for i in range(4):
contexts.append(
{"patient_name": self.patient_info["name"], "page_number": i + 3}
)
# Page 4 - Nutrition Guidelines with Body Composition
contexts["page_4"] = {
"patient_name": self.patient_info["name"],
"page_number": 4,
"fat_percentage": f"{self.patient_info['fat_percentage']:.1f}",
"body_composition_chart": graphs.get("body_composition", ""),
"body_fat_chart": graphs.get("body_fat_percent", ""), # Alias for template
"body_fat_percent_chart": graphs.get(
"body_fat_percent", ""
), # Keep for consistency
}
# Page 5 - Resting Metabolic Rate Assessment
contexts["page_5"] = {
"patient_name": self.patient_info["name"],
"page_number": 5,
"metabolism_chart": graphs.get("metabolism_chart", ""),
"fuel_source_chart": graphs.get("fuel_source_chart", ""),
"resting_calories": rmr_metrics.get("resting_calories", 1500),
"neat_calories": rmr_metrics.get("neat_calories", 375),
"weight_loss_calories": rmr_metrics.get("weight_loss_calories", 500),
"weight_loss_rate": rmr_metrics.get("weight_loss_rate", 1.0),
"total_calories": rmr_metrics.get("total_calories", 1375),
}
# Calculate FEV1 percentage for page 7
fev1_percentage = 0
if spirometry_metrics.get("fvc_best"):
fev1_percentage = (
pnoe_metrics["peak_vt"] / spirometry_metrics["fvc_best"]
) * 100
contexts.append(
{
"peak_vt": f"{pnoe_metrics['peak_vt']:.2f}",
"peak_vt_bpm": f"{int(pnoe_metrics['peak_vt_hr'])}",
"fev1_percentage": f"{fev1_percentage:.1f}",
"lung_analysis_chart": graphs.get("spirometry_chart", ""),
"respiratory_analysis_chart": graphs.get("respiratory", ""),
}
)
contexts.append(
{
"vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}",
"age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}",
"zone1_bpm": pnoe_metrics.get("zone1_bpm", ""),
"zone2_bpm": pnoe_metrics.get("zone2_bpm", ""),
"zone3_bpm": pnoe_metrics.get("zone3_bpm", ""),
"zone4_bpm": pnoe_metrics.get("zone4_bpm", ""),
"zone5_bpm": pnoe_metrics.get("zone5_bpm", ""),
"vo2_pulse_chart": graphs.get("vo2_pulse", ""),
}
)
contexts.append(
{
"fat_max_value": f"{pnoe_metrics['fat_max_value']:.2f}",
"fat_max_hr": f"{int(pnoe_metrics['fat_max_hr'])}",
"fuel_utilization_chart": graphs.get("fuel_utilization", ""),
"fat_metabolism_chart": graphs.get("fat_metabolism", ""),
}
)
contexts.append(
{
"fat_percentage": f"{self.patient_info['fat_percentage']:.1f}",
"fat_mass_lbs": f"{self.patient_info['fat_mass_lbs']:.1f}",
"lean_mass_lbs": f"{self.patient_info['lean_mass_lbs']:.1f}",
"body_composition_chart": graphs.get("body_composition", ""),
"body_fat_percent_chart": graphs.get("body_fat_percent", ""),
}
)
# Page 7
contexts["page_7"] = {
"peak_vt": f"{pnoe_metrics['peak_vt']:.2f}",
"peak_vt_bpm": f"{int(pnoe_metrics['peak_vt_hr'])}",
"fev1_percentage": f"{fev1_percentage:.1f}",
"lung_analysis_chart": graphs.get("spirometry_chart", ""),
"respiratory_analysis_chart": graphs.get("respiratory", ""),
}
for i in range(9):
contexts.append(
{
"patient_name": self.patient_info["name"],
"page_number": i + 11,
"vo2_breath_chart": graphs.get("vo2_breath", ""),
"recovery_chart": graphs.get("recovery", ""),
}
)
# Page 8
contexts["page_8"] = {
"vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}",
"age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}",
"zone1_bpm": pnoe_metrics.get("zone1_bpm", ""),
"zone2_bpm": pnoe_metrics.get("zone2_bpm", ""),
"zone3_bpm": pnoe_metrics.get("zone3_bpm", ""),
"zone4_bpm": pnoe_metrics.get("zone4_bpm", ""),
"zone5_bpm": pnoe_metrics.get("zone5_bpm", ""),
"vo2_pulse_chart": graphs.get("vo2_pulse", ""),
}
# Page 9
contexts["page_9"] = {
"fat_max_value": f"{pnoe_metrics['fat_max_value']:.2f}",
"fat_max_hr": f"{int(pnoe_metrics['fat_max_hr'])}",
"fuel_utilization_chart": graphs.get("fuel_utilization", ""),
"fat_metabolism_chart": graphs.get("fat_metabolism", ""),
}
# Page 10 - VO2 Pulse and VO2 Breath
vo2_drop_metrics = self._calculate_vo2_drop_points(pnoe_metrics)
contexts["page_10"] = {
"vo2_pulse_chart": graphs.get("vo2_pulse", ""),
"vo2_breath_chart": graphs.get("vo2_breath", ""),
"vo2_pulse_drop_bpm": f"{vo2_drop_metrics['vo2_pulse_drop_bpm']} bpm",
"vo2_pulse_drop_zone": vo2_drop_metrics["vo2_pulse_drop_zone"],
"vo2_breath_drop_bpm": f"{vo2_drop_metrics['vo2_breath_drop_bpm']} bpm",
"vo2_breath_drop_zone": vo2_drop_metrics["vo2_breath_drop_zone"],
}
# Page 11 - Fat Metabolism and Recovery
fat_metabolism_metrics = self._calculate_fat_metabolism_metrics(pnoe_metrics)
recovery_metrics = self._calculate_recovery_metrics()
resting_hr_metrics = self._calculate_resting_heart_rate_metrics()
contexts["page_11"] = {
"fat_metabolism_chart": graphs.get("fat_metabolism", ""),
"recovery_chart": graphs.get("recovery", ""),
**fat_metabolism_metrics,
**recovery_metrics,
**resting_hr_metrics,
}
# Pages 12-17
for i in range(6):
contexts[f"page_{i + 12}"] = {
"patient_name": self.patient_info["name"],
"page_number": i + 12,
}
# Page 18 - Glossary with Body Fat Percentage Chart
contexts["page_18"] = {
"patient_name": self.patient_info["name"],
"page_number": 18,
"body_fat_percentage_chart": graphs.get("body_fat_percent", ""),
}
# Page 19
contexts["page_19"] = {
"patient_name": self.patient_info["name"],
"page_number": 19,
}
return contexts
+392 -23
View File
@@ -584,6 +584,105 @@ class GraphGenerator:
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_tsi_chart(
self, oxygenation_df: pd.DataFrame, save_as_base64: bool = True
) -> str:
"""
Generate TSI (Tissue Saturation Index) chart with trend lines per stage.
Args:
oxygenation_df: DataFrame with Time, TSI, and TSI-second columns
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
from numpy.polynomial.polynomial import Polynomial
plt.figure(figsize=(12, 5.5))
# Plot TSI (Left Leg)
plt.plot(
oxygenation_df["Time"],
oxygenation_df["TSI"],
label="TSI (Left Leg)",
color="steelblue",
linewidth=2,
)
# Plot TSI2 (Right Leg)
plt.plot(
oxygenation_df["Time"],
oxygenation_df["TSI-second"],
label="TSI2 (Right Leg)",
color="orange",
linewidth=2,
)
# Define time intervals for stages (adjust these based on your test protocol)
max_time = oxygenation_df["Time"].max()
intervals = [
(0, 250),
(250, 500),
(500, 750),
(750, 1000),
(1000, 1250),
(1250, 1500),
(1500, max_time),
]
# Calculate and plot trend lines for each interval
for start_time, end_time in intervals:
# Filter data for this interval
mask_interval = (oxygenation_df["Time"] >= start_time) & (
oxygenation_df["Time"] <= end_time
)
# TSI (Left Leg) trend for this interval
mask_left = mask_interval & ~oxygenation_df["TSI"].isna()
if mask_left.sum() > 1: # Need at least 2 points for a line
x_left = oxygenation_df.loc[mask_left, "Time"]
y_left = oxygenation_df.loc[mask_left, "TSI"]
coefs_left = Polynomial.fit(x_left, y_left, 1).convert().coef
trend_left = coefs_left[0] + coefs_left[1] * x_left
plt.plot(
x_left,
trend_left,
color="black",
linestyle="--",
linewidth=2,
alpha=0.8,
)
# TSI-second (Right Leg) trend for this interval
mask_right = mask_interval & ~oxygenation_df["TSI-second"].isna()
if mask_right.sum() > 1: # Need at least 2 points for a line
x_right = oxygenation_df.loc[mask_right, "Time"]
y_right = oxygenation_df.loc[mask_right, "TSI-second"]
coefs_right = Polynomial.fit(x_right, y_right, 1).convert().coef
trend_right = coefs_right[0] + coefs_right[1] * x_right
plt.plot(
x_right,
trend_right,
color="black",
linestyle="--",
linewidth=2,
alpha=0.8,
)
plt.xlabel("Time (s)")
plt.ylabel("TSI (%)")
plt.title("TSI (Left) and TSI2 (Right) with Black Slope Lines per Stage")
plt.legend(fontsize=10, loc="upper right")
plt.grid(alpha=0.25)
plt.tight_layout()
chart_path = self.charts_dir / "tsi_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=160)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_body_composition_chart(
self, fat_mass_lbs: float, lean_mass_lbs: float, save_as_base64: bool = True
) -> str:
@@ -678,25 +777,52 @@ class GraphGenerator:
else:
age_group = "20-39" # Default
demographic = f"{age_group}\n({gender[0].upper()})"
gender_abbrev = "M" if gender.lower() == "male" else "F"
demographic = f"{age_group}\n({gender_abbrev})"
# Define segments based on gender (female example)
# Define segments based on gender and age group
if gender.lower() == "female":
segments = [
("#F8A8A8", 0, 15), # Muted Red: 0% to 15%
("#FFEECC", 15, 5), # Pale Yellow: 15% to 20%
("#D0F0C0", 20, 15), # Pale Green: 20% to 35%
("#FFEECC", 35, 5), # Pale Yellow: 35% to 40%
("#F8A8A8", 40, 10), # Muted Red: 40% to 50%
]
if age_group == "20-39":
segments = [
("#F8A8A8", 0, 15), # Bad: 0-15%
("#FFEECC", 15, 5), # Okay: 15-20%
("#D0F0C0", 20, 15), # Good: 20-35%
("#FFEECC", 35, 5), # Okay: 35-40%
("#F8A8A8", 40, 10), # Bad: 40-50%
]
else: # 40-59 and 60-79 have same ranges for females
segments = [
("#F8A8A8", 0, 20), # Bad: 0-20%
("#FFEECC", 20, 5), # Okay: 20-25%
("#D0F0C0", 25, 10), # Good: 25-35%
("#FFEECC", 35, 5), # Okay: 35-40%
("#F8A8A8", 40, 10), # Bad: 40-50%
]
else: # male
segments = [
("#F8A8A8", 0, 5), # Muted Red: 0% to 5%
("#FFEECC", 5, 5), # Pale Yellow: 5% to 10%
("#D0F0C0", 10, 10), # Pale Green: 10% to 20%
("#FFEECC", 20, 5), # Pale Yellow: 20% to 25%
("#F8A8A8", 25, 25), # Muted Red: 25% to 50%
]
if age_group == "20-39":
segments = [
("#F8A8A8", 0, 5), # Bad: 0-5%
("#FFEECC", 5, 5), # Okay: 5-10%
("#D0F0C0", 10, 10), # Good: 10-20%
("#FFEECC", 20, 5), # Okay: 20-25%
("#F8A8A8", 25, 25), # Bad: 25-50%
]
elif age_group == "40-59":
segments = [
("#F8A8A8", 0, 5), # Bad: 0-5%
("#FFEECC", 5, 5), # Okay: 5-10%
("#D0F0C0", 10, 10), # Good: 10-20%
("#FFEECC", 20, 10), # Okay: 20-30%
("#F8A8A8", 30, 20), # Bad: 30-50%
]
else: # 60-79
segments = [
("#F8A8A8", 0, 5), # Bad: 0-5%
("#FFEECC", 5, 5), # Okay: 5-10%
("#D0F0C0", 10, 15), # Good: 10-25%
("#FFEECC", 25, 5), # Okay: 25-30%
("#F8A8A8", 30, 20), # Bad: 30-50%
]
fig, ax = plt.subplots(figsize=(10, 2))
@@ -779,10 +905,40 @@ class GraphGenerator:
Returns:
Base64 string or file path
"""
# Coerce numeric columns
for col in ["Best", "LLN", "Pred.", "%Pred.", "ZScore"]:
if col in spirometry_df.columns:
spirometry_df[col] = pd.to_numeric(spirometry_df[col], errors="coerce")
# Coerce numeric columns - handle various column name formats
# Map standard column names to possible variations
column_aliases = {
"Best": ["Best", "best", "BEST"],
"LLN": ["LLN", "lln"],
"Pred.": ["Pred.", "Pred", "pred", "Predicted", "predicted"],
"%Pred.": [
"%Pred.",
"%Pred",
"%pred",
"% Pred.",
"% Pred",
"Pred %",
"Pred%",
],
"ZScore": ["ZScore", "Z-Score", "z-score", "Zscore", "zscore", "Z Score"],
}
# Find and normalize column names
column_mapping = {}
for target_col, possible_names in column_aliases.items():
for col_name in possible_names:
if col_name in spirometry_df.columns:
column_mapping[target_col] = col_name
# Convert to numeric
spirometry_df[col_name] = pd.to_numeric(
spirometry_df[col_name], errors="coerce"
)
break
# If standard columns don't exist, create aliases
for target_col, source_col in column_mapping.items():
if target_col not in spirometry_df.columns and source_col != target_col:
spirometry_df[target_col] = spirometry_df[source_col]
# Select rows of interest
rows_map = {
@@ -793,20 +949,49 @@ class GraphGenerator:
records = []
for label, param in rows_map.items():
# Try exact match first
row = spirometry_df.loc[spirometry_df["Parameters"].str.strip() == param]
if row.empty:
# Try case-insensitive match
row = spirometry_df.loc[
spirometry_df["Parameters"].str.strip().str.upper() == param.upper()
]
if row.empty:
# Try matching without % sign
if "%" in param:
param_no_pct = param.replace("%", "")
row = spirometry_df.loc[
spirometry_df["Parameters"].str.strip() == param_no_pct
]
if row.empty:
print(f"Warning: Could not find parameter '{param}' in spirometry data")
print(f"Available parameters: {spirometry_df['Parameters'].tolist()}")
continue
row = row.iloc[0]
# Get values with fallbacks for column name variations
best_val = row.get("Best", row.get("best", pd.NA))
pct_val = row.get(
"%Pred.", row.get("%Pred", row.get("Pred %", row.get("Pred%", pd.NA)))
)
z_val = row.get("ZScore", row.get("Z-Score", row.get("Zscore", pd.NA)))
records.append(
{
"label": label,
"param": param,
"best": row["Best"],
"pct": row["%Pred."],
"z": row["ZScore"],
"best": best_val,
"pct": pct_val,
"z": z_val,
}
)
# Validate we have exactly 3 records
if len(records) != 3:
raise ValueError(
f"Expected 3 spirometry parameters (FVC, FEV1, FEV1/FVC%), "
f"but found {len(records)}. Found: {[r['param'] for r in records]}"
)
# Figure setup
fig, axes = plt.subplots(
nrows=3,
@@ -936,3 +1121,187 @@ class GraphGenerator:
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_metabolism_chart(
self, rmr_kcal: float, save_as_base64: bool = True
) -> str:
"""
Generate metabolism chart (Slow vs Fast Metabolism).
Args:
rmr_kcal: Resting metabolic rate in kcal/day
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
from matplotlib.patches import FancyBboxPatch
fig, ax = plt.subplots(figsize=(10, 2.5))
# Chart data and positions
categories = ["Very Slow", "Slow", "Average", "Fast", "Very Fast"]
positions = [1500, 3000, 4500, 6000, 7500]
indicator_pos = rmr_kcal
highlight_end = rmr_kcal
# Main Bar (Background)
main_bar = FancyBboxPatch(
(0, 0.4),
9000,
0.2,
boxstyle="round,pad=0,rounding_size=0.1",
ec="none",
fc="#E0E0E0",
)
ax.add_patch(main_bar)
# Highlighted Bar
highlight_bar = FancyBboxPatch(
(0, 0.4),
highlight_end,
0.2,
boxstyle="round,pad=0,rounding_size=0.1",
ec="none",
fc="#B2FFC8",
)
ax.add_patch(highlight_bar)
# Text and Labels
ax.text(
highlight_end / 2,
0.5,
f"{rmr_kcal:.0f}kCals",
ha="center",
va="center",
color="#006400",
fontsize=14,
weight="bold",
)
# Indicator Triangle
ax.plot(indicator_pos, 0.65, "v", markersize=15, color="#606060", clip_on=False)
# Ticks and Labels
for pos, label in zip(positions, categories):
ax.text(
pos, 0.15, label, ha="center", va="center", fontsize=12, color="#333333"
)
ax.plot([pos, pos], [0.35, 0.39], color="grey", lw=5)
# Chart Styling
ax.set_title("Slow vs Fast Metabolism", fontsize=18, weight="bold", loc="left")
ax.set_xlim(0, 9000)
ax.set_ylim(0, 1)
ax.axis("off")
plt.tight_layout()
chart_path = self.charts_dir / "metabolism_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=300)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_fuel_source_chart(
self, fat_percentage: float, save_as_base64: bool = True
) -> str:
"""
Generate fuel source chart (Fats vs Carbs).
Args:
fat_percentage: Fat percentage at rest
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
from matplotlib.patches import FancyBboxPatch
fig, ax = plt.subplots(figsize=(10, 2.5))
carb_percentage = 100 - fat_percentage
optimal_point = 75
# Main Bars (Fats and Carbs)
# Fats bar (yellow)
fats_bar = FancyBboxPatch(
(0, 0.4),
fat_percentage,
0.2,
boxstyle="round,pad=0,rounding_size=0.1",
ec="none",
fc="#FEEAAB",
)
ax.add_patch(fats_bar)
# Carbs bar (blue) - starts where the fats bar ends
carbs_bar = FancyBboxPatch(
(fat_percentage, 0.4),
carb_percentage,
0.2,
boxstyle="round,pad=0,rounding_size=0.1",
ec="none",
fc="#A7F5FF",
)
ax.add_patch(carbs_bar)
# Text and Labels
ax.text(
fat_percentage / 2,
0.5,
f"Fats\n{fat_percentage:.1f}%",
ha="center",
va="center",
color="#333333",
fontsize=12,
weight="bold",
)
ax.text(
fat_percentage + carb_percentage / 2,
0.5,
f"Carbs\n{carb_percentage:.1f}%",
ha="center",
va="center",
color="#333333",
fontsize=12,
weight="bold",
)
# Add 'Optimal' label
ax.text(optimal_point, 0.75, "Optimal", ha="center", va="center", fontsize=12)
# Indicator Triangle
ax.plot(
fat_percentage, 0.65, "v", markersize=15, color="#606060", clip_on=False
)
# Ticks and Labels
positions = [0, 25, 50, 75, 100]
for pos in positions:
ax.text(
pos,
0.15,
str(pos),
ha="center",
va="center",
fontsize=12,
color="#333333",
)
ax.plot([pos, pos], [0.35, 0.39], color="grey", lw=5)
# Add a special tick for the 'Optimal' point
ax.plot([optimal_point, optimal_point], [0.6, 0.7], color="black", lw=2)
# Chart Styling
ax.set_title("Fuel Source", fontsize=18, weight="bold", loc="left")
ax.set_ylim(0, 1)
ax.axis("off")
plt.tight_layout()
chart_path = self.charts_dir / "fuel_source_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=300)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
+139 -38
View File
@@ -151,7 +151,7 @@ class ReportGeneratorService:
}
def generate_html(
self, patient_info: Dict[str, Any], context_list: List[Dict[str, Any]]
self, patient_info: Dict[str, Any], contexts: Dict[str, Dict[str, Any]]
) -> str:
"""
Generate HTML content for the report.
@@ -159,7 +159,7 @@ class ReportGeneratorService:
Args:
patient_info: Dictionary containing patient information
(patient_name, age, height, weight, focus)
context_list: List of context dictionaries for each page
contexts: Dictionary with keys 'page_1', 'page_2', etc., each containing context data
Returns:
Complete HTML document as string
@@ -175,6 +175,9 @@ class ReportGeneratorService:
"focus": patient_info.get("focus", "Endurance"),
}
# Get total number of pages
num_pages = len(contexts)
# Footer context
footer_context = [
{
@@ -183,7 +186,7 @@ class ReportGeneratorService:
"social": "@ishplabs",
"page_number": i + 1,
}
for i in range(len(context_list))
for i in range(num_pages)
]
# Render header
@@ -195,11 +198,13 @@ class ReportGeneratorService:
for context in footer_context
]
# Render pages
for i, context in enumerate(context_list):
template = self.env.get_template(f"page_{i + 1}.html").render(context)
# Render pages - iterate through pages in order
for i in range(1, num_pages + 1):
page_key = f"page_{i}"
context = contexts.get(page_key, {})
template = self.env.get_template(f"page_{i}.html").render(context)
if (i + 1) > 2:
if i > 2:
full_html = f"""
<div class="page flex flex-col justify-between">
<div>
@@ -209,7 +214,7 @@ class ReportGeneratorService:
{template}
</main>
<div class="border-t text-center text-sm text-gray-600">
{footer_html_list[i]}
{footer_html_list[i - 1]}
</div>
</div>
"""
@@ -284,10 +289,10 @@ class ReportGeneratorService:
self,
spirometry_pdf_path: str,
pnoe_csv_path: str,
seca_excel_path: str,
patient_info: Dict[str, Any],
output_filename: str = None,
metric_overrides: Optional[Dict[str, Any]] = None,
oxygenation_csv_path: Optional[str] = None,
) -> Dict[str, Any]:
"""
Generate complete medical report from uploaded files.
@@ -325,69 +330,165 @@ class ReportGeneratorService:
graphs_generated = self.generate_graphs(df)
# Create graph dictionary with base64 encoded images
import base64
graphs_dict = {}
for graph in graphs_generated:
# Read the graph file and convert to base64
graph_path = Path(graph["path"])
if graph_path.exists():
import base64
with open(graph_path, "rb") as f:
graphs_dict[graph["name"]] = base64.b64encode(f.read()).decode(
"utf-8"
)
# Also generate body composition charts
# Extract patient data for these charts
patient_name = patient_info.get("patient_name", "").split()[-1] # Get last name
# Use patient info directly (no SECA file needed)
fat_pct = patient_info.get("fat_percentage", 0)
age = patient_info.get("age", 25)
gender = patient_info.get("gender", "female").lower()
# Load SECA data to get body composition info
seca_df = pd.read_excel(seca_excel_path)
patient_data = seca_df[
seca_df["LastName"].str.contains(patient_name, case=False, na=False)
]
# Convert weight to kg if needed
weight_str = str(patient_info.get("weight", "0"))
# Extract numeric value and unit
weight_str_clean = (
weight_str.replace("lbs", "").replace("kg", "").replace(" ", "").strip()
)
try:
weight_value = float(weight_str_clean)
except ValueError:
print(f"Warning: Could not parse weight '{weight_str}', using default 0")
weight_value = 0.0
if not patient_data.empty:
row = patient_data.iloc[0]
weight_kg = float(row.get("Weight", 0))
fat_pct = float(row.get("Adult_FMP", 0))
age = int(row.get("Age", patient_info.get("age", 25)))
gender = row.get("Gender", "female").lower()
# Convert to kg if weight is in lbs
if "lbs" in weight_str.lower():
weight_kg = weight_value / 2.20462 # Convert lbs to kg
else:
weight_kg = weight_value # Already in kg or assume kg if no unit specified
fat_mass_lbs = weight_kg * fat_pct / 100 * 2.20462
lean_mass_lbs = weight_kg * (1 - fat_pct / 100) * 2.20462
# Calculate fat and lean mass in pounds
fat_mass_lbs = weight_kg * fat_pct / 100 * 2.20462
lean_mass_lbs = weight_kg * (1 - fat_pct / 100) * 2.20462
# Generate body composition chart
body_comp_b64 = self.graph_generator.generate_body_composition_chart(
fat_mass_lbs, lean_mass_lbs, save_as_base64=True
# Generate body composition chart (save as file first, then convert to base64)
try:
body_comp_path = self.graph_generator.generate_body_composition_chart(
fat_mass_lbs, lean_mass_lbs, save_as_base64=False
)
graphs_dict["body_composition"] = body_comp_b64
# Generate body fat percent chart
body_fat_b64 = self.graph_generator.generate_body_fat_percent_chart(
fat_pct, age, gender, save_as_base64=True
graphs_generated.append(
{"name": "body_composition", "path": str(body_comp_path)}
)
graphs_dict["body_fat_percent"] = body_fat_b64
# Convert to base64 for graphs_dict
with open(body_comp_path, "rb") as f:
graphs_dict["body_composition"] = base64.b64encode(f.read()).decode(
"utf-8"
)
except Exception as e:
print(f"Warning: Could not generate body composition chart: {e}")
graphs_dict["body_composition"] = ""
# Generate body fat percent chart (save as file first, then convert to base64)
try:
body_fat_path = self.graph_generator.generate_body_fat_percent_chart(
fat_pct, age, gender, save_as_base64=False
)
graphs_generated.append(
{"name": "body_fat_percent", "path": str(body_fat_path)}
)
# Convert to base64 for graphs_dict
with open(body_fat_path, "rb") as f:
graphs_dict["body_fat_percent"] = base64.b64encode(f.read()).decode(
"utf-8"
)
except Exception as e:
print(f"Warning: Could not generate body fat percent chart: {e}")
graphs_dict["body_fat_percent"] = ""
# Generate spirometry chart
print("Step 4: Generating spirometry chart...")
try:
spirometry_df = pd.read_csv(spirometry_csv_path)
print(f"Spirometry data loaded: {len(spirometry_df)} rows")
print(f"Spirometry columns: {spirometry_df.columns.tolist()}")
if "Parameters" in spirometry_df.columns:
print(f"Available parameters: {spirometry_df['Parameters'].tolist()}")
spirometry_chart_b64 = self.graph_generator.generate_spirometry_chart(
spirometry_df, save_as_base64=True
)
graphs_dict["spirometry_chart"] = spirometry_chart_b64
print("Spirometry chart generated successfully")
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"Warning: Could not generate spirometry chart: {e}")
print(f"Error details: {error_details}")
graphs_dict["spirometry_chart"] = ""
# Generate TSI chart if oxygenation CSV is provided
if oxygenation_csv_path:
print("Step 4.5: Generating TSI chart...")
try:
oxygenation_df = pd.read_csv(oxygenation_csv_path)
tsi_chart_b64 = self.graph_generator.generate_tsi_chart(
oxygenation_df, save_as_base64=True
)
graphs_dict["tsi_chart"] = tsi_chart_b64
except Exception as e:
print(f"Warning: Could not generate TSI chart: {e}")
graphs_dict["tsi_chart"] = ""
# Generate metabolism and fuel source charts for page 5
print("Step 4.6: Generating metabolism and fuel source charts...")
try:
# Calculate RMR and fuel source from pnoe data
from services.context_generator import ContextGenerator
temp_context_gen = ContextGenerator()
temp_context_gen.load_data(pnoe_csv_path, str(spirometry_csv_path), None)
temp_context_gen.patient_info = {
"name": patient_info.get("first_name", ""),
"last_name": patient_info.get("last_name", ""),
"age": patient_info.get("age", 25),
"weight": weight_kg,
"fat_percentage": fat_pct,
"gender": gender,
}
rmr_metrics = temp_context_gen.calculate_rmr_and_fuel_source()
# Generate metabolism chart
metabolism_chart_b64 = self.graph_generator.generate_metabolism_chart(
rmr_metrics["rmr_kcal"], save_as_base64=True
)
graphs_dict["metabolism_chart"] = metabolism_chart_b64
# Generate fuel source chart
fuel_source_chart_b64 = self.graph_generator.generate_fuel_source_chart(
rmr_metrics["rest_fat_percentage"], save_as_base64=True
)
graphs_dict["fuel_source_chart"] = fuel_source_chart_b64
except Exception as e:
print(f"Warning: Could not generate metabolism/fuel source charts: {e}")
graphs_dict["metabolism_chart"] = ""
graphs_dict["fuel_source_chart"] = ""
# Step 5: Generate context for all pages
print("Step 5: Generating page contexts...")
patient_name = patient_info.get("patient_name", "")
self.context_generator.load_data(
pnoe_csv_path, str(spirometry_csv_path), seca_excel_path
pnoe_csv_path,
str(spirometry_csv_path),
None, # No SECA file
)
context_list = self.context_generator.generate_all_contexts(
# Set patient info manually
self.context_generator.patient_info = {
"name": patient_info.get("first_name", ""),
"last_name": patient_info.get("last_name", ""),
"age": patient_info.get("age", 25),
"weight": weight_kg,
"fat_percentage": fat_pct,
"gender": gender,
}
contexts = self.context_generator.generate_all_contexts(
patient_name, graphs_dict, metric_overrides=metric_overrides
)
@@ -396,7 +497,7 @@ class ReportGeneratorService:
analysis_data["graphs_count"] = len(graphs_generated)
# Step 6: Generate HTML
html_content = self.generate_html(patient_info, context_list)
html_content = self.generate_html(patient_info, contexts)
# Step 7: Generate PDF
if output_filename is None:
+1
View File
@@ -39,3 +39,4 @@
</html>
+9 -2
View File
@@ -84,9 +84,16 @@
class="mt-1 block w-full text-sm text-gray-500 file:mr-4 file:py-2 file:px-4 file:rounded-md file:border-0 file:text-sm file:font-semibold file:bg-indigo-50 file:text-indigo-700 hover:file:bg-indigo-100">
</div>
<div>
<label for="seca_excel" class="block text-sm font-medium text-gray-700">SECA Excel</label>
<input type="file" name="seca_excel" id="seca_excel" accept=".xlsx,.xls" required
<label for="fat_percentage" class="block text-sm font-medium text-gray-700">Body Fat Percentage (%)</label>
<input type="number" step="0.1" name="fat_percentage" id="fat_percentage" required min="0" max="100"
class="mt-1 block w-full rounded-md border-gray-300 shadow-sm focus:border-indigo-500 focus:ring-indigo-500 sm:text-sm px-3 py-2 border"
placeholder="22.5">
</div>
<div>
<label for="oxygenation_csv" class="block text-sm font-medium text-gray-700">Muscle Oxygenation CSV (Optional)</label>
<input type="file" name="oxygenation_csv" id="oxygenation_csv" accept=".csv"
class="mt-1 block w-full text-sm text-gray-500 file:mr-4 file:py-2 file:px-4 file:rounded-md file:border-0 file:text-sm file:font-semibold file:bg-indigo-50 file:text-indigo-700 hover:file:bg-indigo-100">
<p class="mt-1 text-xs text-gray-500">Upload NIRS muscle oxygen CSV file to generate TSI graph</p>
</div>
</div>
</div>
+17 -4
View File
@@ -72,7 +72,7 @@
},
{
"cell_type": "code",
"execution_count": 37,
"execution_count": null,
"id": "99116a35",
"metadata": {},
"outputs": [],
@@ -237,7 +237,7 @@
},
{
"cell_type": "code",
"execution_count": 41,
"execution_count": null,
"id": "470e871e",
"metadata": {},
"outputs": [
@@ -290,7 +290,14 @@
" # --- Chart data and positions ---\n",
" categories = ['Very Slow', 'Slow', 'Average', 'Fast', 'Very Fast']\n",
" positions = [1500, 3000, 4500, 6000, 7500]\n",
" kcal_value = 1386\n",
" # Step 1: Filter resting phase (usually lowest VO2 or MET values)\n",
" rest_phase = df[df['MET'] <= 1.1] # assuming <1.1 MET means rest\n",
"\n",
" # Step 2: Compute resting metabolic rate\n",
" rmr = rest_phase['EE(kcal/day)'].mean()\n",
"\n",
" print(f\"Estimated RMR from data: {rmr:.0f} kcal/day\")\n",
" kcal_value = rmr\n",
" # Position the indicator and highlight based on the kcal value\n",
" # For this example, we'll place it in the 'Very Slow' section.\n",
" indicator_pos = kcal_value\n",
@@ -349,7 +356,13 @@
" fig, ax = plt.subplots(figsize=(10, 2.5))\n",
"\n",
" # --- Chart data and positions ---\n",
" fat_percentage = 33\n",
" rest_phase = df[df['RER'] == 0.9] # filter rest data\n",
" fat_rest = rest_phase['FAT(%)'].mean()\n",
" carb_rest = rest_phase['CARBS(%)'].mean()\n",
"\n",
" print(f\"Resting phase fuel mix: Fats {fat_rest:.1f}%, Carbs {carb_rest:.1f}%\")\n",
"\n",
" fat_percentage = fat_rest\n",
" carb_percentage = 100 - fat_percentage\n",
" optimal_point = 75\n",
"\n",
+3 -9
View File
@@ -18,13 +18,12 @@
},
{
"cell_type": "code",
"execution_count": 19,
"execution_count": null,
"id": "da5ac3c1",
"metadata": {},
"outputs": [],
"source": [
"pnoe_df = pd.read_csv('data/pnoe_data.csv', delimiter=';')\n",
"patients_info = pd.read_excel('data/patients_data.xlsx')\n",
"spirometry_df = pd.read_csv('data/spirometry_data.csv')"
]
},
@@ -254,7 +253,7 @@
},
{
"cell_type": "code",
"execution_count": 11,
"execution_count": null,
"id": "2fa8ff13",
"metadata": {},
"outputs": [
@@ -270,15 +269,10 @@
}
],
"source": [
"def body_composition_chart(first_name='Keirstyn', last_name='Moran'):\n",
"def body_composition_chart(fat_percentage=22.4, weight_kg=70):\n",
"\n",
" \n",
" #=========================== Body Composition Donut Chart ========================#\n",
" patient_data = patients_info[(patients_info['FirstName'].str.contains(first_name, case=False, na=False)) & \n",
" (patients_info['LastName'].str.contains(last_name, case=False, na=False))]\n",
"# Get the fat mass percentage for Keirstyn\n",
" fat_percentage = patient_data['Adult_FMP'].iloc[0]\n",
" weight_kg = patient_data['Weight'].iloc[0]\n",
" lean_percentage = 100 - fat_percentage\n",
"\n",
"# Create donut chart\n",