diff --git a/.gitignore b/.gitignore index bd16166..78abf35 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ .venv -data/ \ No newline at end of file +data/ + +.env \ No newline at end of file diff --git a/app/__pycache__/graph_generator.cpython-312.pyc b/app/__pycache__/graph_generator.cpython-312.pyc new file mode 100644 index 0000000..f90b866 Binary files /dev/null and b/app/__pycache__/graph_generator.cpython-312.pyc differ diff --git a/app/api.py b/app/api.py deleted file mode 100644 index c513151..0000000 --- a/app/api.py +++ /dev/null @@ -1,533 +0,0 @@ -""" -FastAPI application for report generation with file uploads. - -This API allows users to: -1. Upload required files (Spirometry PDF, Pnoe CSV, SECA Excel) -2. Generate reports with graphs and analysis -""" - -import shutil -from pathlib import Path -from typing import Dict, Optional - -import pandas as pd -from fastapi import FastAPI, File, HTTPException, UploadFile -from fastapi.responses import FileResponse -from pydantic import BaseModel - -from graph_generator import GraphGenerator - -app = FastAPI( - title="Medical Report Generation API", - description="API for generating medical performance reports with analysis and graphs", - version="1.0.0", -) - -# Define upload directory -UPLOAD_DIR = Path("uploads") -UPLOAD_DIR.mkdir(exist_ok=True) - -# Define output directories -GRAPHS_DIR = Path("graphs") -GRAPHS_DIR.mkdir(exist_ok=True) -REPORTS_DIR = Path("reports") -REPORTS_DIR.mkdir(exist_ok=True) - -# Storage for uploaded files metadata -uploaded_files_store: Dict[str, Dict[str, str]] = {} - - -class FileUploadResponse(BaseModel): - message: str - filename: str - file_type: str - file_path: str - - -class ReportRequest(BaseModel): - patient_name: str - age: int - height: str - weight: str - focus: str = "Endurance" - session_id: Optional[str] = "default" - - -class ReportResponse(BaseModel): - message: str - report_path: str - graphs_generated: list - analysis_data: dict - - -@app.get("/") -async def root(): - """Root endpoint with API information""" - return { - "message": "Medical Report Generation API", - "version": "1.0.0", - "endpoints": { - "upload_spirometry": "/upload/spirometry", - "upload_pnoe": "/upload/pnoe", - "upload_seca": "/upload/seca", - "generate_report": "/generate-report", - "list_uploads": "/uploads", - "health": "/health", - }, - } - - -@app.get("/health") -async def health_check(): - """Health check endpoint""" - return {"status": "healthy", "service": "report-generation-api"} - - -@app.post("/upload/spirometry", response_model=FileUploadResponse) -async def upload_spirometry_pdf( - file: UploadFile = File(...), session_id: str = "default" -): - """ - Upload Spirometry PDF file for analysis. - - Args: - file: Spirometry PDF file - session_id: Session identifier to group files together (default: "default") - - Returns: - FileUploadResponse with upload details - """ - if not file.filename.endswith(".pdf"): - raise HTTPException(status_code=400, detail="Only PDF files are allowed") - - # Create session directory - session_dir = UPLOAD_DIR / session_id - session_dir.mkdir(exist_ok=True) - - # Save file - file_path = session_dir / f"spirometry_{file.filename}" - with open(file_path, "wb") as buffer: - shutil.copyfileobj(file.file, buffer) - - # Store metadata - if session_id not in uploaded_files_store: - uploaded_files_store[session_id] = {} - - uploaded_files_store[session_id]["spirometry_pdf"] = str(file_path) - - return FileUploadResponse( - message="Spirometry PDF uploaded successfully", - filename=file.filename, - file_type="spirometry_pdf", - file_path=str(file_path), - ) - - -@app.post("/upload/pnoe", response_model=FileUploadResponse) -async def upload_pnoe_csv(file: UploadFile = File(...), session_id: str = "default"): - """ - Upload Pnoe CSV file for metabolic analysis. - - Args: - file: Pnoe CSV file - session_id: Session identifier to group files together (default: "default") - - Returns: - FileUploadResponse with upload details - """ - if not file.filename.endswith(".csv"): - raise HTTPException(status_code=400, detail="Only CSV files are allowed") - - # Create session directory - session_dir = UPLOAD_DIR / session_id - session_dir.mkdir(exist_ok=True) - - # Save file - file_path = session_dir / f"pnoe_{file.filename}" - with open(file_path, "wb") as buffer: - shutil.copyfileobj(file.file, buffer) - - # Store metadata - if session_id not in uploaded_files_store: - uploaded_files_store[session_id] = {} - - uploaded_files_store[session_id]["pnoe_csv"] = str(file_path) - - return FileUploadResponse( - message="Pnoe CSV uploaded successfully", - filename=file.filename, - file_type="pnoe_csv", - file_path=str(file_path), - ) - - -@app.post("/upload/seca", response_model=FileUploadResponse) -async def upload_seca_excel(file: UploadFile = File(...), session_id: str = "default"): - """ - Upload SECA body composition Excel file. - - Args: - file: SECA Excel file (.xlsx) - session_id: Session identifier to group files together (default: "default") - - Returns: - FileUploadResponse with upload details - """ - if not file.filename.endswith((".xlsx", ".xls")): - raise HTTPException( - status_code=400, detail="Only Excel files (.xlsx, .xls) are allowed" - ) - - # Create session directory - session_dir = UPLOAD_DIR / session_id - session_dir.mkdir(exist_ok=True) - - # Save file - file_path = session_dir / f"seca_{file.filename}" - with open(file_path, "wb") as buffer: - shutil.copyfileobj(file.file, buffer) - - # Store metadata - if session_id not in uploaded_files_store: - uploaded_files_store[session_id] = {} - - uploaded_files_store[session_id]["seca_excel"] = str(file_path) - - return FileUploadResponse( - message="SECA Excel uploaded successfully", - filename=file.filename, - file_type="seca_excel", - file_path=str(file_path), - ) - - -@app.get("/uploads") -async def list_uploads(session_id: str = "default"): - """ - List all uploaded files for a session. - - Args: - session_id: Session identifier (default: "default") - - Returns: - Dictionary of uploaded files - """ - if session_id not in uploaded_files_store: - return {"session_id": session_id, "files": {}, "message": "No files uploaded"} - - return { - "session_id": session_id, - "files": uploaded_files_store[session_id], - "files_count": len(uploaded_files_store[session_id]), - } - - -@app.post("/generate-report", response_model=ReportResponse) -async def generate_report(report_request: ReportRequest): - """ - Generate a comprehensive medical report with graphs and analysis. - - Args: - report_request: Report configuration including patient details - - Returns: - ReportResponse with report path and analysis data - """ - session_id = report_request.session_id - - # Check if all required files are uploaded - if session_id not in uploaded_files_store: - raise HTTPException( - status_code=400, - detail=f"No files found for session '{session_id}'. Please upload files first.", - ) - - files = uploaded_files_store[session_id] - required_files = ["spirometry_pdf", "pnoe_csv", "seca_excel"] - missing_files = [f for f in required_files if f not in files] - - if missing_files: - raise HTTPException( - status_code=400, - detail=f"Missing required files: {', '.join(missing_files)}. Please upload all files first.", - ) - - try: - # Initialize graph generator - graph_gen = GraphGenerator(charts_dir=str(GRAPHS_DIR)) - - # Load and process Pnoe data - df = pd.read_csv(files["pnoe_csv"], delimiter=";") - df = df.apply(pd.to_numeric, errors="ignore") - - # Calculate derived columns - df["VO2 Pulse"] = df["VO2(ml/min)"] / df["HR(bpm)"] - df["VO2 Breath"] = df["VO2(ml/min)"] / df["BF(bpm)"] - df["CHO"] = df["EE(kcal/min)"] * df["CARBS(%)"] / 100 - df["FAT"] = df["EE(kcal/min)"] * df["FAT(%)"] / 100 - - # Smooth columns - window_size = 10 - columns_to_smooth = [ - "VO2(ml/min)", - "VCO2(ml/min)", - "HR(bpm)", - "VT(l)", - "BF(bpm)", - "VE(l/min)", - "VO2 Pulse", - "VO2 Breath", - "CHO", - "FAT", - ] - - for col in columns_to_smooth: - if col in df.columns: - df[f"{col}_smoothed"] = ( - df[col].rolling(window=window_size, min_periods=1).mean() - ) - - # Generate graphs - graphs_generated = [] - - # Generate all available graphs from the graph generator - try: - respiratory_path = graph_gen.generate_respiratory_chart( - df, save_as_base64=False - ) - graphs_generated.append( - {"name": "respiratory", "path": str(respiratory_path)} - ) - except Exception as e: - print(f"Warning: Could not generate respiratory chart: {e}") - - try: - fuel_util_path = graph_gen.generate_fuel_utilization_chart( - df, save_as_base64=False - ) - graphs_generated.append( - {"name": "fuel_utilization", "path": str(fuel_util_path)} - ) - except Exception as e: - print(f"Warning: Could not generate fuel utilization chart: {e}") - - try: - vo2_pulse_path = graph_gen.generate_vo2_pulse_chart( - df, save_as_base64=False - ) - graphs_generated.append({"name": "vo2_pulse", "path": str(vo2_pulse_path)}) - except Exception as e: - print(f"Warning: Could not generate VO2 pulse chart: {e}") - - try: - vo2_breath_path = graph_gen.generate_vo2_breath_chart( - df, save_as_base64=False - ) - graphs_generated.append( - {"name": "vo2_breath", "path": str(vo2_breath_path)} - ) - except Exception as e: - print(f"Warning: Could not generate VO2 breath chart: {e}") - - try: - fat_metabolism_path = graph_gen.generate_fat_metabolism_chart( - df, save_as_base64=False - ) - graphs_generated.append( - {"name": "fat_metabolism", "path": str(fat_metabolism_path)} - ) - except Exception as e: - print(f"Warning: Could not generate fat metabolism chart: {e}") - - try: - recovery_path = graph_gen.generate_recovery_chart(df, save_as_base64=False) - graphs_generated.append({"name": "recovery", "path": str(recovery_path)}) - except Exception as e: - print(f"Warning: Could not generate recovery chart: {e}") - - # Calculate basic analysis metrics - analysis_data = { - "vo2_max": float(df["VO2(ml/min)_smoothed"].max()) - if "VO2(ml/min)_smoothed" in df.columns - else 0, - "peak_vt": float(df["VT(l)_smoothed"].max()) - if "VT(l)_smoothed" in df.columns - else 0, - "max_hr": float(df["HR(bpm)_smoothed"].max()) - if "HR(bpm)_smoothed" in df.columns - else 0, - "graphs_count": len(graphs_generated), - } - - # Generate PDF report using existing main.py logic - from jinja2 import Environment, FileSystemLoader - - from context import context_list - from main import html_string_to_pdf - - env = Environment(loader=FileSystemLoader("report_gen")) - html_pages = [] - - header_context = { - "patient_name": report_request.patient_name, - "age": report_request.age, - "height": report_request.height, - "weight": report_request.weight, - "focus": report_request.focus, - } - - footer_context = [ - { - "contact_email": "info@ishplabs.com", - "website": "www.ishplabs.com", - "social": "@ishplabs", - "page_number": i + 1, - } - for i in range(len(context_list)) - ] - - header_html = env.get_template("header.html").render(header_context) - footer_html_list = [ - env.get_template("footer.html").render(context) - for context in footer_context - ] - - for i, context in enumerate(context_list): - template = env.get_template(f"page_{i + 1}.html").render(context) - - if (i + 1) > 2: - full_html = f""" -
-
- {header_html} -
-
- {template} -
-
- {footer_html_list[i]} -
-
- """ - html_pages.append(full_html) - else: - html_pages.append(template) - - # Combine with page breaks - final_html = "
".join(html_pages) - - # Wrap in full HTML document - html_doc = f""" - - - - - - - - - {final_html} - - - """ - - # Generate PDF - report_filename = ( - f"report_{report_request.patient_name.replace(' ', '_')}_{session_id}.pdf" - ) - report_path = REPORTS_DIR / report_filename - html_string_to_pdf(html_doc, str(report_path)) - - return ReportResponse( - message="Report generated successfully", - report_path=str(report_path), - graphs_generated=graphs_generated, - analysis_data=analysis_data, - ) - - except Exception as e: - raise HTTPException( - status_code=500, detail=f"Error generating report: {str(e)}" - ) - - -@app.get("/download-report/{filename}") -async def download_report(filename: str): - """ - Download a generated report. - - Args: - filename: Name of the report file - - Returns: - PDF file - """ - file_path = REPORTS_DIR / filename - - if not file_path.exists(): - raise HTTPException(status_code=404, detail="Report not found") - - return FileResponse( - path=file_path, - media_type="application/pdf", - filename=filename, - ) - - -@app.delete("/uploads/{session_id}") -async def delete_session_uploads(session_id: str): - """ - Delete all uploaded files for a session. - - Args: - session_id: Session identifier - - Returns: - Success message - """ - if session_id not in uploaded_files_store: - raise HTTPException(status_code=404, detail="Session not found") - - # Delete files - session_dir = UPLOAD_DIR / session_id - if session_dir.exists(): - shutil.rmtree(session_dir) - - # Remove from store - del uploaded_files_store[session_id] - - return {"message": f"Session '{session_id}' deleted successfully"} - - -if __name__ == "__main__": - import uvicorn - - uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..ae2fcb0 --- /dev/null +++ b/app/main.py @@ -0,0 +1,193 @@ +""" +FastAPI application for medical report generation. + +This API provides a single endpoint that accepts all required files +and patient information, then generates a comprehensive medical report. +""" + +import shutil +import tempfile +from pathlib import Path + +from fastapi import FastAPI, File, Form, HTTPException, UploadFile +from fastapi.responses import FileResponse +from pydantic import BaseModel + +from services.report_generator import ReportGeneratorService + +app = FastAPI( + title="Medical Report Generation API", + description="API for generating medical performance reports with analysis and graphs", + version="2.0.0", +) + +# Define output directories +GRAPHS_DIR = Path("graphs") +GRAPHS_DIR.mkdir(exist_ok=True) + +REPORTS_DIR = Path("reports") +REPORTS_DIR.mkdir(exist_ok=True) + +# Initialize report generator service +report_service = ReportGeneratorService( + template_dir="app/report_gen", + graphs_dir=str(GRAPHS_DIR), + reports_dir=str(REPORTS_DIR), +) + + +class ReportResponse(BaseModel): + message: str + report_path: str + graphs_generated: list + analysis_data: dict + + +@app.get("/") +async def root(): + """Root endpoint with API information""" + return { + "message": "Medical Report Generation API", + "version": "2.0.0", + "endpoints": { + "generate_report": "POST /generate-report", + "download_report": "GET /download-report/{filename}", + "health": "GET /health", + }, + } + + +@app.get("/health") +async def health_check(): + """Health check endpoint""" + return {"status": "healthy", "service": "report-generation-api"} + + +@app.post("/generate-report", response_model=ReportResponse) +async def generate_report( + patient_name: str = Form(..., description="Patient name"), + age: int = Form(..., description="Patient age"), + height: str = Form(..., description="Patient height (e.g., 5'4\")"), + weight: str = Form(..., description="Patient weight (e.g., 123lbs)"), + focus: str = Form(default="Endurance", description="Training focus"), + session_id: str = Form(default="default", description="Session ID"), + spirometry_pdf: UploadFile = File(..., description="Spirometry PDF file"), + pnoe_csv: UploadFile = File(..., description="Pnoe CSV file"), + seca_excel: UploadFile = File(..., description="SECA Excel file"), +): + """ + Generate a comprehensive medical report from uploaded files. + + This endpoint accepts all required files and patient information, + processes the data, generates graphs, and returns a PDF report. + + Args: + spirometry_pdf: Spirometry PDF file + pnoe_csv: Pnoe CSV data file + seca_excel: SECA body composition Excel file + patient_name: Name of the patient + age: Patient age + height: Patient height + weight: Patient weight + focus: Training focus (default: Endurance) + session_id: Session identifier (default: default) + + Returns: + ReportResponse with report path, graphs generated, and analysis data + """ + # Validate file types + if not spirometry_pdf.filename.endswith(".pdf"): + raise HTTPException(status_code=400, detail="Spirometry file must be a PDF") + + if not pnoe_csv.filename.endswith(".csv"): + raise HTTPException(status_code=400, detail="Pnoe file must be a CSV") + + if not seca_excel.filename.endswith((".xlsx", ".xls")): + raise HTTPException( + status_code=400, detail="SECA file must be an Excel file (.xlsx or .xls)" + ) + + # Create temporary directory for uploaded files + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + # Save uploaded files temporarily + spirometry_path = temp_path / f"spirometry_{spirometry_pdf.filename}" + pnoe_path = temp_path / f"pnoe_{pnoe_csv.filename}" + seca_path = temp_path / f"seca_{seca_excel.filename}" + + try: + # Write files + with open(spirometry_path, "wb") as f: + shutil.copyfileobj(spirometry_pdf.file, f) + + with open(pnoe_path, "wb") as f: + shutil.copyfileobj(pnoe_csv.file, f) + + with open(seca_path, "wb") as f: + shutil.copyfileobj(seca_excel.file, f) + + # Prepare patient information + patient_info = { + "patient_name": patient_name, + "age": age, + "height": height, + "weight": weight, + "focus": focus, + "session_id": session_id, + } + + # Generate report using the service + result = report_service.generate_report( + spirometry_pdf_path=str(spirometry_path), + pnoe_csv_path=str(pnoe_path), + seca_excel_path=str(seca_path), + patient_info=patient_info, + ) + + return ReportResponse( + message="Report generated successfully", + report_path=result["report_path"], + graphs_generated=result["graphs_generated"], + analysis_data=result["analysis_data"], + ) + + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Error generating report: {str(e)}", + ) + finally: + # Close file handles + spirometry_pdf.file.close() + pnoe_csv.file.close() + seca_excel.file.close() + + +@app.get("/download-report/{filename}") +async def download_report(filename: str): + """ + Download a generated report. + + Args: + filename: Name of the report file + + Returns: + PDF file + """ + file_path = REPORTS_DIR / filename + + if not file_path.exists(): + raise HTTPException(status_code=404, detail="Report not found") + + return FileResponse( + path=file_path, + media_type="application/pdf", + filename=filename, + ) + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/app/graph_generator.py b/app/services/graph_generator.py similarity index 100% rename from app/graph_generator.py rename to app/services/graph_generator.py diff --git a/app/services/main.py b/app/services/main.py deleted file mode 100644 index 5d152cb..0000000 --- a/app/services/main.py +++ /dev/null @@ -1,124 +0,0 @@ -from jinja2 import Environment, FileSystemLoader -from playwright.sync_api import sync_playwright - -from context import context_list - -env = Environment(loader=FileSystemLoader("report_gen")) - -html_pages = [] - -header_context = { - "patient_name": "Keirstyn Moran", - "age": 34, - "height": "5'4\"", - "weight": "123lbs", - "focus": "Endurance", -} - -footer_context = [ - { - "contact_email": "info@ishplabs.com ", - "website": "www.ishplabs.com", - "social": "@ishplabs", - "page_number": i + 1, - } - for i in range(len(context_list)) -] - - -header_html = env.get_template("header.html").render(header_context) -footer_html_list = [ - env.get_template("footer.html").render(context) for context in footer_context -] - -for i, context in enumerate(context_list): - template = env.get_template(f"page_{i + 1}.html").render(context) - - if (i + 1) > 2: - full_html = f""" -
-
- {header_html} -
-
- {template} -
-
- {footer_html_list[i]} -
-
- """ - html_pages.append(full_html) - else: - html_pages.append(template) - -# Combine with page breaks -final_html = "
".join(html_pages) -# Wrap in full HTML document -html_doc = f""" - - - - - - - - - {final_html} - - -""" - - -# Generate PDF - - -def html_string_to_pdf(html_content, pdf_path): - with sync_playwright() as p: - browser = p.chromium.launch() - page = browser.new_page() - - # Set the HTML directly - page.set_content(html_content) - - # Export to PDF - page.pdf(path=pdf_path, format="A4", print_background=True) - - browser.close() - - -html_string_to_pdf(html_doc, "multi_page_report.pdf") -# pdfkit.from_string(html_doc, "truth_report.pdf", options=options) - -print("✅ PDF generated: multi_page_report.pdf") diff --git a/app/services/report_generator.py b/app/services/report_generator.py new file mode 100644 index 0000000..a1f6ced --- /dev/null +++ b/app/services/report_generator.py @@ -0,0 +1,318 @@ +""" +Report Generator Service + +This service handles the generation of medical reports from uploaded files. +It processes data, generates graphs, and creates PDF reports. +""" + +from pathlib import Path +from typing import Any, Dict, List + +import pandas as pd +from jinja2 import Environment, FileSystemLoader +from playwright.sync_api import sync_playwright + +from app.services.context import context_list +from app.services.graph_generator import GraphGenerator + + +class ReportGeneratorService: + """Service for generating medical performance reports""" + + def __init__( + self, + template_dir: str = "app/report_gen", + graphs_dir: str = "graphs", + reports_dir: str = "reports", + ): + """ + Initialize the report generator service. + + Args: + template_dir: Directory containing Jinja2 templates + graphs_dir: Directory to save generated graphs + reports_dir: Directory to save generated reports + """ + self.template_dir = template_dir + self.graphs_dir = Path(graphs_dir) + self.reports_dir = Path(reports_dir) + self.graph_generator = GraphGenerator(charts_dir=str(graphs_dir)) + self.env = Environment(loader=FileSystemLoader(template_dir)) + + # Ensure directories exist + self.graphs_dir.mkdir(exist_ok=True) + self.reports_dir.mkdir(exist_ok=True) + + def process_pnoe_data(self, pnoe_csv_path: str) -> pd.DataFrame: + """ + Load and process Pnoe CSV data. + + Args: + pnoe_csv_path: Path to Pnoe CSV file + + Returns: + Processed DataFrame with smoothed columns + """ + # Load data + df = pd.read_csv(pnoe_csv_path, delimiter=";") + df = df.apply(pd.to_numeric, errors="ignore") + + # Calculate derived columns + df["VO2 Pulse"] = df["VO2(ml/min)"] / df["HR(bpm)"] + df["VO2 Breath"] = df["VO2(ml/min)"] / df["BF(bpm)"] + df["CHO"] = df["EE(kcal/min)"] * df["CARBS(%)"] / 100 + df["FAT"] = df["EE(kcal/min)"] * df["FAT(%)"] / 100 + + # Smooth columns + window_size = 10 + columns_to_smooth = [ + "VO2(ml/min)", + "VCO2(ml/min)", + "HR(bpm)", + "VT(l)", + "BF(bpm)", + "VE(l/min)", + "VO2 Pulse", + "VO2 Breath", + "CHO", + "FAT", + ] + + for col in columns_to_smooth: + if col in df.columns: + df[f"{col}_smoothed"] = ( + df[col].rolling(window=window_size, min_periods=1).mean() + ) + + return df + + def generate_graphs(self, df: pd.DataFrame) -> List[Dict[str, str]]: + """ + Generate all required graphs from processed data. + + Args: + df: Processed DataFrame with smoothed columns + + Returns: + List of dictionaries containing graph names and paths + """ + graphs_generated = [] + + # List of graphs to generate + graph_methods = [ + ("respiratory", self.graph_generator.generate_respiratory_chart), + ("fuel_utilization", self.graph_generator.generate_fuel_utilization_chart), + ("vo2_pulse", self.graph_generator.generate_vo2_pulse_chart), + ("vo2_breath", self.graph_generator.generate_vo2_breath_chart), + ("fat_metabolism", self.graph_generator.generate_fat_metabolism_chart), + ("recovery", self.graph_generator.generate_recovery_chart), + ] + + for name, method in graph_methods: + try: + path = method(df, save_as_base64=False) + graphs_generated.append({"name": name, "path": str(path)}) + except Exception as e: + print(f"Warning: Could not generate {name} chart: {e}") + + return graphs_generated + + def calculate_analysis_metrics(self, df: pd.DataFrame) -> Dict[str, Any]: + """ + Calculate basic analysis metrics from processed data. + + Args: + df: Processed DataFrame with smoothed columns + + Returns: + Dictionary containing analysis metrics + """ + return { + "vo2_max": float(df["VO2(ml/min)_smoothed"].max()) + if "VO2(ml/min)_smoothed" in df.columns + else 0, + "peak_vt": float(df["VT(l)_smoothed"].max()) + if "VT(l)_smoothed" in df.columns + else 0, + "max_hr": float(df["HR(bpm)_smoothed"].max()) + if "HR(bpm)_smoothed" in df.columns + else 0, + } + + def generate_html(self, patient_info: Dict[str, Any]) -> str: + """ + Generate HTML content for the report. + + Args: + patient_info: Dictionary containing patient information + (patient_name, age, height, weight, focus) + + Returns: + Complete HTML document as string + """ + html_pages = [] + + # Header context + header_context = { + "patient_name": patient_info.get("patient_name", ""), + "age": patient_info.get("age", ""), + "height": patient_info.get("height", ""), + "weight": patient_info.get("weight", ""), + "focus": patient_info.get("focus", "Endurance"), + } + + # Footer context + footer_context = [ + { + "contact_email": "info@ishplabs.com", + "website": "www.ishplabs.com", + "social": "@ishplabs", + "page_number": i + 1, + } + for i in range(len(context_list)) + ] + + # Render header + header_html = self.env.get_template("header.html").render(header_context) + + # Render footers + footer_html_list = [ + self.env.get_template("footer.html").render(context) + for context in footer_context + ] + + # Render pages + for i, context in enumerate(context_list): + template = self.env.get_template(f"page_{i + 1}.html").render(context) + + if (i + 1) > 2: + full_html = f""" +
+
+ {header_html} +
+
+ {template} +
+
+ {footer_html_list[i]} +
+
+ """ + html_pages.append(full_html) + else: + html_pages.append(template) + + # Combine with page breaks + final_html = "
".join(html_pages) + + # Wrap in full HTML document + html_doc = f""" + + + + + + + + + {final_html} + + + """ + + return html_doc + + def html_to_pdf(self, html_content: str, pdf_path: str) -> None: + """ + Convert HTML content to PDF file. + + Args: + html_content: HTML content as string + pdf_path: Path where PDF should be saved + """ + with sync_playwright() as p: + browser = p.chromium.launch() + page = browser.new_page() + page.set_content(html_content) + page.pdf(path=pdf_path, format="A4", print_background=True) + browser.close() + + def generate_report( + self, + spirometry_pdf_path: str, + pnoe_csv_path: str, + seca_excel_path: str, + patient_info: Dict[str, Any], + output_filename: str = None, + ) -> Dict[str, Any]: + """ + Generate complete medical report from uploaded files. + + Args: + spirometry_pdf_path: Path to Spirometry PDF file + pnoe_csv_path: Path to Pnoe CSV file + seca_excel_path: Path to SECA Excel file + patient_info: Dictionary containing patient information + output_filename: Optional custom output filename + + Returns: + Dictionary containing report path, graphs generated, and analysis data + """ + # Process data + df = self.process_pnoe_data(pnoe_csv_path) + + # Generate graphs + graphs_generated = self.generate_graphs(df) + + # Calculate analysis metrics + analysis_data = self.calculate_analysis_metrics(df) + analysis_data["graphs_count"] = len(graphs_generated) + + # Generate HTML + html_content = self.generate_html(patient_info) + + # Generate PDF + if output_filename is None: + patient_name = patient_info.get("patient_name", "Unknown") + session_id = patient_info.get("session_id", "default") + output_filename = ( + f"report_{patient_name.replace(' ', '_')}_{session_id}.pdf" + ) + + report_path = self.reports_dir / output_filename + self.html_to_pdf(html_content, str(report_path)) + + return { + "report_path": str(report_path), + "graphs_generated": graphs_generated, + "analysis_data": analysis_data, + } diff --git a/app/services/spirometry_table_extractor.py b/app/services/spirometry_table_extractor.py new file mode 100644 index 0000000..79f3901 --- /dev/null +++ b/app/services/spirometry_table_extractor.py @@ -0,0 +1,64 @@ +import base64 +import os + +import requests +from dotenv import load_dotenv + +load_dotenv() +API_KEY_REF = os.getenv("OPENROUTER_API_KEY") + + +def encode_pdf_to_base64(pdf_path): + with open(pdf_path, "rb") as pdf_file: + return base64.b64encode(pdf_file.read()).decode("utf-8") + + +def extract_spirometry_table_from_pdf(pdf_path): + url = "https://openrouter.ai/api/v1/chat/completions" + headers = { + "Authorization": f"Bearer {API_KEY_REF}", + "Content-Type": "application/json", + } + + # Read and encode the PDF + base64_pdf = encode_pdf_to_base64(pdf_path) + data_url = f"data:application/pdf;base64,{base64_pdf}" + + messages = [ + { + "role": "user", + "content": [ + { + "type": "text", + "text": "Please extract the Spirometry table from the pdf and return the values in csv format, " + "note that it is the unit of parameter that is beside it and it should not be a column. " + "The '-' Should be treated as empty values." + "do not add 'csv' at the start or end of the response", + }, + { + "type": "file", + "file": {"filename": "document.pdf", "file_data": data_url}, + }, + ], + } + ] + + payload = { + "model": "google/gemini-2.5-flash-lite", + "messages": messages, + } + + response = requests.post(url, headers=headers, json=payload) + response_data = response.json() + + if "choices" in response_data and len(response_data["choices"]) > 0: + content = response_data["choices"][0]["message"]["content"] + + # Save to a CSV file + output_file = "extracted_spirometry_table.csv" + with open(output_file, "w", encoding="utf-8") as f: + f.write(content) + + return f"Extracted table saved to {output_file}" + else: + return "No content found in response" diff --git a/app/services_dfdf/__init__.py b/app/services_dfdf/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/analysis.ipynb b/app/services_dfdf/analysis.ipynb similarity index 100% rename from app/services/analysis.ipynb rename to app/services_dfdf/analysis.ipynb diff --git a/app/services_dfdf/context_generator.py b/app/services_dfdf/context_generator.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/notebook.ipynb b/app/services_dfdf/notebook.ipynb similarity index 100% rename from app/services/notebook.ipynb rename to app/services_dfdf/notebook.ipynb diff --git a/app/services_dfdf/report_generator.py b/app/services_dfdf/report_generator.py new file mode 100644 index 0000000..a1f6ced --- /dev/null +++ b/app/services_dfdf/report_generator.py @@ -0,0 +1,318 @@ +""" +Report Generator Service + +This service handles the generation of medical reports from uploaded files. +It processes data, generates graphs, and creates PDF reports. +""" + +from pathlib import Path +from typing import Any, Dict, List + +import pandas as pd +from jinja2 import Environment, FileSystemLoader +from playwright.sync_api import sync_playwright + +from app.services.context import context_list +from app.services.graph_generator import GraphGenerator + + +class ReportGeneratorService: + """Service for generating medical performance reports""" + + def __init__( + self, + template_dir: str = "app/report_gen", + graphs_dir: str = "graphs", + reports_dir: str = "reports", + ): + """ + Initialize the report generator service. + + Args: + template_dir: Directory containing Jinja2 templates + graphs_dir: Directory to save generated graphs + reports_dir: Directory to save generated reports + """ + self.template_dir = template_dir + self.graphs_dir = Path(graphs_dir) + self.reports_dir = Path(reports_dir) + self.graph_generator = GraphGenerator(charts_dir=str(graphs_dir)) + self.env = Environment(loader=FileSystemLoader(template_dir)) + + # Ensure directories exist + self.graphs_dir.mkdir(exist_ok=True) + self.reports_dir.mkdir(exist_ok=True) + + def process_pnoe_data(self, pnoe_csv_path: str) -> pd.DataFrame: + """ + Load and process Pnoe CSV data. + + Args: + pnoe_csv_path: Path to Pnoe CSV file + + Returns: + Processed DataFrame with smoothed columns + """ + # Load data + df = pd.read_csv(pnoe_csv_path, delimiter=";") + df = df.apply(pd.to_numeric, errors="ignore") + + # Calculate derived columns + df["VO2 Pulse"] = df["VO2(ml/min)"] / df["HR(bpm)"] + df["VO2 Breath"] = df["VO2(ml/min)"] / df["BF(bpm)"] + df["CHO"] = df["EE(kcal/min)"] * df["CARBS(%)"] / 100 + df["FAT"] = df["EE(kcal/min)"] * df["FAT(%)"] / 100 + + # Smooth columns + window_size = 10 + columns_to_smooth = [ + "VO2(ml/min)", + "VCO2(ml/min)", + "HR(bpm)", + "VT(l)", + "BF(bpm)", + "VE(l/min)", + "VO2 Pulse", + "VO2 Breath", + "CHO", + "FAT", + ] + + for col in columns_to_smooth: + if col in df.columns: + df[f"{col}_smoothed"] = ( + df[col].rolling(window=window_size, min_periods=1).mean() + ) + + return df + + def generate_graphs(self, df: pd.DataFrame) -> List[Dict[str, str]]: + """ + Generate all required graphs from processed data. + + Args: + df: Processed DataFrame with smoothed columns + + Returns: + List of dictionaries containing graph names and paths + """ + graphs_generated = [] + + # List of graphs to generate + graph_methods = [ + ("respiratory", self.graph_generator.generate_respiratory_chart), + ("fuel_utilization", self.graph_generator.generate_fuel_utilization_chart), + ("vo2_pulse", self.graph_generator.generate_vo2_pulse_chart), + ("vo2_breath", self.graph_generator.generate_vo2_breath_chart), + ("fat_metabolism", self.graph_generator.generate_fat_metabolism_chart), + ("recovery", self.graph_generator.generate_recovery_chart), + ] + + for name, method in graph_methods: + try: + path = method(df, save_as_base64=False) + graphs_generated.append({"name": name, "path": str(path)}) + except Exception as e: + print(f"Warning: Could not generate {name} chart: {e}") + + return graphs_generated + + def calculate_analysis_metrics(self, df: pd.DataFrame) -> Dict[str, Any]: + """ + Calculate basic analysis metrics from processed data. + + Args: + df: Processed DataFrame with smoothed columns + + Returns: + Dictionary containing analysis metrics + """ + return { + "vo2_max": float(df["VO2(ml/min)_smoothed"].max()) + if "VO2(ml/min)_smoothed" in df.columns + else 0, + "peak_vt": float(df["VT(l)_smoothed"].max()) + if "VT(l)_smoothed" in df.columns + else 0, + "max_hr": float(df["HR(bpm)_smoothed"].max()) + if "HR(bpm)_smoothed" in df.columns + else 0, + } + + def generate_html(self, patient_info: Dict[str, Any]) -> str: + """ + Generate HTML content for the report. + + Args: + patient_info: Dictionary containing patient information + (patient_name, age, height, weight, focus) + + Returns: + Complete HTML document as string + """ + html_pages = [] + + # Header context + header_context = { + "patient_name": patient_info.get("patient_name", ""), + "age": patient_info.get("age", ""), + "height": patient_info.get("height", ""), + "weight": patient_info.get("weight", ""), + "focus": patient_info.get("focus", "Endurance"), + } + + # Footer context + footer_context = [ + { + "contact_email": "info@ishplabs.com", + "website": "www.ishplabs.com", + "social": "@ishplabs", + "page_number": i + 1, + } + for i in range(len(context_list)) + ] + + # Render header + header_html = self.env.get_template("header.html").render(header_context) + + # Render footers + footer_html_list = [ + self.env.get_template("footer.html").render(context) + for context in footer_context + ] + + # Render pages + for i, context in enumerate(context_list): + template = self.env.get_template(f"page_{i + 1}.html").render(context) + + if (i + 1) > 2: + full_html = f""" +
+
+ {header_html} +
+
+ {template} +
+
+ {footer_html_list[i]} +
+
+ """ + html_pages.append(full_html) + else: + html_pages.append(template) + + # Combine with page breaks + final_html = "
".join(html_pages) + + # Wrap in full HTML document + html_doc = f""" + + + + + + + + + {final_html} + + + """ + + return html_doc + + def html_to_pdf(self, html_content: str, pdf_path: str) -> None: + """ + Convert HTML content to PDF file. + + Args: + html_content: HTML content as string + pdf_path: Path where PDF should be saved + """ + with sync_playwright() as p: + browser = p.chromium.launch() + page = browser.new_page() + page.set_content(html_content) + page.pdf(path=pdf_path, format="A4", print_background=True) + browser.close() + + def generate_report( + self, + spirometry_pdf_path: str, + pnoe_csv_path: str, + seca_excel_path: str, + patient_info: Dict[str, Any], + output_filename: str = None, + ) -> Dict[str, Any]: + """ + Generate complete medical report from uploaded files. + + Args: + spirometry_pdf_path: Path to Spirometry PDF file + pnoe_csv_path: Path to Pnoe CSV file + seca_excel_path: Path to SECA Excel file + patient_info: Dictionary containing patient information + output_filename: Optional custom output filename + + Returns: + Dictionary containing report path, graphs generated, and analysis data + """ + # Process data + df = self.process_pnoe_data(pnoe_csv_path) + + # Generate graphs + graphs_generated = self.generate_graphs(df) + + # Calculate analysis metrics + analysis_data = self.calculate_analysis_metrics(df) + analysis_data["graphs_count"] = len(graphs_generated) + + # Generate HTML + html_content = self.generate_html(patient_info) + + # Generate PDF + if output_filename is None: + patient_name = patient_info.get("patient_name", "Unknown") + session_id = patient_info.get("session_id", "default") + output_filename = ( + f"report_{patient_name.replace(' ', '_')}_{session_id}.pdf" + ) + + report_path = self.reports_dir / output_filename + self.html_to_pdf(html_content, str(report_path)) + + return { + "report_path": str(report_path), + "graphs_generated": graphs_generated, + "analysis_data": analysis_data, + }