Compare commits

...

2 Commits

10 changed files with 286 additions and 86 deletions
+7 -1
View File
@@ -2,4 +2,10 @@
data/ data/
.env .env
/graphs
/data
/reports
+7 -3
View File
@@ -12,7 +12,6 @@ from pathlib import Path
from fastapi import FastAPI, File, Form, HTTPException, UploadFile from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.responses import FileResponse from fastapi.responses import FileResponse
from pydantic import BaseModel from pydantic import BaseModel
from services.report_generator import ReportGeneratorService from services.report_generator import ReportGeneratorService
app = FastAPI( app = FastAPI(
@@ -138,7 +137,7 @@ async def generate_report(
} }
# Generate report using the service # Generate report using the service
result = report_service.generate_report( result = await report_service.generate_report(
spirometry_pdf_path=str(spirometry_path), spirometry_pdf_path=str(spirometry_path),
pnoe_csv_path=str(pnoe_path), pnoe_csv_path=str(pnoe_path),
seca_excel_path=str(seca_path), seca_excel_path=str(seca_path),
@@ -153,9 +152,14 @@ async def generate_report(
) )
except Exception as e: except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"ERROR: {error_details}") # This will show in terminal
raise HTTPException( raise HTTPException(
status_code=500, status_code=500,
detail=f"Error generating report: {str(e)}", detail=f"Error generating report: {str(e)}\n{error_details}",
) )
finally: finally:
# Close file handles # Close file handles
+142 -42
View File
@@ -6,7 +6,6 @@ of the medical report. It performs analysis on Pnoe, Spirometry, and SECA data.
""" """
from datetime import datetime from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
import pandas as pd import pandas as pd
@@ -35,28 +34,59 @@ class ContextGenerator:
def _preprocess_pnoe_data(self): def _preprocess_pnoe_data(self):
"""Apply preprocessing steps to Pnoe data""" """Apply preprocessing steps to Pnoe data"""
self.pnoe_df = self.pnoe_df.apply(pd.to_numeric, errors="ignore") # Convert numeric columns
self.pnoe_df["VO2 Pulse"] = self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["HR(bpm)"] for col in self.pnoe_df.columns:
self.pnoe_df["VO2 Breath"] = self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["BF(bpm)"] try:
self.pnoe_df["CHO"] = self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["CARBS(%)"] / 100 self.pnoe_df[col] = pd.to_numeric(self.pnoe_df[col])
self.pnoe_df["FAT"] = self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["FAT(%)"] / 100 except (ValueError, TypeError):
pass
self.pnoe_df["VO2 Pulse"] = (
self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["HR(bpm)"]
)
self.pnoe_df["VO2 Breath"] = (
self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["BF(bpm)"]
)
self.pnoe_df["CHO"] = (
self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["CARBS(%)"] / 100
)
self.pnoe_df["FAT"] = (
self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["FAT(%)"] / 100
)
window_size = 10 window_size = 10
columns_to_smooth = ["VO2(ml/min)", "VCO2(ml/min)", "HR(bpm)", "VT(l)", "BF(bpm)", "VE(l/min)", "VO2 Pulse", "VO2 Breath", "CHO", "FAT"] columns_to_smooth = [
"VO2(ml/min)",
"VCO2(ml/min)",
"HR(bpm)",
"VT(l)",
"BF(bpm)",
"VE(l/min)",
"VO2 Pulse",
"VO2 Breath",
"CHO",
"FAT",
]
for col in columns_to_smooth: for col in columns_to_smooth:
if col in self.pnoe_df.columns: if col in self.pnoe_df.columns:
self.pnoe_df[f"{col}_smoothed"] = self.pnoe_df[col].rolling(window=window_size, min_periods=1).mean() self.pnoe_df[f"{col}_smoothed"] = (
self.pnoe_df[col].rolling(window=window_size, min_periods=1).mean()
)
def extract_patient_info(self, patient_name: str) -> Dict: def extract_patient_info(self, patient_name: str) -> Dict:
"""Extract patient information from SECA dataset""" """Extract patient information from SECA dataset"""
if self.seca_df is not None: if self.seca_df is not None:
patient_data = self.seca_df[self.seca_df["LastName"].str.contains(patient_name, case=False, na=False)] patient_data = self.seca_df[
self.seca_df["LastName"].str.contains(
patient_name, case=False, na=False
)
]
if not patient_data.empty: if not patient_data.empty:
row = patient_data.iloc[0] row = patient_data.iloc[0]
weight_kg = float(row.get("Weight", 0)) weight_kg = float(row.get("Weight", 0))
fat_pct = float(row.get("Adult_FMP", 0)) fat_pct = float(row.get("Adult_FMP", 0))
self.patient_info = { self.patient_info = {
"name": f"{row.get('FirstName', '')} {row.get('LastName', '')}", "name": f"{row.get('FirstName', '')} {row.get('LastName', '')}",
"first_name": row.get("FirstName", ""), "first_name": row.get("FirstName", ""),
@@ -75,9 +105,11 @@ class ContextGenerator:
"""Calculate spirometry-related metrics""" """Calculate spirometry-related metrics"""
metrics = {} metrics = {}
for param in ["FVC", "FEV1", "FEV1/FVC%"]: for param in ["FVC", "FEV1", "FEV1/FVC%"]:
row = self.spirometry_df.loc[self.spirometry_df["Parameters"].str.strip() == param] row = self.spirometry_df.loc[
self.spirometry_df["Parameters"].str.strip() == param
]
if not row.empty: if not row.empty:
param_key = param.lower().replace('/', '_').replace('%', '_pct') param_key = param.lower().replace("/", "_").replace("%", "_pct")
metrics[f"{param_key}_best"] = row["Best"].values[0] metrics[f"{param_key}_best"] = row["Best"].values[0]
metrics[f"{param_key}_pred"] = row["%Pred."].values[0] metrics[f"{param_key}_pred"] = row["%Pred."].values[0]
return metrics return metrics
@@ -87,21 +119,21 @@ class ContextGenerator:
metrics = {} metrics = {}
metrics["vo2_max"] = self.pnoe_df["VO2(ml/min)_smoothed"].max() metrics["vo2_max"] = self.pnoe_df["VO2(ml/min)_smoothed"].max()
metrics["vo2_max_per_kg"] = metrics["vo2_max"] / self.patient_info["weight"] metrics["vo2_max_per_kg"] = metrics["vo2_max"] / self.patient_info["weight"]
peak_vt_idx = self.pnoe_df["VT(l)_smoothed"].idxmax() peak_vt_idx = self.pnoe_df["VT(l)_smoothed"].idxmax()
peak_vt_row = self.pnoe_df.loc[peak_vt_idx] peak_vt_row = self.pnoe_df.loc[peak_vt_idx]
metrics["peak_vt"] = peak_vt_row["VT(l)_smoothed"] metrics["peak_vt"] = peak_vt_row["VT(l)_smoothed"]
metrics["peak_vt_hr"] = peak_vt_row["HR(bpm)_smoothed"] metrics["peak_vt_hr"] = peak_vt_row["HR(bpm)_smoothed"]
fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax() fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax()
fat_max_row = self.pnoe_df.loc[fat_max_idx] fat_max_row = self.pnoe_df.loc[fat_max_idx]
metrics["fat_max_value"] = fat_max_row["FAT_smoothed"] metrics["fat_max_value"] = fat_max_row["FAT_smoothed"]
metrics["fat_max_hr"] = fat_max_row["HR(bpm)_smoothed"] metrics["fat_max_hr"] = fat_max_row["HR(bpm)_smoothed"]
vt1, vt2 = self._detect_thresholds() vt1, vt2 = self._detect_thresholds()
metrics["vt1"] = vt1 metrics["vt1"] = vt1
metrics["vt2"] = vt2 metrics["vt2"] = vt2
zones = self._calculate_hr_zones(vt1, vt2, fat_max_row) zones = self._calculate_hr_zones(vt1, vt2, fat_max_row)
metrics.update(zones) metrics.update(zones)
return metrics return metrics
@@ -110,25 +142,35 @@ class ContextGenerator:
"""Detect VT1 and VT2 thresholds""" """Detect VT1 and VT2 thresholds"""
condition = self.pnoe_df["CHO_smoothed"] > self.pnoe_df["FAT_smoothed"] condition = self.pnoe_df["CHO_smoothed"] > self.pnoe_df["FAT_smoothed"]
crossover_indices = condition[condition].index crossover_indices = condition[condition].index
vt1 = None vt1 = None
if len(crossover_indices) > 0: if len(crossover_indices) > 0:
vt1_idx = crossover_indices[0] vt1_idx = crossover_indices[0]
vt1_row = self.pnoe_df.loc[vt1_idx] vt1_row = self.pnoe_df.loc[vt1_idx]
vt1 = {"HeartRate": vt1_row["HR(bpm)_smoothed"], "Speed": vt1_row["Speed"], "Time": vt1_row["T(sec)"]} vt1 = {
"HeartRate": vt1_row["HR(bpm)_smoothed"],
"Speed": vt1_row["Speed"],
"Time": vt1_row["T(sec)"],
}
ve_slope = self.pnoe_df["VE(l/min)_smoothed"].diff() ve_slope = self.pnoe_df["VE(l/min)_smoothed"].diff()
second_derivative = ve_slope.diff() second_derivative = ve_slope.diff()
vt2_idx = second_derivative.idxmax() vt2_idx = second_derivative.idxmax()
vt2 = None vt2 = None
if pd.notna(vt2_idx): if pd.notna(vt2_idx):
vt2_row = self.pnoe_df.loc[vt2_idx] vt2_row = self.pnoe_df.loc[vt2_idx]
vt2 = {"HeartRate": vt2_row["HR(bpm)_smoothed"], "Speed": vt2_row["Speed"], "Time": vt2_row["T(sec)"]} vt2 = {
"HeartRate": vt2_row["HR(bpm)_smoothed"],
"Speed": vt2_row["Speed"],
"Time": vt2_row["T(sec)"],
}
return vt1, vt2 return vt1, vt2
def _calculate_hr_zones(self, vt1: Optional[Dict], vt2: Optional[Dict], fat_max_row: pd.Series) -> Dict: def _calculate_hr_zones(
self, vt1: Optional[Dict], vt2: Optional[Dict], fat_max_row: pd.Series
) -> Dict:
"""Calculate heart rate zones based on thresholds""" """Calculate heart rate zones based on thresholds"""
zones = {} zones = {}
if vt1 and vt2: if vt1 and vt2:
@@ -137,7 +179,7 @@ class ContextGenerator:
zone_3_start = vt1["HeartRate"] zone_3_start = vt1["HeartRate"]
zone_4_start = vt2["HeartRate"] - 10 zone_4_start = vt2["HeartRate"] - 10
zone_5_start = vt2["HeartRate"] + 10 zone_5_start = vt2["HeartRate"] + 10
zones["zone1_bpm"] = f"{int(zone_1_start)}-{int(zone_2_start)}bpm" zones["zone1_bpm"] = f"{int(zone_1_start)}-{int(zone_2_start)}bpm"
zones["zone2_bpm"] = f"{int(zone_2_start)}-{int(vt1['HeartRate'])}bpm" zones["zone2_bpm"] = f"{int(zone_2_start)}-{int(vt1['HeartRate'])}bpm"
zones["zone3_bpm"] = f"{int(zone_3_start)}-{int(zone_4_start)}bpm" zones["zone3_bpm"] = f"{int(zone_3_start)}-{int(zone_4_start)}bpm"
@@ -152,29 +194,87 @@ class ContextGenerator:
zones["zone5_bpm"] = f"{int(max_hr * 0.95)}+bpm" zones["zone5_bpm"] = f"{int(max_hr * 0.95)}+bpm"
return zones return zones
def generate_all_contexts(self, patient_name: str, graphs: Dict[str, str]) -> List[Dict]: def generate_all_contexts(
self, patient_name: str, graphs: Dict[str, str]
) -> List[Dict]:
"""Main method to generate all page contexts""" """Main method to generate all page contexts"""
self.extract_patient_info(patient_name) self.extract_patient_info(patient_name)
spirometry_metrics = self.calculate_spirometry_metrics() spirometry_metrics = self.calculate_spirometry_metrics()
pnoe_metrics = self.calculate_pnoe_metrics() pnoe_metrics = self.calculate_pnoe_metrics()
contexts = [] contexts = []
contexts.append({"name": self.patient_info["name"], "surname": self.patient_info["last_name"], "date": datetime.now().strftime("%B %d, %Y")}) contexts.append(
contexts.append({"patient_name": self.patient_info["name"], "test_date": datetime.now().strftime("%B %d, %Y")}) {
"name": self.patient_info["name"],
"surname": self.patient_info["last_name"],
"date": datetime.now().strftime("%B %d, %Y"),
}
)
contexts.append(
{
"patient_name": self.patient_info["name"],
"test_date": datetime.now().strftime("%B %d, %Y"),
}
)
for i in range(4): for i in range(4):
contexts.append({"patient_name": self.patient_info["name"], "page_number": i + 3}) contexts.append(
{"patient_name": self.patient_info["name"], "page_number": i + 3}
)
fev1_percentage = 0 fev1_percentage = 0
if spirometry_metrics.get("fvc_best"): if spirometry_metrics.get("fvc_best"):
fev1_percentage = (pnoe_metrics["peak_vt"] / spirometry_metrics["fvc_best"]) * 100 fev1_percentage = (
pnoe_metrics["peak_vt"] / spirometry_metrics["fvc_best"]
contexts.append({"peak_vt": f"{pnoe_metrics['peak_vt']:.2f}", "peak_vt_bpm": f"{int(pnoe_metrics['peak_vt_hr'])}", "fev1_percentage": f"{fev1_percentage:.1f}", "lung_analysis_chart": graphs.get("spirometry_chart", ""), "respiratory_analysis_chart": graphs.get("respiratory", "")}) ) * 100
contexts.append({"vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}", "age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}", "zone1_bpm": pnoe_metrics.get("zone1_bpm", ""), "zone2_bpm": pnoe_metrics.get("zone2_bpm", ""), "zone3_bpm": pnoe_metrics.get("zone3_bpm", ""), "zone4_bpm": pnoe_metrics.get("zone4_bpm", ""), "zone5_bpm": pnoe_metrics.get("zone5_bpm", ""), "vo2_pulse_chart": graphs.get("vo2_pulse", "")})
contexts.append({"fat_max_value": f"{pnoe_metrics['fat_max_value']:.2f}", "fat_max_hr": f"{int(pnoe_metrics['fat_max_hr'])}", "fuel_utilization_chart": graphs.get("fuel_utilization", ""), "fat_metabolism_chart": graphs.get("fat_metabolism", "")}) contexts.append(
contexts.append({"fat_percentage": f"{self.patient_info['fat_percentage']:.1f}", "fat_mass_lbs": f"{self.patient_info['fat_mass_lbs']:.1f}", "lean_mass_lbs": f"{self.patient_info['lean_mass_lbs']:.1f}", "body_composition_chart": graphs.get("body_composition", ""), "body_fat_percent_chart": graphs.get("body_fat_percent", "")}) {
"peak_vt": f"{pnoe_metrics['peak_vt']:.2f}",
"peak_vt_bpm": f"{int(pnoe_metrics['peak_vt_hr'])}",
"fev1_percentage": f"{fev1_percentage:.1f}",
"lung_analysis_chart": graphs.get("spirometry_chart", ""),
"respiratory_analysis_chart": graphs.get("respiratory", ""),
}
)
contexts.append(
{
"vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}",
"age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}",
"zone1_bpm": pnoe_metrics.get("zone1_bpm", ""),
"zone2_bpm": pnoe_metrics.get("zone2_bpm", ""),
"zone3_bpm": pnoe_metrics.get("zone3_bpm", ""),
"zone4_bpm": pnoe_metrics.get("zone4_bpm", ""),
"zone5_bpm": pnoe_metrics.get("zone5_bpm", ""),
"vo2_pulse_chart": graphs.get("vo2_pulse", ""),
}
)
contexts.append(
{
"fat_max_value": f"{pnoe_metrics['fat_max_value']:.2f}",
"fat_max_hr": f"{int(pnoe_metrics['fat_max_hr'])}",
"fuel_utilization_chart": graphs.get("fuel_utilization", ""),
"fat_metabolism_chart": graphs.get("fat_metabolism", ""),
}
)
contexts.append(
{
"fat_percentage": f"{self.patient_info['fat_percentage']:.1f}",
"fat_mass_lbs": f"{self.patient_info['fat_mass_lbs']:.1f}",
"lean_mass_lbs": f"{self.patient_info['lean_mass_lbs']:.1f}",
"body_composition_chart": graphs.get("body_composition", ""),
"body_fat_percent_chart": graphs.get("body_fat_percent", ""),
}
)
for i in range(9): for i in range(9):
contexts.append({"patient_name": self.patient_info["name"], "page_number": i + 11, "vo2_breath_chart": graphs.get("vo2_breath", ""), "recovery_chart": graphs.get("recovery", "")}) contexts.append(
{
"patient_name": self.patient_info["name"],
"page_number": i + 11,
"vo2_breath_chart": graphs.get("vo2_breath", ""),
"recovery_chart": graphs.get("recovery", ""),
}
)
return contexts return contexts
+3
View File
@@ -8,6 +8,9 @@ Based on the analysis notebooks in services_dfdf/.
import base64 import base64
from pathlib import Path from pathlib import Path
import matplotlib
matplotlib.use("Agg") # Use non-interactive backend
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import matplotlib.transforms as mtransforms import matplotlib.transforms as mtransforms
import numpy as np import numpy as np
+41 -29
View File
@@ -10,11 +10,10 @@ from typing import Any, Dict, List
import pandas as pd import pandas as pd
from jinja2 import Environment, FileSystemLoader from jinja2 import Environment, FileSystemLoader
from playwright.sync_api import sync_playwright from playwright.async_api import async_playwright
from services.context_generator import ContextGenerator
from app.services.context_generator import ContextGenerator from services.graph_generator import GraphGenerator
from app.services.graph_generator import GraphGenerator from services.spirometry_table_extractor import extract_spirometry_table_from_pdf
from app.services.spirometry_table_extractor import extract_spirometry_table_from_pdf
class ReportGeneratorService: class ReportGeneratorService:
@@ -61,7 +60,13 @@ class ReportGeneratorService:
""" """
# Load data # Load data
df = pd.read_csv(pnoe_csv_path, delimiter=";") df = pd.read_csv(pnoe_csv_path, delimiter=";")
df = df.apply(pd.to_numeric, errors="ignore")
# Convert numeric columns (updated approach)
for col in df.columns:
try:
df[col] = pd.to_numeric(df[col])
except (ValueError, TypeError):
pass # Keep as-is if not numeric
# Calculate derived columns # Calculate derived columns
df["VO2 Pulse"] = df["VO2(ml/min)"] / df["HR(bpm)"] df["VO2 Pulse"] = df["VO2(ml/min)"] / df["HR(bpm)"]
@@ -260,7 +265,7 @@ class ReportGeneratorService:
return html_doc return html_doc
def html_to_pdf(self, html_content: str, pdf_path: str) -> None: async def html_to_pdf(self, html_content: str, pdf_path: str) -> None:
""" """
Convert HTML content to PDF file. Convert HTML content to PDF file.
@@ -268,14 +273,14 @@ class ReportGeneratorService:
html_content: HTML content as string html_content: HTML content as string
pdf_path: Path where PDF should be saved pdf_path: Path where PDF should be saved
""" """
with sync_playwright() as p: async with async_playwright() as p:
browser = p.chromium.launch() browser = await p.chromium.launch()
page = browser.new_page() page = await browser.new_page()
page.set_content(html_content) await page.set_content(html_content)
page.pdf(path=pdf_path, format="A4", print_background=True) await page.pdf(path=pdf_path, format="A4", print_background=True)
browser.close() await browser.close()
def generate_report( async def generate_report(
self, self,
spirometry_pdf_path: str, spirometry_pdf_path: str,
pnoe_csv_path: str, pnoe_csv_path: str,
@@ -304,19 +309,18 @@ class ReportGeneratorService:
Dictionary containing report path, graphs generated, and analysis data Dictionary containing report path, graphs generated, and analysis data
""" """
# Step 1: Extract spirometry table from PDF # Step 1: Extract spirometry table from PDF
spirometry_csv_path = self.data_dir / "extracted_spirometry_table.csv" print("Step 1: Extracting spirometry data from PDF...")
extract_spirometry_table_from_pdf(spirometry_pdf_path) spirometry_csv_path = extract_spirometry_table_from_pdf(
spirometry_pdf_path, output_dir=str(self.data_dir)
# The extraction saves to current directory, move it to data_dir )
import shutil print(f"Spirometry data saved to: {spirometry_csv_path}")
if Path("extracted_spirometry_table.csv").exists():
shutil.move("extracted_spirometry_table.csv", spirometry_csv_path)
# Step 2: Process Pnoe data # Step 2: Process Pnoe data
print("Step 2: Processing Pnoe data...")
df = self.process_pnoe_data(pnoe_csv_path) df = self.process_pnoe_data(pnoe_csv_path)
# Step 3: Generate all graphs # Step 3: Generate all graphs
print("Step 3: Generating graphs...")
graphs_generated = self.generate_graphs(df) graphs_generated = self.generate_graphs(df)
# Create graph dictionary with base64 encoded images # Create graph dictionary with base64 encoded images
@@ -365,13 +369,20 @@ class ReportGeneratorService:
graphs_dict["body_fat_percent"] = body_fat_b64 graphs_dict["body_fat_percent"] = body_fat_b64
# Generate spirometry chart # Generate spirometry chart
spirometry_df = pd.read_csv(spirometry_csv_path) print("Step 4: Generating spirometry chart...")
spirometry_chart_b64 = self.graph_generator.generate_spirometry_chart( try:
spirometry_df, save_as_base64=True spirometry_df = pd.read_csv(spirometry_csv_path)
) print(f"Spirometry data loaded: {len(spirometry_df)} rows")
graphs_dict["spirometry_chart"] = spirometry_chart_b64 spirometry_chart_b64 = self.graph_generator.generate_spirometry_chart(
spirometry_df, save_as_base64=True
)
graphs_dict["spirometry_chart"] = spirometry_chart_b64
except Exception as e:
print(f"Warning: Could not generate spirometry chart: {e}")
graphs_dict["spirometry_chart"] = ""
# Step 4: Generate context for all pages # Step 5: Generate context for all pages
print("Step 5: Generating page contexts...")
self.context_generator.load_data( self.context_generator.load_data(
pnoe_csv_path, str(spirometry_csv_path), seca_excel_path pnoe_csv_path, str(spirometry_csv_path), seca_excel_path
) )
@@ -395,7 +406,8 @@ class ReportGeneratorService:
) )
report_path = self.reports_dir / output_filename report_path = self.reports_dir / output_filename
self.html_to_pdf(html_content, str(report_path)) print(f"Generating PDF report at {report_path}")
await self.html_to_pdf(html_content, str(report_path))
return { return {
"report_path": str(report_path), "report_path": str(report_path),
+86 -11
View File
@@ -13,7 +13,21 @@ def encode_pdf_to_base64(pdf_path):
return base64.b64encode(pdf_file.read()).decode("utf-8") return base64.b64encode(pdf_file.read()).decode("utf-8")
def extract_spirometry_table_from_pdf(pdf_path): def extract_spirometry_table_from_pdf(pdf_path, output_dir="data"):
"""
Extract spirometry table from PDF using AI and save as clean CSV.
Args:
pdf_path: Path to the spirometry PDF file
output_dir: Directory to save the extracted CSV
Returns:
Path to the saved CSV file
"""
import csv
import re
from pathlib import Path
url = "https://openrouter.ai/api/v1/chat/completions" url = "https://openrouter.ai/api/v1/chat/completions"
headers = { headers = {
"Authorization": f"Bearer {API_KEY_REF}", "Authorization": f"Bearer {API_KEY_REF}",
@@ -30,10 +44,17 @@ def extract_spirometry_table_from_pdf(pdf_path):
"content": [ "content": [
{ {
"type": "text", "type": "text",
"text": "Please extract the Spirometry table from the pdf and return the values in csv format, " "text": "Please extract the Spirometry table from the pdf and return ONLY the values in CSV format. "
"note that it is the unit of parameter that is beside it and it should not be a column. " "The CSV should have these columns: Parameters,Pre,Best,LLN,Pred.,%Pred.,ZScore\n"
"The '-' Should be treated as empty values." "Rules:\n"
"do not add 'csv' at the start or end of the response", "1. Include ONLY the data rows (FVC, FEV1, FEV1/FVC%, etc.)\n"
"2. Do NOT include units in the data (units are part of parameter name)\n"
"3. Use empty string for missing values (not '-' or 'N/A')\n"
"4. Do NOT add 'csv' markers or code blocks\n"
"5. First line should be the header\n"
"Example format:\n"
"Parameters,Pre,Best,LLN,Pred.,%Pred.,ZScore\n"
"FVC,4.50,4.75,3.20,4.80,99,-0.10",
}, },
{ {
"type": "file", "type": "file",
@@ -54,11 +75,65 @@ def extract_spirometry_table_from_pdf(pdf_path):
if "choices" in response_data and len(response_data["choices"]) > 0: if "choices" in response_data and len(response_data["choices"]) > 0:
content = response_data["choices"][0]["message"]["content"] content = response_data["choices"][0]["message"]["content"]
# Save to a CSV file # Clean the content - remove markdown code blocks if present
output_file = "extracted_spirometry_table.csv" content = re.sub(r"```csv\n?", "", content)
with open(output_file, "w", encoding="utf-8") as f: content = re.sub(r"```\n?", "", content)
f.write(content) content = content.strip()
return f"Extracted table saved to {output_file}" # Parse and validate CSV
lines = content.split("\n")
if not lines:
raise ValueError("No data extracted from PDF")
# Ensure output directory exists
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
output_file = output_path / "extracted_spirometry_table.csv"
# Write cleaned CSV with proper formatting
with open(output_file, "w", encoding="utf-8", newline="") as f:
# Parse the first line as header
header_line = lines[0].strip()
if "," in header_line:
header = [col.strip() for col in header_line.split(",")]
else:
# Default header if not provided
header = [
"Parameters",
"Pre",
"Best",
"LLN",
"Pred.",
"%Pred.",
"ZScore",
]
writer = csv.writer(f)
writer.writerow(header)
# Process data rows
for line in lines[1:]:
line = line.strip()
if not line:
continue
# Split by comma and clean each field
fields = [field.strip() for field in line.split(",")]
# Ensure we have the right number of fields
if len(fields) < len(header):
# Pad with empty strings
fields.extend([""] * (len(header) - len(fields)))
elif len(fields) > len(header):
# Take only the first N fields
fields = fields[: len(header)]
# Replace '-' or 'N/A' with empty string
fields = ["" if f in ["-", "N/A", "n/a", "NA"] else f for f in fields]
writer.writerow(fields)
return str(output_file)
else: else:
return "No content found in response" error_msg = response_data.get("error", {}).get("message", "Unknown error")
raise Exception(f"No content found in response: {error_msg}")