2025-09-29 11:45:09 +01:00
|
|
|
import base64
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
|
|
|
|
|
|
import matplotlib.pyplot as plt
|
2025-09-29 09:54:05 +01:00
|
|
|
import pandas as pd
|
|
|
|
|
|
2025-09-29 11:17:32 +01:00
|
|
|
|
2025-09-29 11:45:09 +01:00
|
|
|
class ReportGenerator:
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.pnoe_df = None
|
|
|
|
|
self.patient_df = None
|
|
|
|
|
self.spirometry_df = None
|
|
|
|
|
self.seca_df = None
|
|
|
|
|
self.patient_info = {}
|
|
|
|
|
self.charts_dir = Path("graphs")
|
|
|
|
|
self.charts_dir.mkdir(exist_ok=True)
|
|
|
|
|
|
|
|
|
|
def load_data(
|
|
|
|
|
self,
|
|
|
|
|
pnoe_path: str,
|
|
|
|
|
patient_path: str,
|
|
|
|
|
spirometry_path: str,
|
|
|
|
|
seca_path: str = None,
|
|
|
|
|
):
|
|
|
|
|
"""Load all required datasets"""
|
|
|
|
|
self.pnoe_df = pd.read_csv(pnoe_path, delimiter=";")
|
|
|
|
|
self.patient_df = pd.read_csv(patient_path)
|
|
|
|
|
self.spirometry_df = pd.read_csv(spirometry_path)
|
|
|
|
|
if seca_path:
|
|
|
|
|
self.seca_df = pd.read_excel(seca_path)
|
|
|
|
|
|
|
|
|
|
# Apply preprocessing
|
|
|
|
|
self._preprocess_data()
|
|
|
|
|
|
|
|
|
|
def _preprocess_data(self):
|
|
|
|
|
"""Apply preprocessing steps from your notebook"""
|
|
|
|
|
# Convert to numeric
|
|
|
|
|
self.pnoe_df = self.pnoe_df.apply(pd.to_numeric, errors="ignore")
|
|
|
|
|
|
|
|
|
|
# Calculate derived columns
|
|
|
|
|
self.pnoe_df["VO2 Pulse"] = (
|
|
|
|
|
self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["HR(bpm)"]
|
|
|
|
|
)
|
|
|
|
|
self.pnoe_df["VO2 Breath"] = (
|
|
|
|
|
self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["BF(bpm)"]
|
|
|
|
|
)
|
|
|
|
|
self.pnoe_df["CHO"] = (
|
|
|
|
|
self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["CARBS(%)"] / 100
|
|
|
|
|
)
|
|
|
|
|
self.pnoe_df["FAT"] = (
|
|
|
|
|
self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["FAT(%)"] / 100
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Apply smoothing
|
|
|
|
|
window_size = 10
|
|
|
|
|
columns_to_smooth = [
|
|
|
|
|
"VO2(ml/min)",
|
|
|
|
|
"VCO2(ml/min)",
|
|
|
|
|
"HR(bpm)",
|
|
|
|
|
"VT(l)",
|
|
|
|
|
"BF(bpm)",
|
|
|
|
|
"VE(l/min)",
|
|
|
|
|
"VO2 Pulse",
|
|
|
|
|
"VO2 Breath",
|
|
|
|
|
"CHO",
|
|
|
|
|
"FAT",
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
for col in columns_to_smooth:
|
|
|
|
|
if col in self.pnoe_df.columns:
|
|
|
|
|
self.pnoe_df[f"{col}_smoothed"] = (
|
|
|
|
|
self.pnoe_df[col].rolling(window=window_size, min_periods=1).mean()
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def extract_patient_info(self, last_name: str) -> Dict:
|
|
|
|
|
"""Extract patient information from datasets"""
|
|
|
|
|
if self.seca_df is not None:
|
|
|
|
|
patient_data = self.seca_df[
|
|
|
|
|
self.seca_df["LastName"].str.contains(last_name, case=False, na=False)
|
|
|
|
|
]
|
|
|
|
|
if not patient_data.empty:
|
|
|
|
|
row = patient_data.iloc[0]
|
|
|
|
|
self.patient_info = {
|
|
|
|
|
"name": f"{row.get('FirstName', '')} {last_name}",
|
|
|
|
|
"age": int(row.get("Age", 0)),
|
|
|
|
|
"height": f"{row.get('Height', '')}",
|
|
|
|
|
"weight": float(row.get("Weight", 0)),
|
|
|
|
|
"gender": row.get("Gender", "").lower(),
|
|
|
|
|
"fat_percentage": float(row.get("Adult_FMP", 0)),
|
|
|
|
|
}
|
|
|
|
|
return self.patient_info
|
|
|
|
|
|
|
|
|
|
def calculate_spirometry_metrics(self) -> Dict:
|
|
|
|
|
"""Calculate spirometry-related metrics"""
|
|
|
|
|
metrics = {}
|
|
|
|
|
|
|
|
|
|
# Extract key spirometry values
|
|
|
|
|
for param in ["FVC", "FEV1", "FEV1/FVC%"]:
|
|
|
|
|
row = self.spirometry_df.loc[self.spirometry_df["Parameters"] == param]
|
|
|
|
|
if not row.empty:
|
|
|
|
|
metrics[
|
|
|
|
|
f"{param.lower().replace('/', '_').replace('%', '_pct')}_best"
|
|
|
|
|
] = row["Best"].values[0]
|
|
|
|
|
metrics[
|
|
|
|
|
f"{param.lower().replace('/', '_').replace('%', '_pct')}_pred"
|
|
|
|
|
] = row["%Pred."].values[0]
|
|
|
|
|
|
|
|
|
|
return metrics
|
|
|
|
|
|
|
|
|
|
def calculate_pnoe_metrics(self) -> Dict:
|
|
|
|
|
"""Calculate all Pnoe-derived metrics"""
|
|
|
|
|
metrics = {}
|
|
|
|
|
|
|
|
|
|
# Basic metrics
|
|
|
|
|
metrics["vo2_max"] = self.pnoe_df["VO2(ml/min)_smoothed"].max()
|
|
|
|
|
metrics["vo2_max_per_kg"] = metrics["vo2_max"] / self.patient_info["weight"]
|
|
|
|
|
|
|
|
|
|
# Peak VT
|
|
|
|
|
peak_vt_idx = self.pnoe_df["VT(l)_smoothed"].idxmax()
|
|
|
|
|
peak_vt_row = self.pnoe_df.loc[peak_vt_idx]
|
|
|
|
|
metrics["peak_vt"] = peak_vt_row["VT(l)_smoothed"]
|
|
|
|
|
metrics["peak_vt_hr"] = peak_vt_row["HR(bpm)_smoothed"]
|
|
|
|
|
|
|
|
|
|
# Fat burning metrics
|
|
|
|
|
fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax()
|
|
|
|
|
fat_max_row = self.pnoe_df.loc[fat_max_idx]
|
|
|
|
|
metrics["fat_max_value"] = fat_max_row["FAT_smoothed"]
|
|
|
|
|
metrics["fat_max_hr"] = fat_max_row["HR(bpm)_smoothed"]
|
|
|
|
|
|
|
|
|
|
# Calculate zones (simplified from your logic)
|
|
|
|
|
metrics.update(self._calculate_hr_zones())
|
|
|
|
|
|
|
|
|
|
# VT1/VT2 detection
|
|
|
|
|
vt1, vt2 = self._detect_thresholds()
|
|
|
|
|
metrics["vt1"] = vt1
|
|
|
|
|
metrics["vt2"] = vt2
|
|
|
|
|
|
|
|
|
|
return metrics
|
|
|
|
|
|
|
|
|
|
def _detect_thresholds(self) -> Tuple[Optional[Dict], Optional[Dict]]:
|
|
|
|
|
"""Detect VT1 and VT2 thresholds"""
|
|
|
|
|
# VT1: First crossover where carbs > fat
|
|
|
|
|
condition = self.pnoe_df["CHO_smoothed"] > self.pnoe_df["FAT_smoothed"]
|
|
|
|
|
crossover_indices = condition[condition].index
|
|
|
|
|
|
|
|
|
|
vt1 = None
|
|
|
|
|
if len(crossover_indices) > 0:
|
|
|
|
|
vt1_idx = crossover_indices[0]
|
|
|
|
|
vt1_row = self.pnoe_df.loc[vt1_idx]
|
|
|
|
|
vt1 = {
|
|
|
|
|
"HeartRate": vt1_row["HR(bpm)_smoothed"],
|
|
|
|
|
"Speed": vt1_row["Speed"],
|
|
|
|
|
"Time": vt1_row["T(sec)"],
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# VT2: Ventilation inflection (simplified)
|
|
|
|
|
ve_slope = self.pnoe_df["VE(l/min)_smoothed"].diff()
|
|
|
|
|
second_derivative = ve_slope.diff()
|
|
|
|
|
vt2_idx = second_derivative.idxmax()
|
|
|
|
|
|
|
|
|
|
vt2 = None
|
|
|
|
|
if pd.notna(vt2_idx):
|
|
|
|
|
vt2_row = self.pnoe_df.loc[vt2_idx]
|
|
|
|
|
vt2 = {
|
|
|
|
|
"HeartRate": vt2_row["HR(bpm)_smoothed"],
|
|
|
|
|
"Speed": vt2_row["Speed"],
|
|
|
|
|
"Time": vt2_row["T(sec)"],
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return vt1, vt2
|
|
|
|
|
|
|
|
|
|
def _calculate_hr_zones(self) -> Dict:
|
|
|
|
|
"""Calculate heart rate zones"""
|
|
|
|
|
max_hr = 220 - self.patient_info["age"]
|
|
|
|
|
|
|
|
|
|
# Simplified zone calculation - you can make this more sophisticated
|
|
|
|
|
zones = {
|
|
|
|
|
"zone1_bpm": f"{int(max_hr * 0.55)}-{int(max_hr * 0.65)}bpm",
|
|
|
|
|
"zone2_bpm": f"{int(max_hr * 0.65)}-{int(max_hr * 0.75)}bpm",
|
|
|
|
|
"zone3_bpm": f"{int(max_hr * 0.75)}-{int(max_hr * 0.85)}bpm",
|
|
|
|
|
"zone4_bpm": f"{int(max_hr * 0.85)}-{int(max_hr * 0.95)}bpm",
|
|
|
|
|
"zone5_bpm": f"{int(max_hr * 0.95)}+bpm",
|
|
|
|
|
}
|
|
|
|
|
return zones
|
|
|
|
|
|
|
|
|
|
def generate_charts(self) -> Dict[str, str]:
|
|
|
|
|
"""Generate all charts and return base64 encoded versions"""
|
|
|
|
|
charts = {}
|
|
|
|
|
|
|
|
|
|
# Generate fuel utilization chart
|
|
|
|
|
charts["fuel_utilization_chart"] = self._create_fuel_chart()
|
|
|
|
|
|
|
|
|
|
# Generate VO2 pulse chart
|
|
|
|
|
charts["vo2_pulse_chart"] = self._create_vo2_pulse_chart()
|
|
|
|
|
|
|
|
|
|
# Generate body composition chart
|
|
|
|
|
charts["body_composition_chart"] = self._create_body_comp_chart()
|
|
|
|
|
|
|
|
|
|
# Add more chart generation methods...
|
|
|
|
|
|
|
|
|
|
return charts
|
|
|
|
|
|
|
|
|
|
def _create_fuel_chart(self) -> str:
|
|
|
|
|
"""Create and save fuel utilization chart"""
|
|
|
|
|
# Use your existing chart code but make it dynamic
|
|
|
|
|
speed_groups = self.pnoe_df.groupby("Speed").mean(numeric_only=True).round(1)
|
|
|
|
|
speed_groups = speed_groups.iloc[1:-1]
|
|
|
|
|
filtered_data = speed_groups[
|
|
|
|
|
(speed_groups.index >= 3.5) & (speed_groups.index <= 7.5)
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
plt.figure(figsize=(15, 8))
|
|
|
|
|
# ... your chart code here ...
|
|
|
|
|
|
|
|
|
|
chart_path = self.charts_dir / "fuel_utilization_chart.png"
|
|
|
|
|
plt.savefig(chart_path, dpi=300)
|
|
|
|
|
plt.close()
|
|
|
|
|
|
|
|
|
|
return self._image_to_base64(chart_path)
|
|
|
|
|
|
|
|
|
|
def _create_vo2_pulse_chart(self) -> str:
|
|
|
|
|
"""Create VO2 pulse chart"""
|
|
|
|
|
# Your VO2 pulse chart code here
|
|
|
|
|
chart_path = self.charts_dir / "vo2_pulse_chart.png"
|
|
|
|
|
# ... chart generation code ...
|
|
|
|
|
return self._image_to_base64(chart_path)
|
|
|
|
|
|
|
|
|
|
def _create_body_comp_chart(self) -> str:
|
|
|
|
|
"""Create body composition chart"""
|
|
|
|
|
# Your body composition chart code here
|
|
|
|
|
chart_path = self.charts_dir / "body_composition_chart.png"
|
|
|
|
|
# ... chart generation code ...
|
|
|
|
|
return self._image_to_base64(chart_path)
|
|
|
|
|
|
|
|
|
|
def _image_to_base64(self, image_path: Path) -> str:
|
|
|
|
|
"""Convert image to base64"""
|
|
|
|
|
try:
|
|
|
|
|
with open(image_path, "rb") as image_file:
|
|
|
|
|
return base64.b64encode(image_file.read()).decode("utf-8")
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
def generate_all_contexts(self, last_name: str = "Moran") -> List[Dict]:
|
|
|
|
|
"""Main method to generate all page contexts"""
|
|
|
|
|
# Extract patient info
|
|
|
|
|
self.extract_patient_info(last_name)
|
|
|
|
|
|
|
|
|
|
# Calculate metrics
|
|
|
|
|
spirometry_metrics = self.calculate_spirometry_metrics()
|
|
|
|
|
pnoe_metrics = self.calculate_pnoe_metrics()
|
|
|
|
|
|
|
|
|
|
# Generate charts
|
|
|
|
|
charts = self.generate_charts()
|
|
|
|
|
|
|
|
|
|
# Build contexts for each page
|
|
|
|
|
contexts = []
|
|
|
|
|
|
|
|
|
|
# Page 1
|
|
|
|
|
contexts.append(
|
|
|
|
|
{
|
|
|
|
|
"name": self.patient_info["name"],
|
|
|
|
|
"surname": last_name,
|
|
|
|
|
"date": "July 29, 2025",
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Page 2-6 (add as needed)
|
|
|
|
|
for i in range(5):
|
|
|
|
|
contexts.append({})
|
|
|
|
|
|
|
|
|
|
# Page 7 - Spirometry
|
|
|
|
|
contexts.append(
|
|
|
|
|
{
|
|
|
|
|
"peak_vt": pnoe_metrics["peak_vt"],
|
|
|
|
|
"peak_vt_bpm": pnoe_metrics["peak_vt_hr"],
|
|
|
|
|
"fev1_percentage": (
|
|
|
|
|
pnoe_metrics["peak_vt"] / spirometry_metrics["fvc_best"]
|
|
|
|
|
)
|
|
|
|
|
* 100,
|
|
|
|
|
"lung_analysis_chart": charts.get("spirometry_chart", ""),
|
|
|
|
|
"respiratory_analysis_chart": charts.get("respiratory_chart", ""),
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Page 8 - VO2 Max and Zones
|
|
|
|
|
contexts.append(
|
|
|
|
|
{
|
|
|
|
|
"vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}",
|
|
|
|
|
"age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}",
|
|
|
|
|
**pnoe_metrics, # Include all zone calculations
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Continue for all pages...
|
|
|
|
|
# Add remaining pages as needed
|
|
|
|
|
|
|
|
|
|
return contexts
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Usage for backend service
|
|
|
|
|
def generate_report(
|
|
|
|
|
pnoe_file, patient_file, spirometry_file, seca_file=None, patient_name="Moran"
|
|
|
|
|
):
|
|
|
|
|
"""Main function for backend service"""
|
|
|
|
|
generator = ReportGenerator()
|
|
|
|
|
generator.load_data(pnoe_file, patient_file, spirometry_file, seca_file)
|
|
|
|
|
return generator.generate_all_contexts(patient_name)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Example usage
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
contexts = generate_report(
|
|
|
|
|
"data/Pnoe_20250729_1550-Moran_Keirstyn.csv",
|
|
|
|
|
"data/patient_data.csv",
|
|
|
|
|
"data/spirometry_data.csv",
|
|
|
|
|
"data/SECA body comp for all patients.xlsx",
|
|
|
|
|
)
|
|
|
|
|
print(f"Generated {len(contexts)} page contexts")
|