feat: Implement report generator service for medical reports

- Added ReportGeneratorService to handle generation of medical reports from uploaded files.
- Implemented methods for processing Pnoe CSV data, generating graphs, and calculating analysis metrics.
- Integrated Jinja2 for HTML report generation with customizable templates.
- Added functionality to convert HTML content to PDF using Playwright.
- Ensured proper directory structure for saving generated graphs and reports.
This commit is contained in:
bolade
2025-10-03 21:41:00 +01:00
parent 1d8136d6ad
commit 11ee6b192f
13 changed files with 896 additions and 658 deletions
+3 -1
View File
@@ -1,3 +1,5 @@
.venv
data/
data/
.env
Binary file not shown.
-533
View File
@@ -1,533 +0,0 @@
"""
FastAPI application for report generation with file uploads.
This API allows users to:
1. Upload required files (Spirometry PDF, Pnoe CSV, SECA Excel)
2. Generate reports with graphs and analysis
"""
import shutil
from pathlib import Path
from typing import Dict, Optional
import pandas as pd
from fastapi import FastAPI, File, HTTPException, UploadFile
from fastapi.responses import FileResponse
from pydantic import BaseModel
from graph_generator import GraphGenerator
app = FastAPI(
title="Medical Report Generation API",
description="API for generating medical performance reports with analysis and graphs",
version="1.0.0",
)
# Define upload directory
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
# Define output directories
GRAPHS_DIR = Path("graphs")
GRAPHS_DIR.mkdir(exist_ok=True)
REPORTS_DIR = Path("reports")
REPORTS_DIR.mkdir(exist_ok=True)
# Storage for uploaded files metadata
uploaded_files_store: Dict[str, Dict[str, str]] = {}
class FileUploadResponse(BaseModel):
message: str
filename: str
file_type: str
file_path: str
class ReportRequest(BaseModel):
patient_name: str
age: int
height: str
weight: str
focus: str = "Endurance"
session_id: Optional[str] = "default"
class ReportResponse(BaseModel):
message: str
report_path: str
graphs_generated: list
analysis_data: dict
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"message": "Medical Report Generation API",
"version": "1.0.0",
"endpoints": {
"upload_spirometry": "/upload/spirometry",
"upload_pnoe": "/upload/pnoe",
"upload_seca": "/upload/seca",
"generate_report": "/generate-report",
"list_uploads": "/uploads",
"health": "/health",
},
}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "report-generation-api"}
@app.post("/upload/spirometry", response_model=FileUploadResponse)
async def upload_spirometry_pdf(
file: UploadFile = File(...), session_id: str = "default"
):
"""
Upload Spirometry PDF file for analysis.
Args:
file: Spirometry PDF file
session_id: Session identifier to group files together (default: "default")
Returns:
FileUploadResponse with upload details
"""
if not file.filename.endswith(".pdf"):
raise HTTPException(status_code=400, detail="Only PDF files are allowed")
# Create session directory
session_dir = UPLOAD_DIR / session_id
session_dir.mkdir(exist_ok=True)
# Save file
file_path = session_dir / f"spirometry_{file.filename}"
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Store metadata
if session_id not in uploaded_files_store:
uploaded_files_store[session_id] = {}
uploaded_files_store[session_id]["spirometry_pdf"] = str(file_path)
return FileUploadResponse(
message="Spirometry PDF uploaded successfully",
filename=file.filename,
file_type="spirometry_pdf",
file_path=str(file_path),
)
@app.post("/upload/pnoe", response_model=FileUploadResponse)
async def upload_pnoe_csv(file: UploadFile = File(...), session_id: str = "default"):
"""
Upload Pnoe CSV file for metabolic analysis.
Args:
file: Pnoe CSV file
session_id: Session identifier to group files together (default: "default")
Returns:
FileUploadResponse with upload details
"""
if not file.filename.endswith(".csv"):
raise HTTPException(status_code=400, detail="Only CSV files are allowed")
# Create session directory
session_dir = UPLOAD_DIR / session_id
session_dir.mkdir(exist_ok=True)
# Save file
file_path = session_dir / f"pnoe_{file.filename}"
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Store metadata
if session_id not in uploaded_files_store:
uploaded_files_store[session_id] = {}
uploaded_files_store[session_id]["pnoe_csv"] = str(file_path)
return FileUploadResponse(
message="Pnoe CSV uploaded successfully",
filename=file.filename,
file_type="pnoe_csv",
file_path=str(file_path),
)
@app.post("/upload/seca", response_model=FileUploadResponse)
async def upload_seca_excel(file: UploadFile = File(...), session_id: str = "default"):
"""
Upload SECA body composition Excel file.
Args:
file: SECA Excel file (.xlsx)
session_id: Session identifier to group files together (default: "default")
Returns:
FileUploadResponse with upload details
"""
if not file.filename.endswith((".xlsx", ".xls")):
raise HTTPException(
status_code=400, detail="Only Excel files (.xlsx, .xls) are allowed"
)
# Create session directory
session_dir = UPLOAD_DIR / session_id
session_dir.mkdir(exist_ok=True)
# Save file
file_path = session_dir / f"seca_{file.filename}"
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Store metadata
if session_id not in uploaded_files_store:
uploaded_files_store[session_id] = {}
uploaded_files_store[session_id]["seca_excel"] = str(file_path)
return FileUploadResponse(
message="SECA Excel uploaded successfully",
filename=file.filename,
file_type="seca_excel",
file_path=str(file_path),
)
@app.get("/uploads")
async def list_uploads(session_id: str = "default"):
"""
List all uploaded files for a session.
Args:
session_id: Session identifier (default: "default")
Returns:
Dictionary of uploaded files
"""
if session_id not in uploaded_files_store:
return {"session_id": session_id, "files": {}, "message": "No files uploaded"}
return {
"session_id": session_id,
"files": uploaded_files_store[session_id],
"files_count": len(uploaded_files_store[session_id]),
}
@app.post("/generate-report", response_model=ReportResponse)
async def generate_report(report_request: ReportRequest):
"""
Generate a comprehensive medical report with graphs and analysis.
Args:
report_request: Report configuration including patient details
Returns:
ReportResponse with report path and analysis data
"""
session_id = report_request.session_id
# Check if all required files are uploaded
if session_id not in uploaded_files_store:
raise HTTPException(
status_code=400,
detail=f"No files found for session '{session_id}'. Please upload files first.",
)
files = uploaded_files_store[session_id]
required_files = ["spirometry_pdf", "pnoe_csv", "seca_excel"]
missing_files = [f for f in required_files if f not in files]
if missing_files:
raise HTTPException(
status_code=400,
detail=f"Missing required files: {', '.join(missing_files)}. Please upload all files first.",
)
try:
# Initialize graph generator
graph_gen = GraphGenerator(charts_dir=str(GRAPHS_DIR))
# Load and process Pnoe data
df = pd.read_csv(files["pnoe_csv"], delimiter=";")
df = df.apply(pd.to_numeric, errors="ignore")
# Calculate derived columns
df["VO2 Pulse"] = df["VO2(ml/min)"] / df["HR(bpm)"]
df["VO2 Breath"] = df["VO2(ml/min)"] / df["BF(bpm)"]
df["CHO"] = df["EE(kcal/min)"] * df["CARBS(%)"] / 100
df["FAT"] = df["EE(kcal/min)"] * df["FAT(%)"] / 100
# Smooth columns
window_size = 10
columns_to_smooth = [
"VO2(ml/min)",
"VCO2(ml/min)",
"HR(bpm)",
"VT(l)",
"BF(bpm)",
"VE(l/min)",
"VO2 Pulse",
"VO2 Breath",
"CHO",
"FAT",
]
for col in columns_to_smooth:
if col in df.columns:
df[f"{col}_smoothed"] = (
df[col].rolling(window=window_size, min_periods=1).mean()
)
# Generate graphs
graphs_generated = []
# Generate all available graphs from the graph generator
try:
respiratory_path = graph_gen.generate_respiratory_chart(
df, save_as_base64=False
)
graphs_generated.append(
{"name": "respiratory", "path": str(respiratory_path)}
)
except Exception as e:
print(f"Warning: Could not generate respiratory chart: {e}")
try:
fuel_util_path = graph_gen.generate_fuel_utilization_chart(
df, save_as_base64=False
)
graphs_generated.append(
{"name": "fuel_utilization", "path": str(fuel_util_path)}
)
except Exception as e:
print(f"Warning: Could not generate fuel utilization chart: {e}")
try:
vo2_pulse_path = graph_gen.generate_vo2_pulse_chart(
df, save_as_base64=False
)
graphs_generated.append({"name": "vo2_pulse", "path": str(vo2_pulse_path)})
except Exception as e:
print(f"Warning: Could not generate VO2 pulse chart: {e}")
try:
vo2_breath_path = graph_gen.generate_vo2_breath_chart(
df, save_as_base64=False
)
graphs_generated.append(
{"name": "vo2_breath", "path": str(vo2_breath_path)}
)
except Exception as e:
print(f"Warning: Could not generate VO2 breath chart: {e}")
try:
fat_metabolism_path = graph_gen.generate_fat_metabolism_chart(
df, save_as_base64=False
)
graphs_generated.append(
{"name": "fat_metabolism", "path": str(fat_metabolism_path)}
)
except Exception as e:
print(f"Warning: Could not generate fat metabolism chart: {e}")
try:
recovery_path = graph_gen.generate_recovery_chart(df, save_as_base64=False)
graphs_generated.append({"name": "recovery", "path": str(recovery_path)})
except Exception as e:
print(f"Warning: Could not generate recovery chart: {e}")
# Calculate basic analysis metrics
analysis_data = {
"vo2_max": float(df["VO2(ml/min)_smoothed"].max())
if "VO2(ml/min)_smoothed" in df.columns
else 0,
"peak_vt": float(df["VT(l)_smoothed"].max())
if "VT(l)_smoothed" in df.columns
else 0,
"max_hr": float(df["HR(bpm)_smoothed"].max())
if "HR(bpm)_smoothed" in df.columns
else 0,
"graphs_count": len(graphs_generated),
}
# Generate PDF report using existing main.py logic
from jinja2 import Environment, FileSystemLoader
from context import context_list
from main import html_string_to_pdf
env = Environment(loader=FileSystemLoader("report_gen"))
html_pages = []
header_context = {
"patient_name": report_request.patient_name,
"age": report_request.age,
"height": report_request.height,
"weight": report_request.weight,
"focus": report_request.focus,
}
footer_context = [
{
"contact_email": "info@ishplabs.com",
"website": "www.ishplabs.com",
"social": "@ishplabs",
"page_number": i + 1,
}
for i in range(len(context_list))
]
header_html = env.get_template("header.html").render(header_context)
footer_html_list = [
env.get_template("footer.html").render(context)
for context in footer_context
]
for i, context in enumerate(context_list):
template = env.get_template(f"page_{i + 1}.html").render(context)
if (i + 1) > 2:
full_html = f"""
<div class="page flex flex-col justify-between">
<div>
{header_html}
</div>
<main class="flex-grow p-4">
{template}
</main>
<div class="border-t text-center text-sm text-gray-600">
{footer_html_list[i]}
</div>
</div>
"""
html_pages.append(full_html)
else:
html_pages.append(template)
# Combine with page breaks
final_html = "<div class='page-break'></div>".join(html_pages)
# Wrap in full HTML document
html_doc = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<link href="https://cdn.jsdelivr.net/npm/tailwindcss/dist/tailwind.min.css" rel="stylesheet">
<style>
html, body {{
height: 100%;
margin: 0;
padding: 0;
}}
.page-break {{ page-break-after: always; }}
.page {{
height: 100vh;
min-height: 100vh;
display: flex;
flex-direction: column;
}}
.page main {{
flex: 1;
overflow: hidden;
}}
* {{
margin: 0;
padding: 0;
box-sizing: border-box;
}}
img {{
max-height: 300px;
}}
.chart-large {{
max-height: 500px !important;
}}
</style>
</head>
<body class="m-0 p-0">
{final_html}
</body>
</html>
"""
# Generate PDF
report_filename = (
f"report_{report_request.patient_name.replace(' ', '_')}_{session_id}.pdf"
)
report_path = REPORTS_DIR / report_filename
html_string_to_pdf(html_doc, str(report_path))
return ReportResponse(
message="Report generated successfully",
report_path=str(report_path),
graphs_generated=graphs_generated,
analysis_data=analysis_data,
)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Error generating report: {str(e)}"
)
@app.get("/download-report/{filename}")
async def download_report(filename: str):
"""
Download a generated report.
Args:
filename: Name of the report file
Returns:
PDF file
"""
file_path = REPORTS_DIR / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="Report not found")
return FileResponse(
path=file_path,
media_type="application/pdf",
filename=filename,
)
@app.delete("/uploads/{session_id}")
async def delete_session_uploads(session_id: str):
"""
Delete all uploaded files for a session.
Args:
session_id: Session identifier
Returns:
Success message
"""
if session_id not in uploaded_files_store:
raise HTTPException(status_code=404, detail="Session not found")
# Delete files
session_dir = UPLOAD_DIR / session_id
if session_dir.exists():
shutil.rmtree(session_dir)
# Remove from store
del uploaded_files_store[session_id]
return {"message": f"Session '{session_id}' deleted successfully"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
+193
View File
@@ -0,0 +1,193 @@
"""
FastAPI application for medical report generation.
This API provides a single endpoint that accepts all required files
and patient information, then generates a comprehensive medical report.
"""
import shutil
import tempfile
from pathlib import Path
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.responses import FileResponse
from pydantic import BaseModel
from services.report_generator import ReportGeneratorService
app = FastAPI(
title="Medical Report Generation API",
description="API for generating medical performance reports with analysis and graphs",
version="2.0.0",
)
# Define output directories
GRAPHS_DIR = Path("graphs")
GRAPHS_DIR.mkdir(exist_ok=True)
REPORTS_DIR = Path("reports")
REPORTS_DIR.mkdir(exist_ok=True)
# Initialize report generator service
report_service = ReportGeneratorService(
template_dir="app/report_gen",
graphs_dir=str(GRAPHS_DIR),
reports_dir=str(REPORTS_DIR),
)
class ReportResponse(BaseModel):
message: str
report_path: str
graphs_generated: list
analysis_data: dict
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"message": "Medical Report Generation API",
"version": "2.0.0",
"endpoints": {
"generate_report": "POST /generate-report",
"download_report": "GET /download-report/{filename}",
"health": "GET /health",
},
}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "report-generation-api"}
@app.post("/generate-report", response_model=ReportResponse)
async def generate_report(
patient_name: str = Form(..., description="Patient name"),
age: int = Form(..., description="Patient age"),
height: str = Form(..., description="Patient height (e.g., 5'4\")"),
weight: str = Form(..., description="Patient weight (e.g., 123lbs)"),
focus: str = Form(default="Endurance", description="Training focus"),
session_id: str = Form(default="default", description="Session ID"),
spirometry_pdf: UploadFile = File(..., description="Spirometry PDF file"),
pnoe_csv: UploadFile = File(..., description="Pnoe CSV file"),
seca_excel: UploadFile = File(..., description="SECA Excel file"),
):
"""
Generate a comprehensive medical report from uploaded files.
This endpoint accepts all required files and patient information,
processes the data, generates graphs, and returns a PDF report.
Args:
spirometry_pdf: Spirometry PDF file
pnoe_csv: Pnoe CSV data file
seca_excel: SECA body composition Excel file
patient_name: Name of the patient
age: Patient age
height: Patient height
weight: Patient weight
focus: Training focus (default: Endurance)
session_id: Session identifier (default: default)
Returns:
ReportResponse with report path, graphs generated, and analysis data
"""
# Validate file types
if not spirometry_pdf.filename.endswith(".pdf"):
raise HTTPException(status_code=400, detail="Spirometry file must be a PDF")
if not pnoe_csv.filename.endswith(".csv"):
raise HTTPException(status_code=400, detail="Pnoe file must be a CSV")
if not seca_excel.filename.endswith((".xlsx", ".xls")):
raise HTTPException(
status_code=400, detail="SECA file must be an Excel file (.xlsx or .xls)"
)
# Create temporary directory for uploaded files
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Save uploaded files temporarily
spirometry_path = temp_path / f"spirometry_{spirometry_pdf.filename}"
pnoe_path = temp_path / f"pnoe_{pnoe_csv.filename}"
seca_path = temp_path / f"seca_{seca_excel.filename}"
try:
# Write files
with open(spirometry_path, "wb") as f:
shutil.copyfileobj(spirometry_pdf.file, f)
with open(pnoe_path, "wb") as f:
shutil.copyfileobj(pnoe_csv.file, f)
with open(seca_path, "wb") as f:
shutil.copyfileobj(seca_excel.file, f)
# Prepare patient information
patient_info = {
"patient_name": patient_name,
"age": age,
"height": height,
"weight": weight,
"focus": focus,
"session_id": session_id,
}
# Generate report using the service
result = report_service.generate_report(
spirometry_pdf_path=str(spirometry_path),
pnoe_csv_path=str(pnoe_path),
seca_excel_path=str(seca_path),
patient_info=patient_info,
)
return ReportResponse(
message="Report generated successfully",
report_path=result["report_path"],
graphs_generated=result["graphs_generated"],
analysis_data=result["analysis_data"],
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error generating report: {str(e)}",
)
finally:
# Close file handles
spirometry_pdf.file.close()
pnoe_csv.file.close()
seca_excel.file.close()
@app.get("/download-report/{filename}")
async def download_report(filename: str):
"""
Download a generated report.
Args:
filename: Name of the report file
Returns:
PDF file
"""
file_path = REPORTS_DIR / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="Report not found")
return FileResponse(
path=file_path,
media_type="application/pdf",
filename=filename,
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
-124
View File
@@ -1,124 +0,0 @@
from jinja2 import Environment, FileSystemLoader
from playwright.sync_api import sync_playwright
from context import context_list
env = Environment(loader=FileSystemLoader("report_gen"))
html_pages = []
header_context = {
"patient_name": "Keirstyn Moran",
"age": 34,
"height": "5'4\"",
"weight": "123lbs",
"focus": "Endurance",
}
footer_context = [
{
"contact_email": "info@ishplabs.com ",
"website": "www.ishplabs.com",
"social": "@ishplabs",
"page_number": i + 1,
}
for i in range(len(context_list))
]
header_html = env.get_template("header.html").render(header_context)
footer_html_list = [
env.get_template("footer.html").render(context) for context in footer_context
]
for i, context in enumerate(context_list):
template = env.get_template(f"page_{i + 1}.html").render(context)
if (i + 1) > 2:
full_html = f"""
<div class="page flex flex-col justify-between">
<div>
{header_html}
</div>
<main class="flex-grow p-4">
{template}
</main>
<div class="border-t text-center text-sm text-gray-600">
{footer_html_list[i]}
</div>
</div>
"""
html_pages.append(full_html)
else:
html_pages.append(template)
# Combine with page breaks
final_html = "<div class='page-break'></div>".join(html_pages)
# Wrap in full HTML document
html_doc = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<link href="https://cdn.jsdelivr.net/npm/tailwindcss/dist/tailwind.min.css" rel="stylesheet">
<style>
html, body {{
height: 100%;
margin: 0;
padding: 0;
}}
.page-break {{ page-break-after: always; }}
.page {{
height: 100vh;
min-height: 100vh;
display: flex;
flex-direction: column;
}}
.page main {{
flex: 1;
overflow: hidden;
}}
/* Reset margins and padding everywhere */
* {{
margin: 0;
padding: 0;
box-sizing: border-box;
}}
/* Prevent images from being too large */
img {{
max-height: 300px;
}}
/* Larger images for specific charts */
.chart-large {{
max-height: 500px !important;
}}
</style>
</head>
<body class="m-0 p-0">
{final_html}
</body>
</html>
"""
# Generate PDF
def html_string_to_pdf(html_content, pdf_path):
with sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page()
# Set the HTML directly
page.set_content(html_content)
# Export to PDF
page.pdf(path=pdf_path, format="A4", print_background=True)
browser.close()
html_string_to_pdf(html_doc, "multi_page_report.pdf")
# pdfkit.from_string(html_doc, "truth_report.pdf", options=options)
print("✅ PDF generated: multi_page_report.pdf")
+318
View File
@@ -0,0 +1,318 @@
"""
Report Generator Service
This service handles the generation of medical reports from uploaded files.
It processes data, generates graphs, and creates PDF reports.
"""
from pathlib import Path
from typing import Any, Dict, List
import pandas as pd
from jinja2 import Environment, FileSystemLoader
from playwright.sync_api import sync_playwright
from app.services.context import context_list
from app.services.graph_generator import GraphGenerator
class ReportGeneratorService:
"""Service for generating medical performance reports"""
def __init__(
self,
template_dir: str = "app/report_gen",
graphs_dir: str = "graphs",
reports_dir: str = "reports",
):
"""
Initialize the report generator service.
Args:
template_dir: Directory containing Jinja2 templates
graphs_dir: Directory to save generated graphs
reports_dir: Directory to save generated reports
"""
self.template_dir = template_dir
self.graphs_dir = Path(graphs_dir)
self.reports_dir = Path(reports_dir)
self.graph_generator = GraphGenerator(charts_dir=str(graphs_dir))
self.env = Environment(loader=FileSystemLoader(template_dir))
# Ensure directories exist
self.graphs_dir.mkdir(exist_ok=True)
self.reports_dir.mkdir(exist_ok=True)
def process_pnoe_data(self, pnoe_csv_path: str) -> pd.DataFrame:
"""
Load and process Pnoe CSV data.
Args:
pnoe_csv_path: Path to Pnoe CSV file
Returns:
Processed DataFrame with smoothed columns
"""
# Load data
df = pd.read_csv(pnoe_csv_path, delimiter=";")
df = df.apply(pd.to_numeric, errors="ignore")
# Calculate derived columns
df["VO2 Pulse"] = df["VO2(ml/min)"] / df["HR(bpm)"]
df["VO2 Breath"] = df["VO2(ml/min)"] / df["BF(bpm)"]
df["CHO"] = df["EE(kcal/min)"] * df["CARBS(%)"] / 100
df["FAT"] = df["EE(kcal/min)"] * df["FAT(%)"] / 100
# Smooth columns
window_size = 10
columns_to_smooth = [
"VO2(ml/min)",
"VCO2(ml/min)",
"HR(bpm)",
"VT(l)",
"BF(bpm)",
"VE(l/min)",
"VO2 Pulse",
"VO2 Breath",
"CHO",
"FAT",
]
for col in columns_to_smooth:
if col in df.columns:
df[f"{col}_smoothed"] = (
df[col].rolling(window=window_size, min_periods=1).mean()
)
return df
def generate_graphs(self, df: pd.DataFrame) -> List[Dict[str, str]]:
"""
Generate all required graphs from processed data.
Args:
df: Processed DataFrame with smoothed columns
Returns:
List of dictionaries containing graph names and paths
"""
graphs_generated = []
# List of graphs to generate
graph_methods = [
("respiratory", self.graph_generator.generate_respiratory_chart),
("fuel_utilization", self.graph_generator.generate_fuel_utilization_chart),
("vo2_pulse", self.graph_generator.generate_vo2_pulse_chart),
("vo2_breath", self.graph_generator.generate_vo2_breath_chart),
("fat_metabolism", self.graph_generator.generate_fat_metabolism_chart),
("recovery", self.graph_generator.generate_recovery_chart),
]
for name, method in graph_methods:
try:
path = method(df, save_as_base64=False)
graphs_generated.append({"name": name, "path": str(path)})
except Exception as e:
print(f"Warning: Could not generate {name} chart: {e}")
return graphs_generated
def calculate_analysis_metrics(self, df: pd.DataFrame) -> Dict[str, Any]:
"""
Calculate basic analysis metrics from processed data.
Args:
df: Processed DataFrame with smoothed columns
Returns:
Dictionary containing analysis metrics
"""
return {
"vo2_max": float(df["VO2(ml/min)_smoothed"].max())
if "VO2(ml/min)_smoothed" in df.columns
else 0,
"peak_vt": float(df["VT(l)_smoothed"].max())
if "VT(l)_smoothed" in df.columns
else 0,
"max_hr": float(df["HR(bpm)_smoothed"].max())
if "HR(bpm)_smoothed" in df.columns
else 0,
}
def generate_html(self, patient_info: Dict[str, Any]) -> str:
"""
Generate HTML content for the report.
Args:
patient_info: Dictionary containing patient information
(patient_name, age, height, weight, focus)
Returns:
Complete HTML document as string
"""
html_pages = []
# Header context
header_context = {
"patient_name": patient_info.get("patient_name", ""),
"age": patient_info.get("age", ""),
"height": patient_info.get("height", ""),
"weight": patient_info.get("weight", ""),
"focus": patient_info.get("focus", "Endurance"),
}
# Footer context
footer_context = [
{
"contact_email": "info@ishplabs.com",
"website": "www.ishplabs.com",
"social": "@ishplabs",
"page_number": i + 1,
}
for i in range(len(context_list))
]
# Render header
header_html = self.env.get_template("header.html").render(header_context)
# Render footers
footer_html_list = [
self.env.get_template("footer.html").render(context)
for context in footer_context
]
# Render pages
for i, context in enumerate(context_list):
template = self.env.get_template(f"page_{i + 1}.html").render(context)
if (i + 1) > 2:
full_html = f"""
<div class="page flex flex-col justify-between">
<div>
{header_html}
</div>
<main class="flex-grow p-4">
{template}
</main>
<div class="border-t text-center text-sm text-gray-600">
{footer_html_list[i]}
</div>
</div>
"""
html_pages.append(full_html)
else:
html_pages.append(template)
# Combine with page breaks
final_html = "<div class='page-break'></div>".join(html_pages)
# Wrap in full HTML document
html_doc = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<link href="https://cdn.jsdelivr.net/npm/tailwindcss/dist/tailwind.min.css" rel="stylesheet">
<style>
html, body {{
height: 100%;
margin: 0;
padding: 0;
}}
.page-break {{ page-break-after: always; }}
.page {{
height: 100vh;
min-height: 100vh;
display: flex;
flex-direction: column;
}}
.page main {{
flex: 1;
overflow: hidden;
}}
* {{
margin: 0;
padding: 0;
box-sizing: border-box;
}}
img {{
max-height: 300px;
}}
.chart-large {{
max-height: 500px !important;
}}
</style>
</head>
<body class="m-0 p-0">
{final_html}
</body>
</html>
"""
return html_doc
def html_to_pdf(self, html_content: str, pdf_path: str) -> None:
"""
Convert HTML content to PDF file.
Args:
html_content: HTML content as string
pdf_path: Path where PDF should be saved
"""
with sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page()
page.set_content(html_content)
page.pdf(path=pdf_path, format="A4", print_background=True)
browser.close()
def generate_report(
self,
spirometry_pdf_path: str,
pnoe_csv_path: str,
seca_excel_path: str,
patient_info: Dict[str, Any],
output_filename: str = None,
) -> Dict[str, Any]:
"""
Generate complete medical report from uploaded files.
Args:
spirometry_pdf_path: Path to Spirometry PDF file
pnoe_csv_path: Path to Pnoe CSV file
seca_excel_path: Path to SECA Excel file
patient_info: Dictionary containing patient information
output_filename: Optional custom output filename
Returns:
Dictionary containing report path, graphs generated, and analysis data
"""
# Process data
df = self.process_pnoe_data(pnoe_csv_path)
# Generate graphs
graphs_generated = self.generate_graphs(df)
# Calculate analysis metrics
analysis_data = self.calculate_analysis_metrics(df)
analysis_data["graphs_count"] = len(graphs_generated)
# Generate HTML
html_content = self.generate_html(patient_info)
# Generate PDF
if output_filename is None:
patient_name = patient_info.get("patient_name", "Unknown")
session_id = patient_info.get("session_id", "default")
output_filename = (
f"report_{patient_name.replace(' ', '_')}_{session_id}.pdf"
)
report_path = self.reports_dir / output_filename
self.html_to_pdf(html_content, str(report_path))
return {
"report_path": str(report_path),
"graphs_generated": graphs_generated,
"analysis_data": analysis_data,
}
@@ -0,0 +1,64 @@
import base64
import os
import requests
from dotenv import load_dotenv
load_dotenv()
API_KEY_REF = os.getenv("OPENROUTER_API_KEY")
def encode_pdf_to_base64(pdf_path):
with open(pdf_path, "rb") as pdf_file:
return base64.b64encode(pdf_file.read()).decode("utf-8")
def extract_spirometry_table_from_pdf(pdf_path):
url = "https://openrouter.ai/api/v1/chat/completions"
headers = {
"Authorization": f"Bearer {API_KEY_REF}",
"Content-Type": "application/json",
}
# Read and encode the PDF
base64_pdf = encode_pdf_to_base64(pdf_path)
data_url = f"data:application/pdf;base64,{base64_pdf}"
messages = [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Please extract the Spirometry table from the pdf and return the values in csv format, "
"note that it is the unit of parameter that is beside it and it should not be a column. "
"The '-' Should be treated as empty values."
"do not add 'csv' at the start or end of the response",
},
{
"type": "file",
"file": {"filename": "document.pdf", "file_data": data_url},
},
],
}
]
payload = {
"model": "google/gemini-2.5-flash-lite",
"messages": messages,
}
response = requests.post(url, headers=headers, json=payload)
response_data = response.json()
if "choices" in response_data and len(response_data["choices"]) > 0:
content = response_data["choices"][0]["message"]["content"]
# Save to a CSV file
output_file = "extracted_spirometry_table.csv"
with open(output_file, "w", encoding="utf-8") as f:
f.write(content)
return f"Extracted table saved to {output_file}"
else:
return "No content found in response"
View File
+318
View File
@@ -0,0 +1,318 @@
"""
Report Generator Service
This service handles the generation of medical reports from uploaded files.
It processes data, generates graphs, and creates PDF reports.
"""
from pathlib import Path
from typing import Any, Dict, List
import pandas as pd
from jinja2 import Environment, FileSystemLoader
from playwright.sync_api import sync_playwright
from app.services.context import context_list
from app.services.graph_generator import GraphGenerator
class ReportGeneratorService:
"""Service for generating medical performance reports"""
def __init__(
self,
template_dir: str = "app/report_gen",
graphs_dir: str = "graphs",
reports_dir: str = "reports",
):
"""
Initialize the report generator service.
Args:
template_dir: Directory containing Jinja2 templates
graphs_dir: Directory to save generated graphs
reports_dir: Directory to save generated reports
"""
self.template_dir = template_dir
self.graphs_dir = Path(graphs_dir)
self.reports_dir = Path(reports_dir)
self.graph_generator = GraphGenerator(charts_dir=str(graphs_dir))
self.env = Environment(loader=FileSystemLoader(template_dir))
# Ensure directories exist
self.graphs_dir.mkdir(exist_ok=True)
self.reports_dir.mkdir(exist_ok=True)
def process_pnoe_data(self, pnoe_csv_path: str) -> pd.DataFrame:
"""
Load and process Pnoe CSV data.
Args:
pnoe_csv_path: Path to Pnoe CSV file
Returns:
Processed DataFrame with smoothed columns
"""
# Load data
df = pd.read_csv(pnoe_csv_path, delimiter=";")
df = df.apply(pd.to_numeric, errors="ignore")
# Calculate derived columns
df["VO2 Pulse"] = df["VO2(ml/min)"] / df["HR(bpm)"]
df["VO2 Breath"] = df["VO2(ml/min)"] / df["BF(bpm)"]
df["CHO"] = df["EE(kcal/min)"] * df["CARBS(%)"] / 100
df["FAT"] = df["EE(kcal/min)"] * df["FAT(%)"] / 100
# Smooth columns
window_size = 10
columns_to_smooth = [
"VO2(ml/min)",
"VCO2(ml/min)",
"HR(bpm)",
"VT(l)",
"BF(bpm)",
"VE(l/min)",
"VO2 Pulse",
"VO2 Breath",
"CHO",
"FAT",
]
for col in columns_to_smooth:
if col in df.columns:
df[f"{col}_smoothed"] = (
df[col].rolling(window=window_size, min_periods=1).mean()
)
return df
def generate_graphs(self, df: pd.DataFrame) -> List[Dict[str, str]]:
"""
Generate all required graphs from processed data.
Args:
df: Processed DataFrame with smoothed columns
Returns:
List of dictionaries containing graph names and paths
"""
graphs_generated = []
# List of graphs to generate
graph_methods = [
("respiratory", self.graph_generator.generate_respiratory_chart),
("fuel_utilization", self.graph_generator.generate_fuel_utilization_chart),
("vo2_pulse", self.graph_generator.generate_vo2_pulse_chart),
("vo2_breath", self.graph_generator.generate_vo2_breath_chart),
("fat_metabolism", self.graph_generator.generate_fat_metabolism_chart),
("recovery", self.graph_generator.generate_recovery_chart),
]
for name, method in graph_methods:
try:
path = method(df, save_as_base64=False)
graphs_generated.append({"name": name, "path": str(path)})
except Exception as e:
print(f"Warning: Could not generate {name} chart: {e}")
return graphs_generated
def calculate_analysis_metrics(self, df: pd.DataFrame) -> Dict[str, Any]:
"""
Calculate basic analysis metrics from processed data.
Args:
df: Processed DataFrame with smoothed columns
Returns:
Dictionary containing analysis metrics
"""
return {
"vo2_max": float(df["VO2(ml/min)_smoothed"].max())
if "VO2(ml/min)_smoothed" in df.columns
else 0,
"peak_vt": float(df["VT(l)_smoothed"].max())
if "VT(l)_smoothed" in df.columns
else 0,
"max_hr": float(df["HR(bpm)_smoothed"].max())
if "HR(bpm)_smoothed" in df.columns
else 0,
}
def generate_html(self, patient_info: Dict[str, Any]) -> str:
"""
Generate HTML content for the report.
Args:
patient_info: Dictionary containing patient information
(patient_name, age, height, weight, focus)
Returns:
Complete HTML document as string
"""
html_pages = []
# Header context
header_context = {
"patient_name": patient_info.get("patient_name", ""),
"age": patient_info.get("age", ""),
"height": patient_info.get("height", ""),
"weight": patient_info.get("weight", ""),
"focus": patient_info.get("focus", "Endurance"),
}
# Footer context
footer_context = [
{
"contact_email": "info@ishplabs.com",
"website": "www.ishplabs.com",
"social": "@ishplabs",
"page_number": i + 1,
}
for i in range(len(context_list))
]
# Render header
header_html = self.env.get_template("header.html").render(header_context)
# Render footers
footer_html_list = [
self.env.get_template("footer.html").render(context)
for context in footer_context
]
# Render pages
for i, context in enumerate(context_list):
template = self.env.get_template(f"page_{i + 1}.html").render(context)
if (i + 1) > 2:
full_html = f"""
<div class="page flex flex-col justify-between">
<div>
{header_html}
</div>
<main class="flex-grow p-4">
{template}
</main>
<div class="border-t text-center text-sm text-gray-600">
{footer_html_list[i]}
</div>
</div>
"""
html_pages.append(full_html)
else:
html_pages.append(template)
# Combine with page breaks
final_html = "<div class='page-break'></div>".join(html_pages)
# Wrap in full HTML document
html_doc = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<link href="https://cdn.jsdelivr.net/npm/tailwindcss/dist/tailwind.min.css" rel="stylesheet">
<style>
html, body {{
height: 100%;
margin: 0;
padding: 0;
}}
.page-break {{ page-break-after: always; }}
.page {{
height: 100vh;
min-height: 100vh;
display: flex;
flex-direction: column;
}}
.page main {{
flex: 1;
overflow: hidden;
}}
* {{
margin: 0;
padding: 0;
box-sizing: border-box;
}}
img {{
max-height: 300px;
}}
.chart-large {{
max-height: 500px !important;
}}
</style>
</head>
<body class="m-0 p-0">
{final_html}
</body>
</html>
"""
return html_doc
def html_to_pdf(self, html_content: str, pdf_path: str) -> None:
"""
Convert HTML content to PDF file.
Args:
html_content: HTML content as string
pdf_path: Path where PDF should be saved
"""
with sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page()
page.set_content(html_content)
page.pdf(path=pdf_path, format="A4", print_background=True)
browser.close()
def generate_report(
self,
spirometry_pdf_path: str,
pnoe_csv_path: str,
seca_excel_path: str,
patient_info: Dict[str, Any],
output_filename: str = None,
) -> Dict[str, Any]:
"""
Generate complete medical report from uploaded files.
Args:
spirometry_pdf_path: Path to Spirometry PDF file
pnoe_csv_path: Path to Pnoe CSV file
seca_excel_path: Path to SECA Excel file
patient_info: Dictionary containing patient information
output_filename: Optional custom output filename
Returns:
Dictionary containing report path, graphs generated, and analysis data
"""
# Process data
df = self.process_pnoe_data(pnoe_csv_path)
# Generate graphs
graphs_generated = self.generate_graphs(df)
# Calculate analysis metrics
analysis_data = self.calculate_analysis_metrics(df)
analysis_data["graphs_count"] = len(graphs_generated)
# Generate HTML
html_content = self.generate_html(patient_info)
# Generate PDF
if output_filename is None:
patient_name = patient_info.get("patient_name", "Unknown")
session_id = patient_info.get("session_id", "default")
output_filename = (
f"report_{patient_name.replace(' ', '_')}_{session_id}.pdf"
)
report_path = self.reports_dir / output_filename
self.html_to_pdf(html_content, str(report_path))
return {
"report_path": str(report_path),
"graphs_generated": graphs_generated,
"analysis_data": analysis_data,
}