diff --git a/context_generator.py b/context_generator.py index b46bc41..5f80f1a 100644 --- a/context_generator.py +++ b/context_generator.py @@ -1,6 +1,319 @@ +import base64 +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +import matplotlib.pyplot as plt import pandas as pd -pnoe_df = pd.read_csv('data/pnoe_data.csv') -patient_df = pd.read_csv('data/patient_data.csv') -spirometry_df = pd.read_csv('data/spirometry_data.csv') +class ReportGenerator: + def __init__(self): + self.pnoe_df = None + self.patient_df = None + self.spirometry_df = None + self.seca_df = None + self.patient_info = {} + self.charts_dir = Path("graphs") + self.charts_dir.mkdir(exist_ok=True) + + def load_data( + self, + pnoe_path: str, + patient_path: str, + spirometry_path: str, + seca_path: str = None, + ): + """Load all required datasets""" + self.pnoe_df = pd.read_csv(pnoe_path, delimiter=";") + self.patient_df = pd.read_csv(patient_path) + self.spirometry_df = pd.read_csv(spirometry_path) + if seca_path: + self.seca_df = pd.read_excel(seca_path) + + # Apply preprocessing + self._preprocess_data() + + def _preprocess_data(self): + """Apply preprocessing steps from your notebook""" + # Convert to numeric + self.pnoe_df = self.pnoe_df.apply(pd.to_numeric, errors="ignore") + + # Calculate derived columns + self.pnoe_df["VO2 Pulse"] = ( + self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["HR(bpm)"] + ) + self.pnoe_df["VO2 Breath"] = ( + self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["BF(bpm)"] + ) + self.pnoe_df["CHO"] = ( + self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["CARBS(%)"] / 100 + ) + self.pnoe_df["FAT"] = ( + self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["FAT(%)"] / 100 + ) + + # Apply smoothing + window_size = 10 + columns_to_smooth = [ + "VO2(ml/min)", + "VCO2(ml/min)", + "HR(bpm)", + "VT(l)", + "BF(bpm)", + "VE(l/min)", + "VO2 Pulse", + "VO2 Breath", + "CHO", + "FAT", + ] + + for col in columns_to_smooth: + if col in self.pnoe_df.columns: + self.pnoe_df[f"{col}_smoothed"] = ( + self.pnoe_df[col].rolling(window=window_size, min_periods=1).mean() + ) + + def extract_patient_info(self, last_name: str) -> Dict: + """Extract patient information from datasets""" + if self.seca_df is not None: + patient_data = self.seca_df[ + self.seca_df["LastName"].str.contains(last_name, case=False, na=False) + ] + if not patient_data.empty: + row = patient_data.iloc[0] + self.patient_info = { + "name": f"{row.get('FirstName', '')} {last_name}", + "age": int(row.get("Age", 0)), + "height": f"{row.get('Height', '')}", + "weight": float(row.get("Weight", 0)), + "gender": row.get("Gender", "").lower(), + "fat_percentage": float(row.get("Adult_FMP", 0)), + } + return self.patient_info + + def calculate_spirometry_metrics(self) -> Dict: + """Calculate spirometry-related metrics""" + metrics = {} + + # Extract key spirometry values + for param in ["FVC", "FEV1", "FEV1/FVC%"]: + row = self.spirometry_df.loc[self.spirometry_df["Parameters"] == param] + if not row.empty: + metrics[ + f"{param.lower().replace('/', '_').replace('%', '_pct')}_best" + ] = row["Best"].values[0] + metrics[ + f"{param.lower().replace('/', '_').replace('%', '_pct')}_pred" + ] = row["%Pred."].values[0] + + return metrics + + def calculate_pnoe_metrics(self) -> Dict: + """Calculate all Pnoe-derived metrics""" + metrics = {} + + # Basic metrics + metrics["vo2_max"] = self.pnoe_df["VO2(ml/min)_smoothed"].max() + metrics["vo2_max_per_kg"] = metrics["vo2_max"] / self.patient_info["weight"] + + # Peak VT + peak_vt_idx = self.pnoe_df["VT(l)_smoothed"].idxmax() + peak_vt_row = self.pnoe_df.loc[peak_vt_idx] + metrics["peak_vt"] = peak_vt_row["VT(l)_smoothed"] + metrics["peak_vt_hr"] = peak_vt_row["HR(bpm)_smoothed"] + + # Fat burning metrics + fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax() + fat_max_row = self.pnoe_df.loc[fat_max_idx] + metrics["fat_max_value"] = fat_max_row["FAT_smoothed"] + metrics["fat_max_hr"] = fat_max_row["HR(bpm)_smoothed"] + + # Calculate zones (simplified from your logic) + metrics.update(self._calculate_hr_zones()) + + # VT1/VT2 detection + vt1, vt2 = self._detect_thresholds() + metrics["vt1"] = vt1 + metrics["vt2"] = vt2 + + return metrics + + def _detect_thresholds(self) -> Tuple[Optional[Dict], Optional[Dict]]: + """Detect VT1 and VT2 thresholds""" + # VT1: First crossover where carbs > fat + condition = self.pnoe_df["CHO_smoothed"] > self.pnoe_df["FAT_smoothed"] + crossover_indices = condition[condition].index + + vt1 = None + if len(crossover_indices) > 0: + vt1_idx = crossover_indices[0] + vt1_row = self.pnoe_df.loc[vt1_idx] + vt1 = { + "HeartRate": vt1_row["HR(bpm)_smoothed"], + "Speed": vt1_row["Speed"], + "Time": vt1_row["T(sec)"], + } + + # VT2: Ventilation inflection (simplified) + ve_slope = self.pnoe_df["VE(l/min)_smoothed"].diff() + second_derivative = ve_slope.diff() + vt2_idx = second_derivative.idxmax() + + vt2 = None + if pd.notna(vt2_idx): + vt2_row = self.pnoe_df.loc[vt2_idx] + vt2 = { + "HeartRate": vt2_row["HR(bpm)_smoothed"], + "Speed": vt2_row["Speed"], + "Time": vt2_row["T(sec)"], + } + + return vt1, vt2 + + def _calculate_hr_zones(self) -> Dict: + """Calculate heart rate zones""" + max_hr = 220 - self.patient_info["age"] + + # Simplified zone calculation - you can make this more sophisticated + zones = { + "zone1_bpm": f"{int(max_hr * 0.55)}-{int(max_hr * 0.65)}bpm", + "zone2_bpm": f"{int(max_hr * 0.65)}-{int(max_hr * 0.75)}bpm", + "zone3_bpm": f"{int(max_hr * 0.75)}-{int(max_hr * 0.85)}bpm", + "zone4_bpm": f"{int(max_hr * 0.85)}-{int(max_hr * 0.95)}bpm", + "zone5_bpm": f"{int(max_hr * 0.95)}+bpm", + } + return zones + + def generate_charts(self) -> Dict[str, str]: + """Generate all charts and return base64 encoded versions""" + charts = {} + + # Generate fuel utilization chart + charts["fuel_utilization_chart"] = self._create_fuel_chart() + + # Generate VO2 pulse chart + charts["vo2_pulse_chart"] = self._create_vo2_pulse_chart() + + # Generate body composition chart + charts["body_composition_chart"] = self._create_body_comp_chart() + + # Add more chart generation methods... + + return charts + + def _create_fuel_chart(self) -> str: + """Create and save fuel utilization chart""" + # Use your existing chart code but make it dynamic + speed_groups = self.pnoe_df.groupby("Speed").mean(numeric_only=True).round(1) + speed_groups = speed_groups.iloc[1:-1] + filtered_data = speed_groups[ + (speed_groups.index >= 3.5) & (speed_groups.index <= 7.5) + ] + + plt.figure(figsize=(15, 8)) + # ... your chart code here ... + + chart_path = self.charts_dir / "fuel_utilization_chart.png" + plt.savefig(chart_path, dpi=300) + plt.close() + + return self._image_to_base64(chart_path) + + def _create_vo2_pulse_chart(self) -> str: + """Create VO2 pulse chart""" + # Your VO2 pulse chart code here + chart_path = self.charts_dir / "vo2_pulse_chart.png" + # ... chart generation code ... + return self._image_to_base64(chart_path) + + def _create_body_comp_chart(self) -> str: + """Create body composition chart""" + # Your body composition chart code here + chart_path = self.charts_dir / "body_composition_chart.png" + # ... chart generation code ... + return self._image_to_base64(chart_path) + + def _image_to_base64(self, image_path: Path) -> str: + """Convert image to base64""" + try: + with open(image_path, "rb") as image_file: + return base64.b64encode(image_file.read()).decode("utf-8") + except FileNotFoundError: + return "" + + def generate_all_contexts(self, last_name: str = "Moran") -> List[Dict]: + """Main method to generate all page contexts""" + # Extract patient info + self.extract_patient_info(last_name) + + # Calculate metrics + spirometry_metrics = self.calculate_spirometry_metrics() + pnoe_metrics = self.calculate_pnoe_metrics() + + # Generate charts + charts = self.generate_charts() + + # Build contexts for each page + contexts = [] + + # Page 1 + contexts.append( + { + "name": self.patient_info["name"], + "surname": last_name, + "date": "July 29, 2025", + } + ) + + # Page 2-6 (add as needed) + for i in range(5): + contexts.append({}) + + # Page 7 - Spirometry + contexts.append( + { + "peak_vt": pnoe_metrics["peak_vt"], + "peak_vt_bpm": pnoe_metrics["peak_vt_hr"], + "fev1_percentage": ( + pnoe_metrics["peak_vt"] / spirometry_metrics["fvc_best"] + ) + * 100, + "lung_analysis_chart": charts.get("spirometry_chart", ""), + "respiratory_analysis_chart": charts.get("respiratory_chart", ""), + } + ) + + # Page 8 - VO2 Max and Zones + contexts.append( + { + "vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}", + "age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}", + **pnoe_metrics, # Include all zone calculations + } + ) + + # Continue for all pages... + # Add remaining pages as needed + + return contexts + + +# Usage for backend service +def generate_report( + pnoe_file, patient_file, spirometry_file, seca_file=None, patient_name="Moran" +): + """Main function for backend service""" + generator = ReportGenerator() + generator.load_data(pnoe_file, patient_file, spirometry_file, seca_file) + return generator.generate_all_contexts(patient_name) + + +# Example usage +if __name__ == "__main__": + contexts = generate_report( + "data/Pnoe_20250729_1550-Moran_Keirstyn.csv", + "data/patient_data.csv", + "data/spirometry_data.csv", + "data/SECA body comp for all patients.xlsx", + ) + print(f"Generated {len(contexts)} page contexts") diff --git a/graph_generator.py b/graph_generator.py new file mode 100644 index 0000000..3158a06 --- /dev/null +++ b/graph_generator.py @@ -0,0 +1,942 @@ +import base64 +from pathlib import Path +from typing import Dict + +import matplotlib.pyplot as plt +import matplotlib.transforms as mtransforms +import numpy as np +import pandas as pd +import seaborn as sns +from matplotlib.patches import FancyBboxPatch + + +class GraphGenerator: + def __init__(self, charts_dir: str = "graphs"): + """Initialize the GraphGenerator with output directory for charts""" + self.charts_dir = Path(charts_dir) + self.charts_dir.mkdir(exist_ok=True) + + def _image_to_base64(self, image_path: Path) -> str: + """Convert image to base64 string""" + try: + with open(image_path, "rb") as image_file: + return base64.b64encode(image_file.read()).decode("utf-8") + except FileNotFoundError: + return "" + + def generate_respiratory_chart( + self, df: pd.DataFrame, save_as_base64: bool = False + ) -> str: + """Generate respiratory chart showing VT and Speed over time""" + # Get phase times for background regions + first_unique_phase = df.drop_duplicates(subset="PHASE") + phase_times = first_unique_phase["T(sec)"].tolist() + + plt.figure(figsize=(18, 5)) + ax1 = plt.subplot() + + # Plot VT with step-like appearance + sns.lineplot(data=df, x="T(sec)", y="VT(l)_smoothed", label="VT (L)") + ax1.set_xlabel("Time (sec)") + ax1.set_ylabel("VT (L)") + ax1.grid(True, alpha=0.1) + ax1.set_ylim(0, min(8, df["VT(l)_smoothed"].max())) + + # Plot speed as step function on secondary y-axis + ax2 = ax1.twinx() + ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200)) + line2 = sns.lineplot( + data=df, + x="T(sec)", + y="Speed", + color="green", + ax=ax2, + drawstyle="steps-post", + linewidth=2, + label="Speed", + ) + ax2.set_ylabel("Speed") + ax2.set_ylim(0, min(30, df["Speed"].max()) + 1) + + # Remove default legends first + ax1.get_legend().remove() + ax2.get_legend().remove() + + # Combine legends from both axes in the top left + lines1, labels1 = ax1.get_legend_handles_labels() + lines2, labels2 = ax2.get_legend_handles_labels() + ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left") + + # Add colored background regions + if len(phase_times) >= 4: + ax1.axvspan(0, phase_times[1], alpha=0.2, color="lightblue") + ax1.axvspan(phase_times[1], phase_times[2], alpha=0.2, color="purple") + ax1.axvspan(phase_times[2], phase_times[3], alpha=0.2, color="lightgreen") + ax1.axvspan(phase_times[3], df["T(sec)"].max(), alpha=0.2, color="blue") + + chart_path = self.charts_dir / "respiratory.png" + plt.savefig(chart_path, dpi=300, bbox_inches="tight") + plt.close() + + return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) + + def generate_fuel_utilization_chart( + self, df: pd.DataFrame, save_as_base64: bool = False + ) -> str: + """Generate fuel utilization chart with stacked bars showing fat vs carbs""" + # Group by speed and calculate mean for numeric columns only + speed_groups = df.groupby("Speed").mean(numeric_only=True).round(1) + speed_groups = speed_groups.iloc[1:-1] + filtered_data = speed_groups[ + (speed_groups.index >= 3.5) & (speed_groups.index <= 7.5) + ] + + plt.figure(figsize=(15, 8)) + plt.style.use("default") + + # Create stage labels and positions + stage_labels = [f"Stage {i}" for i in range(1, len(filtered_data) + 1)] + x_positions = np.arange(len(filtered_data)) + + # Calculate fat and carbs energy expenditure from percentages + fat_ee = filtered_data["EE(kcal/min)"] * filtered_data["FAT(%)"] / 100 + carbs_ee = filtered_data["EE(kcal/min)"] * filtered_data["CARBS(%)"] / 100 + + # Create the main axis for the stacked bars + ax1 = plt.gca() + + # Create stacked bar chart with colors + ax1.bar(x_positions, fat_ee, color="#1f77b4", alpha=0.8, width=0.6, label="Fat") + ax1.bar( + x_positions, + carbs_ee, + bottom=fat_ee, + color="#ff7f0e", + alpha=0.8, + width=0.6, + label="Carbs", + ) + + # Set labels and formatting for primary axis + ax1.set_xlabel("", fontsize=12) + ax1.set_ylabel("Fuel (kcal/min)", fontsize=12) + ax1.set_ylim(0, 20) + + # Add individual values on each bar segment + for i, (fat_val, carb_val, total_val) in enumerate( + zip(fat_ee, carbs_ee, filtered_data["EE(kcal/min)"]) + ): + if fat_val > 0.3: # Fat value + ax1.text( + i, + fat_val / 2, + f"{fat_val:.1f}", + ha="center", + va="center", + fontsize=9, + fontweight="bold", + color="white", + ) + if carb_val > 0.3: # Carbs value + ax1.text( + i, + fat_val + carb_val / 2, + f"{carb_val:.1f}", + ha="center", + va="center", + fontsize=9, + fontweight="bold", + color="white", + ) + # Total EE + ax1.text( + i, + total_val + 0.5, + f"{total_val:.1f} kcal", + ha="center", + va="bottom", + fontsize=10, + fontweight="bold", + color="black", + ) + + # Add speed labels below x-axis + for i, speed in enumerate(filtered_data.index): + ax1.text(i, -1.5, f"{speed:.1f} mph", ha="center", va="top", fontsize=9) + ax1.text( + i, + -2.8, + f"{speed * 1.609:.1f} min/km", + ha="center", + va="top", + fontsize=8, + color="gray", + ) + + # Create secondary y-axis for heart rate + ax2 = ax1.twinx() + + # Plot heart rate line + ax2.plot( + x_positions, + filtered_data["HR(bpm)"], + marker="o", + linewidth=3, + markersize=8, + color="red", + label="Heart Rate", + ) + + # Set heart rate axis formatting + ax2.set_ylabel("Heart Rate (bpm)", fontsize=12, color="red") + ax2.tick_params(axis="y", labelcolor="red") + ax2.set_ylim(0, 220) + + # Add HR values above the points + for i, hr in enumerate(filtered_data["HR(bpm)"]): + ax2.text( + i, + hr + 10, + f"{int(hr)}bpm", + ha="center", + va="bottom", + fontsize=10, + fontweight="bold", + color="red", + ) + + # Set x-axis formatting + ax1.set_xticks(x_positions) + ax1.set_xticklabels(stage_labels, fontsize=11) + + # Create legend + lines1, labels1 = ax1.get_legend_handles_labels() + lines2, labels2 = ax2.get_legend_handles_labels() + ax1.legend( + lines1 + lines2, + labels1 + labels2, + loc="upper left", + frameon=True, + fancybox=True, + shadow=True, + ) + + # Add grid + ax1.grid(True, alpha=0.3, linestyle="-", linewidth=0.5) + ax1.set_axisbelow(True) + + # Adjust layout + plt.tight_layout() + plt.subplots_adjust(bottom=0.1, top=0.9) + + chart_path = self.charts_dir / "fuel_utilization_chart.png" + plt.savefig(chart_path, dpi=300) + plt.close() + + return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) + + def generate_vo2_pulse_chart( + self, df: pd.DataFrame, save_as_base64: bool = False + ) -> str: + """Generate VO2 Pulse chart with heart rate and speed""" + first_unique_phase = df.drop_duplicates(subset="PHASE") + phase_times = first_unique_phase["T(sec)"].tolist() + + plt.figure(figsize=(18, 5)) + ax1 = plt.subplot() + + # Plot VO2 Pulse + sns.lineplot( + data=df, + x="T(sec)", + y="VO2 Pulse_smoothed", + label="VO2 Pulse (mL/beat)", + color="blue", + ) + ax1.set_xlabel("Time (sec)") + ax1.set_ylabel("VO2 Pulse (mL/beat)") + ax1.set_ylim(0, df["VO2 Pulse_smoothed"].max()) + ax1.grid(True, alpha=0.1) + + # Create second y-axis for heart rate + ax2 = ax1.twinx() + sns.lineplot( + data=df, + x="T(sec)", + y="HR(bpm)_smoothed", + color="red", + ax=ax2, + linewidth=2, + label="Heart Rate (bpm)", + ) + ax2.set_ylabel("Heart Rate (bpm)", color="red") + ax2.tick_params(axis="y", labelcolor="red") + ax2.set_ylim(0, df["HR(bpm)_smoothed"].max() + 1) + + # Create third y-axis for speed + ax3 = ax1.twinx() + ax3.spines["right"].set_position(("outward", 60)) + sns.lineplot( + data=df, + x="T(sec)", + y="Speed", + color="green", + ax=ax3, + drawstyle="steps-post", + linewidth=2, + label="Speed", + ) + ax3.set_ylabel("Speed", color="green") + ax3.tick_params(axis="y", labelcolor="green") + ax3.set_ylim(0, df["Speed"].max() + 1) + + ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200)) + + # Remove default legends first + for ax in [ax1, ax2, ax3]: + if ax.get_legend(): + ax.get_legend().remove() + + # Combine legends from all axes + lines1, labels1 = ax1.get_legend_handles_labels() + lines2, labels2 = ax2.get_legend_handles_labels() + lines3, labels3 = ax3.get_legend_handles_labels() + ax1.legend( + lines1 + lines2 + lines3, labels1 + labels2 + labels3, loc="upper left" + ) + + # Add colored background regions + if len(phase_times) >= 4: + ax1.axvspan(0, phase_times[1], alpha=0.2, color="lightblue") + ax1.axvspan(phase_times[1], phase_times[2], alpha=0.2, color="purple") + ax1.axvspan(phase_times[2], phase_times[3], alpha=0.2, color="lightgreen") + ax1.axvspan(phase_times[3], df["T(sec)"].max(), alpha=0.2, color="blue") + + chart_path = self.charts_dir / "vo2_pulse_chart.png" + plt.savefig(chart_path, bbox_inches="tight", dpi=300) + plt.close() + + return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) + + def generate_vo2_breath_chart( + self, df: pd.DataFrame, save_as_base64: bool = False + ) -> str: + """Generate VO2 per Breath chart""" + first_unique_phase = df.drop_duplicates(subset="PHASE") + phase_times = first_unique_phase["T(sec)"].tolist() + + plt.figure(figsize=(18, 5)) + ax1 = plt.subplot() + + # Plot VO2 per Breath + sns.lineplot( + data=df, + x="T(sec)", + y="VO2 Breath_smoothed", + label="VO2 per Breath (mL/breath)", + ) + ax1.set_xlabel("Time (sec)") + ax1.set_ylabel("VO2 per Breath (mL/breath)") + ax1.set_ylim(0, df["VO2 Breath_smoothed"].max() + 1) + ax1.grid(True, alpha=0.1) + + # Plot speed as step function on secondary y-axis + ax2 = ax1.twinx() + ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200)) + sns.lineplot( + data=df, + x="T(sec)", + y="Speed", + color="green", + ax=ax2, + drawstyle="steps-post", + linewidth=2, + label="Speed", + ) + ax2.set_ylim(0, df["Speed"].max() + 1) + ax2.set_ylabel("Speed") + + # Remove default legends first + ax1.get_legend().remove() + ax2.get_legend().remove() + + # Combine legends from both axes in the top left + lines1, labels1 = ax1.get_legend_handles_labels() + lines2, labels2 = ax2.get_legend_handles_labels() + ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left") + + # Add colored background regions + if len(phase_times) >= 4: + ax1.axvspan(0, phase_times[1], alpha=0.2, color="lightblue") + ax1.axvspan(phase_times[1], phase_times[2], alpha=0.2, color="purple") + ax1.axvspan(phase_times[2], phase_times[3], alpha=0.2, color="lightgreen") + ax1.axvspan(phase_times[3], df["T(sec)"].max(), alpha=0.2, color="blue") + + chart_path = self.charts_dir / "vo2_breath_chart.png" + plt.savefig(chart_path, bbox_inches="tight", dpi=300) + plt.close() + + return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) + + def generate_fat_metabolism_chart( + self, df: pd.DataFrame, save_as_base64: bool = False + ) -> str: + """Generate CHO and FAT metabolism chart""" + first_unique_phase = df.drop_duplicates(subset="PHASE") + phase_times = first_unique_phase["T(sec)"].tolist() + + plt.figure(figsize=(18, 5)) + ax1 = plt.subplot() + + # Plot CHO + sns.lineplot(data=df, x="T(sec)", y="CHO_smoothed", label="CHO (kcal/min)") + ax1.set_xlabel("Time (sec)") + ax1.set_ylabel("CHO (kcal/min)") + ax1.grid(True, alpha=0.1) + + # Plot FAT on secondary y-axis + ax2 = ax1.twinx() + ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200)) + sns.lineplot( + data=df, + x="T(sec)", + y="FAT_smoothed", + color="green", + ax=ax2, + label="FAT (kcal/min)", + ) + ax2.set_ylabel("FAT (kcal/min)") + ax2.set_ylim(0, 15) + + # Remove default legends first + ax1.get_legend().remove() + ax2.get_legend().remove() + + # Combine legends from both axes in the top left + lines1, labels1 = ax1.get_legend_handles_labels() + lines2, labels2 = ax2.get_legend_handles_labels() + ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left") + + # Add colored background regions + if len(phase_times) >= 4: + ax1.axvspan(0, phase_times[1], alpha=0.2, color="lightblue") + ax1.axvspan(phase_times[1], phase_times[2], alpha=0.2, color="purple") + ax1.axvspan(phase_times[2], phase_times[3], alpha=0.2, color="lightgreen") + ax1.axvspan(phase_times[3], df["T(sec)"].max(), alpha=0.2, color="blue") + + chart_path = self.charts_dir / "fat_metabolism_chart.png" + plt.savefig(chart_path, bbox_inches="tight", dpi=300) + plt.close() + + return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) + + def generate_recovery_chart( + self, df: pd.DataFrame, save_as_base64: bool = False + ) -> str: + """Generate recovery chart with VCO2, HR, and BF""" + first_unique_phase = df.drop_duplicates(subset="PHASE") + phase_times = first_unique_phase["T(sec)"].tolist() + + plt.figure(figsize=(18, 5)) + ax1 = plt.subplot() + + # Plot VCO2 + sns.lineplot( + data=df, + x="T(sec)", + y="VCO2(ml/min)_smoothed", + label="VCO2 (ml/min)", + color="blue", + ) + ax1.set_xlabel("Time (sec)") + ax1.set_ylabel("VCO2 (ml/min)") + ax1.set_ylim(0, df["VCO2(ml/min)"].max()) + ax1.grid(True, alpha=0.1) + + # Create second y-axis for heart rate + ax2 = ax1.twinx() + sns.lineplot( + data=df, + x="T(sec)", + y="HR(bpm)_smoothed", + color="red", + ax=ax2, + linewidth=2, + label="Heart Rate (bpm)", + ) + ax2.set_ylabel("Heart Rate (bpm)", color="red") + ax2.set_ylim(df["HR(bpm)_smoothed"].min(), df["HR(bpm)_smoothed"].max() + 1) + ax2.tick_params(axis="y", labelcolor="red") + + # Create third y-axis for breathing frequency + ax3 = ax1.twinx() + ax3.spines["right"].set_position(("outward", 60)) + sns.lineplot( + data=df, + x="T(sec)", + y="BF(bpm)_smoothed", + color="green", + ax=ax3, + linewidth=2, + label="BF (bpm)", + ) + ax3.set_ylabel("BF (bpm)", color="green") + ax3.tick_params(axis="y", labelcolor="green") + ax3.set_ylim(0, df["BF(bpm)_smoothed"].max() + 1) + ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200)) + + # Remove default legends first + for ax in [ax1, ax2, ax3]: + if ax.get_legend(): + ax.get_legend().remove() + + # Combine legends from all axes in the top left + lines1, labels1 = ax1.get_legend_handles_labels() + lines2, labels2 = ax2.get_legend_handles_labels() + lines3, labels3 = ax3.get_legend_handles_labels() + ax1.legend( + lines1 + lines2 + lines3, labels1 + labels2 + labels3, loc="upper left" + ) + + # Add colored background regions + if len(phase_times) >= 4: + ax1.axvspan(0, phase_times[1], alpha=0.2, color="lightblue") + ax1.axvspan(phase_times[1], phase_times[2], alpha=0.2, color="purple") + ax1.axvspan(phase_times[2], phase_times[3], alpha=0.2, color="lightgreen") + ax1.axvspan(phase_times[3], df["T(sec)"].max(), alpha=0.2, color="blue") + + chart_path = self.charts_dir / "recovery_chart.png" + plt.savefig(chart_path, bbox_inches="tight", dpi=300) + plt.close() + + return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) + + def generate_body_fat_percentage_chart( + self, + gender: str, + age: int, + body_fat_percentage: float, + save_as_base64: bool = False, + ) -> str: + """Generate body fat percentage chart with ranges""" + # Define the segments with muted colors + segments = [ + ("#F8A8A8", 0, 15), # Muted Red/Salmon: 0% to 15% + ("#FFEECC", 15, 5), # Pale Yellow/Cream: 15% to 20% + ("#D0F0C0", 20, 15), # Pale Green/Mint: 20% to 35% + ("#FFEECC", 35, 5), # Pale Yellow/Cream: 35% to 40% + ("#F8A8A8", 40, 10), # Muted Red/Salmon: 40% to 50% + ] + + # Determine age group + if 20 <= age <= 39: + age_group = "20-39" + elif 40 <= age <= 59: + age_group = "40-59" + elif 60 <= age <= 79: + age_group = "60-79" + else: + age_group = "N/A" + + demographic = f"{age_group}\n({gender[0].upper()})" + + fig, ax = plt.subplots(figsize=(10, 2)) + + # Create the Segmented Bar + for color, start, length in segments: + ax.barh( + y=0, + width=length, + left=start, + height=1, + color=color, + edgecolor="black", + linewidth=0.5, + ) + + # Add the Indicator (Triangle) + ax.plot( + body_fat_percentage, + 1.05, + marker="v", + color="black", + markersize=10, + clip_on=False, + transform=ax.get_xaxis_transform(), + ) + + # Set Axis Properties and Labels + ax.set_xlim(0, 50) + ax.set_xticks(range(0, 51, 5)) + ax.set_yticks([]) + ax.text( + -0.05, + 0, + demographic, + transform=ax.get_yaxis_transform(), + va="center", + ha="right", + fontsize=12, + ) + + ax.set_xlim(0, 50) + ticks = range(0, 51, 5) + ax.set_xticks(ticks) + labels = [f"{t}%" for t in ticks] + ax.set_xticklabels(labels) + + # Clean up spines and add small ticks + ax.spines["right"].set_visible(False) + ax.spines["top"].set_visible(False) + ax.spines["left"].set_visible(False) + ax.spines["bottom"].set_visible(True) + + for x in range(0, 51, 5): + ax.plot( + [x, x], + [-0.05, -0.01], + color="black", + transform=ax.get_xaxis_transform(), + clip_on=False, + ) + + plt.tight_layout() + + chart_path = self.charts_dir / "body_fat_percentage_chart.png" + plt.savefig(chart_path, bbox_inches="tight", dpi=300) + plt.close() + + return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) + + def generate_body_composition_chart( + self, fat_mass_lbs: float, lean_mass_lbs: float, save_as_base64: bool = False + ) -> str: + """Generate donut chart for body composition""" + # Calculate percentages + total_weight = fat_mass_lbs + lean_mass_lbs + fat_percentage = (fat_mass_lbs / total_weight) * 100 + lean_percentage = (lean_mass_lbs / total_weight) * 100 + + # Data for the chart + sizes = [fat_percentage, lean_percentage] + colors = ["#fde3ac", "#ff9966"] # Light yellow/tan and orange + + plt.figure(figsize=(8, 8)) + + # Create the donut chart without labels first + wedges, texts, autotexts = plt.pie( + sizes, + autopct="", # Remove auto percentages + startangle=90, + wedgeprops=dict(width=0.5, edgecolor="w"), + colors=colors, + labels=["", ""], + ) # Remove default labels + + # Add custom text annotations positioned manually + plt.text( + -1, + 1, + f"Fat Mass ({fat_mass_lbs:.1f}lbs)\n{fat_percentage:.1f}%", + fontsize=14, + fontweight="bold", + ha="center", + va="center", + bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.8), + ) + + plt.text( + 1, + -1, + f"Lean Mass ({lean_mass_lbs:.1f}lbs)\n{lean_percentage:.1f}%", + fontsize=14, + fontweight="bold", + ha="center", + va="center", + bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.8), + ) + + # Set the title + plt.axis("equal") # Equal aspect ratio ensures that pie is drawn as a circle + + chart_path = self.charts_dir / "body_composition_chart.png" + plt.savefig(chart_path, bbox_inches="tight", dpi=600) + plt.close() + + return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) + + def generate_spirometry_chart( + self, spirometry_df: pd.DataFrame, save_as_base64: bool = False + ) -> str: + """Generate spirometry chart with Z-scores and ranges""" + # Coerce numeric columns + for col in ["Best", "LLN", "Pred.", "%Pred.", "ZScore"]: + if col in spirometry_df.columns: + spirometry_df[col] = pd.to_numeric(spirometry_df[col], errors="coerce") + + # Select rows of interest and prepare display values + rows_map = { + "Lung Volume": "FVC", + "Lung Power": "FEV1", + "Power/Volume": "FEV1/FVC%", + } + + records = [] + for label, param in rows_map.items(): + row = spirometry_df.loc[spirometry_df["Parameters"].str.strip() == param] + if row.empty: + continue + row = row.iloc[0] + records.append( + { + "label": label, + "param": param, + "best": row["Best"], + "pct": row["%Pred."], + "z": row["ZScore"], + } + ) + + # Figure setup + fig, axes = plt.subplots( + nrows=3, + ncols=1, + figsize=(11.5, 3.6), + sharex=True, + gridspec_kw={"hspace": 0.65}, + ) + + x_min, x_max = -5, 3 + # Segment colors: red -> orange -> yellow -> green + segments = [ + (-5, -4, "#f4a7a7"), # red-ish + (-4, -3, "#f7c49a"), # orange-ish + (-3, -1.7, "#f6e3a3"), # yellow-ish + (-1.7, 3, "#c9f0cc"), # green-ish + ] + + ticks = np.arange(x_min, x_max + 1, 1) + labels = [str(i) for i in ticks] + + # Plot each row + for ax, rec in zip(axes, records): + # Background segments + for a, b, color in segments: + ax.barh( + 0, width=b - a, left=a, height=0.6, color=color, edgecolor="none" + ) + + # LLN (-1) and Predicted (0) markers + ax.axvline(0, color="black", lw=1) + + # Z-score pointer (downward triangle) at top of each panel + if pd.notna(rec["z"]): + trans = mtransforms.blended_transform_factory( + ax.transData, ax.transAxes + ) + ax.plot( + float(rec["z"]), + 1.2, + marker="v", + markersize=12, + color="dimgray", + transform=trans, + clip_on=False, + ) + + # Labels, ticks, and styling + ax.set_title( + rec["label"], loc="left", fontsize=11, fontweight="bold", pad=2 + ) + ax.set_xlim(x_min, x_max) + ax.set_yticks([]) + ax.set_xticks(ticks) + ax.set_xticklabels(labels, fontsize=8) + ax.set_xlabel("") + + # Top annotations + axes[0].text(-1.7, 0.45, "LLN", ha="center", va="bottom", fontsize=9) + axes[0].text(0, 0.45, "Predicted", ha="center", va="bottom", fontsize=9) + + # Right-side summary boxes + fig.subplots_adjust(right=0.78) + box_ax = fig.add_axes( + [0.805, 0.06, 0.18, 0.90] + ) # [left, bottom, width, height] + box_ax.axis("off") + + # Helper to draw a pill-shaped text box + def pill(ax, xy, text): + x, y = xy + # Draw rounded rectangle background + bbox = FancyBboxPatch( + (x - 0.48, y - 0.09), + 0.96, + 0.18, + boxstyle="round,pad=0.02,rounding_size=0.08", + ec="#dddddd", + fc="#f3f3f3", + linewidth=1.0, + ) + ax.add_patch(bbox) + ax.text( + x, + y + 0.025, + text, + ha="center", + va="center", + fontsize=11, + fontweight="bold", + ) + ax.text( + x, + y - 0.055, + "of predicted", + ha="center", + va="center", + fontsize=9, + color="#555555", + ) + + box_ax.set_xlim(0, 1) + box_ax.set_ylim(0, 1) + + # Prepare display strings and positions (top to bottom) + right_items = [] + for rec in records: + name = ( + "FVC" + if rec["param"] == "FVC" + else ("FEV1" if rec["param"] == "FEV1" else "FEV1/FVC") + ) + unit = "L" if rec["param"] in ("FVC", "FEV1") else "%" + value_fmt = f"{rec['best']:.2f}{unit}" + pct_fmt = f"{rec['pct']:.1f}%" + right_items.append((name, value_fmt, pct_fmt)) + + # Sort to match image order on the right (FVC, FEV1, FEV1/FVC) + order = ["FVC", "FEV1", "FEV1/FVC"] + right_items_sorted = [ + next(item for item in right_items if item[0] == k) for k in order + ] + + ys = [0.82, 0.48, 0.15] + for (name, value_fmt, pct_fmt), y in zip(right_items_sorted, ys): + main_line = f"{name}\n{value_fmt} → {pct_fmt}" + pill(box_ax, (0.5, y), main_line) + + chart_path = self.charts_dir / "spirometry_chart.png" + plt.savefig(chart_path, dpi=300, bbox_inches="tight") + plt.close() + + return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) + + def generate_all_charts( + self, + pnoe_df: pd.DataFrame, + spirometry_df: pd.DataFrame, + patient_data: Dict, + save_as_base64: bool = False, + ) -> Dict[str, str]: + """Generate all charts at once and return dictionary of paths/base64 strings""" + charts = {} + + # Generate physiological charts + charts["respiratory"] = self.generate_respiratory_chart(pnoe_df, save_as_base64) + charts["fuel_utilization_chart"] = self.generate_fuel_utilization_chart( + pnoe_df, save_as_base64 + ) + charts["vo2_pulse_chart"] = self.generate_vo2_pulse_chart( + pnoe_df, save_as_base64 + ) + charts["vo2_breath_chart"] = self.generate_vo2_breath_chart( + pnoe_df, save_as_base64 + ) + charts["fat_metabolism_chart"] = self.generate_fat_metabolism_chart( + pnoe_df, save_as_base64 + ) + charts["recovery_chart"] = self.generate_recovery_chart(pnoe_df, save_as_base64) + + # Generate body composition charts + if ( + "gender" in patient_data + and "age" in patient_data + and "fat_percentage" in patient_data + ): + charts["body_fat_percentage_chart"] = ( + self.generate_body_fat_percentage_chart( + patient_data["gender"], + patient_data["age"], + patient_data["fat_percentage"], + save_as_base64, + ) + ) + + if "fat_mass_lbs" in patient_data and "lean_mass_lbs" in patient_data: + charts["body_composition_chart"] = self.generate_body_composition_chart( + patient_data["fat_mass_lbs"], + patient_data["lean_mass_lbs"], + save_as_base64, + ) + + # Generate spirometry chart + charts["spirometry_chart"] = self.generate_spirometry_chart( + spirometry_df, save_as_base64 + ) + + return charts + + +# Example usage +if __name__ == "__main__": + # Initialize graph generator + generator = GraphGenerator() + + # Load sample data (you would pass your actual dataframes) + pnoe_df = pd.read_csv("data/Pnoe_20250729_1550-Moran_Keirstyn.csv", delimiter=";") + spirometry_df = pd.read_csv("data/spirometry_data.csv") + + # Preprocess pnoe data (same as in your notebook) + pnoe_df = pnoe_df.apply(pd.to_numeric, errors="ignore") + pnoe_df["VO2 Pulse"] = pnoe_df["VO2(ml/min)"] / pnoe_df["HR(bpm)"] + pnoe_df["VO2 Breath"] = pnoe_df["VO2(ml/min)"] / pnoe_df["BF(bpm)"] + pnoe_df["CHO"] = pnoe_df["EE(kcal/min)"] * pnoe_df["CARBS(%)"] / 100 + pnoe_df["FAT"] = pnoe_df["EE(kcal/min)"] * pnoe_df["FAT(%)"] / 100 + + # Apply smoothing + window_size = 10 + columns_to_smooth = [ + "VO2(ml/min)", + "VCO2(ml/min)", + "HR(bpm)", + "VT(l)", + "BF(bpm)", + "VE(l/min)", + "VO2 Pulse", + "VO2 Breath", + "CHO", + "FAT", + ] + for col in columns_to_smooth: + if col in pnoe_df.columns: + pnoe_df[f"{col}_smoothed"] = ( + pnoe_df[col].rolling(window=window_size, min_periods=1).mean() + ) + + # Patient data + patient_data = { + "gender": "female", + "age": 25, + "fat_percentage": 22.4, + "fat_mass_lbs": 27.6, + "lean_mass_lbs": 95.4, + } + + # Generate all charts + charts = generator.generate_all_charts( + pnoe_df, spirometry_df, patient_data, save_as_base64=True + ) + + print(f"Generated {len(charts)} charts:") + for chart_name in charts.keys(): + print(f"- {chart_name}") diff --git a/graphs/body_fat_percentage_chart.png b/graphs/body_fat_percentage_chart.png index 7e24de3..f9218ea 100644 Binary files a/graphs/body_fat_percentage_chart.png and b/graphs/body_fat_percentage_chart.png differ diff --git a/graphs/fat_metabolism_chart.png b/graphs/fat_metabolism_chart.png index 5ca5510..6960491 100644 Binary files a/graphs/fat_metabolism_chart.png and b/graphs/fat_metabolism_chart.png differ diff --git a/graphs/recovery_chart.png b/graphs/recovery_chart.png index 2f21e05..57c6feb 100644 Binary files a/graphs/recovery_chart.png and b/graphs/recovery_chart.png differ diff --git a/graphs/respiratory.png b/graphs/respiratory.png index 2228e5a..ffa0ee6 100644 Binary files a/graphs/respiratory.png and b/graphs/respiratory.png differ diff --git a/graphs/spirometry_chart.png b/graphs/spirometry_chart.png index 152f6a3..0c8b5b5 100644 Binary files a/graphs/spirometry_chart.png and b/graphs/spirometry_chart.png differ