import base64 from pathlib import Path from typing import Dict, List, Optional, Tuple import matplotlib.pyplot as plt import pandas as pd class ReportGenerator: def __init__(self): self.pnoe_df = None self.patient_df = None self.spirometry_df = None self.seca_df = None self.patient_info = {} self.charts_dir = Path("graphs") self.charts_dir.mkdir(exist_ok=True) def load_data( self, pnoe_path: str, patient_path: str, spirometry_path: str, seca_path: str = None, ): """Load all required datasets""" self.pnoe_df = pd.read_csv(pnoe_path, delimiter=";") self.patient_df = pd.read_csv(patient_path) self.spirometry_df = pd.read_csv(spirometry_path) if seca_path: self.seca_df = pd.read_excel(seca_path) # Apply preprocessing self._preprocess_data() def _preprocess_data(self): """Apply preprocessing steps from your notebook""" # Convert to numeric self.pnoe_df = self.pnoe_df.apply(pd.to_numeric, errors="ignore") # Calculate derived columns self.pnoe_df["VO2 Pulse"] = ( self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["HR(bpm)"] ) self.pnoe_df["VO2 Breath"] = ( self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["BF(bpm)"] ) self.pnoe_df["CHO"] = ( self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["CARBS(%)"] / 100 ) self.pnoe_df["FAT"] = ( self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["FAT(%)"] / 100 ) # Apply smoothing window_size = 10 columns_to_smooth = [ "VO2(ml/min)", "VCO2(ml/min)", "HR(bpm)", "VT(l)", "BF(bpm)", "VE(l/min)", "VO2 Pulse", "VO2 Breath", "CHO", "FAT", ] for col in columns_to_smooth: if col in self.pnoe_df.columns: self.pnoe_df[f"{col}_smoothed"] = ( self.pnoe_df[col].rolling(window=window_size, min_periods=1).mean() ) def extract_patient_info(self, last_name: str) -> Dict: """Extract patient information from datasets""" if self.seca_df is not None: patient_data = self.seca_df[ self.seca_df["LastName"].str.contains(last_name, case=False, na=False) ] if not patient_data.empty: row = patient_data.iloc[0] self.patient_info = { "name": f"{row.get('FirstName', '')} {last_name}", "age": int(row.get("Age", 0)), "height": f"{row.get('Height', '')}", "weight": float(row.get("Weight", 0)), "gender": row.get("Gender", "").lower(), "fat_percentage": float(row.get("Adult_FMP", 0)), } return self.patient_info def calculate_spirometry_metrics(self) -> Dict: """Calculate spirometry-related metrics""" metrics = {} # Extract key spirometry values for param in ["FVC", "FEV1", "FEV1/FVC%"]: row = self.spirometry_df.loc[self.spirometry_df["Parameters"] == param] if not row.empty: metrics[ f"{param.lower().replace('/', '_').replace('%', '_pct')}_best" ] = row["Best"].values[0] metrics[ f"{param.lower().replace('/', '_').replace('%', '_pct')}_pred" ] = row["%Pred."].values[0] return metrics def calculate_pnoe_metrics(self) -> Dict: """Calculate all Pnoe-derived metrics""" metrics = {} # Basic metrics metrics["vo2_max"] = self.pnoe_df["VO2(ml/min)_smoothed"].max() metrics["vo2_max_per_kg"] = metrics["vo2_max"] / self.patient_info["weight"] # Peak VT peak_vt_idx = self.pnoe_df["VT(l)_smoothed"].idxmax() peak_vt_row = self.pnoe_df.loc[peak_vt_idx] metrics["peak_vt"] = peak_vt_row["VT(l)_smoothed"] metrics["peak_vt_hr"] = peak_vt_row["HR(bpm)_smoothed"] # Fat burning metrics fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax() fat_max_row = self.pnoe_df.loc[fat_max_idx] metrics["fat_max_value"] = fat_max_row["FAT_smoothed"] metrics["fat_max_hr"] = fat_max_row["HR(bpm)_smoothed"] # Calculate zones (simplified from your logic) metrics.update(self._calculate_hr_zones()) # VT1/VT2 detection vt1, vt2 = self._detect_thresholds() metrics["vt1"] = vt1 metrics["vt2"] = vt2 return metrics def _detect_thresholds(self) -> Tuple[Optional[Dict], Optional[Dict]]: """Detect VT1 and VT2 thresholds""" # VT1: First crossover where carbs > fat condition = self.pnoe_df["CHO_smoothed"] > self.pnoe_df["FAT_smoothed"] crossover_indices = condition[condition].index vt1 = None if len(crossover_indices) > 0: vt1_idx = crossover_indices[0] vt1_row = self.pnoe_df.loc[vt1_idx] vt1 = { "HeartRate": vt1_row["HR(bpm)_smoothed"], "Speed": vt1_row["Speed"], "Time": vt1_row["T(sec)"], } # VT2: Ventilation inflection (simplified) ve_slope = self.pnoe_df["VE(l/min)_smoothed"].diff() second_derivative = ve_slope.diff() vt2_idx = second_derivative.idxmax() vt2 = None if pd.notna(vt2_idx): vt2_row = self.pnoe_df.loc[vt2_idx] vt2 = { "HeartRate": vt2_row["HR(bpm)_smoothed"], "Speed": vt2_row["Speed"], "Time": vt2_row["T(sec)"], } return vt1, vt2 def _calculate_hr_zones(self) -> Dict: """Calculate heart rate zones""" max_hr = 220 - self.patient_info["age"] # Simplified zone calculation - you can make this more sophisticated zones = { "zone1_bpm": f"{int(max_hr * 0.55)}-{int(max_hr * 0.65)}bpm", "zone2_bpm": f"{int(max_hr * 0.65)}-{int(max_hr * 0.75)}bpm", "zone3_bpm": f"{int(max_hr * 0.75)}-{int(max_hr * 0.85)}bpm", "zone4_bpm": f"{int(max_hr * 0.85)}-{int(max_hr * 0.95)}bpm", "zone5_bpm": f"{int(max_hr * 0.95)}+bpm", } return zones def generate_charts(self) -> Dict[str, str]: """Generate all charts and return base64 encoded versions""" charts = {} # Generate fuel utilization chart charts["fuel_utilization_chart"] = self._create_fuel_chart() # Generate VO2 pulse chart charts["vo2_pulse_chart"] = self._create_vo2_pulse_chart() # Generate body composition chart charts["body_composition_chart"] = self._create_body_comp_chart() # Add more chart generation methods... return charts def _create_fuel_chart(self) -> str: """Create and save fuel utilization chart""" # Use your existing chart code but make it dynamic speed_groups = self.pnoe_df.groupby("Speed").mean(numeric_only=True).round(1) speed_groups = speed_groups.iloc[1:-1] filtered_data = speed_groups[ (speed_groups.index >= 3.5) & (speed_groups.index <= 7.5) ] plt.figure(figsize=(15, 8)) # ... your chart code here ... chart_path = self.charts_dir / "fuel_utilization_chart.png" plt.savefig(chart_path, dpi=300) plt.close() return self._image_to_base64(chart_path) def _create_vo2_pulse_chart(self) -> str: """Create VO2 pulse chart""" # Your VO2 pulse chart code here chart_path = self.charts_dir / "vo2_pulse_chart.png" # ... chart generation code ... return self._image_to_base64(chart_path) def _create_body_comp_chart(self) -> str: """Create body composition chart""" # Your body composition chart code here chart_path = self.charts_dir / "body_composition_chart.png" # ... chart generation code ... return self._image_to_base64(chart_path) def _image_to_base64(self, image_path: Path) -> str: """Convert image to base64""" try: with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode("utf-8") except FileNotFoundError: return "" def generate_all_contexts(self, last_name: str = "Moran") -> List[Dict]: """Main method to generate all page contexts""" # Extract patient info self.extract_patient_info(last_name) # Calculate metrics spirometry_metrics = self.calculate_spirometry_metrics() pnoe_metrics = self.calculate_pnoe_metrics() # Generate charts charts = self.generate_charts() # Build contexts for each page contexts = [] # Page 1 contexts.append( { "name": self.patient_info["name"], "surname": last_name, "date": "July 29, 2025", } ) # Page 2-6 (add as needed) for i in range(5): contexts.append({}) # Page 7 - Spirometry contexts.append( { "peak_vt": pnoe_metrics["peak_vt"], "peak_vt_bpm": pnoe_metrics["peak_vt_hr"], "fev1_percentage": ( pnoe_metrics["peak_vt"] / spirometry_metrics["fvc_best"] ) * 100, "lung_analysis_chart": charts.get("spirometry_chart", ""), "respiratory_analysis_chart": charts.get("respiratory_chart", ""), } ) # Page 8 - VO2 Max and Zones contexts.append( { "vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}", "age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}", **pnoe_metrics, # Include all zone calculations } ) # Continue for all pages... # Add remaining pages as needed return contexts # Usage for backend service def generate_report( pnoe_file, patient_file, spirometry_file, seca_file=None, patient_name="Moran" ): """Main function for backend service""" generator = ReportGenerator() generator.load_data(pnoe_file, patient_file, spirometry_file, seca_file) return generator.generate_all_contexts(patient_name) # Example usage if __name__ == "__main__": contexts = generate_report( "data/Pnoe_20250729_1550-Moran_Keirstyn.csv", "data/patient_data.csv", "data/spirometry_data.csv", "data/SECA body comp for all patients.xlsx", ) print(f"Generated {len(contexts)} page contexts")