Compare commits
2 Commits
d66f3fd18b
...
0a735d88c8
| Author | SHA1 | Date | |
|---|---|---|---|
| 0a735d88c8 | |||
| 358898b7db |
+7
-1
@@ -2,4 +2,10 @@
|
||||
|
||||
data/
|
||||
|
||||
.env
|
||||
.env
|
||||
|
||||
/graphs
|
||||
|
||||
/data
|
||||
|
||||
/reports
|
||||
+7
-3
@@ -12,7 +12,6 @@ from pathlib import Path
|
||||
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
|
||||
from fastapi.responses import FileResponse
|
||||
from pydantic import BaseModel
|
||||
|
||||
from services.report_generator import ReportGeneratorService
|
||||
|
||||
app = FastAPI(
|
||||
@@ -138,7 +137,7 @@ async def generate_report(
|
||||
}
|
||||
|
||||
# Generate report using the service
|
||||
result = report_service.generate_report(
|
||||
result = await report_service.generate_report(
|
||||
spirometry_pdf_path=str(spirometry_path),
|
||||
pnoe_csv_path=str(pnoe_path),
|
||||
seca_excel_path=str(seca_path),
|
||||
@@ -153,9 +152,14 @@ async def generate_report(
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
import traceback
|
||||
|
||||
error_details = traceback.format_exc()
|
||||
print(f"ERROR: {error_details}") # This will show in terminal
|
||||
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail=f"Error generating report: {str(e)}",
|
||||
detail=f"Error generating report: {str(e)}\n{error_details}",
|
||||
)
|
||||
finally:
|
||||
# Close file handles
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -6,7 +6,6 @@ of the medical report. It performs analysis on Pnoe, Spirometry, and SECA data.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
import pandas as pd
|
||||
@@ -35,28 +34,59 @@ class ContextGenerator:
|
||||
|
||||
def _preprocess_pnoe_data(self):
|
||||
"""Apply preprocessing steps to Pnoe data"""
|
||||
self.pnoe_df = self.pnoe_df.apply(pd.to_numeric, errors="ignore")
|
||||
self.pnoe_df["VO2 Pulse"] = self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["HR(bpm)"]
|
||||
self.pnoe_df["VO2 Breath"] = self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["BF(bpm)"]
|
||||
self.pnoe_df["CHO"] = self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["CARBS(%)"] / 100
|
||||
self.pnoe_df["FAT"] = self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["FAT(%)"] / 100
|
||||
|
||||
# Convert numeric columns
|
||||
for col in self.pnoe_df.columns:
|
||||
try:
|
||||
self.pnoe_df[col] = pd.to_numeric(self.pnoe_df[col])
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
self.pnoe_df["VO2 Pulse"] = (
|
||||
self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["HR(bpm)"]
|
||||
)
|
||||
self.pnoe_df["VO2 Breath"] = (
|
||||
self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["BF(bpm)"]
|
||||
)
|
||||
self.pnoe_df["CHO"] = (
|
||||
self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["CARBS(%)"] / 100
|
||||
)
|
||||
self.pnoe_df["FAT"] = (
|
||||
self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["FAT(%)"] / 100
|
||||
)
|
||||
|
||||
window_size = 10
|
||||
columns_to_smooth = ["VO2(ml/min)", "VCO2(ml/min)", "HR(bpm)", "VT(l)", "BF(bpm)", "VE(l/min)", "VO2 Pulse", "VO2 Breath", "CHO", "FAT"]
|
||||
|
||||
columns_to_smooth = [
|
||||
"VO2(ml/min)",
|
||||
"VCO2(ml/min)",
|
||||
"HR(bpm)",
|
||||
"VT(l)",
|
||||
"BF(bpm)",
|
||||
"VE(l/min)",
|
||||
"VO2 Pulse",
|
||||
"VO2 Breath",
|
||||
"CHO",
|
||||
"FAT",
|
||||
]
|
||||
|
||||
for col in columns_to_smooth:
|
||||
if col in self.pnoe_df.columns:
|
||||
self.pnoe_df[f"{col}_smoothed"] = self.pnoe_df[col].rolling(window=window_size, min_periods=1).mean()
|
||||
self.pnoe_df[f"{col}_smoothed"] = (
|
||||
self.pnoe_df[col].rolling(window=window_size, min_periods=1).mean()
|
||||
)
|
||||
|
||||
def extract_patient_info(self, patient_name: str) -> Dict:
|
||||
"""Extract patient information from SECA dataset"""
|
||||
if self.seca_df is not None:
|
||||
patient_data = self.seca_df[self.seca_df["LastName"].str.contains(patient_name, case=False, na=False)]
|
||||
patient_data = self.seca_df[
|
||||
self.seca_df["LastName"].str.contains(
|
||||
patient_name, case=False, na=False
|
||||
)
|
||||
]
|
||||
if not patient_data.empty:
|
||||
row = patient_data.iloc[0]
|
||||
weight_kg = float(row.get("Weight", 0))
|
||||
fat_pct = float(row.get("Adult_FMP", 0))
|
||||
|
||||
|
||||
self.patient_info = {
|
||||
"name": f"{row.get('FirstName', '')} {row.get('LastName', '')}",
|
||||
"first_name": row.get("FirstName", ""),
|
||||
@@ -75,9 +105,11 @@ class ContextGenerator:
|
||||
"""Calculate spirometry-related metrics"""
|
||||
metrics = {}
|
||||
for param in ["FVC", "FEV1", "FEV1/FVC%"]:
|
||||
row = self.spirometry_df.loc[self.spirometry_df["Parameters"].str.strip() == param]
|
||||
row = self.spirometry_df.loc[
|
||||
self.spirometry_df["Parameters"].str.strip() == param
|
||||
]
|
||||
if not row.empty:
|
||||
param_key = param.lower().replace('/', '_').replace('%', '_pct')
|
||||
param_key = param.lower().replace("/", "_").replace("%", "_pct")
|
||||
metrics[f"{param_key}_best"] = row["Best"].values[0]
|
||||
metrics[f"{param_key}_pred"] = row["%Pred."].values[0]
|
||||
return metrics
|
||||
@@ -87,21 +119,21 @@ class ContextGenerator:
|
||||
metrics = {}
|
||||
metrics["vo2_max"] = self.pnoe_df["VO2(ml/min)_smoothed"].max()
|
||||
metrics["vo2_max_per_kg"] = metrics["vo2_max"] / self.patient_info["weight"]
|
||||
|
||||
|
||||
peak_vt_idx = self.pnoe_df["VT(l)_smoothed"].idxmax()
|
||||
peak_vt_row = self.pnoe_df.loc[peak_vt_idx]
|
||||
metrics["peak_vt"] = peak_vt_row["VT(l)_smoothed"]
|
||||
metrics["peak_vt_hr"] = peak_vt_row["HR(bpm)_smoothed"]
|
||||
|
||||
|
||||
fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax()
|
||||
fat_max_row = self.pnoe_df.loc[fat_max_idx]
|
||||
metrics["fat_max_value"] = fat_max_row["FAT_smoothed"]
|
||||
metrics["fat_max_hr"] = fat_max_row["HR(bpm)_smoothed"]
|
||||
|
||||
|
||||
vt1, vt2 = self._detect_thresholds()
|
||||
metrics["vt1"] = vt1
|
||||
metrics["vt2"] = vt2
|
||||
|
||||
|
||||
zones = self._calculate_hr_zones(vt1, vt2, fat_max_row)
|
||||
metrics.update(zones)
|
||||
return metrics
|
||||
@@ -110,25 +142,35 @@ class ContextGenerator:
|
||||
"""Detect VT1 and VT2 thresholds"""
|
||||
condition = self.pnoe_df["CHO_smoothed"] > self.pnoe_df["FAT_smoothed"]
|
||||
crossover_indices = condition[condition].index
|
||||
|
||||
|
||||
vt1 = None
|
||||
if len(crossover_indices) > 0:
|
||||
vt1_idx = crossover_indices[0]
|
||||
vt1_row = self.pnoe_df.loc[vt1_idx]
|
||||
vt1 = {"HeartRate": vt1_row["HR(bpm)_smoothed"], "Speed": vt1_row["Speed"], "Time": vt1_row["T(sec)"]}
|
||||
|
||||
vt1 = {
|
||||
"HeartRate": vt1_row["HR(bpm)_smoothed"],
|
||||
"Speed": vt1_row["Speed"],
|
||||
"Time": vt1_row["T(sec)"],
|
||||
}
|
||||
|
||||
ve_slope = self.pnoe_df["VE(l/min)_smoothed"].diff()
|
||||
second_derivative = ve_slope.diff()
|
||||
vt2_idx = second_derivative.idxmax()
|
||||
|
||||
|
||||
vt2 = None
|
||||
if pd.notna(vt2_idx):
|
||||
vt2_row = self.pnoe_df.loc[vt2_idx]
|
||||
vt2 = {"HeartRate": vt2_row["HR(bpm)_smoothed"], "Speed": vt2_row["Speed"], "Time": vt2_row["T(sec)"]}
|
||||
|
||||
vt2 = {
|
||||
"HeartRate": vt2_row["HR(bpm)_smoothed"],
|
||||
"Speed": vt2_row["Speed"],
|
||||
"Time": vt2_row["T(sec)"],
|
||||
}
|
||||
|
||||
return vt1, vt2
|
||||
|
||||
def _calculate_hr_zones(self, vt1: Optional[Dict], vt2: Optional[Dict], fat_max_row: pd.Series) -> Dict:
|
||||
def _calculate_hr_zones(
|
||||
self, vt1: Optional[Dict], vt2: Optional[Dict], fat_max_row: pd.Series
|
||||
) -> Dict:
|
||||
"""Calculate heart rate zones based on thresholds"""
|
||||
zones = {}
|
||||
if vt1 and vt2:
|
||||
@@ -137,7 +179,7 @@ class ContextGenerator:
|
||||
zone_3_start = vt1["HeartRate"]
|
||||
zone_4_start = vt2["HeartRate"] - 10
|
||||
zone_5_start = vt2["HeartRate"] + 10
|
||||
|
||||
|
||||
zones["zone1_bpm"] = f"{int(zone_1_start)}-{int(zone_2_start)}bpm"
|
||||
zones["zone2_bpm"] = f"{int(zone_2_start)}-{int(vt1['HeartRate'])}bpm"
|
||||
zones["zone3_bpm"] = f"{int(zone_3_start)}-{int(zone_4_start)}bpm"
|
||||
@@ -152,29 +194,87 @@ class ContextGenerator:
|
||||
zones["zone5_bpm"] = f"{int(max_hr * 0.95)}+bpm"
|
||||
return zones
|
||||
|
||||
def generate_all_contexts(self, patient_name: str, graphs: Dict[str, str]) -> List[Dict]:
|
||||
def generate_all_contexts(
|
||||
self, patient_name: str, graphs: Dict[str, str]
|
||||
) -> List[Dict]:
|
||||
"""Main method to generate all page contexts"""
|
||||
self.extract_patient_info(patient_name)
|
||||
spirometry_metrics = self.calculate_spirometry_metrics()
|
||||
pnoe_metrics = self.calculate_pnoe_metrics()
|
||||
|
||||
|
||||
contexts = []
|
||||
contexts.append({"name": self.patient_info["name"], "surname": self.patient_info["last_name"], "date": datetime.now().strftime("%B %d, %Y")})
|
||||
contexts.append({"patient_name": self.patient_info["name"], "test_date": datetime.now().strftime("%B %d, %Y")})
|
||||
|
||||
contexts.append(
|
||||
{
|
||||
"name": self.patient_info["name"],
|
||||
"surname": self.patient_info["last_name"],
|
||||
"date": datetime.now().strftime("%B %d, %Y"),
|
||||
}
|
||||
)
|
||||
contexts.append(
|
||||
{
|
||||
"patient_name": self.patient_info["name"],
|
||||
"test_date": datetime.now().strftime("%B %d, %Y"),
|
||||
}
|
||||
)
|
||||
|
||||
for i in range(4):
|
||||
contexts.append({"patient_name": self.patient_info["name"], "page_number": i + 3})
|
||||
|
||||
contexts.append(
|
||||
{"patient_name": self.patient_info["name"], "page_number": i + 3}
|
||||
)
|
||||
|
||||
fev1_percentage = 0
|
||||
if spirometry_metrics.get("fvc_best"):
|
||||
fev1_percentage = (pnoe_metrics["peak_vt"] / spirometry_metrics["fvc_best"]) * 100
|
||||
|
||||
contexts.append({"peak_vt": f"{pnoe_metrics['peak_vt']:.2f}", "peak_vt_bpm": f"{int(pnoe_metrics['peak_vt_hr'])}", "fev1_percentage": f"{fev1_percentage:.1f}", "lung_analysis_chart": graphs.get("spirometry_chart", ""), "respiratory_analysis_chart": graphs.get("respiratory", "")})
|
||||
contexts.append({"vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}", "age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}", "zone1_bpm": pnoe_metrics.get("zone1_bpm", ""), "zone2_bpm": pnoe_metrics.get("zone2_bpm", ""), "zone3_bpm": pnoe_metrics.get("zone3_bpm", ""), "zone4_bpm": pnoe_metrics.get("zone4_bpm", ""), "zone5_bpm": pnoe_metrics.get("zone5_bpm", ""), "vo2_pulse_chart": graphs.get("vo2_pulse", "")})
|
||||
contexts.append({"fat_max_value": f"{pnoe_metrics['fat_max_value']:.2f}", "fat_max_hr": f"{int(pnoe_metrics['fat_max_hr'])}", "fuel_utilization_chart": graphs.get("fuel_utilization", ""), "fat_metabolism_chart": graphs.get("fat_metabolism", "")})
|
||||
contexts.append({"fat_percentage": f"{self.patient_info['fat_percentage']:.1f}", "fat_mass_lbs": f"{self.patient_info['fat_mass_lbs']:.1f}", "lean_mass_lbs": f"{self.patient_info['lean_mass_lbs']:.1f}", "body_composition_chart": graphs.get("body_composition", ""), "body_fat_percent_chart": graphs.get("body_fat_percent", "")})
|
||||
|
||||
fev1_percentage = (
|
||||
pnoe_metrics["peak_vt"] / spirometry_metrics["fvc_best"]
|
||||
) * 100
|
||||
|
||||
contexts.append(
|
||||
{
|
||||
"peak_vt": f"{pnoe_metrics['peak_vt']:.2f}",
|
||||
"peak_vt_bpm": f"{int(pnoe_metrics['peak_vt_hr'])}",
|
||||
"fev1_percentage": f"{fev1_percentage:.1f}",
|
||||
"lung_analysis_chart": graphs.get("spirometry_chart", ""),
|
||||
"respiratory_analysis_chart": graphs.get("respiratory", ""),
|
||||
}
|
||||
)
|
||||
contexts.append(
|
||||
{
|
||||
"vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}",
|
||||
"age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}",
|
||||
"zone1_bpm": pnoe_metrics.get("zone1_bpm", ""),
|
||||
"zone2_bpm": pnoe_metrics.get("zone2_bpm", ""),
|
||||
"zone3_bpm": pnoe_metrics.get("zone3_bpm", ""),
|
||||
"zone4_bpm": pnoe_metrics.get("zone4_bpm", ""),
|
||||
"zone5_bpm": pnoe_metrics.get("zone5_bpm", ""),
|
||||
"vo2_pulse_chart": graphs.get("vo2_pulse", ""),
|
||||
}
|
||||
)
|
||||
contexts.append(
|
||||
{
|
||||
"fat_max_value": f"{pnoe_metrics['fat_max_value']:.2f}",
|
||||
"fat_max_hr": f"{int(pnoe_metrics['fat_max_hr'])}",
|
||||
"fuel_utilization_chart": graphs.get("fuel_utilization", ""),
|
||||
"fat_metabolism_chart": graphs.get("fat_metabolism", ""),
|
||||
}
|
||||
)
|
||||
contexts.append(
|
||||
{
|
||||
"fat_percentage": f"{self.patient_info['fat_percentage']:.1f}",
|
||||
"fat_mass_lbs": f"{self.patient_info['fat_mass_lbs']:.1f}",
|
||||
"lean_mass_lbs": f"{self.patient_info['lean_mass_lbs']:.1f}",
|
||||
"body_composition_chart": graphs.get("body_composition", ""),
|
||||
"body_fat_percent_chart": graphs.get("body_fat_percent", ""),
|
||||
}
|
||||
)
|
||||
|
||||
for i in range(9):
|
||||
contexts.append({"patient_name": self.patient_info["name"], "page_number": i + 11, "vo2_breath_chart": graphs.get("vo2_breath", ""), "recovery_chart": graphs.get("recovery", "")})
|
||||
|
||||
contexts.append(
|
||||
{
|
||||
"patient_name": self.patient_info["name"],
|
||||
"page_number": i + 11,
|
||||
"vo2_breath_chart": graphs.get("vo2_breath", ""),
|
||||
"recovery_chart": graphs.get("recovery", ""),
|
||||
}
|
||||
)
|
||||
|
||||
return contexts
|
||||
|
||||
@@ -8,6 +8,9 @@ Based on the analysis notebooks in services_dfdf/.
|
||||
import base64
|
||||
from pathlib import Path
|
||||
|
||||
import matplotlib
|
||||
|
||||
matplotlib.use("Agg") # Use non-interactive backend
|
||||
import matplotlib.pyplot as plt
|
||||
import matplotlib.transforms as mtransforms
|
||||
import numpy as np
|
||||
|
||||
@@ -10,11 +10,10 @@ from typing import Any, Dict, List
|
||||
|
||||
import pandas as pd
|
||||
from jinja2 import Environment, FileSystemLoader
|
||||
from playwright.sync_api import sync_playwright
|
||||
|
||||
from app.services.context_generator import ContextGenerator
|
||||
from app.services.graph_generator import GraphGenerator
|
||||
from app.services.spirometry_table_extractor import extract_spirometry_table_from_pdf
|
||||
from playwright.async_api import async_playwright
|
||||
from services.context_generator import ContextGenerator
|
||||
from services.graph_generator import GraphGenerator
|
||||
from services.spirometry_table_extractor import extract_spirometry_table_from_pdf
|
||||
|
||||
|
||||
class ReportGeneratorService:
|
||||
@@ -61,7 +60,13 @@ class ReportGeneratorService:
|
||||
"""
|
||||
# Load data
|
||||
df = pd.read_csv(pnoe_csv_path, delimiter=";")
|
||||
df = df.apply(pd.to_numeric, errors="ignore")
|
||||
|
||||
# Convert numeric columns (updated approach)
|
||||
for col in df.columns:
|
||||
try:
|
||||
df[col] = pd.to_numeric(df[col])
|
||||
except (ValueError, TypeError):
|
||||
pass # Keep as-is if not numeric
|
||||
|
||||
# Calculate derived columns
|
||||
df["VO2 Pulse"] = df["VO2(ml/min)"] / df["HR(bpm)"]
|
||||
@@ -260,7 +265,7 @@ class ReportGeneratorService:
|
||||
|
||||
return html_doc
|
||||
|
||||
def html_to_pdf(self, html_content: str, pdf_path: str) -> None:
|
||||
async def html_to_pdf(self, html_content: str, pdf_path: str) -> None:
|
||||
"""
|
||||
Convert HTML content to PDF file.
|
||||
|
||||
@@ -268,14 +273,14 @@ class ReportGeneratorService:
|
||||
html_content: HTML content as string
|
||||
pdf_path: Path where PDF should be saved
|
||||
"""
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch()
|
||||
page = browser.new_page()
|
||||
page.set_content(html_content)
|
||||
page.pdf(path=pdf_path, format="A4", print_background=True)
|
||||
browser.close()
|
||||
async with async_playwright() as p:
|
||||
browser = await p.chromium.launch()
|
||||
page = await browser.new_page()
|
||||
await page.set_content(html_content)
|
||||
await page.pdf(path=pdf_path, format="A4", print_background=True)
|
||||
await browser.close()
|
||||
|
||||
def generate_report(
|
||||
async def generate_report(
|
||||
self,
|
||||
spirometry_pdf_path: str,
|
||||
pnoe_csv_path: str,
|
||||
@@ -304,19 +309,18 @@ class ReportGeneratorService:
|
||||
Dictionary containing report path, graphs generated, and analysis data
|
||||
"""
|
||||
# Step 1: Extract spirometry table from PDF
|
||||
spirometry_csv_path = self.data_dir / "extracted_spirometry_table.csv"
|
||||
extract_spirometry_table_from_pdf(spirometry_pdf_path)
|
||||
|
||||
# The extraction saves to current directory, move it to data_dir
|
||||
import shutil
|
||||
|
||||
if Path("extracted_spirometry_table.csv").exists():
|
||||
shutil.move("extracted_spirometry_table.csv", spirometry_csv_path)
|
||||
print("Step 1: Extracting spirometry data from PDF...")
|
||||
spirometry_csv_path = extract_spirometry_table_from_pdf(
|
||||
spirometry_pdf_path, output_dir=str(self.data_dir)
|
||||
)
|
||||
print(f"Spirometry data saved to: {spirometry_csv_path}")
|
||||
|
||||
# Step 2: Process Pnoe data
|
||||
print("Step 2: Processing Pnoe data...")
|
||||
df = self.process_pnoe_data(pnoe_csv_path)
|
||||
|
||||
# Step 3: Generate all graphs
|
||||
print("Step 3: Generating graphs...")
|
||||
graphs_generated = self.generate_graphs(df)
|
||||
|
||||
# Create graph dictionary with base64 encoded images
|
||||
@@ -365,13 +369,20 @@ class ReportGeneratorService:
|
||||
graphs_dict["body_fat_percent"] = body_fat_b64
|
||||
|
||||
# Generate spirometry chart
|
||||
spirometry_df = pd.read_csv(spirometry_csv_path)
|
||||
spirometry_chart_b64 = self.graph_generator.generate_spirometry_chart(
|
||||
spirometry_df, save_as_base64=True
|
||||
)
|
||||
graphs_dict["spirometry_chart"] = spirometry_chart_b64
|
||||
print("Step 4: Generating spirometry chart...")
|
||||
try:
|
||||
spirometry_df = pd.read_csv(spirometry_csv_path)
|
||||
print(f"Spirometry data loaded: {len(spirometry_df)} rows")
|
||||
spirometry_chart_b64 = self.graph_generator.generate_spirometry_chart(
|
||||
spirometry_df, save_as_base64=True
|
||||
)
|
||||
graphs_dict["spirometry_chart"] = spirometry_chart_b64
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not generate spirometry chart: {e}")
|
||||
graphs_dict["spirometry_chart"] = ""
|
||||
|
||||
# Step 4: Generate context for all pages
|
||||
# Step 5: Generate context for all pages
|
||||
print("Step 5: Generating page contexts...")
|
||||
self.context_generator.load_data(
|
||||
pnoe_csv_path, str(spirometry_csv_path), seca_excel_path
|
||||
)
|
||||
@@ -395,7 +406,8 @@ class ReportGeneratorService:
|
||||
)
|
||||
|
||||
report_path = self.reports_dir / output_filename
|
||||
self.html_to_pdf(html_content, str(report_path))
|
||||
print(f"Generating PDF report at {report_path}")
|
||||
await self.html_to_pdf(html_content, str(report_path))
|
||||
|
||||
return {
|
||||
"report_path": str(report_path),
|
||||
|
||||
@@ -13,7 +13,21 @@ def encode_pdf_to_base64(pdf_path):
|
||||
return base64.b64encode(pdf_file.read()).decode("utf-8")
|
||||
|
||||
|
||||
def extract_spirometry_table_from_pdf(pdf_path):
|
||||
def extract_spirometry_table_from_pdf(pdf_path, output_dir="data"):
|
||||
"""
|
||||
Extract spirometry table from PDF using AI and save as clean CSV.
|
||||
|
||||
Args:
|
||||
pdf_path: Path to the spirometry PDF file
|
||||
output_dir: Directory to save the extracted CSV
|
||||
|
||||
Returns:
|
||||
Path to the saved CSV file
|
||||
"""
|
||||
import csv
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
url = "https://openrouter.ai/api/v1/chat/completions"
|
||||
headers = {
|
||||
"Authorization": f"Bearer {API_KEY_REF}",
|
||||
@@ -30,10 +44,17 @@ def extract_spirometry_table_from_pdf(pdf_path):
|
||||
"content": [
|
||||
{
|
||||
"type": "text",
|
||||
"text": "Please extract the Spirometry table from the pdf and return the values in csv format, "
|
||||
"note that it is the unit of parameter that is beside it and it should not be a column. "
|
||||
"The '-' Should be treated as empty values."
|
||||
"do not add 'csv' at the start or end of the response",
|
||||
"text": "Please extract the Spirometry table from the pdf and return ONLY the values in CSV format. "
|
||||
"The CSV should have these columns: Parameters,Pre,Best,LLN,Pred.,%Pred.,ZScore\n"
|
||||
"Rules:\n"
|
||||
"1. Include ONLY the data rows (FVC, FEV1, FEV1/FVC%, etc.)\n"
|
||||
"2. Do NOT include units in the data (units are part of parameter name)\n"
|
||||
"3. Use empty string for missing values (not '-' or 'N/A')\n"
|
||||
"4. Do NOT add 'csv' markers or code blocks\n"
|
||||
"5. First line should be the header\n"
|
||||
"Example format:\n"
|
||||
"Parameters,Pre,Best,LLN,Pred.,%Pred.,ZScore\n"
|
||||
"FVC,4.50,4.75,3.20,4.80,99,-0.10",
|
||||
},
|
||||
{
|
||||
"type": "file",
|
||||
@@ -54,11 +75,65 @@ def extract_spirometry_table_from_pdf(pdf_path):
|
||||
if "choices" in response_data and len(response_data["choices"]) > 0:
|
||||
content = response_data["choices"][0]["message"]["content"]
|
||||
|
||||
# Save to a CSV file
|
||||
output_file = "extracted_spirometry_table.csv"
|
||||
with open(output_file, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
# Clean the content - remove markdown code blocks if present
|
||||
content = re.sub(r"```csv\n?", "", content)
|
||||
content = re.sub(r"```\n?", "", content)
|
||||
content = content.strip()
|
||||
|
||||
return f"Extracted table saved to {output_file}"
|
||||
# Parse and validate CSV
|
||||
lines = content.split("\n")
|
||||
if not lines:
|
||||
raise ValueError("No data extracted from PDF")
|
||||
|
||||
# Ensure output directory exists
|
||||
output_path = Path(output_dir)
|
||||
output_path.mkdir(exist_ok=True)
|
||||
output_file = output_path / "extracted_spirometry_table.csv"
|
||||
|
||||
# Write cleaned CSV with proper formatting
|
||||
with open(output_file, "w", encoding="utf-8", newline="") as f:
|
||||
# Parse the first line as header
|
||||
header_line = lines[0].strip()
|
||||
if "," in header_line:
|
||||
header = [col.strip() for col in header_line.split(",")]
|
||||
else:
|
||||
# Default header if not provided
|
||||
header = [
|
||||
"Parameters",
|
||||
"Pre",
|
||||
"Best",
|
||||
"LLN",
|
||||
"Pred.",
|
||||
"%Pred.",
|
||||
"ZScore",
|
||||
]
|
||||
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(header)
|
||||
|
||||
# Process data rows
|
||||
for line in lines[1:]:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
# Split by comma and clean each field
|
||||
fields = [field.strip() for field in line.split(",")]
|
||||
|
||||
# Ensure we have the right number of fields
|
||||
if len(fields) < len(header):
|
||||
# Pad with empty strings
|
||||
fields.extend([""] * (len(header) - len(fields)))
|
||||
elif len(fields) > len(header):
|
||||
# Take only the first N fields
|
||||
fields = fields[: len(header)]
|
||||
|
||||
# Replace '-' or 'N/A' with empty string
|
||||
fields = ["" if f in ["-", "N/A", "n/a", "NA"] else f for f in fields]
|
||||
|
||||
writer.writerow(fields)
|
||||
|
||||
return str(output_file)
|
||||
else:
|
||||
return "No content found in response"
|
||||
error_msg = response_data.get("error", {}).get("message", "Unknown error")
|
||||
raise Exception(f"No content found in response: {error_msg}")
|
||||
|
||||
Reference in New Issue
Block a user