Files

320 lines
11 KiB
Python
Raw Permalink Normal View History

import base64
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import matplotlib.pyplot as plt
import pandas as pd
class ReportGenerator:
def __init__(self):
self.pnoe_df = None
self.patient_df = None
self.spirometry_df = None
self.seca_df = None
self.patient_info = {}
self.charts_dir = Path("graphs")
self.charts_dir.mkdir(exist_ok=True)
def load_data(
self,
pnoe_path: str,
patient_path: str,
spirometry_path: str,
seca_path: str = None,
):
"""Load all required datasets"""
self.pnoe_df = pd.read_csv(pnoe_path, delimiter=";")
self.patient_df = pd.read_csv(patient_path)
self.spirometry_df = pd.read_csv(spirometry_path)
if seca_path:
self.seca_df = pd.read_excel(seca_path)
# Apply preprocessing
self._preprocess_data()
def _preprocess_data(self):
"""Apply preprocessing steps from your notebook"""
# Convert to numeric
self.pnoe_df = self.pnoe_df.apply(pd.to_numeric, errors="ignore")
# Calculate derived columns
self.pnoe_df["VO2 Pulse"] = (
self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["HR(bpm)"]
)
self.pnoe_df["VO2 Breath"] = (
self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["BF(bpm)"]
)
self.pnoe_df["CHO"] = (
self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["CARBS(%)"] / 100
)
self.pnoe_df["FAT"] = (
self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["FAT(%)"] / 100
)
# Apply smoothing
window_size = 10
columns_to_smooth = [
"VO2(ml/min)",
"VCO2(ml/min)",
"HR(bpm)",
"VT(l)",
"BF(bpm)",
"VE(l/min)",
"VO2 Pulse",
"VO2 Breath",
"CHO",
"FAT",
]
for col in columns_to_smooth:
if col in self.pnoe_df.columns:
self.pnoe_df[f"{col}_smoothed"] = (
self.pnoe_df[col].rolling(window=window_size, min_periods=1).mean()
)
def extract_patient_info(self, last_name: str) -> Dict:
"""Extract patient information from datasets"""
if self.seca_df is not None:
patient_data = self.seca_df[
self.seca_df["LastName"].str.contains(last_name, case=False, na=False)
]
if not patient_data.empty:
row = patient_data.iloc[0]
self.patient_info = {
"name": f"{row.get('FirstName', '')} {last_name}",
"age": int(row.get("Age", 0)),
"height": f"{row.get('Height', '')}",
"weight": float(row.get("Weight", 0)),
"gender": row.get("Gender", "").lower(),
"fat_percentage": float(row.get("Adult_FMP", 0)),
}
return self.patient_info
def calculate_spirometry_metrics(self) -> Dict:
"""Calculate spirometry-related metrics"""
metrics = {}
# Extract key spirometry values
for param in ["FVC", "FEV1", "FEV1/FVC%"]:
row = self.spirometry_df.loc[self.spirometry_df["Parameters"] == param]
if not row.empty:
metrics[
f"{param.lower().replace('/', '_').replace('%', '_pct')}_best"
] = row["Best"].values[0]
metrics[
f"{param.lower().replace('/', '_').replace('%', '_pct')}_pred"
] = row["%Pred."].values[0]
return metrics
def calculate_pnoe_metrics(self) -> Dict:
"""Calculate all Pnoe-derived metrics"""
metrics = {}
# Basic metrics
metrics["vo2_max"] = self.pnoe_df["VO2(ml/min)_smoothed"].max()
metrics["vo2_max_per_kg"] = metrics["vo2_max"] / self.patient_info["weight"]
# Peak VT
peak_vt_idx = self.pnoe_df["VT(l)_smoothed"].idxmax()
peak_vt_row = self.pnoe_df.loc[peak_vt_idx]
metrics["peak_vt"] = peak_vt_row["VT(l)_smoothed"]
metrics["peak_vt_hr"] = peak_vt_row["HR(bpm)_smoothed"]
# Fat burning metrics
fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax()
fat_max_row = self.pnoe_df.loc[fat_max_idx]
metrics["fat_max_value"] = fat_max_row["FAT_smoothed"]
metrics["fat_max_hr"] = fat_max_row["HR(bpm)_smoothed"]
# Calculate zones (simplified from your logic)
metrics.update(self._calculate_hr_zones())
# VT1/VT2 detection
vt1, vt2 = self._detect_thresholds()
metrics["vt1"] = vt1
metrics["vt2"] = vt2
return metrics
def _detect_thresholds(self) -> Tuple[Optional[Dict], Optional[Dict]]:
"""Detect VT1 and VT2 thresholds"""
# VT1: First crossover where carbs > fat
condition = self.pnoe_df["CHO_smoothed"] > self.pnoe_df["FAT_smoothed"]
crossover_indices = condition[condition].index
vt1 = None
if len(crossover_indices) > 0:
vt1_idx = crossover_indices[0]
vt1_row = self.pnoe_df.loc[vt1_idx]
vt1 = {
"HeartRate": vt1_row["HR(bpm)_smoothed"],
"Speed": vt1_row["Speed"],
"Time": vt1_row["T(sec)"],
}
# VT2: Ventilation inflection (simplified)
ve_slope = self.pnoe_df["VE(l/min)_smoothed"].diff()
second_derivative = ve_slope.diff()
vt2_idx = second_derivative.idxmax()
vt2 = None
if pd.notna(vt2_idx):
vt2_row = self.pnoe_df.loc[vt2_idx]
vt2 = {
"HeartRate": vt2_row["HR(bpm)_smoothed"],
"Speed": vt2_row["Speed"],
"Time": vt2_row["T(sec)"],
}
return vt1, vt2
def _calculate_hr_zones(self) -> Dict:
"""Calculate heart rate zones"""
max_hr = 220 - self.patient_info["age"]
# Simplified zone calculation - you can make this more sophisticated
zones = {
"zone1_bpm": f"{int(max_hr * 0.55)}-{int(max_hr * 0.65)}bpm",
"zone2_bpm": f"{int(max_hr * 0.65)}-{int(max_hr * 0.75)}bpm",
"zone3_bpm": f"{int(max_hr * 0.75)}-{int(max_hr * 0.85)}bpm",
"zone4_bpm": f"{int(max_hr * 0.85)}-{int(max_hr * 0.95)}bpm",
"zone5_bpm": f"{int(max_hr * 0.95)}+bpm",
}
return zones
def generate_charts(self) -> Dict[str, str]:
"""Generate all charts and return base64 encoded versions"""
charts = {}
# Generate fuel utilization chart
charts["fuel_utilization_chart"] = self._create_fuel_chart()
# Generate VO2 pulse chart
charts["vo2_pulse_chart"] = self._create_vo2_pulse_chart()
# Generate body composition chart
charts["body_composition_chart"] = self._create_body_comp_chart()
# Add more chart generation methods...
return charts
def _create_fuel_chart(self) -> str:
"""Create and save fuel utilization chart"""
# Use your existing chart code but make it dynamic
speed_groups = self.pnoe_df.groupby("Speed").mean(numeric_only=True).round(1)
speed_groups = speed_groups.iloc[1:-1]
filtered_data = speed_groups[
(speed_groups.index >= 3.5) & (speed_groups.index <= 7.5)
]
plt.figure(figsize=(15, 8))
# ... your chart code here ...
chart_path = self.charts_dir / "fuel_utilization_chart.png"
plt.savefig(chart_path, dpi=300)
plt.close()
return self._image_to_base64(chart_path)
def _create_vo2_pulse_chart(self) -> str:
"""Create VO2 pulse chart"""
# Your VO2 pulse chart code here
chart_path = self.charts_dir / "vo2_pulse_chart.png"
# ... chart generation code ...
return self._image_to_base64(chart_path)
def _create_body_comp_chart(self) -> str:
"""Create body composition chart"""
# Your body composition chart code here
chart_path = self.charts_dir / "body_composition_chart.png"
# ... chart generation code ...
return self._image_to_base64(chart_path)
def _image_to_base64(self, image_path: Path) -> str:
"""Convert image to base64"""
try:
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
except FileNotFoundError:
return ""
def generate_all_contexts(self, last_name: str = "Moran") -> List[Dict]:
"""Main method to generate all page contexts"""
# Extract patient info
self.extract_patient_info(last_name)
# Calculate metrics
spirometry_metrics = self.calculate_spirometry_metrics()
pnoe_metrics = self.calculate_pnoe_metrics()
# Generate charts
charts = self.generate_charts()
# Build contexts for each page
contexts = []
# Page 1
contexts.append(
{
"name": self.patient_info["name"],
"surname": last_name,
"date": "July 29, 2025",
}
)
# Page 2-6 (add as needed)
for i in range(5):
contexts.append({})
# Page 7 - Spirometry
contexts.append(
{
"peak_vt": pnoe_metrics["peak_vt"],
"peak_vt_bpm": pnoe_metrics["peak_vt_hr"],
"fev1_percentage": (
pnoe_metrics["peak_vt"] / spirometry_metrics["fvc_best"]
)
* 100,
"lung_analysis_chart": charts.get("spirometry_chart", ""),
"respiratory_analysis_chart": charts.get("respiratory_chart", ""),
}
)
# Page 8 - VO2 Max and Zones
contexts.append(
{
"vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}",
"age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}",
**pnoe_metrics, # Include all zone calculations
}
)
# Continue for all pages...
# Add remaining pages as needed
return contexts
# Usage for backend service
def generate_report(
pnoe_file, patient_file, spirometry_file, seca_file=None, patient_name="Moran"
):
"""Main function for backend service"""
generator = ReportGenerator()
generator.load_data(pnoe_file, patient_file, spirometry_file, seca_file)
return generator.generate_all_contexts(patient_name)
# Example usage
if __name__ == "__main__":
contexts = generate_report(
"data/Pnoe_20250729_1550-Moran_Keirstyn.csv",
"data/patient_data.csv",
"data/spirometry_data.csv",
"data/SECA body comp for all patients.xlsx",
)
print(f"Generated {len(contexts)} page contexts")