Add HTML templates for medical report generator with navigation, upload, edit, and preview functionalities

- Created base template with navigation and layout structure
- Implemented upload.html for patient data and file uploads
- Developed edit.html for editing calculated metrics
- Added preview.html for displaying generated report previews
- Enhanced user experience with Tailwind CSS styling
This commit is contained in:
bolade
2025-11-17 17:15:44 +01:00
parent 4f97691ff9
commit 83f50882e2
14 changed files with 1726 additions and 1770 deletions
+335 -13
View File
@@ -5,13 +5,17 @@ This API provides a single endpoint that accepts all required files
and patient information, then generates a comprehensive medical report.
"""
import os
import shutil
import tempfile
import uuid
from pathlib import Path
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.responses import FileResponse
from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile
from fastapi.responses import FileResponse, HTMLResponse, RedirectResponse
from jinja2 import Environment, FileSystemLoader
from pydantic import BaseModel
from starlette.middleware.sessions import SessionMiddleware
from services.report_generator import ReportGeneratorService
app = FastAPI(
@@ -20,6 +24,12 @@ app = FastAPI(
version="2.0.0",
)
# Add session middleware
app.add_middleware(SessionMiddleware, secret_key=os.getenv("SECRET_KEY", "your-secret-key-change-in-production"))
# Setup templates
jinja_env = Environment(loader=FileSystemLoader("app/templates"))
# Define output directories
GRAPHS_DIR = Path("graphs")
GRAPHS_DIR.mkdir(exist_ok=True)
@@ -27,6 +37,9 @@ GRAPHS_DIR.mkdir(exist_ok=True)
REPORTS_DIR = Path("reports")
REPORTS_DIR.mkdir(exist_ok=True)
TEMP_DIR = Path("temp")
TEMP_DIR.mkdir(exist_ok=True)
# Initialize report generator service
report_service = ReportGeneratorService(
template_dir="app/report_gen",
@@ -42,18 +55,327 @@ class ReportResponse(BaseModel):
analysis_data: dict
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"message": "Medical Report Generation API",
"version": "2.0.0",
"endpoints": {
"generate_report": "POST /generate-report",
"download_report": "GET /download-report/{filename}",
"health": "GET /health",
},
def render_template(template_name: str, context: dict) -> HTMLResponse:
"""Helper function to render Jinja2 templates"""
template = jinja_env.get_template(template_name)
html_content = template.render(**context)
return HTMLResponse(content=html_content)
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
"""Root endpoint - Upload form page"""
return render_template("upload.html", {"request": request, "session": request.session})
@app.post("/upload")
async def upload_files(
request: Request,
first_name: str = Form(...),
last_name: str = Form(...),
age: int = Form(...),
height: str = Form(...),
weight: str = Form(...),
gender: str = Form(...),
focus: str = Form(default="Endurance"),
session_id: str = Form(default="default"),
spirometry_pdf: UploadFile = File(...),
pnoe_csv: UploadFile = File(...),
seca_excel: UploadFile = File(...),
):
"""Handle file upload and generate report"""
# Validate file types
if not spirometry_pdf.filename.endswith(".pdf"):
return render_template("upload.html", {
"request": request,
"session": request.session,
"error": "Spirometry file must be a PDF"
})
if not pnoe_csv.filename.endswith(".csv"):
return render_template("upload.html", {
"request": request,
"session": request.session,
"error": "Pnoe file must be a CSV"
})
if not seca_excel.filename.endswith((".xlsx", ".xls")):
return render_template("upload.html", {
"request": request,
"session": request.session,
"error": "SECA file must be an Excel file (.xlsx or .xls)"
})
# Create session-specific temp directory
session_uuid = str(uuid.uuid4())
session_temp_dir = TEMP_DIR / session_uuid
session_temp_dir.mkdir(exist_ok=True, parents=True)
# Save uploaded files
spirometry_path = session_temp_dir / f"spirometry_{spirometry_pdf.filename}"
pnoe_path = session_temp_dir / f"pnoe_{pnoe_csv.filename}"
seca_path = session_temp_dir / f"seca_{seca_excel.filename}"
try:
# Write files
with open(spirometry_path, "wb") as f:
shutil.copyfileobj(spirometry_pdf.file, f)
with open(pnoe_path, "wb") as f:
shutil.copyfileobj(pnoe_csv.file, f)
with open(seca_path, "wb") as f:
shutil.copyfileobj(seca_excel.file, f)
# Prepare patient information
patient_name = f"{first_name} {last_name}"
patient_info = {
"patient_name": patient_name,
"first_name": first_name,
"last_name": last_name,
"age": age,
"height": height,
"weight": weight,
"gender": gender,
"focus": focus,
"session_id": session_id,
}
# Generate report
result = await report_service.generate_report(
spirometry_pdf_path=str(spirometry_path),
pnoe_csv_path=str(pnoe_path),
seca_excel_path=str(seca_path),
patient_info=patient_info,
)
# Store in session
request.session["patient_info"] = patient_info
request.session["temp_dir"] = str(session_temp_dir)
request.session["report_path"] = result["report_path"]
request.session["graphs_generated"] = result["graphs_generated"]
request.session["analysis_data"] = result["analysis_data"]
# Extract spirometry CSV path (it's saved in data_dir by the service)
from services.spirometry_table_extractor import extract_spirometry_table_from_pdf
from services.context_generator import ContextGenerator
from pathlib import Path as PathLib
# The spirometry CSV is extracted during report generation
# We need to find it or extract it again
data_dir = PathLib("data")
spirometry_csv_path = data_dir / f"spirometry_{Path(spirometry_pdf.filename).stem}.csv"
# If it doesn't exist, extract it
if not spirometry_csv_path.exists():
spirometry_csv_path = extract_spirometry_table_from_pdf(
str(spirometry_path), output_dir=str(data_dir)
)
spirometry_csv_path = PathLib(spirometry_csv_path)
# Get calculated metrics for display and editing
context_gen = ContextGenerator()
context_gen.load_data(
str(pnoe_path),
str(spirometry_csv_path),
str(seca_path)
)
context_gen.extract_patient_info(last_name) # Extract patient info
spirometry_metrics = context_gen.calculate_spirometry_metrics()
pnoe_metrics = context_gen.calculate_pnoe_metrics()
# Store metrics in session
request.session["metrics"] = {
"spirometry": spirometry_metrics,
"pnoe": pnoe_metrics,
}
request.session["spirometry_csv_path"] = str(spirometry_csv_path)
return RedirectResponse(url="/preview", status_code=303)
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"ERROR: {error_details}")
return render_template("upload.html", {
"request": request,
"session": request.session,
"error": f"Error generating report: {str(e)}"
})
finally:
# Close file handles
spirometry_pdf.file.close()
pnoe_csv.file.close()
seca_excel.file.close()
@app.get("/preview", response_class=HTMLResponse)
async def preview(request: Request):
"""Preview generated report"""
if not request.session.get("report_path"):
return RedirectResponse(url="/", status_code=303)
return render_template("preview.html", {"request": request, "session": request.session})
@app.get("/graphs/{filename}")
async def serve_graph(filename: str):
"""Serve graph images"""
graph_path = GRAPHS_DIR / filename
if not graph_path.exists():
raise HTTPException(status_code=404, detail="Graph not found")
return FileResponse(path=graph_path, media_type="image/png")
@app.get("/edit", response_class=HTMLResponse)
async def edit_form(request: Request):
"""Display edit metrics form"""
if not request.session.get("metrics"):
return RedirectResponse(url="/", status_code=303)
return render_template("edit.html", {"request": request, "session": request.session})
@app.post("/edit")
async def edit_metrics(request: Request):
"""Handle metric edits and regenerate report"""
if not request.session.get("temp_dir") or not request.session.get("patient_info"):
return RedirectResponse(url="/", status_code=303)
# Get form data
form_data = await request.form()
# Build metric overrides
metric_overrides = {
"pnoe": {},
"spirometry": {}
}
# Pnoe overrides
if form_data.get("vo2_max"):
metric_overrides["pnoe"]["vo2_max"] = float(form_data["vo2_max"])
if form_data.get("vo2_max_per_kg"):
metric_overrides["pnoe"]["vo2_max_per_kg"] = float(form_data["vo2_max_per_kg"])
if form_data.get("peak_vt"):
metric_overrides["pnoe"]["peak_vt"] = float(form_data["peak_vt"])
if form_data.get("peak_vt_hr"):
metric_overrides["pnoe"]["peak_vt_hr"] = float(form_data["peak_vt_hr"])
if form_data.get("fat_max_value"):
metric_overrides["pnoe"]["fat_max_value"] = float(form_data["fat_max_value"])
if form_data.get("fat_max_hr"):
metric_overrides["pnoe"]["fat_max_hr"] = float(form_data["fat_max_hr"])
# VT1 and VT2 overrides
if form_data.get("vt1_hr") or form_data.get("vt1_speed") or form_data.get("vt1_time"):
metric_overrides["pnoe"]["vt1"] = {
"HeartRate": float(form_data.get("vt1_hr", 0)),
"Speed": float(form_data.get("vt1_speed", 0)),
"Time": float(form_data.get("vt1_time", 0))
}
if form_data.get("vt2_hr") or form_data.get("vt2_speed") or form_data.get("vt2_time"):
metric_overrides["pnoe"]["vt2"] = {
"HeartRate": float(form_data.get("vt2_hr", 0)),
"Speed": float(form_data.get("vt2_speed", 0)),
"Time": float(form_data.get("vt2_time", 0))
}
# Heart rate zones
for i in range(1, 6):
zone_key = f"zone{i}_bpm"
if form_data.get(zone_key):
metric_overrides["pnoe"][zone_key] = form_data[zone_key]
# Spirometry overrides
if form_data.get("fvc_best"):
metric_overrides["spirometry"]["fvc_best"] = float(form_data["fvc_best"])
if form_data.get("fvc_pred"):
metric_overrides["spirometry"]["fvc_pred"] = float(form_data["fvc_pred"])
if form_data.get("fev1_best"):
metric_overrides["spirometry"]["fev1_best"] = float(form_data["fev1_best"])
if form_data.get("fev1_pred"):
metric_overrides["spirometry"]["fev1_pred"] = float(form_data["fev1_pred"])
if form_data.get("fev1_fvc_pct_best"):
metric_overrides["spirometry"]["fev1_fvc_pct_best"] = float(form_data["fev1_fvc_pct_best"])
if form_data.get("fev1_fvc_pct_pred"):
metric_overrides["spirometry"]["fev1_fvc_pct_pred"] = float(form_data["fev1_fvc_pct_pred"])
try:
# Get file paths from session
temp_dir = Path(request.session["temp_dir"])
patient_info = request.session["patient_info"]
# Find files in temp directory
spirometry_path = None
pnoe_path = None
seca_path = None
for file_path in temp_dir.iterdir():
if file_path.name.startswith("spirometry_"):
spirometry_path = file_path
elif file_path.name.startswith("pnoe_"):
pnoe_path = file_path
elif file_path.name.startswith("seca_"):
seca_path = file_path
if not all([spirometry_path, pnoe_path, seca_path]):
raise ValueError("Could not find all uploaded files")
# Regenerate report with overrides
result = await report_service.generate_report(
spirometry_pdf_path=str(spirometry_path),
pnoe_csv_path=str(pnoe_path),
seca_excel_path=str(seca_path),
patient_info=patient_info,
metric_overrides=metric_overrides if (metric_overrides["pnoe"] or metric_overrides["spirometry"]) else None,
)
# Update session with new report
request.session["report_path"] = result["report_path"]
request.session["graphs_generated"] = result["graphs_generated"]
request.session["analysis_data"] = result["analysis_data"]
# Recalculate metrics with overrides
from services.context_generator import ContextGenerator
context_gen = ContextGenerator()
spirometry_csv_path = request.session.get("spirometry_csv_path", "")
if not spirometry_csv_path or not Path(spirometry_csv_path).exists():
from services.spirometry_table_extractor import extract_spirometry_table_from_pdf
from pathlib import Path as PathLib
data_dir = PathLib("data")
spirometry_csv_path = extract_spirometry_table_from_pdf(
str(spirometry_path), output_dir=str(data_dir)
)
spirometry_csv_path = str(PathLib(spirometry_csv_path))
context_gen.load_data(
str(pnoe_path),
spirometry_csv_path,
str(seca_path)
)
context_gen.extract_patient_info(patient_info.get("last_name", ""))
spirometry_overrides = metric_overrides.get("spirometry", {})
pnoe_overrides = metric_overrides.get("pnoe", {})
spirometry_metrics = context_gen.calculate_spirometry_metrics(spirometry_overrides)
pnoe_metrics = context_gen.calculate_pnoe_metrics(pnoe_overrides)
# Update metrics in session
request.session["metrics"] = {
"spirometry": spirometry_metrics,
"pnoe": pnoe_metrics,
}
request.session["spirometry_csv_path"] = spirometry_csv_path
return RedirectResponse(url="/preview", status_code=303)
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"ERROR: {error_details}")
return render_template("edit.html", {
"request": request,
"session": request.session,
"error": f"Error regenerating report: {str(e)}"
})
@app.get("/health")