""" FastAPI application for report generation with file uploads. This API allows users to: 1. Upload required files (Spirometry PDF, Pnoe CSV, SECA Excel) 2. Generate reports with graphs and analysis """ import shutil from pathlib import Path from typing import Dict, Optional import pandas as pd from fastapi import FastAPI, File, HTTPException, UploadFile from fastapi.responses import FileResponse from pydantic import BaseModel from graph_generator import GraphGenerator app = FastAPI( title="Medical Report Generation API", description="API for generating medical performance reports with analysis and graphs", version="1.0.0", ) # Define upload directory UPLOAD_DIR = Path("uploads") UPLOAD_DIR.mkdir(exist_ok=True) # Define output directories GRAPHS_DIR = Path("graphs") GRAPHS_DIR.mkdir(exist_ok=True) REPORTS_DIR = Path("reports") REPORTS_DIR.mkdir(exist_ok=True) # Storage for uploaded files metadata uploaded_files_store: Dict[str, Dict[str, str]] = {} class FileUploadResponse(BaseModel): message: str filename: str file_type: str file_path: str class ReportRequest(BaseModel): patient_name: str age: int height: str weight: str focus: str = "Endurance" session_id: Optional[str] = "default" class ReportResponse(BaseModel): message: str report_path: str graphs_generated: list analysis_data: dict @app.get("/") async def root(): """Root endpoint with API information""" return { "message": "Medical Report Generation API", "version": "1.0.0", "endpoints": { "upload_spirometry": "/upload/spirometry", "upload_pnoe": "/upload/pnoe", "upload_seca": "/upload/seca", "generate_report": "/generate-report", "list_uploads": "/uploads", "health": "/health", }, } @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy", "service": "report-generation-api"} @app.post("/upload/spirometry", response_model=FileUploadResponse) async def upload_spirometry_pdf( file: UploadFile = File(...), session_id: str = "default" ): """ Upload Spirometry PDF file for analysis. Args: file: Spirometry PDF file session_id: Session identifier to group files together (default: "default") Returns: FileUploadResponse with upload details """ if not file.filename.endswith(".pdf"): raise HTTPException(status_code=400, detail="Only PDF files are allowed") # Create session directory session_dir = UPLOAD_DIR / session_id session_dir.mkdir(exist_ok=True) # Save file file_path = session_dir / f"spirometry_{file.filename}" with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Store metadata if session_id not in uploaded_files_store: uploaded_files_store[session_id] = {} uploaded_files_store[session_id]["spirometry_pdf"] = str(file_path) return FileUploadResponse( message="Spirometry PDF uploaded successfully", filename=file.filename, file_type="spirometry_pdf", file_path=str(file_path), ) @app.post("/upload/pnoe", response_model=FileUploadResponse) async def upload_pnoe_csv(file: UploadFile = File(...), session_id: str = "default"): """ Upload Pnoe CSV file for metabolic analysis. Args: file: Pnoe CSV file session_id: Session identifier to group files together (default: "default") Returns: FileUploadResponse with upload details """ if not file.filename.endswith(".csv"): raise HTTPException(status_code=400, detail="Only CSV files are allowed") # Create session directory session_dir = UPLOAD_DIR / session_id session_dir.mkdir(exist_ok=True) # Save file file_path = session_dir / f"pnoe_{file.filename}" with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Store metadata if session_id not in uploaded_files_store: uploaded_files_store[session_id] = {} uploaded_files_store[session_id]["pnoe_csv"] = str(file_path) return FileUploadResponse( message="Pnoe CSV uploaded successfully", filename=file.filename, file_type="pnoe_csv", file_path=str(file_path), ) @app.post("/upload/seca", response_model=FileUploadResponse) async def upload_seca_excel(file: UploadFile = File(...), session_id: str = "default"): """ Upload SECA body composition Excel file. Args: file: SECA Excel file (.xlsx) session_id: Session identifier to group files together (default: "default") Returns: FileUploadResponse with upload details """ if not file.filename.endswith((".xlsx", ".xls")): raise HTTPException( status_code=400, detail="Only Excel files (.xlsx, .xls) are allowed" ) # Create session directory session_dir = UPLOAD_DIR / session_id session_dir.mkdir(exist_ok=True) # Save file file_path = session_dir / f"seca_{file.filename}" with open(file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Store metadata if session_id not in uploaded_files_store: uploaded_files_store[session_id] = {} uploaded_files_store[session_id]["seca_excel"] = str(file_path) return FileUploadResponse( message="SECA Excel uploaded successfully", filename=file.filename, file_type="seca_excel", file_path=str(file_path), ) @app.get("/uploads") async def list_uploads(session_id: str = "default"): """ List all uploaded files for a session. Args: session_id: Session identifier (default: "default") Returns: Dictionary of uploaded files """ if session_id not in uploaded_files_store: return {"session_id": session_id, "files": {}, "message": "No files uploaded"} return { "session_id": session_id, "files": uploaded_files_store[session_id], "files_count": len(uploaded_files_store[session_id]), } @app.post("/generate-report", response_model=ReportResponse) async def generate_report(report_request: ReportRequest): """ Generate a comprehensive medical report with graphs and analysis. Args: report_request: Report configuration including patient details Returns: ReportResponse with report path and analysis data """ session_id = report_request.session_id # Check if all required files are uploaded if session_id not in uploaded_files_store: raise HTTPException( status_code=400, detail=f"No files found for session '{session_id}'. Please upload files first.", ) files = uploaded_files_store[session_id] required_files = ["spirometry_pdf", "pnoe_csv", "seca_excel"] missing_files = [f for f in required_files if f not in files] if missing_files: raise HTTPException( status_code=400, detail=f"Missing required files: {', '.join(missing_files)}. Please upload all files first.", ) try: # Initialize graph generator graph_gen = GraphGenerator(charts_dir=str(GRAPHS_DIR)) # Load and process Pnoe data df = pd.read_csv(files["pnoe_csv"], delimiter=";") df = df.apply(pd.to_numeric, errors="ignore") # Calculate derived columns df["VO2 Pulse"] = df["VO2(ml/min)"] / df["HR(bpm)"] df["VO2 Breath"] = df["VO2(ml/min)"] / df["BF(bpm)"] df["CHO"] = df["EE(kcal/min)"] * df["CARBS(%)"] / 100 df["FAT"] = df["EE(kcal/min)"] * df["FAT(%)"] / 100 # Smooth columns window_size = 10 columns_to_smooth = [ "VO2(ml/min)", "VCO2(ml/min)", "HR(bpm)", "VT(l)", "BF(bpm)", "VE(l/min)", "VO2 Pulse", "VO2 Breath", "CHO", "FAT", ] for col in columns_to_smooth: if col in df.columns: df[f"{col}_smoothed"] = ( df[col].rolling(window=window_size, min_periods=1).mean() ) # Generate graphs graphs_generated = [] # Generate all available graphs from the graph generator try: respiratory_path = graph_gen.generate_respiratory_chart( df, save_as_base64=False ) graphs_generated.append( {"name": "respiratory", "path": str(respiratory_path)} ) except Exception as e: print(f"Warning: Could not generate respiratory chart: {e}") try: fuel_util_path = graph_gen.generate_fuel_utilization_chart( df, save_as_base64=False ) graphs_generated.append( {"name": "fuel_utilization", "path": str(fuel_util_path)} ) except Exception as e: print(f"Warning: Could not generate fuel utilization chart: {e}") try: vo2_pulse_path = graph_gen.generate_vo2_pulse_chart( df, save_as_base64=False ) graphs_generated.append({"name": "vo2_pulse", "path": str(vo2_pulse_path)}) except Exception as e: print(f"Warning: Could not generate VO2 pulse chart: {e}") try: vo2_breath_path = graph_gen.generate_vo2_breath_chart( df, save_as_base64=False ) graphs_generated.append( {"name": "vo2_breath", "path": str(vo2_breath_path)} ) except Exception as e: print(f"Warning: Could not generate VO2 breath chart: {e}") try: fat_metabolism_path = graph_gen.generate_fat_metabolism_chart( df, save_as_base64=False ) graphs_generated.append( {"name": "fat_metabolism", "path": str(fat_metabolism_path)} ) except Exception as e: print(f"Warning: Could not generate fat metabolism chart: {e}") try: recovery_path = graph_gen.generate_recovery_chart(df, save_as_base64=False) graphs_generated.append({"name": "recovery", "path": str(recovery_path)}) except Exception as e: print(f"Warning: Could not generate recovery chart: {e}") # Calculate basic analysis metrics analysis_data = { "vo2_max": float(df["VO2(ml/min)_smoothed"].max()) if "VO2(ml/min)_smoothed" in df.columns else 0, "peak_vt": float(df["VT(l)_smoothed"].max()) if "VT(l)_smoothed" in df.columns else 0, "max_hr": float(df["HR(bpm)_smoothed"].max()) if "HR(bpm)_smoothed" in df.columns else 0, "graphs_count": len(graphs_generated), } # Generate PDF report using existing main.py logic from jinja2 import Environment, FileSystemLoader from context import context_list from main import html_string_to_pdf env = Environment(loader=FileSystemLoader("report_gen")) html_pages = [] header_context = { "patient_name": report_request.patient_name, "age": report_request.age, "height": report_request.height, "weight": report_request.weight, "focus": report_request.focus, } footer_context = [ { "contact_email": "info@ishplabs.com", "website": "www.ishplabs.com", "social": "@ishplabs", "page_number": i + 1, } for i in range(len(context_list)) ] header_html = env.get_template("header.html").render(header_context) footer_html_list = [ env.get_template("footer.html").render(context) for context in footer_context ] for i, context in enumerate(context_list): template = env.get_template(f"page_{i + 1}.html").render(context) if (i + 1) > 2: full_html = f"""