""" FastAPI application for medical report generation. This API provides a single endpoint that accepts all required files and patient information, then generates a comprehensive medical report. """ import os import shutil import tempfile import uuid from pathlib import Path from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from jinja2 import Environment, FileSystemLoader from pydantic import BaseModel from services.report_generator import ReportGeneratorService from starlette.middleware.base import BaseHTTPMiddleware from starlette.middleware.sessions import SessionMiddleware from starlette.requests import Request as StarletteRequest app = FastAPI( title="Medical Report Generation API", description="API for generating medical performance reports with analysis and graphs", version="2.0.0", ) # Add session middleware app.add_middleware( SessionMiddleware, secret_key=os.getenv("SECRET_KEY", "your-secret-key-change-in-production"), ) # Add security headers middleware to allow external scripts class SecurityHeadersMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: StarletteRequest, call_next): response = await call_next(request) # Allow external scripts and styles (for Tailwind CDN) # Only add CSP for HTML responses content_type = response.headers.get("content-type", "").lower() if "text/html" in content_type: response.headers["Content-Security-Policy"] = ( "default-src 'self'; script-src 'self' 'unsafe-inline' https://cdn.tailwindcss.com https:; style-src 'self' 'unsafe-inline' https://cdn.tailwindcss.com https:; img-src 'self' data: https:;" ) return response app.add_middleware(SecurityHeadersMiddleware) # Mount static files (if static directory exists) static_dir = Path("static") if static_dir.exists(): app.mount("/static", StaticFiles(directory="static"), name="static") # Setup templates jinja_env = Environment(loader=FileSystemLoader("app/templates")) # Define output directories GRAPHS_DIR = Path("graphs") GRAPHS_DIR.mkdir(exist_ok=True) REPORTS_DIR = Path("reports") REPORTS_DIR.mkdir(exist_ok=True) TEMP_DIR = Path("temp") TEMP_DIR.mkdir(exist_ok=True) # Initialize report generator service report_service = ReportGeneratorService( template_dir="app/report_gen", graphs_dir=str(GRAPHS_DIR), reports_dir=str(REPORTS_DIR), ) class ReportResponse(BaseModel): message: str report_path: str graphs_generated: list analysis_data: dict def render_template(template_name: str, context: dict) -> HTMLResponse: """Helper function to render Jinja2 templates""" template = jinja_env.get_template(template_name) html_content = template.render(**context) return HTMLResponse(content=html_content, media_type="text/html") @app.get("/", response_class=HTMLResponse) async def root(request: Request): """Root endpoint - Upload form page""" return render_template( "upload.html", {"request": request, "session": request.session} ) @app.post("/upload") async def upload_files( request: Request, first_name: str = Form(...), last_name: str = Form(...), age: int = Form(...), height: str = Form(...), weight: str = Form(...), gender: str = Form(...), fat_percentage: float = Form(...), focus: str = Form(default="Endurance"), session_id: str = Form(default="default"), spirometry_pdf: UploadFile = File(...), pnoe_csv: UploadFile = File(...), oxygenation_csv: UploadFile = File(None), ): """Handle file upload and generate report""" # Validate file types if not spirometry_pdf.filename.endswith(".pdf"): return render_template( "upload.html", { "request": request, "session": request.session, "error": "Spirometry file must be a PDF", }, ) if not pnoe_csv.filename.endswith(".csv"): return render_template( "upload.html", { "request": request, "session": request.session, "error": "Pnoe file must be a CSV", }, ) # Validate oxygenation CSV if provided if oxygenation_csv and oxygenation_csv.filename: if not oxygenation_csv.filename.endswith(".csv"): return render_template( "upload.html", { "request": request, "session": request.session, "error": "Oxygenation file must be a CSV", }, ) # Create session-specific temp directory session_uuid = str(uuid.uuid4()) session_temp_dir = TEMP_DIR / session_uuid session_temp_dir.mkdir(exist_ok=True, parents=True) # Save uploaded files spirometry_path = session_temp_dir / f"spirometry_{spirometry_pdf.filename}" pnoe_path = session_temp_dir / f"pnoe_{pnoe_csv.filename}" oxygenation_path = None try: # Write files with open(spirometry_path, "wb") as f: shutil.copyfileobj(spirometry_pdf.file, f) with open(pnoe_path, "wb") as f: shutil.copyfileobj(pnoe_csv.file, f) # Save oxygenation CSV if provided if oxygenation_csv and oxygenation_csv.filename: oxygenation_path = ( session_temp_dir / f"oxygenation_{oxygenation_csv.filename}" ) with open(oxygenation_path, "wb") as f: shutil.copyfileobj(oxygenation_csv.file, f) # Prepare patient information patient_name = f"{first_name} {last_name}" patient_info = { "patient_name": patient_name, "first_name": first_name, "last_name": last_name, "age": age, "height": height, "weight": weight, "gender": gender, "fat_percentage": fat_percentage, "focus": focus, "session_id": session_id, } # Generate report oxygenation_csv_path = str(oxygenation_path) if oxygenation_path else None result = await report_service.generate_report( spirometry_pdf_path=str(spirometry_path), pnoe_csv_path=str(pnoe_path), patient_info=patient_info, oxygenation_csv_path=oxygenation_csv_path, ) # Store in session request.session["patient_info"] = patient_info request.session["temp_dir"] = str(session_temp_dir) request.session["report_path"] = result["report_path"] request.session["graphs_generated"] = result["graphs_generated"] request.session["analysis_data"] = result["analysis_data"] # Extract spirometry CSV path (it's saved in data_dir by the service) from pathlib import Path as PathLib from services.context_generator import ContextGenerator from services.spirometry_table_extractor import ( extract_spirometry_table_from_pdf, ) # The spirometry CSV is extracted during report generation # We need to find it or extract it again data_dir = PathLib("data") spirometry_csv_path = ( data_dir / f"spirometry_{Path(spirometry_pdf.filename).stem}.csv" ) # If it doesn't exist, extract it if not spirometry_csv_path.exists(): spirometry_csv_path = extract_spirometry_table_from_pdf( str(spirometry_path), output_dir=str(data_dir) ) spirometry_csv_path = PathLib(spirometry_csv_path) # Get calculated metrics for display and editing context_gen = ContextGenerator() context_gen.load_data( str(pnoe_path), str(spirometry_csv_path), None, # No SECA file needed anymore ) # Set patient info manually since we're not reading from SECA weight_kg = float(weight.replace("lbs", "").replace("kg", "").strip()) if "lbs" in weight.lower(): weight_kg = weight_kg / 2.20462 # Convert lbs to kg context_gen.patient_info = { "name": first_name, "last_name": last_name, "age": age, "weight": weight_kg, "fat_percentage": fat_percentage, "gender": gender, } spirometry_metrics = context_gen.calculate_spirometry_metrics() pnoe_metrics = context_gen.calculate_pnoe_metrics() # Store metrics in session request.session["metrics"] = { "spirometry": spirometry_metrics, "pnoe": pnoe_metrics, } request.session["spirometry_csv_path"] = str(spirometry_csv_path) return RedirectResponse(url="/preview", status_code=303) except Exception as e: import traceback error_details = traceback.format_exc() print(f"ERROR: {error_details}") return render_template( "upload.html", { "request": request, "session": request.session, "error": f"Error generating report: {str(e)}", }, ) finally: # Close file handles spirometry_pdf.file.close() pnoe_csv.file.close() if oxygenation_csv and oxygenation_csv.filename: oxygenation_csv.file.close() @app.get("/preview", response_class=HTMLResponse) async def preview(request: Request): """Preview generated report""" if not request.session.get("report_path"): return RedirectResponse(url="/", status_code=303) return render_template( "preview.html", {"request": request, "session": request.session} ) @app.get("/graphs/{filename}") async def serve_graph(filename: str): """Serve graph images""" graph_path = GRAPHS_DIR / filename if not graph_path.exists(): raise HTTPException(status_code=404, detail="Graph not found") return FileResponse(path=graph_path, media_type="image/png") @app.get("/edit", response_class=HTMLResponse) async def edit_form(request: Request): """Display edit metrics form""" if not request.session.get("metrics"): return RedirectResponse(url="/", status_code=303) return render_template( "edit.html", {"request": request, "session": request.session} ) @app.post("/edit") async def edit_metrics(request: Request): """Handle metric edits and regenerate report""" if not request.session.get("temp_dir") or not request.session.get("patient_info"): return RedirectResponse(url="/", status_code=303) # Get form data form_data = await request.form() # Build metric overrides metric_overrides = {"pnoe": {}, "spirometry": {}} # Pnoe overrides if form_data.get("vo2_max"): metric_overrides["pnoe"]["vo2_max"] = float(form_data["vo2_max"]) if form_data.get("vo2_max_per_kg"): metric_overrides["pnoe"]["vo2_max_per_kg"] = float(form_data["vo2_max_per_kg"]) if form_data.get("peak_vt"): metric_overrides["pnoe"]["peak_vt"] = float(form_data["peak_vt"]) if form_data.get("peak_vt_hr"): metric_overrides["pnoe"]["peak_vt_hr"] = float(form_data["peak_vt_hr"]) if form_data.get("fat_max_value"): metric_overrides["pnoe"]["fat_max_value"] = float(form_data["fat_max_value"]) if form_data.get("fat_max_hr"): metric_overrides["pnoe"]["fat_max_hr"] = float(form_data["fat_max_hr"]) # VT1 and VT2 overrides if ( form_data.get("vt1_hr") or form_data.get("vt1_speed") or form_data.get("vt1_time") ): metric_overrides["pnoe"]["vt1"] = { "HeartRate": float(form_data.get("vt1_hr", 0)), "Speed": float(form_data.get("vt1_speed", 0)), "Time": float(form_data.get("vt1_time", 0)), } if ( form_data.get("vt2_hr") or form_data.get("vt2_speed") or form_data.get("vt2_time") ): metric_overrides["pnoe"]["vt2"] = { "HeartRate": float(form_data.get("vt2_hr", 0)), "Speed": float(form_data.get("vt2_speed", 0)), "Time": float(form_data.get("vt2_time", 0)), } # Heart rate zones for i in range(1, 6): zone_key = f"zone{i}_bpm" if form_data.get(zone_key): metric_overrides["pnoe"][zone_key] = form_data[zone_key] # Spirometry overrides if form_data.get("fvc_best"): metric_overrides["spirometry"]["fvc_best"] = float(form_data["fvc_best"]) if form_data.get("fvc_pred"): metric_overrides["spirometry"]["fvc_pred"] = float(form_data["fvc_pred"]) if form_data.get("fev1_best"): metric_overrides["spirometry"]["fev1_best"] = float(form_data["fev1_best"]) if form_data.get("fev1_pred"): metric_overrides["spirometry"]["fev1_pred"] = float(form_data["fev1_pred"]) if form_data.get("fev1_fvc_pct_best"): metric_overrides["spirometry"]["fev1_fvc_pct_best"] = float( form_data["fev1_fvc_pct_best"] ) if form_data.get("fev1_fvc_pct_pred"): metric_overrides["spirometry"]["fev1_fvc_pct_pred"] = float( form_data["fev1_fvc_pct_pred"] ) try: # Get file paths from session temp_dir = Path(request.session["temp_dir"]) patient_info = request.session["patient_info"] # Find files in temp directory spirometry_path = None pnoe_path = None oxygenation_path = None for file_path in temp_dir.iterdir(): if file_path.name.startswith("spirometry_"): spirometry_path = file_path elif file_path.name.startswith("pnoe_"): pnoe_path = file_path elif file_path.name.startswith("oxygenation_"): oxygenation_path = file_path if not all([spirometry_path, pnoe_path]): raise ValueError("Could not find all required uploaded files") # Regenerate report with overrides oxygenation_csv_path = str(oxygenation_path) if oxygenation_path else None result = await report_service.generate_report( spirometry_pdf_path=str(spirometry_path), pnoe_csv_path=str(pnoe_path), patient_info=patient_info, metric_overrides=metric_overrides if (metric_overrides["pnoe"] or metric_overrides["spirometry"]) else None, oxygenation_csv_path=oxygenation_csv_path, ) # Update session with new report request.session["report_path"] = result["report_path"] request.session["graphs_generated"] = result["graphs_generated"] request.session["analysis_data"] = result["analysis_data"] # Recalculate metrics with overrides from services.context_generator import ContextGenerator context_gen = ContextGenerator() spirometry_csv_path = request.session.get("spirometry_csv_path", "") if not spirometry_csv_path or not Path(spirometry_csv_path).exists(): from pathlib import Path as PathLib from services.spirometry_table_extractor import ( extract_spirometry_table_from_pdf, ) data_dir = PathLib("data") spirometry_csv_path = extract_spirometry_table_from_pdf( str(spirometry_path), output_dir=str(data_dir) ) spirometry_csv_path = str(PathLib(spirometry_csv_path)) context_gen.load_data( str(pnoe_path), spirometry_csv_path, None, # No SECA file ) # Set patient info manually weight_str = patient_info.get("weight", "0") weight_kg = float(weight_str.replace("lbs", "").replace("kg", "").strip()) if "lbs" in weight_str.lower(): weight_kg = weight_kg / 2.20462 # Convert lbs to kg context_gen.patient_info = { "name": patient_info.get("first_name", ""), "last_name": patient_info.get("last_name", ""), "age": patient_info.get("age", 25), "weight": weight_kg, "fat_percentage": patient_info.get("fat_percentage", 0), "gender": patient_info.get("gender", "female"), } context_gen.extract_patient_info(patient_info.get("last_name", "")) spirometry_overrides = metric_overrides.get("spirometry", {}) pnoe_overrides = metric_overrides.get("pnoe", {}) spirometry_metrics = context_gen.calculate_spirometry_metrics( spirometry_overrides ) pnoe_metrics = context_gen.calculate_pnoe_metrics(pnoe_overrides) # Update metrics in session request.session["metrics"] = { "spirometry": spirometry_metrics, "pnoe": pnoe_metrics, } request.session["spirometry_csv_path"] = spirometry_csv_path return RedirectResponse(url="/preview", status_code=303) except Exception as e: import traceback error_details = traceback.format_exc() print(f"ERROR: {error_details}") return render_template( "edit.html", { "request": request, "session": request.session, "error": f"Error regenerating report: {str(e)}", }, ) @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy", "service": "report-generation-api"} @app.post("/generate-report", response_model=ReportResponse) async def generate_report( patient_name: str = Form(..., description="Patient name"), age: int = Form(..., description="Patient age"), height: str = Form(..., description="Patient height (e.g., 5'4\")"), weight: str = Form(..., description="Patient weight (e.g., 123lbs)"), focus: str = Form(default="Endurance", description="Training focus"), session_id: str = Form(default="default", description="Session ID"), spirometry_pdf: UploadFile = File(..., description="Spirometry PDF file"), pnoe_csv: UploadFile = File(..., description="Pnoe CSV file"), seca_excel: UploadFile = File(..., description="SECA Excel file"), ): """ Generate a comprehensive medical report from uploaded files. This endpoint accepts all required files and patient information, processes the data, generates graphs, and returns a PDF report. Args: spirometry_pdf: Spirometry PDF file pnoe_csv: Pnoe CSV data file seca_excel: SECA body composition Excel file patient_name: Name of the patient age: Patient age height: Patient height weight: Patient weight focus: Training focus (default: Endurance) session_id: Session identifier (default: default) Returns: ReportResponse with report path, graphs generated, and analysis data """ # Validate file types if not spirometry_pdf.filename.endswith(".pdf"): raise HTTPException(status_code=400, detail="Spirometry file must be a PDF") if not pnoe_csv.filename.endswith(".csv"): raise HTTPException(status_code=400, detail="Pnoe file must be a CSV") if not seca_excel.filename.endswith((".xlsx", ".xls")): raise HTTPException( status_code=400, detail="SECA file must be an Excel file (.xlsx or .xls)" ) # Create temporary directory for uploaded files with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) # Save uploaded files temporarily spirometry_path = temp_path / f"spirometry_{spirometry_pdf.filename}" pnoe_path = temp_path / f"pnoe_{pnoe_csv.filename}" seca_path = temp_path / f"seca_{seca_excel.filename}" try: # Write files with open(spirometry_path, "wb") as f: shutil.copyfileobj(spirometry_pdf.file, f) with open(pnoe_path, "wb") as f: shutil.copyfileobj(pnoe_csv.file, f) with open(seca_path, "wb") as f: shutil.copyfileobj(seca_excel.file, f) # Prepare patient information patient_info = { "patient_name": patient_name, "age": age, "height": height, "weight": weight, "focus": focus, "session_id": session_id, } # Generate report using the service result = await report_service.generate_report( spirometry_pdf_path=str(spirometry_path), pnoe_csv_path=str(pnoe_path), seca_excel_path=str(seca_path), patient_info=patient_info, ) return ReportResponse( message="Report generated successfully", report_path=result["report_path"], graphs_generated=result["graphs_generated"], analysis_data=result["analysis_data"], ) except Exception as e: import traceback error_details = traceback.format_exc() print(f"ERROR: {error_details}") # This will show in terminal raise HTTPException( status_code=500, detail=f"Error generating report: {str(e)}\n{error_details}", ) finally: # Close file handles spirometry_pdf.file.close() pnoe_csv.file.close() seca_excel.file.close() @app.get("/download-report/{filename}") async def download_report(filename: str): """ Download a generated report. Args: filename: Name of the report file Returns: PDF file """ file_path = REPORTS_DIR / filename if not file_path.exists(): raise HTTPException(status_code=404, detail="Report not found") return FileResponse( path=file_path, media_type="application/pdf", filename=filename, ) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)