Add graph generation functionality and update charts

- Implemented GraphGenerator class for generating various physiological charts.
- Added methods for generating respiratory, fuel utilization, VO2 pulse, VO2 breath, fat metabolism, recovery, body fat percentage, body composition, and spirometry charts.
- Included functionality to save charts as PNG files or return them as base64 strings.
- Updated existing chart images in the graphs directory.
This commit is contained in:
bolade
2025-09-29 11:45:09 +01:00
parent 54e0189301
commit f52729d703
7 changed files with 1258 additions and 3 deletions
+316 -3
View File
@@ -1,6 +1,319 @@
import base64
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import matplotlib.pyplot as plt
import pandas as pd import pandas as pd
pnoe_df = pd.read_csv('data/pnoe_data.csv')
patient_df = pd.read_csv('data/patient_data.csv')
spirometry_df = pd.read_csv('data/spirometry_data.csv')
class ReportGenerator:
def __init__(self):
self.pnoe_df = None
self.patient_df = None
self.spirometry_df = None
self.seca_df = None
self.patient_info = {}
self.charts_dir = Path("graphs")
self.charts_dir.mkdir(exist_ok=True)
def load_data(
self,
pnoe_path: str,
patient_path: str,
spirometry_path: str,
seca_path: str = None,
):
"""Load all required datasets"""
self.pnoe_df = pd.read_csv(pnoe_path, delimiter=";")
self.patient_df = pd.read_csv(patient_path)
self.spirometry_df = pd.read_csv(spirometry_path)
if seca_path:
self.seca_df = pd.read_excel(seca_path)
# Apply preprocessing
self._preprocess_data()
def _preprocess_data(self):
"""Apply preprocessing steps from your notebook"""
# Convert to numeric
self.pnoe_df = self.pnoe_df.apply(pd.to_numeric, errors="ignore")
# Calculate derived columns
self.pnoe_df["VO2 Pulse"] = (
self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["HR(bpm)"]
)
self.pnoe_df["VO2 Breath"] = (
self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["BF(bpm)"]
)
self.pnoe_df["CHO"] = (
self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["CARBS(%)"] / 100
)
self.pnoe_df["FAT"] = (
self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["FAT(%)"] / 100
)
# Apply smoothing
window_size = 10
columns_to_smooth = [
"VO2(ml/min)",
"VCO2(ml/min)",
"HR(bpm)",
"VT(l)",
"BF(bpm)",
"VE(l/min)",
"VO2 Pulse",
"VO2 Breath",
"CHO",
"FAT",
]
for col in columns_to_smooth:
if col in self.pnoe_df.columns:
self.pnoe_df[f"{col}_smoothed"] = (
self.pnoe_df[col].rolling(window=window_size, min_periods=1).mean()
)
def extract_patient_info(self, last_name: str) -> Dict:
"""Extract patient information from datasets"""
if self.seca_df is not None:
patient_data = self.seca_df[
self.seca_df["LastName"].str.contains(last_name, case=False, na=False)
]
if not patient_data.empty:
row = patient_data.iloc[0]
self.patient_info = {
"name": f"{row.get('FirstName', '')} {last_name}",
"age": int(row.get("Age", 0)),
"height": f"{row.get('Height', '')}",
"weight": float(row.get("Weight", 0)),
"gender": row.get("Gender", "").lower(),
"fat_percentage": float(row.get("Adult_FMP", 0)),
}
return self.patient_info
def calculate_spirometry_metrics(self) -> Dict:
"""Calculate spirometry-related metrics"""
metrics = {}
# Extract key spirometry values
for param in ["FVC", "FEV1", "FEV1/FVC%"]:
row = self.spirometry_df.loc[self.spirometry_df["Parameters"] == param]
if not row.empty:
metrics[
f"{param.lower().replace('/', '_').replace('%', '_pct')}_best"
] = row["Best"].values[0]
metrics[
f"{param.lower().replace('/', '_').replace('%', '_pct')}_pred"
] = row["%Pred."].values[0]
return metrics
def calculate_pnoe_metrics(self) -> Dict:
"""Calculate all Pnoe-derived metrics"""
metrics = {}
# Basic metrics
metrics["vo2_max"] = self.pnoe_df["VO2(ml/min)_smoothed"].max()
metrics["vo2_max_per_kg"] = metrics["vo2_max"] / self.patient_info["weight"]
# Peak VT
peak_vt_idx = self.pnoe_df["VT(l)_smoothed"].idxmax()
peak_vt_row = self.pnoe_df.loc[peak_vt_idx]
metrics["peak_vt"] = peak_vt_row["VT(l)_smoothed"]
metrics["peak_vt_hr"] = peak_vt_row["HR(bpm)_smoothed"]
# Fat burning metrics
fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax()
fat_max_row = self.pnoe_df.loc[fat_max_idx]
metrics["fat_max_value"] = fat_max_row["FAT_smoothed"]
metrics["fat_max_hr"] = fat_max_row["HR(bpm)_smoothed"]
# Calculate zones (simplified from your logic)
metrics.update(self._calculate_hr_zones())
# VT1/VT2 detection
vt1, vt2 = self._detect_thresholds()
metrics["vt1"] = vt1
metrics["vt2"] = vt2
return metrics
def _detect_thresholds(self) -> Tuple[Optional[Dict], Optional[Dict]]:
"""Detect VT1 and VT2 thresholds"""
# VT1: First crossover where carbs > fat
condition = self.pnoe_df["CHO_smoothed"] > self.pnoe_df["FAT_smoothed"]
crossover_indices = condition[condition].index
vt1 = None
if len(crossover_indices) > 0:
vt1_idx = crossover_indices[0]
vt1_row = self.pnoe_df.loc[vt1_idx]
vt1 = {
"HeartRate": vt1_row["HR(bpm)_smoothed"],
"Speed": vt1_row["Speed"],
"Time": vt1_row["T(sec)"],
}
# VT2: Ventilation inflection (simplified)
ve_slope = self.pnoe_df["VE(l/min)_smoothed"].diff()
second_derivative = ve_slope.diff()
vt2_idx = second_derivative.idxmax()
vt2 = None
if pd.notna(vt2_idx):
vt2_row = self.pnoe_df.loc[vt2_idx]
vt2 = {
"HeartRate": vt2_row["HR(bpm)_smoothed"],
"Speed": vt2_row["Speed"],
"Time": vt2_row["T(sec)"],
}
return vt1, vt2
def _calculate_hr_zones(self) -> Dict:
"""Calculate heart rate zones"""
max_hr = 220 - self.patient_info["age"]
# Simplified zone calculation - you can make this more sophisticated
zones = {
"zone1_bpm": f"{int(max_hr * 0.55)}-{int(max_hr * 0.65)}bpm",
"zone2_bpm": f"{int(max_hr * 0.65)}-{int(max_hr * 0.75)}bpm",
"zone3_bpm": f"{int(max_hr * 0.75)}-{int(max_hr * 0.85)}bpm",
"zone4_bpm": f"{int(max_hr * 0.85)}-{int(max_hr * 0.95)}bpm",
"zone5_bpm": f"{int(max_hr * 0.95)}+bpm",
}
return zones
def generate_charts(self) -> Dict[str, str]:
"""Generate all charts and return base64 encoded versions"""
charts = {}
# Generate fuel utilization chart
charts["fuel_utilization_chart"] = self._create_fuel_chart()
# Generate VO2 pulse chart
charts["vo2_pulse_chart"] = self._create_vo2_pulse_chart()
# Generate body composition chart
charts["body_composition_chart"] = self._create_body_comp_chart()
# Add more chart generation methods...
return charts
def _create_fuel_chart(self) -> str:
"""Create and save fuel utilization chart"""
# Use your existing chart code but make it dynamic
speed_groups = self.pnoe_df.groupby("Speed").mean(numeric_only=True).round(1)
speed_groups = speed_groups.iloc[1:-1]
filtered_data = speed_groups[
(speed_groups.index >= 3.5) & (speed_groups.index <= 7.5)
]
plt.figure(figsize=(15, 8))
# ... your chart code here ...
chart_path = self.charts_dir / "fuel_utilization_chart.png"
plt.savefig(chart_path, dpi=300)
plt.close()
return self._image_to_base64(chart_path)
def _create_vo2_pulse_chart(self) -> str:
"""Create VO2 pulse chart"""
# Your VO2 pulse chart code here
chart_path = self.charts_dir / "vo2_pulse_chart.png"
# ... chart generation code ...
return self._image_to_base64(chart_path)
def _create_body_comp_chart(self) -> str:
"""Create body composition chart"""
# Your body composition chart code here
chart_path = self.charts_dir / "body_composition_chart.png"
# ... chart generation code ...
return self._image_to_base64(chart_path)
def _image_to_base64(self, image_path: Path) -> str:
"""Convert image to base64"""
try:
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
except FileNotFoundError:
return ""
def generate_all_contexts(self, last_name: str = "Moran") -> List[Dict]:
"""Main method to generate all page contexts"""
# Extract patient info
self.extract_patient_info(last_name)
# Calculate metrics
spirometry_metrics = self.calculate_spirometry_metrics()
pnoe_metrics = self.calculate_pnoe_metrics()
# Generate charts
charts = self.generate_charts()
# Build contexts for each page
contexts = []
# Page 1
contexts.append(
{
"name": self.patient_info["name"],
"surname": last_name,
"date": "July 29, 2025",
}
)
# Page 2-6 (add as needed)
for i in range(5):
contexts.append({})
# Page 7 - Spirometry
contexts.append(
{
"peak_vt": pnoe_metrics["peak_vt"],
"peak_vt_bpm": pnoe_metrics["peak_vt_hr"],
"fev1_percentage": (
pnoe_metrics["peak_vt"] / spirometry_metrics["fvc_best"]
)
* 100,
"lung_analysis_chart": charts.get("spirometry_chart", ""),
"respiratory_analysis_chart": charts.get("respiratory_chart", ""),
}
)
# Page 8 - VO2 Max and Zones
contexts.append(
{
"vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}",
"age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}",
**pnoe_metrics, # Include all zone calculations
}
)
# Continue for all pages...
# Add remaining pages as needed
return contexts
# Usage for backend service
def generate_report(
pnoe_file, patient_file, spirometry_file, seca_file=None, patient_name="Moran"
):
"""Main function for backend service"""
generator = ReportGenerator()
generator.load_data(pnoe_file, patient_file, spirometry_file, seca_file)
return generator.generate_all_contexts(patient_name)
# Example usage
if __name__ == "__main__":
contexts = generate_report(
"data/Pnoe_20250729_1550-Moran_Keirstyn.csv",
"data/patient_data.csv",
"data/spirometry_data.csv",
"data/SECA body comp for all patients.xlsx",
)
print(f"Generated {len(contexts)} page contexts")
+942
View File
@@ -0,0 +1,942 @@
import base64
from pathlib import Path
from typing import Dict
import matplotlib.pyplot as plt
import matplotlib.transforms as mtransforms
import numpy as np
import pandas as pd
import seaborn as sns
from matplotlib.patches import FancyBboxPatch
class GraphGenerator:
def __init__(self, charts_dir: str = "graphs"):
"""Initialize the GraphGenerator with output directory for charts"""
self.charts_dir = Path(charts_dir)
self.charts_dir.mkdir(exist_ok=True)
def _image_to_base64(self, image_path: Path) -> str:
"""Convert image to base64 string"""
try:
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
except FileNotFoundError:
return ""
def generate_respiratory_chart(
self, df: pd.DataFrame, save_as_base64: bool = False
) -> str:
"""Generate respiratory chart showing VT and Speed over time"""
# Get phase times for background regions
first_unique_phase = df.drop_duplicates(subset="PHASE")
phase_times = first_unique_phase["T(sec)"].tolist()
plt.figure(figsize=(18, 5))
ax1 = plt.subplot()
# Plot VT with step-like appearance
sns.lineplot(data=df, x="T(sec)", y="VT(l)_smoothed", label="VT (L)")
ax1.set_xlabel("Time (sec)")
ax1.set_ylabel("VT (L)")
ax1.grid(True, alpha=0.1)
ax1.set_ylim(0, min(8, df["VT(l)_smoothed"].max()))
# Plot speed as step function on secondary y-axis
ax2 = ax1.twinx()
ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200))
line2 = sns.lineplot(
data=df,
x="T(sec)",
y="Speed",
color="green",
ax=ax2,
drawstyle="steps-post",
linewidth=2,
label="Speed",
)
ax2.set_ylabel("Speed")
ax2.set_ylim(0, min(30, df["Speed"].max()) + 1)
# Remove default legends first
ax1.get_legend().remove()
ax2.get_legend().remove()
# Combine legends from both axes in the top left
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left")
# Add colored background regions
if len(phase_times) >= 4:
ax1.axvspan(0, phase_times[1], alpha=0.2, color="lightblue")
ax1.axvspan(phase_times[1], phase_times[2], alpha=0.2, color="purple")
ax1.axvspan(phase_times[2], phase_times[3], alpha=0.2, color="lightgreen")
ax1.axvspan(phase_times[3], df["T(sec)"].max(), alpha=0.2, color="blue")
chart_path = self.charts_dir / "respiratory.png"
plt.savefig(chart_path, dpi=300, bbox_inches="tight")
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_fuel_utilization_chart(
self, df: pd.DataFrame, save_as_base64: bool = False
) -> str:
"""Generate fuel utilization chart with stacked bars showing fat vs carbs"""
# Group by speed and calculate mean for numeric columns only
speed_groups = df.groupby("Speed").mean(numeric_only=True).round(1)
speed_groups = speed_groups.iloc[1:-1]
filtered_data = speed_groups[
(speed_groups.index >= 3.5) & (speed_groups.index <= 7.5)
]
plt.figure(figsize=(15, 8))
plt.style.use("default")
# Create stage labels and positions
stage_labels = [f"Stage {i}" for i in range(1, len(filtered_data) + 1)]
x_positions = np.arange(len(filtered_data))
# Calculate fat and carbs energy expenditure from percentages
fat_ee = filtered_data["EE(kcal/min)"] * filtered_data["FAT(%)"] / 100
carbs_ee = filtered_data["EE(kcal/min)"] * filtered_data["CARBS(%)"] / 100
# Create the main axis for the stacked bars
ax1 = plt.gca()
# Create stacked bar chart with colors
ax1.bar(x_positions, fat_ee, color="#1f77b4", alpha=0.8, width=0.6, label="Fat")
ax1.bar(
x_positions,
carbs_ee,
bottom=fat_ee,
color="#ff7f0e",
alpha=0.8,
width=0.6,
label="Carbs",
)
# Set labels and formatting for primary axis
ax1.set_xlabel("", fontsize=12)
ax1.set_ylabel("Fuel (kcal/min)", fontsize=12)
ax1.set_ylim(0, 20)
# Add individual values on each bar segment
for i, (fat_val, carb_val, total_val) in enumerate(
zip(fat_ee, carbs_ee, filtered_data["EE(kcal/min)"])
):
if fat_val > 0.3: # Fat value
ax1.text(
i,
fat_val / 2,
f"{fat_val:.1f}",
ha="center",
va="center",
fontsize=9,
fontweight="bold",
color="white",
)
if carb_val > 0.3: # Carbs value
ax1.text(
i,
fat_val + carb_val / 2,
f"{carb_val:.1f}",
ha="center",
va="center",
fontsize=9,
fontweight="bold",
color="white",
)
# Total EE
ax1.text(
i,
total_val + 0.5,
f"{total_val:.1f} kcal",
ha="center",
va="bottom",
fontsize=10,
fontweight="bold",
color="black",
)
# Add speed labels below x-axis
for i, speed in enumerate(filtered_data.index):
ax1.text(i, -1.5, f"{speed:.1f} mph", ha="center", va="top", fontsize=9)
ax1.text(
i,
-2.8,
f"{speed * 1.609:.1f} min/km",
ha="center",
va="top",
fontsize=8,
color="gray",
)
# Create secondary y-axis for heart rate
ax2 = ax1.twinx()
# Plot heart rate line
ax2.plot(
x_positions,
filtered_data["HR(bpm)"],
marker="o",
linewidth=3,
markersize=8,
color="red",
label="Heart Rate",
)
# Set heart rate axis formatting
ax2.set_ylabel("Heart Rate (bpm)", fontsize=12, color="red")
ax2.tick_params(axis="y", labelcolor="red")
ax2.set_ylim(0, 220)
# Add HR values above the points
for i, hr in enumerate(filtered_data["HR(bpm)"]):
ax2.text(
i,
hr + 10,
f"{int(hr)}bpm",
ha="center",
va="bottom",
fontsize=10,
fontweight="bold",
color="red",
)
# Set x-axis formatting
ax1.set_xticks(x_positions)
ax1.set_xticklabels(stage_labels, fontsize=11)
# Create legend
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(
lines1 + lines2,
labels1 + labels2,
loc="upper left",
frameon=True,
fancybox=True,
shadow=True,
)
# Add grid
ax1.grid(True, alpha=0.3, linestyle="-", linewidth=0.5)
ax1.set_axisbelow(True)
# Adjust layout
plt.tight_layout()
plt.subplots_adjust(bottom=0.1, top=0.9)
chart_path = self.charts_dir / "fuel_utilization_chart.png"
plt.savefig(chart_path, dpi=300)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_vo2_pulse_chart(
self, df: pd.DataFrame, save_as_base64: bool = False
) -> str:
"""Generate VO2 Pulse chart with heart rate and speed"""
first_unique_phase = df.drop_duplicates(subset="PHASE")
phase_times = first_unique_phase["T(sec)"].tolist()
plt.figure(figsize=(18, 5))
ax1 = plt.subplot()
# Plot VO2 Pulse
sns.lineplot(
data=df,
x="T(sec)",
y="VO2 Pulse_smoothed",
label="VO2 Pulse (mL/beat)",
color="blue",
)
ax1.set_xlabel("Time (sec)")
ax1.set_ylabel("VO2 Pulse (mL/beat)")
ax1.set_ylim(0, df["VO2 Pulse_smoothed"].max())
ax1.grid(True, alpha=0.1)
# Create second y-axis for heart rate
ax2 = ax1.twinx()
sns.lineplot(
data=df,
x="T(sec)",
y="HR(bpm)_smoothed",
color="red",
ax=ax2,
linewidth=2,
label="Heart Rate (bpm)",
)
ax2.set_ylabel("Heart Rate (bpm)", color="red")
ax2.tick_params(axis="y", labelcolor="red")
ax2.set_ylim(0, df["HR(bpm)_smoothed"].max() + 1)
# Create third y-axis for speed
ax3 = ax1.twinx()
ax3.spines["right"].set_position(("outward", 60))
sns.lineplot(
data=df,
x="T(sec)",
y="Speed",
color="green",
ax=ax3,
drawstyle="steps-post",
linewidth=2,
label="Speed",
)
ax3.set_ylabel("Speed", color="green")
ax3.tick_params(axis="y", labelcolor="green")
ax3.set_ylim(0, df["Speed"].max() + 1)
ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200))
# Remove default legends first
for ax in [ax1, ax2, ax3]:
if ax.get_legend():
ax.get_legend().remove()
# Combine legends from all axes
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
lines3, labels3 = ax3.get_legend_handles_labels()
ax1.legend(
lines1 + lines2 + lines3, labels1 + labels2 + labels3, loc="upper left"
)
# Add colored background regions
if len(phase_times) >= 4:
ax1.axvspan(0, phase_times[1], alpha=0.2, color="lightblue")
ax1.axvspan(phase_times[1], phase_times[2], alpha=0.2, color="purple")
ax1.axvspan(phase_times[2], phase_times[3], alpha=0.2, color="lightgreen")
ax1.axvspan(phase_times[3], df["T(sec)"].max(), alpha=0.2, color="blue")
chart_path = self.charts_dir / "vo2_pulse_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=300)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_vo2_breath_chart(
self, df: pd.DataFrame, save_as_base64: bool = False
) -> str:
"""Generate VO2 per Breath chart"""
first_unique_phase = df.drop_duplicates(subset="PHASE")
phase_times = first_unique_phase["T(sec)"].tolist()
plt.figure(figsize=(18, 5))
ax1 = plt.subplot()
# Plot VO2 per Breath
sns.lineplot(
data=df,
x="T(sec)",
y="VO2 Breath_smoothed",
label="VO2 per Breath (mL/breath)",
)
ax1.set_xlabel("Time (sec)")
ax1.set_ylabel("VO2 per Breath (mL/breath)")
ax1.set_ylim(0, df["VO2 Breath_smoothed"].max() + 1)
ax1.grid(True, alpha=0.1)
# Plot speed as step function on secondary y-axis
ax2 = ax1.twinx()
ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200))
sns.lineplot(
data=df,
x="T(sec)",
y="Speed",
color="green",
ax=ax2,
drawstyle="steps-post",
linewidth=2,
label="Speed",
)
ax2.set_ylim(0, df["Speed"].max() + 1)
ax2.set_ylabel("Speed")
# Remove default legends first
ax1.get_legend().remove()
ax2.get_legend().remove()
# Combine legends from both axes in the top left
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left")
# Add colored background regions
if len(phase_times) >= 4:
ax1.axvspan(0, phase_times[1], alpha=0.2, color="lightblue")
ax1.axvspan(phase_times[1], phase_times[2], alpha=0.2, color="purple")
ax1.axvspan(phase_times[2], phase_times[3], alpha=0.2, color="lightgreen")
ax1.axvspan(phase_times[3], df["T(sec)"].max(), alpha=0.2, color="blue")
chart_path = self.charts_dir / "vo2_breath_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=300)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_fat_metabolism_chart(
self, df: pd.DataFrame, save_as_base64: bool = False
) -> str:
"""Generate CHO and FAT metabolism chart"""
first_unique_phase = df.drop_duplicates(subset="PHASE")
phase_times = first_unique_phase["T(sec)"].tolist()
plt.figure(figsize=(18, 5))
ax1 = plt.subplot()
# Plot CHO
sns.lineplot(data=df, x="T(sec)", y="CHO_smoothed", label="CHO (kcal/min)")
ax1.set_xlabel("Time (sec)")
ax1.set_ylabel("CHO (kcal/min)")
ax1.grid(True, alpha=0.1)
# Plot FAT on secondary y-axis
ax2 = ax1.twinx()
ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200))
sns.lineplot(
data=df,
x="T(sec)",
y="FAT_smoothed",
color="green",
ax=ax2,
label="FAT (kcal/min)",
)
ax2.set_ylabel("FAT (kcal/min)")
ax2.set_ylim(0, 15)
# Remove default legends first
ax1.get_legend().remove()
ax2.get_legend().remove()
# Combine legends from both axes in the top left
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left")
# Add colored background regions
if len(phase_times) >= 4:
ax1.axvspan(0, phase_times[1], alpha=0.2, color="lightblue")
ax1.axvspan(phase_times[1], phase_times[2], alpha=0.2, color="purple")
ax1.axvspan(phase_times[2], phase_times[3], alpha=0.2, color="lightgreen")
ax1.axvspan(phase_times[3], df["T(sec)"].max(), alpha=0.2, color="blue")
chart_path = self.charts_dir / "fat_metabolism_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=300)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_recovery_chart(
self, df: pd.DataFrame, save_as_base64: bool = False
) -> str:
"""Generate recovery chart with VCO2, HR, and BF"""
first_unique_phase = df.drop_duplicates(subset="PHASE")
phase_times = first_unique_phase["T(sec)"].tolist()
plt.figure(figsize=(18, 5))
ax1 = plt.subplot()
# Plot VCO2
sns.lineplot(
data=df,
x="T(sec)",
y="VCO2(ml/min)_smoothed",
label="VCO2 (ml/min)",
color="blue",
)
ax1.set_xlabel("Time (sec)")
ax1.set_ylabel("VCO2 (ml/min)")
ax1.set_ylim(0, df["VCO2(ml/min)"].max())
ax1.grid(True, alpha=0.1)
# Create second y-axis for heart rate
ax2 = ax1.twinx()
sns.lineplot(
data=df,
x="T(sec)",
y="HR(bpm)_smoothed",
color="red",
ax=ax2,
linewidth=2,
label="Heart Rate (bpm)",
)
ax2.set_ylabel("Heart Rate (bpm)", color="red")
ax2.set_ylim(df["HR(bpm)_smoothed"].min(), df["HR(bpm)_smoothed"].max() + 1)
ax2.tick_params(axis="y", labelcolor="red")
# Create third y-axis for breathing frequency
ax3 = ax1.twinx()
ax3.spines["right"].set_position(("outward", 60))
sns.lineplot(
data=df,
x="T(sec)",
y="BF(bpm)_smoothed",
color="green",
ax=ax3,
linewidth=2,
label="BF (bpm)",
)
ax3.set_ylabel("BF (bpm)", color="green")
ax3.tick_params(axis="y", labelcolor="green")
ax3.set_ylim(0, df["BF(bpm)_smoothed"].max() + 1)
ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200))
# Remove default legends first
for ax in [ax1, ax2, ax3]:
if ax.get_legend():
ax.get_legend().remove()
# Combine legends from all axes in the top left
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
lines3, labels3 = ax3.get_legend_handles_labels()
ax1.legend(
lines1 + lines2 + lines3, labels1 + labels2 + labels3, loc="upper left"
)
# Add colored background regions
if len(phase_times) >= 4:
ax1.axvspan(0, phase_times[1], alpha=0.2, color="lightblue")
ax1.axvspan(phase_times[1], phase_times[2], alpha=0.2, color="purple")
ax1.axvspan(phase_times[2], phase_times[3], alpha=0.2, color="lightgreen")
ax1.axvspan(phase_times[3], df["T(sec)"].max(), alpha=0.2, color="blue")
chart_path = self.charts_dir / "recovery_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=300)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_body_fat_percentage_chart(
self,
gender: str,
age: int,
body_fat_percentage: float,
save_as_base64: bool = False,
) -> str:
"""Generate body fat percentage chart with ranges"""
# Define the segments with muted colors
segments = [
("#F8A8A8", 0, 15), # Muted Red/Salmon: 0% to 15%
("#FFEECC", 15, 5), # Pale Yellow/Cream: 15% to 20%
("#D0F0C0", 20, 15), # Pale Green/Mint: 20% to 35%
("#FFEECC", 35, 5), # Pale Yellow/Cream: 35% to 40%
("#F8A8A8", 40, 10), # Muted Red/Salmon: 40% to 50%
]
# Determine age group
if 20 <= age <= 39:
age_group = "20-39"
elif 40 <= age <= 59:
age_group = "40-59"
elif 60 <= age <= 79:
age_group = "60-79"
else:
age_group = "N/A"
demographic = f"{age_group}\n({gender[0].upper()})"
fig, ax = plt.subplots(figsize=(10, 2))
# Create the Segmented Bar
for color, start, length in segments:
ax.barh(
y=0,
width=length,
left=start,
height=1,
color=color,
edgecolor="black",
linewidth=0.5,
)
# Add the Indicator (Triangle)
ax.plot(
body_fat_percentage,
1.05,
marker="v",
color="black",
markersize=10,
clip_on=False,
transform=ax.get_xaxis_transform(),
)
# Set Axis Properties and Labels
ax.set_xlim(0, 50)
ax.set_xticks(range(0, 51, 5))
ax.set_yticks([])
ax.text(
-0.05,
0,
demographic,
transform=ax.get_yaxis_transform(),
va="center",
ha="right",
fontsize=12,
)
ax.set_xlim(0, 50)
ticks = range(0, 51, 5)
ax.set_xticks(ticks)
labels = [f"{t}%" for t in ticks]
ax.set_xticklabels(labels)
# Clean up spines and add small ticks
ax.spines["right"].set_visible(False)
ax.spines["top"].set_visible(False)
ax.spines["left"].set_visible(False)
ax.spines["bottom"].set_visible(True)
for x in range(0, 51, 5):
ax.plot(
[x, x],
[-0.05, -0.01],
color="black",
transform=ax.get_xaxis_transform(),
clip_on=False,
)
plt.tight_layout()
chart_path = self.charts_dir / "body_fat_percentage_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=300)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_body_composition_chart(
self, fat_mass_lbs: float, lean_mass_lbs: float, save_as_base64: bool = False
) -> str:
"""Generate donut chart for body composition"""
# Calculate percentages
total_weight = fat_mass_lbs + lean_mass_lbs
fat_percentage = (fat_mass_lbs / total_weight) * 100
lean_percentage = (lean_mass_lbs / total_weight) * 100
# Data for the chart
sizes = [fat_percentage, lean_percentage]
colors = ["#fde3ac", "#ff9966"] # Light yellow/tan and orange
plt.figure(figsize=(8, 8))
# Create the donut chart without labels first
wedges, texts, autotexts = plt.pie(
sizes,
autopct="", # Remove auto percentages
startangle=90,
wedgeprops=dict(width=0.5, edgecolor="w"),
colors=colors,
labels=["", ""],
) # Remove default labels
# Add custom text annotations positioned manually
plt.text(
-1,
1,
f"Fat Mass ({fat_mass_lbs:.1f}lbs)\n{fat_percentage:.1f}%",
fontsize=14,
fontweight="bold",
ha="center",
va="center",
bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.8),
)
plt.text(
1,
-1,
f"Lean Mass ({lean_mass_lbs:.1f}lbs)\n{lean_percentage:.1f}%",
fontsize=14,
fontweight="bold",
ha="center",
va="center",
bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.8),
)
# Set the title
plt.axis("equal") # Equal aspect ratio ensures that pie is drawn as a circle
chart_path = self.charts_dir / "body_composition_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=600)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_spirometry_chart(
self, spirometry_df: pd.DataFrame, save_as_base64: bool = False
) -> str:
"""Generate spirometry chart with Z-scores and ranges"""
# Coerce numeric columns
for col in ["Best", "LLN", "Pred.", "%Pred.", "ZScore"]:
if col in spirometry_df.columns:
spirometry_df[col] = pd.to_numeric(spirometry_df[col], errors="coerce")
# Select rows of interest and prepare display values
rows_map = {
"Lung Volume": "FVC",
"Lung Power": "FEV1",
"Power/Volume": "FEV1/FVC%",
}
records = []
for label, param in rows_map.items():
row = spirometry_df.loc[spirometry_df["Parameters"].str.strip() == param]
if row.empty:
continue
row = row.iloc[0]
records.append(
{
"label": label,
"param": param,
"best": row["Best"],
"pct": row["%Pred."],
"z": row["ZScore"],
}
)
# Figure setup
fig, axes = plt.subplots(
nrows=3,
ncols=1,
figsize=(11.5, 3.6),
sharex=True,
gridspec_kw={"hspace": 0.65},
)
x_min, x_max = -5, 3
# Segment colors: red -> orange -> yellow -> green
segments = [
(-5, -4, "#f4a7a7"), # red-ish
(-4, -3, "#f7c49a"), # orange-ish
(-3, -1.7, "#f6e3a3"), # yellow-ish
(-1.7, 3, "#c9f0cc"), # green-ish
]
ticks = np.arange(x_min, x_max + 1, 1)
labels = [str(i) for i in ticks]
# Plot each row
for ax, rec in zip(axes, records):
# Background segments
for a, b, color in segments:
ax.barh(
0, width=b - a, left=a, height=0.6, color=color, edgecolor="none"
)
# LLN (-1) and Predicted (0) markers
ax.axvline(0, color="black", lw=1)
# Z-score pointer (downward triangle) at top of each panel
if pd.notna(rec["z"]):
trans = mtransforms.blended_transform_factory(
ax.transData, ax.transAxes
)
ax.plot(
float(rec["z"]),
1.2,
marker="v",
markersize=12,
color="dimgray",
transform=trans,
clip_on=False,
)
# Labels, ticks, and styling
ax.set_title(
rec["label"], loc="left", fontsize=11, fontweight="bold", pad=2
)
ax.set_xlim(x_min, x_max)
ax.set_yticks([])
ax.set_xticks(ticks)
ax.set_xticklabels(labels, fontsize=8)
ax.set_xlabel("")
# Top annotations
axes[0].text(-1.7, 0.45, "LLN", ha="center", va="bottom", fontsize=9)
axes[0].text(0, 0.45, "Predicted", ha="center", va="bottom", fontsize=9)
# Right-side summary boxes
fig.subplots_adjust(right=0.78)
box_ax = fig.add_axes(
[0.805, 0.06, 0.18, 0.90]
) # [left, bottom, width, height]
box_ax.axis("off")
# Helper to draw a pill-shaped text box
def pill(ax, xy, text):
x, y = xy
# Draw rounded rectangle background
bbox = FancyBboxPatch(
(x - 0.48, y - 0.09),
0.96,
0.18,
boxstyle="round,pad=0.02,rounding_size=0.08",
ec="#dddddd",
fc="#f3f3f3",
linewidth=1.0,
)
ax.add_patch(bbox)
ax.text(
x,
y + 0.025,
text,
ha="center",
va="center",
fontsize=11,
fontweight="bold",
)
ax.text(
x,
y - 0.055,
"of predicted",
ha="center",
va="center",
fontsize=9,
color="#555555",
)
box_ax.set_xlim(0, 1)
box_ax.set_ylim(0, 1)
# Prepare display strings and positions (top to bottom)
right_items = []
for rec in records:
name = (
"FVC"
if rec["param"] == "FVC"
else ("FEV1" if rec["param"] == "FEV1" else "FEV1/FVC")
)
unit = "L" if rec["param"] in ("FVC", "FEV1") else "%"
value_fmt = f"{rec['best']:.2f}{unit}"
pct_fmt = f"{rec['pct']:.1f}%"
right_items.append((name, value_fmt, pct_fmt))
# Sort to match image order on the right (FVC, FEV1, FEV1/FVC)
order = ["FVC", "FEV1", "FEV1/FVC"]
right_items_sorted = [
next(item for item in right_items if item[0] == k) for k in order
]
ys = [0.82, 0.48, 0.15]
for (name, value_fmt, pct_fmt), y in zip(right_items_sorted, ys):
main_line = f"{name}\n{value_fmt}{pct_fmt}"
pill(box_ax, (0.5, y), main_line)
chart_path = self.charts_dir / "spirometry_chart.png"
plt.savefig(chart_path, dpi=300, bbox_inches="tight")
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_all_charts(
self,
pnoe_df: pd.DataFrame,
spirometry_df: pd.DataFrame,
patient_data: Dict,
save_as_base64: bool = False,
) -> Dict[str, str]:
"""Generate all charts at once and return dictionary of paths/base64 strings"""
charts = {}
# Generate physiological charts
charts["respiratory"] = self.generate_respiratory_chart(pnoe_df, save_as_base64)
charts["fuel_utilization_chart"] = self.generate_fuel_utilization_chart(
pnoe_df, save_as_base64
)
charts["vo2_pulse_chart"] = self.generate_vo2_pulse_chart(
pnoe_df, save_as_base64
)
charts["vo2_breath_chart"] = self.generate_vo2_breath_chart(
pnoe_df, save_as_base64
)
charts["fat_metabolism_chart"] = self.generate_fat_metabolism_chart(
pnoe_df, save_as_base64
)
charts["recovery_chart"] = self.generate_recovery_chart(pnoe_df, save_as_base64)
# Generate body composition charts
if (
"gender" in patient_data
and "age" in patient_data
and "fat_percentage" in patient_data
):
charts["body_fat_percentage_chart"] = (
self.generate_body_fat_percentage_chart(
patient_data["gender"],
patient_data["age"],
patient_data["fat_percentage"],
save_as_base64,
)
)
if "fat_mass_lbs" in patient_data and "lean_mass_lbs" in patient_data:
charts["body_composition_chart"] = self.generate_body_composition_chart(
patient_data["fat_mass_lbs"],
patient_data["lean_mass_lbs"],
save_as_base64,
)
# Generate spirometry chart
charts["spirometry_chart"] = self.generate_spirometry_chart(
spirometry_df, save_as_base64
)
return charts
# Example usage
if __name__ == "__main__":
# Initialize graph generator
generator = GraphGenerator()
# Load sample data (you would pass your actual dataframes)
pnoe_df = pd.read_csv("data/Pnoe_20250729_1550-Moran_Keirstyn.csv", delimiter=";")
spirometry_df = pd.read_csv("data/spirometry_data.csv")
# Preprocess pnoe data (same as in your notebook)
pnoe_df = pnoe_df.apply(pd.to_numeric, errors="ignore")
pnoe_df["VO2 Pulse"] = pnoe_df["VO2(ml/min)"] / pnoe_df["HR(bpm)"]
pnoe_df["VO2 Breath"] = pnoe_df["VO2(ml/min)"] / pnoe_df["BF(bpm)"]
pnoe_df["CHO"] = pnoe_df["EE(kcal/min)"] * pnoe_df["CARBS(%)"] / 100
pnoe_df["FAT"] = pnoe_df["EE(kcal/min)"] * pnoe_df["FAT(%)"] / 100
# Apply smoothing
window_size = 10
columns_to_smooth = [
"VO2(ml/min)",
"VCO2(ml/min)",
"HR(bpm)",
"VT(l)",
"BF(bpm)",
"VE(l/min)",
"VO2 Pulse",
"VO2 Breath",
"CHO",
"FAT",
]
for col in columns_to_smooth:
if col in pnoe_df.columns:
pnoe_df[f"{col}_smoothed"] = (
pnoe_df[col].rolling(window=window_size, min_periods=1).mean()
)
# Patient data
patient_data = {
"gender": "female",
"age": 25,
"fat_percentage": 22.4,
"fat_mass_lbs": 27.6,
"lean_mass_lbs": 95.4,
}
# Generate all charts
charts = generator.generate_all_charts(
pnoe_df, spirometry_df, patient_data, save_as_base64=True
)
print(f"Generated {len(charts)} charts:")
for chart_name in charts.keys():
print(f"- {chart_name}")
Binary file not shown.

Before

Width:  |  Height:  |  Size: 91 KiB

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 285 KiB

After

Width:  |  Height:  |  Size: 287 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 399 KiB

After

Width:  |  Height:  |  Size: 396 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 193 KiB

After

Width:  |  Height:  |  Size: 188 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 108 KiB

After

Width:  |  Height:  |  Size: 108 KiB