Compare commits

...

2 Commits

10 changed files with 286 additions and 86 deletions
+7 -1
View File
@@ -2,4 +2,10 @@
data/
.env
.env
/graphs
/data
/reports
+7 -3
View File
@@ -12,7 +12,6 @@ from pathlib import Path
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.responses import FileResponse
from pydantic import BaseModel
from services.report_generator import ReportGeneratorService
app = FastAPI(
@@ -138,7 +137,7 @@ async def generate_report(
}
# Generate report using the service
result = report_service.generate_report(
result = await report_service.generate_report(
spirometry_pdf_path=str(spirometry_path),
pnoe_csv_path=str(pnoe_path),
seca_excel_path=str(seca_path),
@@ -153,9 +152,14 @@ async def generate_report(
)
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"ERROR: {error_details}") # This will show in terminal
raise HTTPException(
status_code=500,
detail=f"Error generating report: {str(e)}",
detail=f"Error generating report: {str(e)}\n{error_details}",
)
finally:
# Close file handles
+142 -42
View File
@@ -6,7 +6,6 @@ of the medical report. It performs analysis on Pnoe, Spirometry, and SECA data.
"""
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import pandas as pd
@@ -35,28 +34,59 @@ class ContextGenerator:
def _preprocess_pnoe_data(self):
"""Apply preprocessing steps to Pnoe data"""
self.pnoe_df = self.pnoe_df.apply(pd.to_numeric, errors="ignore")
self.pnoe_df["VO2 Pulse"] = self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["HR(bpm)"]
self.pnoe_df["VO2 Breath"] = self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["BF(bpm)"]
self.pnoe_df["CHO"] = self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["CARBS(%)"] / 100
self.pnoe_df["FAT"] = self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["FAT(%)"] / 100
# Convert numeric columns
for col in self.pnoe_df.columns:
try:
self.pnoe_df[col] = pd.to_numeric(self.pnoe_df[col])
except (ValueError, TypeError):
pass
self.pnoe_df["VO2 Pulse"] = (
self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["HR(bpm)"]
)
self.pnoe_df["VO2 Breath"] = (
self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["BF(bpm)"]
)
self.pnoe_df["CHO"] = (
self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["CARBS(%)"] / 100
)
self.pnoe_df["FAT"] = (
self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["FAT(%)"] / 100
)
window_size = 10
columns_to_smooth = ["VO2(ml/min)", "VCO2(ml/min)", "HR(bpm)", "VT(l)", "BF(bpm)", "VE(l/min)", "VO2 Pulse", "VO2 Breath", "CHO", "FAT"]
columns_to_smooth = [
"VO2(ml/min)",
"VCO2(ml/min)",
"HR(bpm)",
"VT(l)",
"BF(bpm)",
"VE(l/min)",
"VO2 Pulse",
"VO2 Breath",
"CHO",
"FAT",
]
for col in columns_to_smooth:
if col in self.pnoe_df.columns:
self.pnoe_df[f"{col}_smoothed"] = self.pnoe_df[col].rolling(window=window_size, min_periods=1).mean()
self.pnoe_df[f"{col}_smoothed"] = (
self.pnoe_df[col].rolling(window=window_size, min_periods=1).mean()
)
def extract_patient_info(self, patient_name: str) -> Dict:
"""Extract patient information from SECA dataset"""
if self.seca_df is not None:
patient_data = self.seca_df[self.seca_df["LastName"].str.contains(patient_name, case=False, na=False)]
patient_data = self.seca_df[
self.seca_df["LastName"].str.contains(
patient_name, case=False, na=False
)
]
if not patient_data.empty:
row = patient_data.iloc[0]
weight_kg = float(row.get("Weight", 0))
fat_pct = float(row.get("Adult_FMP", 0))
self.patient_info = {
"name": f"{row.get('FirstName', '')} {row.get('LastName', '')}",
"first_name": row.get("FirstName", ""),
@@ -75,9 +105,11 @@ class ContextGenerator:
"""Calculate spirometry-related metrics"""
metrics = {}
for param in ["FVC", "FEV1", "FEV1/FVC%"]:
row = self.spirometry_df.loc[self.spirometry_df["Parameters"].str.strip() == param]
row = self.spirometry_df.loc[
self.spirometry_df["Parameters"].str.strip() == param
]
if not row.empty:
param_key = param.lower().replace('/', '_').replace('%', '_pct')
param_key = param.lower().replace("/", "_").replace("%", "_pct")
metrics[f"{param_key}_best"] = row["Best"].values[0]
metrics[f"{param_key}_pred"] = row["%Pred."].values[0]
return metrics
@@ -87,21 +119,21 @@ class ContextGenerator:
metrics = {}
metrics["vo2_max"] = self.pnoe_df["VO2(ml/min)_smoothed"].max()
metrics["vo2_max_per_kg"] = metrics["vo2_max"] / self.patient_info["weight"]
peak_vt_idx = self.pnoe_df["VT(l)_smoothed"].idxmax()
peak_vt_row = self.pnoe_df.loc[peak_vt_idx]
metrics["peak_vt"] = peak_vt_row["VT(l)_smoothed"]
metrics["peak_vt_hr"] = peak_vt_row["HR(bpm)_smoothed"]
fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax()
fat_max_row = self.pnoe_df.loc[fat_max_idx]
metrics["fat_max_value"] = fat_max_row["FAT_smoothed"]
metrics["fat_max_hr"] = fat_max_row["HR(bpm)_smoothed"]
vt1, vt2 = self._detect_thresholds()
metrics["vt1"] = vt1
metrics["vt2"] = vt2
zones = self._calculate_hr_zones(vt1, vt2, fat_max_row)
metrics.update(zones)
return metrics
@@ -110,25 +142,35 @@ class ContextGenerator:
"""Detect VT1 and VT2 thresholds"""
condition = self.pnoe_df["CHO_smoothed"] > self.pnoe_df["FAT_smoothed"]
crossover_indices = condition[condition].index
vt1 = None
if len(crossover_indices) > 0:
vt1_idx = crossover_indices[0]
vt1_row = self.pnoe_df.loc[vt1_idx]
vt1 = {"HeartRate": vt1_row["HR(bpm)_smoothed"], "Speed": vt1_row["Speed"], "Time": vt1_row["T(sec)"]}
vt1 = {
"HeartRate": vt1_row["HR(bpm)_smoothed"],
"Speed": vt1_row["Speed"],
"Time": vt1_row["T(sec)"],
}
ve_slope = self.pnoe_df["VE(l/min)_smoothed"].diff()
second_derivative = ve_slope.diff()
vt2_idx = second_derivative.idxmax()
vt2 = None
if pd.notna(vt2_idx):
vt2_row = self.pnoe_df.loc[vt2_idx]
vt2 = {"HeartRate": vt2_row["HR(bpm)_smoothed"], "Speed": vt2_row["Speed"], "Time": vt2_row["T(sec)"]}
vt2 = {
"HeartRate": vt2_row["HR(bpm)_smoothed"],
"Speed": vt2_row["Speed"],
"Time": vt2_row["T(sec)"],
}
return vt1, vt2
def _calculate_hr_zones(self, vt1: Optional[Dict], vt2: Optional[Dict], fat_max_row: pd.Series) -> Dict:
def _calculate_hr_zones(
self, vt1: Optional[Dict], vt2: Optional[Dict], fat_max_row: pd.Series
) -> Dict:
"""Calculate heart rate zones based on thresholds"""
zones = {}
if vt1 and vt2:
@@ -137,7 +179,7 @@ class ContextGenerator:
zone_3_start = vt1["HeartRate"]
zone_4_start = vt2["HeartRate"] - 10
zone_5_start = vt2["HeartRate"] + 10
zones["zone1_bpm"] = f"{int(zone_1_start)}-{int(zone_2_start)}bpm"
zones["zone2_bpm"] = f"{int(zone_2_start)}-{int(vt1['HeartRate'])}bpm"
zones["zone3_bpm"] = f"{int(zone_3_start)}-{int(zone_4_start)}bpm"
@@ -152,29 +194,87 @@ class ContextGenerator:
zones["zone5_bpm"] = f"{int(max_hr * 0.95)}+bpm"
return zones
def generate_all_contexts(self, patient_name: str, graphs: Dict[str, str]) -> List[Dict]:
def generate_all_contexts(
self, patient_name: str, graphs: Dict[str, str]
) -> List[Dict]:
"""Main method to generate all page contexts"""
self.extract_patient_info(patient_name)
spirometry_metrics = self.calculate_spirometry_metrics()
pnoe_metrics = self.calculate_pnoe_metrics()
contexts = []
contexts.append({"name": self.patient_info["name"], "surname": self.patient_info["last_name"], "date": datetime.now().strftime("%B %d, %Y")})
contexts.append({"patient_name": self.patient_info["name"], "test_date": datetime.now().strftime("%B %d, %Y")})
contexts.append(
{
"name": self.patient_info["name"],
"surname": self.patient_info["last_name"],
"date": datetime.now().strftime("%B %d, %Y"),
}
)
contexts.append(
{
"patient_name": self.patient_info["name"],
"test_date": datetime.now().strftime("%B %d, %Y"),
}
)
for i in range(4):
contexts.append({"patient_name": self.patient_info["name"], "page_number": i + 3})
contexts.append(
{"patient_name": self.patient_info["name"], "page_number": i + 3}
)
fev1_percentage = 0
if spirometry_metrics.get("fvc_best"):
fev1_percentage = (pnoe_metrics["peak_vt"] / spirometry_metrics["fvc_best"]) * 100
contexts.append({"peak_vt": f"{pnoe_metrics['peak_vt']:.2f}", "peak_vt_bpm": f"{int(pnoe_metrics['peak_vt_hr'])}", "fev1_percentage": f"{fev1_percentage:.1f}", "lung_analysis_chart": graphs.get("spirometry_chart", ""), "respiratory_analysis_chart": graphs.get("respiratory", "")})
contexts.append({"vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}", "age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}", "zone1_bpm": pnoe_metrics.get("zone1_bpm", ""), "zone2_bpm": pnoe_metrics.get("zone2_bpm", ""), "zone3_bpm": pnoe_metrics.get("zone3_bpm", ""), "zone4_bpm": pnoe_metrics.get("zone4_bpm", ""), "zone5_bpm": pnoe_metrics.get("zone5_bpm", ""), "vo2_pulse_chart": graphs.get("vo2_pulse", "")})
contexts.append({"fat_max_value": f"{pnoe_metrics['fat_max_value']:.2f}", "fat_max_hr": f"{int(pnoe_metrics['fat_max_hr'])}", "fuel_utilization_chart": graphs.get("fuel_utilization", ""), "fat_metabolism_chart": graphs.get("fat_metabolism", "")})
contexts.append({"fat_percentage": f"{self.patient_info['fat_percentage']:.1f}", "fat_mass_lbs": f"{self.patient_info['fat_mass_lbs']:.1f}", "lean_mass_lbs": f"{self.patient_info['lean_mass_lbs']:.1f}", "body_composition_chart": graphs.get("body_composition", ""), "body_fat_percent_chart": graphs.get("body_fat_percent", "")})
fev1_percentage = (
pnoe_metrics["peak_vt"] / spirometry_metrics["fvc_best"]
) * 100
contexts.append(
{
"peak_vt": f"{pnoe_metrics['peak_vt']:.2f}",
"peak_vt_bpm": f"{int(pnoe_metrics['peak_vt_hr'])}",
"fev1_percentage": f"{fev1_percentage:.1f}",
"lung_analysis_chart": graphs.get("spirometry_chart", ""),
"respiratory_analysis_chart": graphs.get("respiratory", ""),
}
)
contexts.append(
{
"vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}",
"age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}",
"zone1_bpm": pnoe_metrics.get("zone1_bpm", ""),
"zone2_bpm": pnoe_metrics.get("zone2_bpm", ""),
"zone3_bpm": pnoe_metrics.get("zone3_bpm", ""),
"zone4_bpm": pnoe_metrics.get("zone4_bpm", ""),
"zone5_bpm": pnoe_metrics.get("zone5_bpm", ""),
"vo2_pulse_chart": graphs.get("vo2_pulse", ""),
}
)
contexts.append(
{
"fat_max_value": f"{pnoe_metrics['fat_max_value']:.2f}",
"fat_max_hr": f"{int(pnoe_metrics['fat_max_hr'])}",
"fuel_utilization_chart": graphs.get("fuel_utilization", ""),
"fat_metabolism_chart": graphs.get("fat_metabolism", ""),
}
)
contexts.append(
{
"fat_percentage": f"{self.patient_info['fat_percentage']:.1f}",
"fat_mass_lbs": f"{self.patient_info['fat_mass_lbs']:.1f}",
"lean_mass_lbs": f"{self.patient_info['lean_mass_lbs']:.1f}",
"body_composition_chart": graphs.get("body_composition", ""),
"body_fat_percent_chart": graphs.get("body_fat_percent", ""),
}
)
for i in range(9):
contexts.append({"patient_name": self.patient_info["name"], "page_number": i + 11, "vo2_breath_chart": graphs.get("vo2_breath", ""), "recovery_chart": graphs.get("recovery", "")})
contexts.append(
{
"patient_name": self.patient_info["name"],
"page_number": i + 11,
"vo2_breath_chart": graphs.get("vo2_breath", ""),
"recovery_chart": graphs.get("recovery", ""),
}
)
return contexts
+3
View File
@@ -8,6 +8,9 @@ Based on the analysis notebooks in services_dfdf/.
import base64
from pathlib import Path
import matplotlib
matplotlib.use("Agg") # Use non-interactive backend
import matplotlib.pyplot as plt
import matplotlib.transforms as mtransforms
import numpy as np
+41 -29
View File
@@ -10,11 +10,10 @@ from typing import Any, Dict, List
import pandas as pd
from jinja2 import Environment, FileSystemLoader
from playwright.sync_api import sync_playwright
from app.services.context_generator import ContextGenerator
from app.services.graph_generator import GraphGenerator
from app.services.spirometry_table_extractor import extract_spirometry_table_from_pdf
from playwright.async_api import async_playwright
from services.context_generator import ContextGenerator
from services.graph_generator import GraphGenerator
from services.spirometry_table_extractor import extract_spirometry_table_from_pdf
class ReportGeneratorService:
@@ -61,7 +60,13 @@ class ReportGeneratorService:
"""
# Load data
df = pd.read_csv(pnoe_csv_path, delimiter=";")
df = df.apply(pd.to_numeric, errors="ignore")
# Convert numeric columns (updated approach)
for col in df.columns:
try:
df[col] = pd.to_numeric(df[col])
except (ValueError, TypeError):
pass # Keep as-is if not numeric
# Calculate derived columns
df["VO2 Pulse"] = df["VO2(ml/min)"] / df["HR(bpm)"]
@@ -260,7 +265,7 @@ class ReportGeneratorService:
return html_doc
def html_to_pdf(self, html_content: str, pdf_path: str) -> None:
async def html_to_pdf(self, html_content: str, pdf_path: str) -> None:
"""
Convert HTML content to PDF file.
@@ -268,14 +273,14 @@ class ReportGeneratorService:
html_content: HTML content as string
pdf_path: Path where PDF should be saved
"""
with sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page()
page.set_content(html_content)
page.pdf(path=pdf_path, format="A4", print_background=True)
browser.close()
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
await page.set_content(html_content)
await page.pdf(path=pdf_path, format="A4", print_background=True)
await browser.close()
def generate_report(
async def generate_report(
self,
spirometry_pdf_path: str,
pnoe_csv_path: str,
@@ -304,19 +309,18 @@ class ReportGeneratorService:
Dictionary containing report path, graphs generated, and analysis data
"""
# Step 1: Extract spirometry table from PDF
spirometry_csv_path = self.data_dir / "extracted_spirometry_table.csv"
extract_spirometry_table_from_pdf(spirometry_pdf_path)
# The extraction saves to current directory, move it to data_dir
import shutil
if Path("extracted_spirometry_table.csv").exists():
shutil.move("extracted_spirometry_table.csv", spirometry_csv_path)
print("Step 1: Extracting spirometry data from PDF...")
spirometry_csv_path = extract_spirometry_table_from_pdf(
spirometry_pdf_path, output_dir=str(self.data_dir)
)
print(f"Spirometry data saved to: {spirometry_csv_path}")
# Step 2: Process Pnoe data
print("Step 2: Processing Pnoe data...")
df = self.process_pnoe_data(pnoe_csv_path)
# Step 3: Generate all graphs
print("Step 3: Generating graphs...")
graphs_generated = self.generate_graphs(df)
# Create graph dictionary with base64 encoded images
@@ -365,13 +369,20 @@ class ReportGeneratorService:
graphs_dict["body_fat_percent"] = body_fat_b64
# Generate spirometry chart
spirometry_df = pd.read_csv(spirometry_csv_path)
spirometry_chart_b64 = self.graph_generator.generate_spirometry_chart(
spirometry_df, save_as_base64=True
)
graphs_dict["spirometry_chart"] = spirometry_chart_b64
print("Step 4: Generating spirometry chart...")
try:
spirometry_df = pd.read_csv(spirometry_csv_path)
print(f"Spirometry data loaded: {len(spirometry_df)} rows")
spirometry_chart_b64 = self.graph_generator.generate_spirometry_chart(
spirometry_df, save_as_base64=True
)
graphs_dict["spirometry_chart"] = spirometry_chart_b64
except Exception as e:
print(f"Warning: Could not generate spirometry chart: {e}")
graphs_dict["spirometry_chart"] = ""
# Step 4: Generate context for all pages
# Step 5: Generate context for all pages
print("Step 5: Generating page contexts...")
self.context_generator.load_data(
pnoe_csv_path, str(spirometry_csv_path), seca_excel_path
)
@@ -395,7 +406,8 @@ class ReportGeneratorService:
)
report_path = self.reports_dir / output_filename
self.html_to_pdf(html_content, str(report_path))
print(f"Generating PDF report at {report_path}")
await self.html_to_pdf(html_content, str(report_path))
return {
"report_path": str(report_path),
+86 -11
View File
@@ -13,7 +13,21 @@ def encode_pdf_to_base64(pdf_path):
return base64.b64encode(pdf_file.read()).decode("utf-8")
def extract_spirometry_table_from_pdf(pdf_path):
def extract_spirometry_table_from_pdf(pdf_path, output_dir="data"):
"""
Extract spirometry table from PDF using AI and save as clean CSV.
Args:
pdf_path: Path to the spirometry PDF file
output_dir: Directory to save the extracted CSV
Returns:
Path to the saved CSV file
"""
import csv
import re
from pathlib import Path
url = "https://openrouter.ai/api/v1/chat/completions"
headers = {
"Authorization": f"Bearer {API_KEY_REF}",
@@ -30,10 +44,17 @@ def extract_spirometry_table_from_pdf(pdf_path):
"content": [
{
"type": "text",
"text": "Please extract the Spirometry table from the pdf and return the values in csv format, "
"note that it is the unit of parameter that is beside it and it should not be a column. "
"The '-' Should be treated as empty values."
"do not add 'csv' at the start or end of the response",
"text": "Please extract the Spirometry table from the pdf and return ONLY the values in CSV format. "
"The CSV should have these columns: Parameters,Pre,Best,LLN,Pred.,%Pred.,ZScore\n"
"Rules:\n"
"1. Include ONLY the data rows (FVC, FEV1, FEV1/FVC%, etc.)\n"
"2. Do NOT include units in the data (units are part of parameter name)\n"
"3. Use empty string for missing values (not '-' or 'N/A')\n"
"4. Do NOT add 'csv' markers or code blocks\n"
"5. First line should be the header\n"
"Example format:\n"
"Parameters,Pre,Best,LLN,Pred.,%Pred.,ZScore\n"
"FVC,4.50,4.75,3.20,4.80,99,-0.10",
},
{
"type": "file",
@@ -54,11 +75,65 @@ def extract_spirometry_table_from_pdf(pdf_path):
if "choices" in response_data and len(response_data["choices"]) > 0:
content = response_data["choices"][0]["message"]["content"]
# Save to a CSV file
output_file = "extracted_spirometry_table.csv"
with open(output_file, "w", encoding="utf-8") as f:
f.write(content)
# Clean the content - remove markdown code blocks if present
content = re.sub(r"```csv\n?", "", content)
content = re.sub(r"```\n?", "", content)
content = content.strip()
return f"Extracted table saved to {output_file}"
# Parse and validate CSV
lines = content.split("\n")
if not lines:
raise ValueError("No data extracted from PDF")
# Ensure output directory exists
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
output_file = output_path / "extracted_spirometry_table.csv"
# Write cleaned CSV with proper formatting
with open(output_file, "w", encoding="utf-8", newline="") as f:
# Parse the first line as header
header_line = lines[0].strip()
if "," in header_line:
header = [col.strip() for col in header_line.split(",")]
else:
# Default header if not provided
header = [
"Parameters",
"Pre",
"Best",
"LLN",
"Pred.",
"%Pred.",
"ZScore",
]
writer = csv.writer(f)
writer.writerow(header)
# Process data rows
for line in lines[1:]:
line = line.strip()
if not line:
continue
# Split by comma and clean each field
fields = [field.strip() for field in line.split(",")]
# Ensure we have the right number of fields
if len(fields) < len(header):
# Pad with empty strings
fields.extend([""] * (len(header) - len(fields)))
elif len(fields) > len(header):
# Take only the first N fields
fields = fields[: len(header)]
# Replace '-' or 'N/A' with empty string
fields = ["" if f in ["-", "N/A", "n/a", "NA"] else f for f in fields]
writer.writerow(fields)
return str(output_file)
else:
return "No content found in response"
error_msg = response_data.get("error", {}).get("message", "Unknown error")
raise Exception(f"No content found in response: {error_msg}")