""" Context Generator Service This service processes all data files and generates context dictionaries for each page of the medical report. It performs analysis on Pnoe, Spirometry, and SECA data. """ from datetime import datetime from typing import Any, Dict, Optional, Tuple import pandas as pd class ContextGenerator: """Generate context data for report pages""" def __init__(self): self.pnoe_df = None self.spirometry_df = None self.seca_df = None self.patient_info = {} def load_data( self, pnoe_path: str, spirometry_path: str, seca_path: Optional[str] = None, ): """Load all required datasets""" self.pnoe_df = pd.read_csv(pnoe_path, delimiter=";") self.spirometry_df = pd.read_csv(spirometry_path) if seca_path: self.seca_df = pd.read_excel(seca_path) else: self.seca_df = None self._preprocess_pnoe_data() def _preprocess_pnoe_data(self): """Apply preprocessing steps to Pnoe data""" # Convert numeric columns for col in self.pnoe_df.columns: try: self.pnoe_df[col] = pd.to_numeric(self.pnoe_df[col]) except (ValueError, TypeError): pass self.pnoe_df["VO2 Pulse"] = ( self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["HR(bpm)"] ) self.pnoe_df["VO2 Breath"] = ( self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["BF(bpm)"] ) self.pnoe_df["CHO"] = ( self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["CARBS(%)"] / 100 ) self.pnoe_df["FAT"] = ( self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["FAT(%)"] / 100 ) window_size = 10 columns_to_smooth = [ "VO2(ml/min)", "VCO2(ml/min)", "HR(bpm)", "VT(l)", "BF(bpm)", "VE(l/min)", "VO2 Pulse", "VO2 Breath", "CHO", "FAT", ] for col in columns_to_smooth: if col in self.pnoe_df.columns: self.pnoe_df[f"{col}_smoothed"] = ( self.pnoe_df[col].rolling(window=window_size, min_periods=1).mean() ) def extract_patient_info(self, patient_name: str) -> Dict: """Extract patient information from SECA dataset or use provided patient_info""" if self.seca_df is not None: patient_data = self.seca_df[ self.seca_df["LastName"].str.contains( patient_name, case=False, na=False ) ] if not patient_data.empty: row = patient_data.iloc[0] weight_kg = float(row.get("Weight", 0)) fat_pct = float(row.get("Adult_FMP", 0)) self.patient_info = { "name": f"{row.get('FirstName', '')} {row.get('LastName', '')}", "first_name": row.get("FirstName", ""), "last_name": row.get("LastName", ""), "age": int(row.get("Age", 0)), "height": f"{row.get('Height', '')}", "weight": weight_kg, "gender": row.get("Gender", "").lower(), "fat_percentage": fat_pct, "fat_mass_lbs": weight_kg * fat_pct / 100 * 2.20462, "lean_mass_lbs": weight_kg * (1 - fat_pct / 100) * 2.20462, } # If patient_info is already set (from manual input), calculate fat_mass and lean_mass elif "weight" in self.patient_info and "fat_percentage" in self.patient_info: weight_kg = self.patient_info["weight"] fat_pct = self.patient_info["fat_percentage"] self.patient_info["fat_mass_lbs"] = weight_kg * fat_pct / 100 * 2.20462 self.patient_info["lean_mass_lbs"] = ( weight_kg * (1 - fat_pct / 100) * 2.20462 ) return self.patient_info def calculate_spirometry_metrics( self, metric_overrides: Optional[Dict] = None ) -> Dict: """Calculate spirometry-related metrics""" if metric_overrides is None: metric_overrides = {} metrics = {} for param in ["FVC", "FEV1", "FEV1/FVC%"]: param_key = param.lower().replace("/", "_").replace("%", "_pct") if f"{param_key}_best" in metric_overrides: metrics[f"{param_key}_best"] = float( metric_overrides[f"{param_key}_best"] ) else: row = self.spirometry_df.loc[ self.spirometry_df["Parameters"].str.strip() == param ] if not row.empty: value = row["Best"].values[0] if pd.notna(value): try: metrics[f"{param_key}_best"] = float(value) except (ValueError, TypeError): pass # Skip if conversion fails if f"{param_key}_pred" in metric_overrides: metrics[f"{param_key}_pred"] = float( metric_overrides[f"{param_key}_pred"] ) else: row = self.spirometry_df.loc[ self.spirometry_df["Parameters"].str.strip() == param ] if not row.empty: value = row["%Pred."].values[0] if pd.notna(value): try: metrics[f"{param_key}_pred"] = float(value) except (ValueError, TypeError): pass # Skip if conversion fails return metrics def calculate_pnoe_metrics(self, metric_overrides: Optional[Dict] = None) -> Dict: """Calculate all Pnoe-derived metrics""" if metric_overrides is None: metric_overrides = {} metrics = {} # VO2 Max metrics if "vo2_max" in metric_overrides: metrics["vo2_max"] = float(metric_overrides["vo2_max"]) else: metrics["vo2_max"] = self.pnoe_df["VO2(ml/min)_smoothed"].max() if "vo2_max_per_kg" in metric_overrides: metrics["vo2_max_per_kg"] = float(metric_overrides["vo2_max_per_kg"]) else: metrics["vo2_max_per_kg"] = metrics["vo2_max"] / self.patient_info["weight"] # Peak VT metrics if "peak_vt" in metric_overrides: metrics["peak_vt"] = float(metric_overrides["peak_vt"]) # Need to get HR from override or calculate if "peak_vt_hr" in metric_overrides: metrics["peak_vt_hr"] = float(metric_overrides["peak_vt_hr"]) else: peak_vt_idx = self.pnoe_df["VT(l)_smoothed"].idxmax() peak_vt_row = self.pnoe_df.loc[peak_vt_idx] metrics["peak_vt_hr"] = peak_vt_row["HR(bpm)_smoothed"] else: peak_vt_idx = self.pnoe_df["VT(l)_smoothed"].idxmax() peak_vt_row = self.pnoe_df.loc[peak_vt_idx] metrics["peak_vt"] = peak_vt_row["VT(l)_smoothed"] metrics["peak_vt_hr"] = peak_vt_row["HR(bpm)_smoothed"] # Fat Max metrics if "fat_max_value" in metric_overrides: metrics["fat_max_value"] = float(metric_overrides["fat_max_value"]) if "fat_max_hr" in metric_overrides: metrics["fat_max_hr"] = float(metric_overrides["fat_max_hr"]) else: fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax() fat_max_row = self.pnoe_df.loc[fat_max_idx] metrics["fat_max_hr"] = fat_max_row["HR(bpm)_smoothed"] else: fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax() fat_max_row = self.pnoe_df.loc[fat_max_idx] metrics["fat_max_value"] = fat_max_row["FAT_smoothed"] metrics["fat_max_hr"] = fat_max_row["HR(bpm)_smoothed"] # VT1 and VT2 thresholds if "vt1" in metric_overrides: metrics["vt1"] = metric_overrides["vt1"] else: vt1, _ = self._detect_thresholds() metrics["vt1"] = vt1 if "vt2" in metric_overrides: metrics["vt2"] = metric_overrides["vt2"] else: _, vt2 = self._detect_thresholds() metrics["vt2"] = vt2 # Heart rate zones if any(f"zone{i}_bpm" in metric_overrides for i in range(1, 6)): for i in range(1, 6): zone_key = f"zone{i}_bpm" if zone_key in metric_overrides: metrics[zone_key] = metric_overrides[zone_key] else: fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax() fat_max_row = self.pnoe_df.loc[fat_max_idx] zones = self._calculate_hr_zones( metrics["vt1"], metrics["vt2"], fat_max_row ) metrics.update(zones) return metrics def _detect_thresholds(self) -> Tuple[Optional[Dict], Optional[Dict]]: """Detect VT1 and VT2 thresholds""" condition = self.pnoe_df["CHO_smoothed"] > self.pnoe_df["FAT_smoothed"] crossover_indices = condition[condition].index vt1 = None if len(crossover_indices) > 0: vt1_idx = crossover_indices[0] vt1_row = self.pnoe_df.loc[vt1_idx] vt1 = { "HeartRate": vt1_row["HR(bpm)_smoothed"], "Speed": vt1_row["Speed"], "Time": vt1_row["T(sec)"], } ve_slope = self.pnoe_df["VE(l/min)_smoothed"].diff() second_derivative = ve_slope.diff() vt2_idx = second_derivative.idxmax() vt2 = None if pd.notna(vt2_idx): vt2_row = self.pnoe_df.loc[vt2_idx] vt2 = { "HeartRate": vt2_row["HR(bpm)_smoothed"], "Speed": vt2_row["Speed"], "Time": vt2_row["T(sec)"], } return vt1, vt2 def _calculate_hr_zones( self, vt1: Optional[Dict], vt2: Optional[Dict], fat_max_row: pd.Series ) -> Dict: """Calculate heart rate zones based on thresholds""" zones = {} if vt1 and vt2: zone_1_start = fat_max_row["HR(bpm)_smoothed"] - 15 zone_2_start = fat_max_row["HR(bpm)_smoothed"] zone_3_start = vt1["HeartRate"] zone_4_start = vt2["HeartRate"] - 10 zone_5_start = vt2["HeartRate"] + 10 zones["zone1_bpm"] = f"{int(zone_1_start)}-{int(zone_2_start)}bpm" zones["zone2_bpm"] = f"{int(zone_2_start)}-{int(vt1['HeartRate'])}bpm" zones["zone3_bpm"] = f"{int(zone_3_start)}-{int(zone_4_start)}bpm" zones["zone4_bpm"] = f"{int(zone_4_start)}-{int(zone_5_start)}bpm" zones["zone5_bpm"] = f"{int(zone_5_start)}+bpm" else: max_hr = 220 - self.patient_info["age"] zones["zone1_bpm"] = f"{int(max_hr * 0.55)}-{int(max_hr * 0.65)}bpm" zones["zone2_bpm"] = f"{int(max_hr * 0.65)}-{int(max_hr * 0.75)}bpm" zones["zone3_bpm"] = f"{int(max_hr * 0.75)}-{int(max_hr * 0.85)}bpm" zones["zone4_bpm"] = f"{int(max_hr * 0.85)}-{int(max_hr * 0.95)}bpm" zones["zone5_bpm"] = f"{int(max_hr * 0.95)}+bpm" return zones def _calculate_vo2_drop_points(self, pnoe_metrics: Dict) -> Dict: """Calculate VO2 Pulse and VO2 Breath drop points""" # Calculate slope of VO2 Pulse vo2_pulse_slope = self.pnoe_df["VO2 Pulse_smoothed"].diff() window = max(1, len(self.pnoe_df) // 3) # Ensure window is at least 1 vo2_pulse_slope_smoothed = vo2_pulse_slope.rolling( window=window, min_periods=1 ).mean() # Find where VO2 Pulse begins to drop (slope becomes negative) mask_pulse = vo2_pulse_slope_smoothed <= 0 drop_indices_pulse = mask_pulse[mask_pulse].index vo2_pulse_drop_bpm = None vo2_pulse_drop_zone = None if len(drop_indices_pulse) > 0: drop_idx = drop_indices_pulse[0] drop_row = self.pnoe_df.loc[drop_idx] vo2_pulse_drop_bpm = int(drop_row["HR(bpm)_smoothed"]) # Determine zone based on HR zones if pnoe_metrics.get("zone1_bpm") and vo2_pulse_drop_bpm: zones = [pnoe_metrics.get(f"zone{i}_bpm", "") for i in range(1, 6)] for i, zone_str in enumerate(zones, 1): if zone_str: zone_clean = zone_str.replace("bpm", "").strip() if "-" in zone_clean: parts = zone_clean.split("-") if len(parts) == 2: try: start, end = ( int(parts[0]), int(parts[1].replace("+", "")), ) if start <= vo2_pulse_drop_bpm <= end: vo2_pulse_drop_zone = f"Zone {i}" break except ValueError: pass elif "+" in zone_clean: # Zone 5 format: "180+bpm" try: start = int(zone_clean.replace("+", "")) if vo2_pulse_drop_bpm >= start: vo2_pulse_drop_zone = f"Zone {i}" break except ValueError: pass # Calculate slope of VO2 Breath vo2_breath_slope = self.pnoe_df["VO2 Breath_smoothed"].diff() vo2_breath_slope_smoothed = vo2_breath_slope.rolling( window=window, min_periods=1 ).mean() # Find where VO2 Breath begins to drop mask_breath = vo2_breath_slope_smoothed <= 0 drop_indices_breath = mask_breath[mask_breath].index vo2_breath_drop_bpm = None vo2_breath_drop_zone = None if len(drop_indices_breath) > 0: drop_idx = drop_indices_breath[0] drop_row = self.pnoe_df.loc[drop_idx] vo2_breath_drop_bpm = int(drop_row["HR(bpm)_smoothed"]) # Determine zone if pnoe_metrics.get("zone1_bpm") and vo2_breath_drop_bpm: zones = [pnoe_metrics.get(f"zone{i}_bpm", "") for i in range(1, 6)] for i, zone_str in enumerate(zones, 1): if zone_str: zone_clean = zone_str.replace("bpm", "").strip() if "-" in zone_clean: parts = zone_clean.split("-") if len(parts) == 2: try: start, end = ( int(parts[0]), int(parts[1].replace("+", "")), ) if start <= vo2_breath_drop_bpm <= end: vo2_breath_drop_zone = f"Zone {i}" break except ValueError: pass elif "+" in zone_clean: # Zone 5 format: "180+bpm" try: start = int(zone_clean.replace("+", "")) if vo2_breath_drop_bpm >= start: vo2_breath_drop_zone = f"Zone {i}" break except ValueError: pass return { "vo2_pulse_drop_bpm": vo2_pulse_drop_bpm or 180, "vo2_pulse_drop_zone": vo2_pulse_drop_zone or "Zone 4", "vo2_breath_drop_bpm": vo2_breath_drop_bpm or 173, "vo2_breath_drop_zone": vo2_breath_drop_zone or "Zone 3", } def _calculate_fat_metabolism_metrics(self, pnoe_metrics: Dict) -> Dict: """Calculate fat metabolism metrics for page 11""" fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax() fat_max_row = self.pnoe_df.loc[fat_max_idx] fat_max_value = pnoe_metrics.get("fat_max_value", 0) fat_max_hr = pnoe_metrics.get("fat_max_hr", 0) max_hr = 220 - self.patient_info["age"] fat_max_heart_rate_pct = (fat_max_hr / max_hr * 100) if max_hr > 0 else 0 # Find carbs and fat crossover point crossover_idx = None for idx in self.pnoe_df.index: if ( self.pnoe_df.loc[idx, "CHO_smoothed"] > self.pnoe_df.loc[idx, "FAT_smoothed"] ): crossover_idx = idx break crossover_bpm = None crossover_heart_rate_pct = None if crossover_idx is not None: crossover_row = self.pnoe_df.loc[crossover_idx] crossover_bpm = int(crossover_row["HR(bpm)_smoothed"]) crossover_heart_rate_pct = ( (crossover_bpm / max_hr * 100) if max_hr > 0 else 0 ) # Get speed and incline at fat max fat_max_speed = fat_max_row.get("Speed", 0) fat_max_incline = ( fat_max_row.get("Incline", 2.0) if "Incline" in fat_max_row else 2.0 ) return { "fat_max_value": f"{fat_max_value:.2f}Kcals/min", "fat_max_heart_rate": f"{fat_max_heart_rate_pct:.0f}% of Max Heart Rate", "fat_max_bpm": f"{int(fat_max_hr)} bpm", "fat_max_optimal": "*Optimal 10-12Kcals/minute", "crossover_bpm": f"{crossover_bpm or 100}bpm", "crossover_heart_rate": f"{crossover_heart_rate_pct or 51:.0f}% of Max Heart Rate", "fat_metabolism_note": f"{crossover_bpm or 100}bpm at a speed of {fat_max_speed:.1f}mph and incline of {fat_max_incline:.0f}%", } def _calculate_recovery_metrics(self) -> Dict: """Calculate recovery metrics for page 11""" # Find peak exercise point (max HR) peak_idx = self.pnoe_df["HR(bpm)_smoothed"].idxmax() peak_hr = self.pnoe_df.loc[peak_idx, "HR(bpm)_smoothed"] peak_time = self.pnoe_df.loc[peak_idx, "T(sec)"] # Find recovery phase (after peak) recovery_df = self.pnoe_df[self.pnoe_df["T(sec)"] > peak_time].copy() if len(recovery_df) == 0: return { "cardiac_recovery_time": "(1 minute)", "cardiac_recovery_percentage": "33%", "metabolic_recovery_time": "(2 minute)", "metabolic_recovery_percentage": "65%", "breath_recovery_time": "(2.5 minute)", "breath_recovery_percentage": "76%", } # Cardiac recovery (1 minute) one_min_time = peak_time + 60 one_min_row = recovery_df[recovery_df["T(sec)"] <= one_min_time] if len(one_min_row) > 0: one_min_hr = one_min_row.iloc[-1]["HR(bpm)_smoothed"] cardiac_recovery_pct = ( ((peak_hr - one_min_hr) / peak_hr * 100) if peak_hr > 0 else 0 ) else: cardiac_recovery_pct = 33 # Metabolic recovery (2 minutes) - using VCO2 two_min_time = peak_time + 120 peak_vco2 = self.pnoe_df.loc[peak_idx, "VCO2(ml/min)_smoothed"] two_min_row = recovery_df[recovery_df["T(sec)"] <= two_min_time] if len(two_min_row) > 0: two_min_vco2 = two_min_row.iloc[-1]["VCO2(ml/min)_smoothed"] metabolic_recovery_pct = ( ((peak_vco2 - two_min_vco2) / peak_vco2 * 100) if peak_vco2 > 0 else 0 ) else: metabolic_recovery_pct = 65 # Breath frequency recovery (2.5 minutes) two_five_min_time = peak_time + 150 peak_bf = self.pnoe_df.loc[peak_idx, "BF(bpm)_smoothed"] two_five_min_row = recovery_df[recovery_df["T(sec)"] <= two_five_min_time] if len(two_five_min_row) > 0: two_five_min_bf = two_five_min_row.iloc[-1]["BF(bpm)_smoothed"] breath_recovery_pct = ( ((peak_bf - two_five_min_bf) / peak_bf * 100) if peak_bf > 0 else 0 ) else: breath_recovery_pct = 76 return { "cardiac_recovery_time": "(1 minute)", "cardiac_recovery_percentage": f"{int(cardiac_recovery_pct)}%", "metabolic_recovery_time": "(2 minute)", "metabolic_recovery_percentage": f"{int(metabolic_recovery_pct)}%", "breath_recovery_time": "(2.5 minute)", "breath_recovery_percentage": f"{int(breath_recovery_pct)}%", } def _calculate_resting_heart_rate_metrics(self) -> Dict: """Calculate resting heart rate metrics for page 11""" # Get resting HR from beginning of test rest_phase = self.pnoe_df.head(30) # First 30 seconds resting_hr = rest_phase["HR(bpm)_smoothed"].mean() age = self.patient_info.get("age", 30) gender = self.patient_info.get("gender", "female").lower() # Determine age range if 26 <= age <= 35: age_range = "26-35" elif 36 <= age <= 45: age_range = "36-45" elif 46 <= age <= 55: age_range = "46-55" else: age_range = "26-35" # Default # HR ranges based on gender and age (simplified) if gender == "female": hr_ranges = { "poor": "82bpm +", "below_avg": "75-81bpm", "average": "71-74bpm", "above_avg": "66-70bpm", "good": "62-65bpm", "excellent": "55-61bpm", "athlete": "44-54bpm", } else: # male hr_ranges = { "poor": "82bpm +", "below_avg": "75-81bpm", "average": "71-74bpm", "above_avg": "66-70bpm", "good": "62-65bpm", "excellent": "55-61bpm", "athlete": "44-54bpm", } return { "resting_heart_rate": f"{int(resting_hr)}bpm", "hr_age_range": age_range, "hr_poor": hr_ranges["poor"], "hr_below_avg": hr_ranges["below_avg"], "hr_average": hr_ranges["average"], "hr_above_avg": hr_ranges["above_avg"], "hr_good": hr_ranges["good"], "hr_excellent": hr_ranges["excellent"], "hr_athlete": hr_ranges["athlete"], } def _calculate_rhr_table_data(self, age: int, gender: str) -> dict: """ Calculate Resting Heart Rate reference table data. Args: age: Patient age gender: Patient gender Returns: Dictionary containing age_range and ranges """ # Determine age range if 18 <= age <= 25: age_range = "18-25" elif 26 <= age <= 35: age_range = "26-35" elif 36 <= age <= 45: age_range = "36-45" elif 46 <= age <= 55: age_range = "46-55" elif 56 <= age <= 65: age_range = "56-65" elif age > 65: age_range = "65+" else: age_range = "18-25" # default for under 18 # RHR Master Chart rhr_chart = { "male": { "18-25": { "Poor": (85, None), "Below Average": (79, 85), "Average": (74, 79), "Above Average": (70, 74), "Good": (66, 70), "Excellent": (61, 66), "Athlete": (40, 61), }, "26-35": { "Poor": (83, None), "Below Average": (77, 83), "Average": (73, 77), "Above Average": (69, 73), "Good": (65, 69), "Excellent": (60, 65), "Athlete": (42, 60), }, "36-45": { "Poor": (85, None), "Below Average": (79, 85), "Average": (74, 79), "Above Average": (70, 74), "Good": (65, 70), "Excellent": (60, 65), "Athlete": (45, 60), }, "46-55": { "Poor": (84, None), "Below Average": (78, 84), "Average": (74, 78), "Above Average": (70, 74), "Good": (66, 70), "Excellent": (61, 66), "Athlete": (48, 61), }, "56-65": { "Poor": (84, None), "Below Average": (78, 84), "Average": (74, 78), "Above Average": (70, 74), "Good": (65, 70), "Excellent": (60, 65), "Athlete": (50, 60), }, "65+": { "Poor": (84, None), "Below Average": (77, 84), "Average": (73, 77), "Above Average": (70, 73), "Good": (65, 70), "Excellent": (60, 65), "Athlete": (52, 60), }, }, "female": { "18-25": { "Poor": (82, None), "Below Average": (74, 82), "Average": (70, 74), "Above Average": (66, 70), "Good": (62, 66), "Excellent": (56, 62), "Athlete": (40, 56), }, "26-35": { "Poor": (82, None), "Below Average": (75, 82), "Average": (71, 75), "Above Average": (66, 71), "Good": (62, 66), "Excellent": (55, 62), "Athlete": (44, 55), }, "36-45": { "Poor": (83, None), "Below Average": (76, 83), "Average": (71, 76), "Above Average": (67, 71), "Good": (63, 67), "Excellent": (57, 63), "Athlete": (47, 57), }, "46-55": { "Poor": (84, None), "Below Average": (77, 84), "Average": (72, 77), "Above Average": (68, 72), "Good": (64, 68), "Excellent": (58, 64), "Athlete": (49, 58), }, "56-65": { "Poor": (82, None), "Below Average": (76, 82), "Average": (72, 76), "Above Average": (68, 72), "Good": (62, 68), "Excellent": (57, 62), "Athlete": (51, 57), }, "65+": { "Poor": (80, None), "Below Average": (74, 80), "Average": (70, 74), "Above Average": (66, 70), "Good": (62, 66), "Excellent": (56, 62), "Athlete": (52, 56), }, }, } gender_key = "male" if gender.lower().startswith("m") else "female" ranges = rhr_chart[gender_key][age_range] # Format ranges formatted_ranges = {} for category, (min_val, max_val) in ranges.items(): if max_val is None: formatted_ranges[category] = f"{min_val}bpm +" else: formatted_ranges[category] = f"{min_val}-{max_val}bpm" return { "age_range": age_range, "ranges": formatted_ranges, "raw_ranges": ranges, # Keep raw ranges for category determination } def _determine_rhr_category(self, rhr: float, age: int, gender: str) -> str: """Determine resting heart rate category based on value, age, and gender (matching notebook logic)""" rhr_table_info = self._calculate_rhr_table_data(age, gender) ranges = rhr_table_info["raw_ranges"] # Check Poor category first (open-ended at top) min_val, max_val = ranges["Poor"] if max_val is None and rhr >= min_val: return "Poor" # Check other categories from Below Average down to Athlete # For RHR, lower is better, so we check from highest to lowest for category in [ "Below Average", "Average", "Above Average", "Good", "Excellent", "Athlete", ]: min_val, max_val = ranges[category] # Check if value falls in this range (inclusive of min, exclusive of max) if min_val <= rhr < max_val: return category # If value is below all ranges (below Athlete minimum), return Athlete # This handles the case where rhr < min of Athlete return "Athlete" def _calculate_zone_metrics(self, pnoe_metrics: Dict) -> Dict: """Calculate detailed metrics for each heart rate zone based on actual data""" import math # Get zone boundaries fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax() optimal_row = self.pnoe_df.loc[fat_max_idx] # Detect VT1 and VT2 vt1 = pnoe_metrics.get("vt1") vt2 = pnoe_metrics.get("vt2") if not vt1 or not vt2: # Return default values if thresholds not detected return {} # Define zone boundaries (from notebook logic) zone_1_start = math.floor(optimal_row["HR(bpm)_smoothed"] - 15) zone_2_start = math.floor(optimal_row["HR(bpm)_smoothed"]) zone_3_start = math.floor(vt1["HeartRate"]) zone_4_start = math.floor(vt2["HeartRate"] - 10) zone_5_start = math.floor(vt2["HeartRate"]) zone_1_end = zone_2_start zone_2_end = math.floor(vt1["HeartRate"]) zone_3_end = zone_4_start zone_4_end = zone_5_start zone_5_end = math.floor(vt2["HeartRate"] + 10) zones_list = [ ("Zone 1", zone_1_start, zone_1_end), ("Zone 2", zone_2_start, zone_2_end), ("Zone 3", zone_3_start, zone_3_end), ("Zone 4", zone_4_start, zone_4_end), ("Zone 5", zone_5_start, zone_5_end), ] ideal_breath_ranges = [ "Ideal Range: 15-20 breaths", "Ideal Range: 20-25 breaths", "Ideal Range: 25-30 breaths", "Ideal Range: 30-35 breaths", "Ideal Range: 40+ breaths", ] def speed_to_pace(s_mph): """Convert speed in mph to pace in min/km""" if s_mph <= 0: return 0, 0 s_kmh = s_mph * 1.60934 p_min = 60 / s_kmh p_m = int(p_min) p_s = int((p_min % 1) * 60) return p_m, p_s zone_data = [] for i, (name, start, end) in enumerate(zones_list): # Filter dataframe for the current zone mask = (self.pnoe_df["HR(bpm)_smoothed"] >= start) & ( self.pnoe_df["HR(bpm)_smoothed"] <= end ) zone_df = self.pnoe_df[mask] # HR BPM Range - match notebook exactly hr_bpm_str = f"{int(start)}-{int(end)} bpm" if not zone_df.empty: # Speed calculation - match notebook exactly speed_series = zone_df[zone_df["Speed"] > 0.1]["Speed"] if not speed_series.empty: min_speed = speed_series.min() max_speed = speed_series.max() if abs(min_speed - max_speed) < 0.1: speed_str = f"{min_speed:.1f} mph\n2% Incline" else: speed_str = f"{min_speed:.1f}-{max_speed:.1f} mph\n2% Incline" # Pace calculation (max speed -> min pace, min speed -> max pace) min_pace_m, min_pace_s = speed_to_pace(max_speed) max_pace_m, max_pace_s = speed_to_pace(min_speed) if min_pace_m == max_pace_m and min_pace_s == max_pace_s: pace_str = f"{min_pace_m}:{min_pace_s:02d} min/km Pace" else: pace_str = f"{max_pace_m}:{max_pace_s:02d}-{min_pace_m}:{min_pace_s:02d}\nmin/km Pace" else: speed_str = "-\n2% Incline" pace_str = "-" # Calories (using raw EE) - match notebook exactly avg_cals = zone_df["EE(kcal/min)"].mean() calories_str = f"Avg:\n{avg_cals:.1f} kcals/minute" # Carb utilization (g/min) - match notebook exactly avg_carbs_g = zone_df["CHO"].mean() / 4 carb_str = f"Avg: {avg_carbs_g:.1f}g/min\nCarb Utilization" # Breathing - match notebook exactly avg_breaths = zone_df["BF(bpm)_smoothed"].mean() breath_str = ( f"Avg: {int(avg_breaths)} breaths\n{ideal_breath_ranges[i]}" ) else: speed_str = "-\n2% Incline" pace_str = "-" calories_str = "-" carb_str = "-" breath_str = f"-\n{ideal_breath_ranges[i]}" zone_data.append( { "zone_name": name, "hr_bpm": hr_bpm_str, "speed": speed_str, "pace": pace_str, "calories": calories_str, "carb": carb_str, "breathing": breath_str, } ) return {"zones": zone_data} def _calculate_vo2_max_table_data(self, age: int, gender: str) -> Dict: """Calculate VO2 Max table data based on age and gender""" # VO2 Max Master Chart Data (from notebook - matching exact values) vo2_max_data = { "20-29 (M)": { "Very Poor": (29.0, 38.1), "Poor": (38.1, 44.9), "Fair": (44.9, 50.2), "Good": (50.2, 61.8), "Excellent": (57.1, 66.3), "Superior": (66.3, None), }, "30-39 (M)": { "Very Poor": (27.2, 34.1), "Poor": (34.1, 39.6), "Fair": (39.6, 45.2), "Good": (45.2, 51.6), "Excellent": (51.6, 59.8), "Superior": (59.8, None), }, "40-49 (M)": { "Very Poor": (24.2, 30.5), "Poor": (30.5, 35.7), "Fair": (35.7, 40.3), "Good": (40.3, 46.7), "Excellent": (46.7, 55.6), "Superior": (55.6, None), }, "50-59 (M)": { "Very Poor": (20.9, 26.1), "Poor": (26.1, 30.7), "Fair": (30.7, 35.1), "Good": (35.1, 41.2), "Excellent": (41.2, 50.7), "Superior": (50.7, None), }, "60-69 (M)": { "Very Poor": (17.4, 22.4), "Poor": (22.4, 26.6), "Fair": (26.6, 30.5), "Good": (30.5, 36.1), "Excellent": (36.1, 43.0), "Superior": (43.0, None), }, "20-29 (F)": { "Very Poor": (21.7, 28.6), "Poor": (28.6, 34.6), "Fair": (34.6, 40.6), "Good": (40.6, 46.5), "Excellent": (46.5, 56.0), "Superior": (56.0, None), }, "30-39 (F)": { "Very Poor": (19.0, 24.1), "Poor": (24.1, 28.2), "Fair": (28.2, 32.2), "Good": (32.2, 35.7), "Excellent": (35.7, 45.8), "Superior": (45.8, None), }, "40-49 (F)": { "Very Poor": (17.0, 21.3), "Poor": (21.3, 24.9), "Fair": (24.9, 28.7), "Good": (28.7, 34.0), "Excellent": (34.0, 41.7), "Superior": (41.7, None), }, "50-59 (F)": { "Very Poor": (16.0, 19.1), "Poor": (19.1, 24.4), "Fair": (21.8, 27.6), "Good": (25.2, 28.6), "Excellent": (28.6, 35.9), "Superior": (35.9, None), }, "60-69 (F)": { "Very Poor": (13.4, 16.5), "Poor": (16.5, 18.9), "Fair": (18.9, 21.2), "Good": (21.2, 24.6), "Excellent": (24.6, 29.4), "Superior": (29.4, None), }, } # Determine age bracket (matching notebook logic) if 20 <= age <= 29: age_key = "20-29" elif 30 <= age <= 39: age_key = "30-39" elif 40 <= age <= 49: age_key = "40-49" elif 50 <= age <= 59: age_key = "50-59" elif 60 <= age <= 69: age_key = "60-69" else: # Default to closest range if age < 20: age_key = "20-29" elif age >= 70: age_key = "60-69" else: age_key = "30-39" # fallback gender_key = "(M)" if gender.lower() == "male" else "(F)" key = f"{age_key} {gender_key}" ranges = vo2_max_data.get(key, vo2_max_data["30-39 (F)"]) # Default # Format the ranges for display result = {} for category, (min_val, max_val) in ranges.items(): if min_val is None: result[category] = f"<{max_val:.1f}" elif max_val is None: result[category] = f"{min_val:.1f}+" else: result[category] = f"{min_val:.1f}-{max_val:.1f}" return { "age_range": age_key, "ranges": result, "raw_ranges": ranges, # Keep raw ranges for category determination } def _determine_vo2_max_category(self, vo2_max: float, age: int, gender: str) -> str: """Determine VO2 max category based on value, age, and gender (matching notebook logic)""" vo2_max_table_info = self._calculate_vo2_max_table_data(age, gender) ranges = vo2_max_table_info["raw_ranges"] categories = ["Very Poor", "Poor", "Fair", "Good", "Excellent", "Superior"] # Check Superior category first (open-ended) min_val, max_val = ranges["Superior"] if max_val is None and vo2_max >= min_val: return "Superior" # Check other categories from Excellent down to Very Poor # Ranges are typically [min, max) - inclusive of min, exclusive of max for category in reversed( categories[:-1] ): # Exclude Superior as we already checked it min_val, max_val = ranges[category] # Check if value falls in this range (inclusive of min, exclusive of max) if min_val <= vo2_max < max_val: return category # If value is below all ranges, return Very Poor # This handles the case where vo2_max < min of Very Poor return "Very Poor" def calculate_rmr_and_fuel_source(self) -> Dict: """Calculate RMR and fuel source from pnoe data""" metrics = {} # Calculate RMR from resting phase (MET <= 1.1) if "MET" in self.pnoe_df.columns and "EE(kcal/day)" in self.pnoe_df.columns: rest_phase = self.pnoe_df[self.pnoe_df["MET"] <= 1.1] if not rest_phase.empty: rmr = rest_phase["EE(kcal/day)"].mean() metrics["rmr_kcal"] = float(rmr) else: # Fallback: use minimum EE(kcal/min) * 1440 (minutes per day) if "EE(kcal/min)" in self.pnoe_df.columns: min_ee = self.pnoe_df["EE(kcal/min)"].min() metrics["rmr_kcal"] = float(min_ee * 1440) else: metrics["rmr_kcal"] = 1500.0 # Default fallback else: # Fallback: estimate from weight (simplified) weight_kg = self.patient_info.get("weight", 70) gender = self.patient_info.get("gender", "female").lower() # Simplified RMR estimation: 22 kcal/kg/day for men, 20 for women if gender == "male": rmr = weight_kg * 22 else: rmr = weight_kg * 20 metrics["rmr_kcal"] = float(rmr) # Calculate fuel source from resting phase (RER == 0.9 or closest) if "RER" in self.pnoe_df.columns and "FAT(%)" in self.pnoe_df.columns: # Find rest phase with RER closest to 0.9 rest_phase = ( self.pnoe_df[self.pnoe_df["RER"] == 0.9].copy() if "RER" in self.pnoe_df.columns else self.pnoe_df.copy() ) if not rest_phase.empty: # Find row with RER closest to 0.9 if "RER" in rest_phase.columns: rest_phase["RER_diff"] = abs(rest_phase["RER"] - 0.9) closest_idx = rest_phase["RER_diff"].idxmin() fat_pct = rest_phase.loc[closest_idx, "FAT(%)"] metrics["rest_fat_percentage"] = float(fat_pct) else: # Use mean FAT(%) from rest phase metrics["rest_fat_percentage"] = float(rest_phase["FAT(%)"].mean()) else: # Fallback: use overall mean metrics["rest_fat_percentage"] = float(self.pnoe_df["FAT(%)"].mean()) else: # Fallback: use a default value metrics["rest_fat_percentage"] = 75.0 # Calculate caloric values for page 5 rmr = metrics["rmr_kcal"] neat = rmr * 0.25 # NEAT is typically 20-30% of RMR weight_loss_rate = 1.0 # 1 lb per week weight_loss_calories = 500.0 # 500 kcal deficit per day for 1 lb/week total_calories = rmr + neat - weight_loss_calories metrics["resting_calories"] = int(rmr) metrics["neat_calories"] = int(neat) metrics["weight_loss_calories"] = int(weight_loss_calories) metrics["weight_loss_rate"] = weight_loss_rate metrics["total_calories"] = int(total_calories) return metrics def generate_all_contexts( self, patient_name: str, graphs: Dict[str, str], metric_overrides: Optional[Dict] = None, graph_generator: Optional[Any] = None, ) -> Dict[str, Dict]: """Main method to generate all page contexts Returns: Dictionary with keys 'page_1', 'page_2', etc., each containing context data for that page """ if metric_overrides is None: metric_overrides = {} self.extract_patient_info(patient_name) # Extract metric overrides for spirometry and pnoe spirometry_overrides = metric_overrides.get("spirometry", {}) pnoe_overrides = metric_overrides.get("pnoe", {}) spirometry_metrics = self.calculate_spirometry_metrics(spirometry_overrides) pnoe_metrics = self.calculate_pnoe_metrics(pnoe_overrides) rmr_metrics = self.calculate_rmr_and_fuel_source() contexts = {} # Page 1 contexts["page_1"] = { "name": self.patient_info["name"], "surname": self.patient_info["last_name"], "date": datetime.now().strftime("%B %d, %Y"), } # Page 2 contexts["page_2"] = { "patient_name": self.patient_info["name"], "test_date": datetime.now().strftime("%B %d, %Y"), } # Pages 3, 6 (pages 4 and 5 are handled separately) for i in [0, 3]: # Skip indices 1 and 2 which are pages 4 and 5 contexts[f"page_{i + 3}"] = { "patient_name": self.patient_info["name"], "page_number": i + 3, } # Page 4 - Nutrition Guidelines with Body Composition contexts["page_4"] = { "patient_name": self.patient_info["name"], "page_number": 4, "fat_percentage": f"{self.patient_info['fat_percentage']:.1f}", "body_composition_chart": graphs.get("body_composition", ""), "body_fat_chart": graphs.get("body_fat_percent", ""), # Alias for template "body_fat_percent_chart": graphs.get( "body_fat_percent", "" ), # Keep for consistency } # Page 5 - Resting Metabolic Rate Assessment contexts["page_5"] = { "patient_name": self.patient_info["name"], "page_number": 5, "metabolism_chart": graphs.get("metabolism_chart", ""), "fuel_source_chart": graphs.get("fuel_source_chart", ""), "resting_calories": rmr_metrics.get("resting_calories", 1500), "neat_calories": rmr_metrics.get("neat_calories", 375), "weight_loss_calories": rmr_metrics.get("weight_loss_calories", 500), "weight_loss_rate": rmr_metrics.get("weight_loss_rate", 1.0), "total_calories": rmr_metrics.get("total_calories", 1375), } # Calculate FEV1 percentage for page 7 fev1_percentage = 0 if spirometry_metrics.get("fvc_best"): fev1_percentage = ( pnoe_metrics["peak_vt"] / spirometry_metrics["fvc_best"] ) * 100 # Page 7 contexts["page_7"] = { "peak_vt": f"{pnoe_metrics['peak_vt']:.2f}", "peak_vt_bpm": f"{int(pnoe_metrics['peak_vt_hr'])}", "fev1_percentage": f"{fev1_percentage:.1f}", "lung_analysis_chart": graphs.get("spirometry_chart", ""), "respiratory_analysis_chart": graphs.get("respiratory", ""), } # Page 8 contexts["page_8"] = { "vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}", "age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}", "zone1_bpm": pnoe_metrics.get("zone1_bpm", ""), "zone2_bpm": pnoe_metrics.get("zone2_bpm", ""), "zone3_bpm": pnoe_metrics.get("zone3_bpm", ""), "zone4_bpm": pnoe_metrics.get("zone4_bpm", ""), "zone5_bpm": pnoe_metrics.get("zone5_bpm", ""), "vo2_pulse_chart": graphs.get("vo2_pulse", ""), } if graph_generator: # Calculate VO2 Max table data vo2_max_table_info = self._calculate_vo2_max_table_data( self.patient_info["age"], self.patient_info["gender"] ) # Determine patient's VO2 max category vo2_max_value = pnoe_metrics.get("vo2_max_per_kg", 0.0) category = self._determine_vo2_max_category( vo2_max_value, self.patient_info["age"], self.patient_info["gender"], ) # VO2 Max Table gender_label = ( "F" if self.patient_info["gender"].lower() == "female" else "M" ) age_range_label = f"{vo2_max_table_info['age_range']} ({gender_label})" vo2_max_columns = [ "Age", "Very Poor", "Poor", "Fair", "Good", "Excellent", "Superior", ] vo2_max_data = [ [ age_range_label, vo2_max_table_info["ranges"]["Very Poor"], vo2_max_table_info["ranges"]["Poor"], vo2_max_table_info["ranges"]["Fair"], vo2_max_table_info["ranges"]["Good"], vo2_max_table_info["ranges"]["Excellent"], vo2_max_table_info["ranges"]["Superior"], ] ] contexts["page_8"]["vo2_max_table"] = ( graph_generator.generate_vo2_max_table( data=vo2_max_data, columns=vo2_max_columns, vo2_max_value=vo2_max_value, category=category, save_as_base64=True, ) ) # Calculate zone metrics for the table zone_metrics = self._calculate_zone_metrics(pnoe_metrics) if zone_metrics.get("zones"): # Heart Rate Zones Table hr_zones_columns = ["Zone 1", "Zone 2", "Zone 3", "Zone 4", "Zone 5"] hr_zones_data = [ [ "Improves health and\nrecovery capacity", "Improves endurance\nand fat burning", "Improves Aerobic\nfitness", "Improves maximum\nperformance capacity", "Develops maximum\nperformance and speed", ], [ "55-65% of Max Heart Rate", "65-75% of Max Heart Rate", "80-85% of Max Heart Rate", "85-88% of Max Heart Rate", "90%+ of Max Heart Rate", ], [zone_metrics["zones"][i]["hr_bpm"] for i in range(5)], [zone_metrics["zones"][i]["speed"] for i in range(5)], [zone_metrics["zones"][i]["pace"] for i in range(5)], [zone_metrics["zones"][i]["calories"] for i in range(5)], [zone_metrics["zones"][i]["carb"] for i in range(5)], [zone_metrics["zones"][i]["breathing"] for i in range(5)], ] # Colors are now handled directly in the graph generator to match notebook # No need to pass cell_colors contexts["page_8"]["hr_zones_table"] = ( graph_generator.generate_heart_rate_zones_table( data=hr_zones_data, columns=hr_zones_columns, save_as_base64=True, ) ) # Page 9 contexts["page_9"] = { "fat_max_value": f"{pnoe_metrics['fat_max_value']:.2f}", "fat_max_hr": f"{int(pnoe_metrics['fat_max_hr'])}", "fuel_utilization_chart": graphs.get("fuel_utilization", ""), "fat_metabolism_chart": graphs.get("fat_metabolism", ""), } # Page 10 - VO2 Pulse and VO2 Breath vo2_drop_metrics = self._calculate_vo2_drop_points(pnoe_metrics) contexts["page_10"] = { "vo2_pulse_chart": graphs.get("vo2_pulse", ""), "vo2_breath_chart": graphs.get("vo2_breath", ""), "vo2_pulse_drop_bpm": f"{vo2_drop_metrics['vo2_pulse_drop_bpm']} bpm", "vo2_pulse_drop_zone": vo2_drop_metrics["vo2_pulse_drop_zone"], "vo2_breath_drop_bpm": f"{vo2_drop_metrics['vo2_breath_drop_bpm']} bpm", "vo2_breath_drop_zone": vo2_drop_metrics["vo2_breath_drop_zone"], } # Page 11 - Fat Metabolism and Recovery fat_metabolism_metrics = self._calculate_fat_metabolism_metrics(pnoe_metrics) recovery_metrics = self._calculate_recovery_metrics() resting_hr_metrics = self._calculate_resting_heart_rate_metrics() contexts["page_11"] = { "fat_metabolism_chart": graphs.get("fat_metabolism", ""), "recovery_chart": graphs.get("recovery", ""), **fat_metabolism_metrics, **recovery_metrics, **resting_hr_metrics, } if graph_generator: # Page 11 Resting Heart Rate Table rhr_table_info = self._calculate_rhr_table_data( self.patient_info["age"], self.patient_info["gender"] ) # Get resting heart rate value and determine category # Extract numeric value from "53bpm" format (resting_hr_metrics already calculated above) rhr_value_str = resting_hr_metrics.get("resting_heart_rate", "0bpm") rhr_value = float(rhr_value_str.replace("bpm", "").strip()) category = self._determine_rhr_category( rhr_value, self.patient_info["age"], self.patient_info["gender"], ) gender_label = ( "F" if self.patient_info["gender"].lower().startswith("f") else "M" ) age_range_label = f"{rhr_table_info['age_range']} ({gender_label})" rhr_columns = [ "Age", "Poor", "Below Average", "Average", "Above Average", "Good", "Excellent", "Athlete", ] rhr_data = [ [ age_range_label, rhr_table_info["ranges"]["Poor"], rhr_table_info["ranges"]["Below Average"], rhr_table_info["ranges"]["Average"], rhr_table_info["ranges"]["Above Average"], rhr_table_info["ranges"]["Good"], rhr_table_info["ranges"]["Excellent"], rhr_table_info["ranges"]["Athlete"], ] ] contexts["page_11"]["rhr_table"] = ( graph_generator.generate_resting_heart_rate_table( data=rhr_data, columns=rhr_columns, rhr_value=rhr_value, category=category, save_as_base64=True, ) ) # Pages 12-17 for i in range(6): contexts[f"page_{i + 12}"] = { "patient_name": self.patient_info["name"], "page_number": i + 12, } # Page 18 - Glossary with Body Fat Percentage Master Chart contexts["page_18"] = { "patient_name": self.patient_info["name"], "page_number": 18, "body_fat_percentage_chart": graphs.get( "body_fat_percentage_master_chart", "" ), } # Page 19 contexts["page_19"] = { "patient_name": self.patient_info["name"], "page_number": 19, } return contexts