Files
bio-performx/app/main.py
T

636 lines
22 KiB
Python

"""
FastAPI application for medical report generation.
This API provides a single endpoint that accepts all required files
and patient information, then generates a comprehensive medical report.
"""
import os
import shutil
import tempfile
import uuid
from pathlib import Path
from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from jinja2 import Environment, FileSystemLoader
from pydantic import BaseModel
from services.report_generator import ReportGeneratorService
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.middleware.sessions import SessionMiddleware
from starlette.requests import Request as StarletteRequest
app = FastAPI(
title="Medical Report Generation API",
description="API for generating medical performance reports with analysis and graphs",
version="2.0.0",
)
# Add session middleware
app.add_middleware(
SessionMiddleware,
secret_key=os.getenv("SECRET_KEY", "your-secret-key-change-in-production"),
)
# Add security headers middleware to allow external scripts
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: StarletteRequest, call_next):
response = await call_next(request)
# Allow external scripts and styles (for Tailwind CDN)
# Only add CSP for HTML responses
content_type = response.headers.get("content-type", "").lower()
if "text/html" in content_type:
response.headers["Content-Security-Policy"] = (
"default-src 'self'; script-src 'self' 'unsafe-inline' https://cdn.tailwindcss.com https:; style-src 'self' 'unsafe-inline' https://cdn.tailwindcss.com https:; img-src 'self' data: https:;"
)
return response
app.add_middleware(SecurityHeadersMiddleware)
# Mount static files (if static directory exists)
static_dir = Path("static")
if static_dir.exists():
app.mount("/static", StaticFiles(directory="static"), name="static")
# Setup templates
jinja_env = Environment(loader=FileSystemLoader("app/templates"))
# Define output directories
GRAPHS_DIR = Path("graphs")
GRAPHS_DIR.mkdir(exist_ok=True)
REPORTS_DIR = Path("reports")
REPORTS_DIR.mkdir(exist_ok=True)
TEMP_DIR = Path("temp")
TEMP_DIR.mkdir(exist_ok=True)
# Initialize report generator service
report_service = ReportGeneratorService(
template_dir="app/report_gen",
graphs_dir=str(GRAPHS_DIR),
reports_dir=str(REPORTS_DIR),
)
class ReportResponse(BaseModel):
message: str
report_path: str
graphs_generated: list
analysis_data: dict
def render_template(template_name: str, context: dict) -> HTMLResponse:
"""Helper function to render Jinja2 templates"""
template = jinja_env.get_template(template_name)
html_content = template.render(**context)
return HTMLResponse(content=html_content, media_type="text/html")
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
"""Root endpoint - Upload form page"""
return render_template(
"upload.html", {"request": request, "session": request.session}
)
@app.post("/upload")
async def upload_files(
request: Request,
first_name: str = Form(...),
last_name: str = Form(...),
age: int = Form(...),
height: str = Form(...),
weight: str = Form(...),
gender: str = Form(...),
fat_percentage: float = Form(...),
focus: str = Form(default="Endurance"),
session_id: str = Form(default="default"),
next_testing_date: str = Form(...),
spirometry_pdf: UploadFile = File(...),
pnoe_csv: UploadFile = File(...),
oxygenation_csv: UploadFile = File(None),
):
"""Handle file upload and generate report"""
# Validate file types
if not spirometry_pdf.filename.endswith(".pdf"):
return render_template(
"upload.html",
{
"request": request,
"session": request.session,
"error": "Spirometry file must be a PDF",
},
)
if not pnoe_csv.filename.endswith(".csv"):
return render_template(
"upload.html",
{
"request": request,
"session": request.session,
"error": "Pnoe file must be a CSV",
},
)
# Validate oxygenation CSV if provided
if oxygenation_csv and oxygenation_csv.filename:
if not oxygenation_csv.filename.endswith(".csv"):
return render_template(
"upload.html",
{
"request": request,
"session": request.session,
"error": "Oxygenation file must be a CSV",
},
)
# Create session-specific temp directory
session_uuid = str(uuid.uuid4())
session_temp_dir = TEMP_DIR / session_uuid
session_temp_dir.mkdir(exist_ok=True, parents=True)
# Save uploaded files
spirometry_path = session_temp_dir / f"spirometry_{spirometry_pdf.filename}"
pnoe_path = session_temp_dir / f"pnoe_{pnoe_csv.filename}"
oxygenation_path = None
try:
# Write files
with open(spirometry_path, "wb") as f:
shutil.copyfileobj(spirometry_pdf.file, f)
with open(pnoe_path, "wb") as f:
shutil.copyfileobj(pnoe_csv.file, f)
# Save oxygenation CSV if provided
if oxygenation_csv and oxygenation_csv.filename:
oxygenation_path = (
session_temp_dir / f"oxygenation_{oxygenation_csv.filename}"
)
with open(oxygenation_path, "wb") as f:
shutil.copyfileobj(oxygenation_csv.file, f)
# Prepare patient information
patient_name = f"{first_name} {last_name}"
patient_info = {
"patient_name": patient_name,
"first_name": first_name,
"last_name": last_name,
"age": age,
"height": height,
"weight": weight,
"gender": gender,
"fat_percentage": fat_percentage,
"focus": focus,
"session_id": session_id,
"next_testing_date": next_testing_date,
}
# Generate report
oxygenation_csv_path = str(oxygenation_path) if oxygenation_path else None
result = await report_service.generate_report(
spirometry_pdf_path=str(spirometry_path),
pnoe_csv_path=str(pnoe_path),
patient_info=patient_info,
oxygenation_csv_path=oxygenation_csv_path,
)
# Store in session
request.session["patient_info"] = patient_info
request.session["temp_dir"] = str(session_temp_dir)
request.session["report_path"] = result["report_path"]
request.session["graphs_generated"] = result["graphs_generated"]
request.session["analysis_data"] = result["analysis_data"]
# Extract spirometry CSV path (it's saved in data_dir by the service)
from pathlib import Path as PathLib
from services.context_generator import ContextGenerator
from services.spirometry_table_extractor import (
extract_spirometry_table_from_pdf,
)
# The spirometry CSV is extracted during report generation
# We need to find it or extract it again
data_dir = PathLib("data")
spirometry_csv_path = (
data_dir / f"spirometry_{Path(spirometry_pdf.filename).stem}.csv"
)
# If it doesn't exist, extract it
if not spirometry_csv_path.exists():
spirometry_csv_path = extract_spirometry_table_from_pdf(
str(spirometry_path), output_dir=str(data_dir)
)
spirometry_csv_path = PathLib(spirometry_csv_path)
# Get calculated metrics for display and editing
context_gen = ContextGenerator()
context_gen.load_data(
str(pnoe_path),
str(spirometry_csv_path),
None, # No SECA file needed anymore
str(oxygenation_path) if oxygenation_path else None, # Oxygenation CSV
)
# Set patient info manually since we're not reading from SECA
weight_kg = float(weight.replace("lbs", "").replace("kg", "").strip())
if "lbs" in weight.lower():
weight_kg = weight_kg / 2.20462 # Convert lbs to kg
context_gen.patient_info = {
"name": first_name,
"last_name": last_name,
"age": age,
"weight": weight_kg,
"fat_percentage": fat_percentage,
"gender": gender,
}
spirometry_metrics = context_gen.calculate_spirometry_metrics()
pnoe_metrics = context_gen.calculate_pnoe_metrics()
# Store metrics in session
request.session["metrics"] = {
"spirometry": spirometry_metrics,
"pnoe": pnoe_metrics,
}
request.session["spirometry_csv_path"] = str(spirometry_csv_path)
return RedirectResponse(url="/preview", status_code=303)
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"ERROR: {error_details}")
return render_template(
"upload.html",
{
"request": request,
"session": request.session,
"error": f"Error generating report: {str(e)}",
},
)
finally:
# Close file handles
spirometry_pdf.file.close()
pnoe_csv.file.close()
if oxygenation_csv and oxygenation_csv.filename:
oxygenation_csv.file.close()
@app.get("/preview", response_class=HTMLResponse)
async def preview(request: Request):
"""Preview generated report"""
if not request.session.get("report_path"):
return RedirectResponse(url="/", status_code=303)
return render_template(
"preview.html", {"request": request, "session": request.session}
)
@app.get("/graphs/{filename}")
async def serve_graph(filename: str):
"""Serve graph images"""
graph_path = GRAPHS_DIR / filename
if not graph_path.exists():
raise HTTPException(status_code=404, detail="Graph not found")
return FileResponse(path=graph_path, media_type="image/png")
@app.get("/edit", response_class=HTMLResponse)
async def edit_form(request: Request):
"""Display edit metrics form"""
if not request.session.get("metrics"):
return RedirectResponse(url="/", status_code=303)
return render_template(
"edit.html", {"request": request, "session": request.session}
)
@app.post("/edit")
async def edit_metrics(request: Request):
"""Handle metric edits and regenerate report"""
if not request.session.get("temp_dir") or not request.session.get("patient_info"):
return RedirectResponse(url="/", status_code=303)
# Get form data
form_data = await request.form()
# Build metric overrides
metric_overrides = {"pnoe": {}, "spirometry": {}}
# Pnoe overrides
if form_data.get("vo2_max"):
metric_overrides["pnoe"]["vo2_max"] = float(form_data["vo2_max"])
if form_data.get("vo2_max_per_kg"):
metric_overrides["pnoe"]["vo2_max_per_kg"] = float(form_data["vo2_max_per_kg"])
if form_data.get("peak_vt"):
metric_overrides["pnoe"]["peak_vt"] = float(form_data["peak_vt"])
if form_data.get("peak_vt_hr"):
metric_overrides["pnoe"]["peak_vt_hr"] = float(form_data["peak_vt_hr"])
if form_data.get("fat_max_value"):
metric_overrides["pnoe"]["fat_max_value"] = float(form_data["fat_max_value"])
if form_data.get("fat_max_hr"):
metric_overrides["pnoe"]["fat_max_hr"] = float(form_data["fat_max_hr"])
# VT1 and VT2 overrides
if (
form_data.get("vt1_hr")
or form_data.get("vt1_speed")
or form_data.get("vt1_time")
):
metric_overrides["pnoe"]["vt1"] = {
"HeartRate": float(form_data.get("vt1_hr", 0)),
"Speed": float(form_data.get("vt1_speed", 0)),
"Time": float(form_data.get("vt1_time", 0)),
}
if (
form_data.get("vt2_hr")
or form_data.get("vt2_speed")
or form_data.get("vt2_time")
):
metric_overrides["pnoe"]["vt2"] = {
"HeartRate": float(form_data.get("vt2_hr", 0)),
"Speed": float(form_data.get("vt2_speed", 0)),
"Time": float(form_data.get("vt2_time", 0)),
}
# Heart rate zones
for i in range(1, 6):
zone_key = f"zone{i}_bpm"
if form_data.get(zone_key):
metric_overrides["pnoe"][zone_key] = form_data[zone_key]
# Spirometry overrides
if form_data.get("fvc_best"):
metric_overrides["spirometry"]["fvc_best"] = float(form_data["fvc_best"])
if form_data.get("fvc_pred"):
metric_overrides["spirometry"]["fvc_pred"] = float(form_data["fvc_pred"])
if form_data.get("fev1_best"):
metric_overrides["spirometry"]["fev1_best"] = float(form_data["fev1_best"])
if form_data.get("fev1_pred"):
metric_overrides["spirometry"]["fev1_pred"] = float(form_data["fev1_pred"])
if form_data.get("fev1_fvc_pct_best"):
metric_overrides["spirometry"]["fev1_fvc_pct_best"] = float(
form_data["fev1_fvc_pct_best"]
)
if form_data.get("fev1_fvc_pct_pred"):
metric_overrides["spirometry"]["fev1_fvc_pct_pred"] = float(
form_data["fev1_fvc_pct_pred"]
)
try:
# Get file paths from session
temp_dir = Path(request.session["temp_dir"])
patient_info = request.session["patient_info"]
# Find files in temp directory
spirometry_path = None
pnoe_path = None
oxygenation_path = None
for file_path in temp_dir.iterdir():
if file_path.name.startswith("spirometry_"):
spirometry_path = file_path
elif file_path.name.startswith("pnoe_"):
pnoe_path = file_path
elif file_path.name.startswith("oxygenation_"):
oxygenation_path = file_path
if not all([spirometry_path, pnoe_path]):
raise ValueError("Could not find all required uploaded files")
# Regenerate report with overrides
oxygenation_csv_path = str(oxygenation_path) if oxygenation_path else None
result = await report_service.generate_report(
spirometry_pdf_path=str(spirometry_path),
pnoe_csv_path=str(pnoe_path),
patient_info=patient_info,
metric_overrides=metric_overrides
if (metric_overrides["pnoe"] or metric_overrides["spirometry"])
else None,
oxygenation_csv_path=oxygenation_csv_path,
)
# Update session with new report
request.session["report_path"] = result["report_path"]
request.session["graphs_generated"] = result["graphs_generated"]
request.session["analysis_data"] = result["analysis_data"]
# Recalculate metrics with overrides
from services.context_generator import ContextGenerator
context_gen = ContextGenerator()
spirometry_csv_path = request.session.get("spirometry_csv_path", "")
if not spirometry_csv_path or not Path(spirometry_csv_path).exists():
from pathlib import Path as PathLib
from services.spirometry_table_extractor import (
extract_spirometry_table_from_pdf,
)
data_dir = PathLib("data")
spirometry_csv_path = extract_spirometry_table_from_pdf(
str(spirometry_path), output_dir=str(data_dir)
)
spirometry_csv_path = str(PathLib(spirometry_csv_path))
context_gen.load_data(
str(pnoe_path),
spirometry_csv_path,
None, # No SECA file
str(oxygenation_path) if oxygenation_path else None, # Oxygenation CSV
)
# Set patient info manually
weight_str = patient_info.get("weight", "0")
weight_kg = float(weight_str.replace("lbs", "").replace("kg", "").strip())
if "lbs" in weight_str.lower():
weight_kg = weight_kg / 2.20462 # Convert lbs to kg
context_gen.patient_info = {
"name": patient_info.get("first_name", ""),
"last_name": patient_info.get("last_name", ""),
"age": patient_info.get("age", 25),
"weight": weight_kg,
"fat_percentage": patient_info.get("fat_percentage", 0),
"gender": patient_info.get("gender", "female"),
}
context_gen.extract_patient_info(patient_info.get("last_name", ""))
spirometry_overrides = metric_overrides.get("spirometry", {})
pnoe_overrides = metric_overrides.get("pnoe", {})
spirometry_metrics = context_gen.calculate_spirometry_metrics(
spirometry_overrides
)
pnoe_metrics = context_gen.calculate_pnoe_metrics(pnoe_overrides)
# Update metrics in session
request.session["metrics"] = {
"spirometry": spirometry_metrics,
"pnoe": pnoe_metrics,
}
request.session["spirometry_csv_path"] = spirometry_csv_path
return RedirectResponse(url="/preview", status_code=303)
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"ERROR: {error_details}")
return render_template(
"edit.html",
{
"request": request,
"session": request.session,
"error": f"Error regenerating report: {str(e)}",
},
)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "report-generation-api"}
@app.post("/generate-report", response_model=ReportResponse)
async def generate_report(
patient_name: str = Form(..., description="Patient name"),
age: int = Form(..., description="Patient age"),
height: str = Form(..., description="Patient height (e.g., 5'4\")"),
weight: str = Form(..., description="Patient weight (e.g., 123lbs)"),
focus: str = Form(default="Endurance", description="Training focus"),
session_id: str = Form(default="default", description="Session ID"),
spirometry_pdf: UploadFile = File(..., description="Spirometry PDF file"),
pnoe_csv: UploadFile = File(..., description="Pnoe CSV file"),
seca_excel: UploadFile = File(..., description="SECA Excel file"),
):
"""
Generate a comprehensive medical report from uploaded files.
This endpoint accepts all required files and patient information,
processes the data, generates graphs, and returns a PDF report.
Args:
spirometry_pdf: Spirometry PDF file
pnoe_csv: Pnoe CSV data file
seca_excel: SECA body composition Excel file
patient_name: Name of the patient
age: Patient age
height: Patient height
weight: Patient weight
focus: Training focus (default: Endurance)
session_id: Session identifier (default: default)
Returns:
ReportResponse with report path, graphs generated, and analysis data
"""
# Validate file types
if not spirometry_pdf.filename.endswith(".pdf"):
raise HTTPException(status_code=400, detail="Spirometry file must be a PDF")
if not pnoe_csv.filename.endswith(".csv"):
raise HTTPException(status_code=400, detail="Pnoe file must be a CSV")
if not seca_excel.filename.endswith((".xlsx", ".xls")):
raise HTTPException(
status_code=400, detail="SECA file must be an Excel file (.xlsx or .xls)"
)
# Create temporary directory for uploaded files
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Save uploaded files temporarily
spirometry_path = temp_path / f"spirometry_{spirometry_pdf.filename}"
pnoe_path = temp_path / f"pnoe_{pnoe_csv.filename}"
seca_path = temp_path / f"seca_{seca_excel.filename}"
try:
# Write files
with open(spirometry_path, "wb") as f:
shutil.copyfileobj(spirometry_pdf.file, f)
with open(pnoe_path, "wb") as f:
shutil.copyfileobj(pnoe_csv.file, f)
with open(seca_path, "wb") as f:
shutil.copyfileobj(seca_excel.file, f)
# Prepare patient information
patient_info = {
"patient_name": patient_name,
"age": age,
"height": height,
"weight": weight,
"focus": focus,
"session_id": session_id,
}
# Generate report using the service
result = await report_service.generate_report(
spirometry_pdf_path=str(spirometry_path),
pnoe_csv_path=str(pnoe_path),
seca_excel_path=str(seca_path),
patient_info=patient_info,
)
return ReportResponse(
message="Report generated successfully",
report_path=result["report_path"],
graphs_generated=result["graphs_generated"],
analysis_data=result["analysis_data"],
)
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"ERROR: {error_details}") # This will show in terminal
raise HTTPException(
status_code=500,
detail=f"Error generating report: {str(e)}\n{error_details}",
)
finally:
# Close file handles
spirometry_pdf.file.close()
pnoe_csv.file.close()
seca_excel.file.close()
@app.get("/download-report/{filename}")
async def download_report(filename: str):
"""
Download a generated report.
Args:
filename: Name of the report file
Returns:
PDF file
"""
file_path = REPORTS_DIR / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="Report not found")
return FileResponse(
path=file_path,
media_type="application/pdf",
filename=filename,
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)