Files
bio-performx/app/services/graph_generator.py
T
2025-11-28 12:11:00 +01:00

2056 lines
68 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Graph Generator Service
This service generates all the charts and visualizations required for the medical report.
Based on the analysis notebooks in services_dfdf/.
"""
import base64
import io
from pathlib import Path
import matplotlib
matplotlib.use("Agg") # Use non-interactive backend
import matplotlib.pyplot as plt
import matplotlib.transforms as mtransforms
import numpy as np
import pandas as pd
import seaborn as sns
from matplotlib.patches import FancyBboxPatch
class GraphGenerator:
"""Generate all charts for medical reports"""
def __init__(self, charts_dir: str = "graphs"):
"""
Initialize the graph generator.
Args:
charts_dir: Directory to save generated charts
"""
self.charts_dir = Path(charts_dir)
self.charts_dir.mkdir(exist_ok=True)
def _image_to_base64(self, image_path: Path) -> str:
"""
Convert image file to base64 string.
Args:
image_path: Path to image file
Returns:
Base64 encoded string
"""
try:
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
except FileNotFoundError:
return ""
def generate_respiratory_chart(
self, df: pd.DataFrame, save_as_base64: bool = True
) -> str:
"""
Generate respiratory chart (VT and Speed over time).
Args:
df: Processed DataFrame with smoothed columns
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
first_unique_phase = df.drop_duplicates(subset="PHASE")
phase_times = first_unique_phase["T(sec)"].tolist()
plt.figure(figsize=(18, 5))
ax1 = plt.subplot()
# Plot VT
sns.lineplot(data=df, x="T(sec)", y="VT(l)_smoothed", label="VT (L)")
ax1.set_xlabel("Time (sec)")
ax1.set_ylabel("VT (L)")
ax1.grid(True, alpha=0.1)
ax1.set_ylim(0, min(8, df["VT(l)_smoothed"].max()))
# Plot speed on secondary y-axis
ax2 = ax1.twinx()
ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200))
sns.lineplot(
data=df,
x="T(sec)",
y="Speed",
color="green",
ax=ax2,
drawstyle="steps-post",
linewidth=2,
label="Speed",
)
ax2.set_ylabel("Speed")
ax2.set_ylim(0, min(30, df["Speed"].max()) + 1)
# Combine legends
ax1.get_legend().remove()
ax2.get_legend().remove()
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left")
# Add colored background regions
if len(phase_times) >= 4:
ax1.axvspan(0, phase_times[1], alpha=0.2, color="lightblue")
ax1.axvspan(phase_times[1], phase_times[2], alpha=0.2, color="purple")
ax1.axvspan(phase_times[2], phase_times[3], alpha=0.2, color="lightgreen")
ax1.axvspan(phase_times[3], df["T(sec)"].max(), alpha=0.2, color="blue")
chart_path = self.charts_dir / "respiratory.png"
plt.savefig(chart_path, dpi=300, bbox_inches="tight")
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_fuel_utilization_chart(
self, df: pd.DataFrame, save_as_base64: bool = True
) -> str:
"""
Generate fuel utilization chart (CHO vs FAT by stage).
Args:
df: Processed DataFrame with smoothed columns
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
# Group by speed and calculate mean
speed_groups = df.groupby("Speed").mean(numeric_only=True).round(1)
speed_groups = speed_groups.iloc[1:-1]
# Filter data
filtered_data = speed_groups[
(speed_groups.index >= 3.5) & (speed_groups.index <= 7.5)
]
plt.figure(figsize=(15, 8))
plt.style.use("default")
stage_labels = [f"Stage {i}" for i in range(1, len(filtered_data) + 1)]
x_positions = np.arange(len(filtered_data))
# Calculate fat and carbs energy expenditure
fat_ee = filtered_data["EE(kcal/min)"] * filtered_data["FAT(%)"] / 100
carbs_ee = filtered_data["EE(kcal/min)"] * filtered_data["CARBS(%)"] / 100
ax1 = plt.gca()
# Create stacked bar chart
ax1.bar(
x_positions,
fat_ee,
color="#1f77b4",
alpha=0.8,
width=0.6,
label="Fat",
)
ax1.bar(
x_positions,
carbs_ee,
bottom=fat_ee,
color="#ff7f0e",
alpha=0.8,
width=0.6,
label="Carbs",
)
ax1.set_xlabel("", fontsize=12)
ax1.set_ylabel("Fuel (kcal/min)", fontsize=12)
ax1.set_ylim(0, 20)
# Add values on bars
for i, (fat_val, carb_val, total_val) in enumerate(
zip(fat_ee, carbs_ee, filtered_data["EE(kcal/min)"])
):
if fat_val > 0.3:
ax1.text(
i,
fat_val / 2,
f"{fat_val:.1f}",
ha="center",
va="center",
fontsize=9,
fontweight="bold",
color="white",
)
if carb_val > 0.3:
ax1.text(
i,
fat_val + carb_val / 2,
f"{carb_val:.1f}",
ha="center",
va="center",
fontsize=9,
fontweight="bold",
color="white",
)
ax1.text(
i,
total_val + 0.5,
f"{total_val:.1f} kcal",
ha="center",
va="bottom",
fontsize=10,
fontweight="bold",
color="black",
)
# Add speed labels
for i, speed in enumerate(filtered_data.index):
ax1.text(i, -1.5, f"{speed:.1f} mph", ha="center", va="top", fontsize=9)
ax1.text(
i,
-2.8,
f"{speed * 1.609:.1f} min/km",
ha="center",
va="top",
fontsize=8,
color="gray",
)
# Create secondary y-axis for heart rate
ax2 = ax1.twinx()
ax2.plot(
x_positions,
filtered_data["HR(bpm)"],
marker="o",
linewidth=3,
markersize=8,
color="red",
label="Heart Rate",
)
ax2.set_ylabel("Heart Rate (bpm)", fontsize=12, color="red")
ax2.tick_params(axis="y", labelcolor="red")
ax2.set_ylim(0, 220)
# Add HR values
for i, hr in enumerate(filtered_data["HR(bpm)"]):
ax2.text(
i,
hr + 10,
f"{int(hr)}bpm",
ha="center",
va="bottom",
fontsize=10,
fontweight="bold",
color="red",
)
ax1.set_xticks(x_positions)
ax1.set_xticklabels(stage_labels, fontsize=11)
# Create legend
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(
lines1 + lines2,
labels1 + labels2,
loc="upper left",
frameon=True,
fancybox=True,
shadow=True,
)
ax1.grid(True, alpha=0.3, linestyle="-", linewidth=0.5)
ax1.set_axisbelow(True)
plt.tight_layout()
plt.subplots_adjust(bottom=0.1, top=0.9)
chart_path = self.charts_dir / "fuel_utilization_chart.png"
plt.savefig(chart_path, dpi=300)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_vo2_pulse_chart(
self, df: pd.DataFrame, save_as_base64: bool = True
) -> str:
"""
Generate VO2 Pulse chart with HR and Speed.
Args:
df: Processed DataFrame with smoothed columns
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
first_unique_phase = df.drop_duplicates(subset="PHASE")
phase_times = first_unique_phase["T(sec)"].tolist()
plt.figure(figsize=(18, 5))
ax1 = plt.subplot()
# Plot VO2 Pulse
sns.lineplot(
data=df,
x="T(sec)",
y="VO2 Pulse_smoothed",
label="VO2 Pulse (mL/beat)",
color="blue",
)
ax1.set_xlabel("Time (sec)")
ax1.set_ylabel("VO2 Pulse (mL/beat)")
ax1.set_ylim(0, df["VO2 Pulse_smoothed"].max())
ax1.grid(True, alpha=0.1)
# Create second y-axis for heart rate
ax2 = ax1.twinx()
sns.lineplot(
data=df,
x="T(sec)",
y="HR(bpm)_smoothed",
color="red",
ax=ax2,
linewidth=2,
label="Heart Rate (bpm)",
)
ax2.set_ylabel("Heart Rate (bpm)", color="red")
ax2.tick_params(axis="y", labelcolor="red")
ax2.set_ylim(0, df["HR(bpm)_smoothed"].max() + 1)
# Create third y-axis for speed
ax3 = ax1.twinx()
ax3.spines["right"].set_position(("outward", 60))
sns.lineplot(
data=df,
x="T(sec)",
y="Speed",
color="green",
ax=ax3,
drawstyle="steps-post",
linewidth=2,
label="Speed",
)
ax3.set_ylabel("Speed", color="green")
ax3.tick_params(axis="y", labelcolor="green")
ax3.set_ylim(0, df["Speed"].max() + 1)
ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200))
# Combine legends
if ax1.get_legend():
ax1.get_legend().remove()
if ax2.get_legend():
ax2.get_legend().remove()
if ax3.get_legend():
ax3.get_legend().remove()
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
lines3, labels3 = ax3.get_legend_handles_labels()
ax1.legend(
lines1 + lines2 + lines3, labels1 + labels2 + labels3, loc="upper left"
)
# Add colored background regions
if len(phase_times) >= 4:
ax1.axvspan(0, phase_times[1], alpha=0.2, color="lightblue")
ax1.axvspan(phase_times[1], phase_times[2], alpha=0.2, color="purple")
ax1.axvspan(phase_times[2], phase_times[3], alpha=0.2, color="lightgreen")
ax1.axvspan(phase_times[3], df["T(sec)"].max(), alpha=0.2, color="blue")
chart_path = self.charts_dir / "vo2_pulse_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=300)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_vo2_breath_chart(
self, df: pd.DataFrame, save_as_base64: bool = True
) -> str:
"""
Generate VO2 per Breath chart.
Args:
df: Processed DataFrame with smoothed columns
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
first_unique_phase = df.drop_duplicates(subset="PHASE")
phase_times = first_unique_phase["T(sec)"].tolist()
plt.figure(figsize=(18, 5))
ax1 = plt.subplot()
sns.lineplot(
data=df,
x="T(sec)",
y="VO2 Breath_smoothed",
label="VO2 per Breath (mL/breath)",
)
ax1.set_xlabel("Time (sec)")
ax1.set_ylabel("VO2 per Breath (mL/breath)")
ax1.set_ylim(0, df["VO2 Breath_smoothed"].max() + 1)
ax1.grid(True, alpha=0.1)
# Plot speed on secondary y-axis
ax2 = ax1.twinx()
ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200))
sns.lineplot(
data=df,
x="T(sec)",
y="Speed",
color="green",
ax=ax2,
drawstyle="steps-post",
linewidth=2,
label="Speed",
)
ax2.set_ylim(0, df["Speed"].max() + 1)
ax2.set_ylabel("Speed")
# Combine legends
ax1.get_legend().remove()
ax2.get_legend().remove()
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left")
# Add colored background regions
if len(phase_times) >= 4:
ax1.axvspan(0, phase_times[1], alpha=0.2, color="lightblue")
ax1.axvspan(phase_times[1], phase_times[2], alpha=0.2, color="purple")
ax1.axvspan(phase_times[2], phase_times[3], alpha=0.2, color="lightgreen")
ax1.axvspan(phase_times[3], df["T(sec)"].max(), alpha=0.2, color="blue")
chart_path = self.charts_dir / "vo2_breath_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=300)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_fat_metabolism_chart(
self, df: pd.DataFrame, save_as_base64: bool = True
) -> str:
"""
Generate fat metabolism chart (CHO vs FAT over time).
Args:
df: Processed DataFrame with smoothed columns
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
first_unique_phase = df.drop_duplicates(subset="PHASE")
phase_times = first_unique_phase["T(sec)"].tolist()
plt.figure(figsize=(18, 5))
ax1 = plt.subplot()
# Plot CHO
sns.lineplot(data=df, x="T(sec)", y="CHO_smoothed", label="CHO (kcal/min)")
ax1.set_xlabel("Time (sec)")
ax1.set_ylabel("CHO (g/min)")
ax1.grid(True, alpha=0.1)
# Plot FAT on secondary y-axis
ax2 = ax1.twinx()
ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200))
sns.lineplot(
data=df,
x="T(sec)",
y="FAT_smoothed",
color="green",
ax=ax2,
label="FAT (kcal/min)",
)
ax2.set_ylabel("FAT (kcal/min)")
ax2.set_ylim(0, 15)
# Combine legends
ax1.get_legend().remove()
ax2.get_legend().remove()
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left")
# Add colored background regions
if len(phase_times) >= 4:
ax1.axvspan(0, phase_times[1], alpha=0.2, color="lightblue")
ax1.axvspan(phase_times[1], phase_times[2], alpha=0.2, color="purple")
ax1.axvspan(phase_times[2], phase_times[3], alpha=0.2, color="lightgreen")
ax1.axvspan(phase_times[3], df["T(sec)"].max(), alpha=0.2, color="blue")
chart_path = self.charts_dir / "fat_metabolism_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=300)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_recovery_chart(
self, df: pd.DataFrame, save_as_base64: bool = True
) -> str:
"""
Generate recovery chart (VCO2, HR, and BF).
Args:
df: Processed DataFrame with smoothed columns
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
first_unique_phase = df.drop_duplicates(subset="PHASE")
phase_times = first_unique_phase["T(sec)"].tolist()
plt.figure(figsize=(18, 5))
ax1 = plt.subplot()
# Plot VCO2
sns.lineplot(
data=df,
x="T(sec)",
y="VCO2(ml/min)_smoothed",
label="VCO2 (ml/min)",
color="blue",
)
ax1.set_xlabel("Time (sec)")
ax1.set_ylabel("VO2 Pulse (mL/beat)")
ax1.set_ylim(0, df["VCO2(ml/min)"].max())
ax1.grid(True, alpha=0.1)
# Create second y-axis for heart rate
ax2 = ax1.twinx()
sns.lineplot(
data=df,
x="T(sec)",
y="HR(bpm)_smoothed",
color="red",
ax=ax2,
linewidth=2,
label="Heart Rate (bpm)",
)
ax2.set_ylabel("Heart Rate (bpm)", color="red")
ax2.set_ylim(df["HR(bpm)_smoothed"].min(), df["HR(bpm)_smoothed"].max() + 1)
ax2.tick_params(axis="y", labelcolor="red")
# Create third y-axis for BF
ax3 = ax1.twinx()
ax3.spines["right"].set_position(("outward", 60))
sns.lineplot(
data=df,
x="T(sec)",
y="BF(bpm)_smoothed",
color="green",
ax=ax3,
linewidth=2,
label="BF (bpm)",
)
ax3.set_ylabel("BF (bpm)", color="green")
ax3.tick_params(axis="y", labelcolor="green")
ax3.set_ylim(0, df["BF(bpm)_smoothed"].max() + 1)
ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200))
# Combine legends
if ax1.get_legend():
ax1.get_legend().remove()
if ax2.get_legend():
ax2.get_legend().remove()
if ax3.get_legend():
ax3.get_legend().remove()
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
lines3, labels3 = ax3.get_legend_handles_labels()
ax1.legend(
lines1 + lines2 + lines3, labels1 + labels2 + labels3, loc="upper left"
)
# Add colored background regions
if len(phase_times) >= 4:
ax1.axvspan(0, phase_times[1], alpha=0.2, color="lightblue")
ax1.axvspan(phase_times[1], phase_times[2], alpha=0.2, color="purple")
ax1.axvspan(phase_times[2], phase_times[3], alpha=0.2, color="lightgreen")
ax1.axvspan(phase_times[3], df["T(sec)"].max(), alpha=0.2, color="blue")
chart_path = self.charts_dir / "recovery_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=300)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_tsi_chart(
self, oxygenation_df: pd.DataFrame, save_as_base64: bool = True
) -> str:
"""
Generate TSI (Tissue Saturation Index) chart with trend lines per stage.
Args:
oxygenation_df: DataFrame with Time, TSI, and TSI-second columns
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
from numpy.polynomial.polynomial import Polynomial
plt.figure(figsize=(12, 5.5))
# Plot TSI (Left Leg)
plt.plot(
oxygenation_df["Time"],
oxygenation_df["TSI"],
label="TSI (Left Leg)",
color="steelblue",
linewidth=2,
)
# Plot TSI2 (Right Leg)
plt.plot(
oxygenation_df["Time"],
oxygenation_df["TSI-second"],
label="TSI2 (Right Leg)",
color="orange",
linewidth=2,
)
# Define time intervals for stages (adjust these based on your test protocol)
max_time = oxygenation_df["Time"].max()
intervals = [
(0, 250),
(250, 500),
(500, 750),
(750, 1000),
(1000, 1250),
(1250, 1500),
(1500, max_time),
]
# Calculate and plot trend lines for each interval
for start_time, end_time in intervals:
# Filter data for this interval
mask_interval = (oxygenation_df["Time"] >= start_time) & (
oxygenation_df["Time"] <= end_time
)
# TSI (Left Leg) trend for this interval
mask_left = mask_interval & ~oxygenation_df["TSI"].isna()
if mask_left.sum() > 1: # Need at least 2 points for a line
x_left = oxygenation_df.loc[mask_left, "Time"]
y_left = oxygenation_df.loc[mask_left, "TSI"]
coefs_left = Polynomial.fit(x_left, y_left, 1).convert().coef
trend_left = coefs_left[0] + coefs_left[1] * x_left
plt.plot(
x_left,
trend_left,
color="black",
linestyle="--",
linewidth=2,
alpha=0.8,
)
# TSI-second (Right Leg) trend for this interval
mask_right = mask_interval & ~oxygenation_df["TSI-second"].isna()
if mask_right.sum() > 1: # Need at least 2 points for a line
x_right = oxygenation_df.loc[mask_right, "Time"]
y_right = oxygenation_df.loc[mask_right, "TSI-second"]
coefs_right = Polynomial.fit(x_right, y_right, 1).convert().coef
trend_right = coefs_right[0] + coefs_right[1] * x_right
plt.plot(
x_right,
trend_right,
color="black",
linestyle="--",
linewidth=2,
alpha=0.8,
)
plt.xlabel("Time (s)")
plt.ylabel("TSI (%)")
plt.title("TSI (Left) and TSI2 (Right) with Black Slope Lines per Stage")
plt.legend(fontsize=10, loc="upper right")
plt.grid(alpha=0.25)
plt.tight_layout()
chart_path = self.charts_dir / "tsi_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=160)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_body_composition_chart(
self, fat_mass_lbs: float, lean_mass_lbs: float, save_as_base64: bool = True
) -> str:
"""
Generate body composition donut chart.
Args:
fat_mass_lbs: Fat mass in pounds
lean_mass_lbs: Lean mass in pounds
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
# Calculate percentages
total_weight = fat_mass_lbs + lean_mass_lbs
fat_percentage = (fat_mass_lbs / total_weight) * 100
lean_percentage = (lean_mass_lbs / total_weight) * 100
sizes = [fat_percentage, lean_percentage]
colors = ["#fde3ac", "#ff9966"]
plt.figure(figsize=(8, 8))
# Create donut chart
plt.pie(
sizes,
autopct="",
startangle=90,
wedgeprops=dict(width=0.5, edgecolor="w"),
colors=colors,
labels=["", ""],
)
# Add custom text annotations
plt.text(
-1,
1,
f"Fat Mass ({fat_mass_lbs:.1f}lbs)\n{fat_percentage:.1f}%",
fontsize=14,
fontweight="bold",
ha="center",
va="center",
bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.8),
)
plt.text(
1,
-1,
f"Lean Mass ({lean_mass_lbs:.1f}lbs)\n{lean_percentage:.1f}%",
fontsize=14,
fontweight="bold",
ha="center",
va="center",
bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.8),
)
plt.axis("equal")
chart_path = self.charts_dir / "body_composition_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=600)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_body_fat_percent_chart(
self,
fat_percentage: float,
age: int,
gender: str,
save_as_base64: bool = True,
) -> str:
"""
Generate body fat percentage chart.
Args:
fat_percentage: Body fat percentage
age: Patient age
gender: Patient gender ('male' or 'female')
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
# Determine age group
if 20 <= age <= 39:
age_group = "20-39"
elif 40 <= age <= 59:
age_group = "40-59"
elif 60 <= age <= 79:
age_group = "60-79"
else:
age_group = "20-39" # Default
gender_abbrev = "M" if gender.lower() == "male" else "F"
demographic = f"{age_group}\n({gender_abbrev})"
# Define segments based on gender and age group
if gender.lower() == "female":
if age_group == "20-39":
segments = [
("#F8A8A8", 0, 15), # Bad: 0-15%
("#FFEECC", 15, 5), # Okay: 15-20%
("#D0F0C0", 20, 15), # Good: 20-35%
("#FFEECC", 35, 5), # Okay: 35-40%
("#F8A8A8", 40, 10), # Bad: 40-50%
]
else: # 40-59 and 60-79 have same ranges for females
segments = [
("#F8A8A8", 0, 20), # Bad: 0-20%
("#FFEECC", 20, 5), # Okay: 20-25%
("#D0F0C0", 25, 10), # Good: 25-35%
("#FFEECC", 35, 5), # Okay: 35-40%
("#F8A8A8", 40, 10), # Bad: 40-50%
]
else: # male
if age_group == "20-39":
segments = [
("#F8A8A8", 0, 5), # Bad: 0-5%
("#FFEECC", 5, 5), # Okay: 5-10%
("#D0F0C0", 10, 10), # Good: 10-20%
("#FFEECC", 20, 5), # Okay: 20-25%
("#F8A8A8", 25, 25), # Bad: 25-50%
]
elif age_group == "40-59":
segments = [
("#F8A8A8", 0, 5), # Bad: 0-5%
("#FFEECC", 5, 5), # Okay: 5-10%
("#D0F0C0", 10, 10), # Good: 10-20%
("#FFEECC", 20, 10), # Okay: 20-30%
("#F8A8A8", 30, 20), # Bad: 30-50%
]
else: # 60-79
segments = [
("#F8A8A8", 0, 5), # Bad: 0-5%
("#FFEECC", 5, 5), # Okay: 5-10%
("#D0F0C0", 10, 15), # Good: 10-25%
("#FFEECC", 25, 5), # Okay: 25-30%
("#F8A8A8", 30, 20), # Bad: 30-50%
]
fig, ax = plt.subplots(figsize=(10, 2))
# Create the segmented bar
for color, start, length in segments:
ax.barh(
y=0,
width=length,
left=start,
height=1,
color=color,
edgecolor="black",
linewidth=0.5,
)
# Add the indicator (triangle)
ax.plot(
fat_percentage,
1.05,
marker="v",
color="black",
markersize=10,
clip_on=False,
transform=ax.get_xaxis_transform(),
)
# Set axis properties
ax.set_xlim(0, 50)
ax.set_xticks(range(0, 51, 5))
ax.set_yticks([])
ax.text(
-0.05,
0,
demographic,
transform=ax.get_yaxis_transform(),
va="center",
ha="right",
fontsize=12,
)
ticks = range(0, 51, 5)
ax.set_xticks(ticks)
labels = [f"{t}%" for t in ticks]
ax.set_xticklabels(labels)
# Clean up spines
ax.spines["right"].set_visible(False)
ax.spines["top"].set_visible(False)
ax.spines["left"].set_visible(False)
ax.spines["bottom"].set_visible(True)
# Add tick marks
for x in range(0, 51, 5):
ax.plot(
[x, x],
[-0.05, -0.01],
color="black",
transform=ax.get_xaxis_transform(),
clip_on=False,
)
plt.tight_layout()
chart_path = self.charts_dir / "body_fat_percent_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=300)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_spirometry_chart(
self, spirometry_df: pd.DataFrame, save_as_base64: bool = True
) -> str:
"""
Generate spirometry chart with Z-scores.
Args:
spirometry_df: Spirometry DataFrame with parameters
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
# Coerce numeric columns - handle various column name formats
# Map standard column names to possible variations
column_aliases = {
"Best": ["Best", "best", "BEST"],
"LLN": ["LLN", "lln"],
"Pred.": ["Pred.", "Pred", "pred", "Predicted", "predicted"],
"%Pred.": [
"%Pred.",
"%Pred",
"%pred",
"% Pred.",
"% Pred",
"Pred %",
"Pred%",
],
"ZScore": ["ZScore", "Z-Score", "z-score", "Zscore", "zscore", "Z Score"],
}
# Find and normalize column names
column_mapping = {}
for target_col, possible_names in column_aliases.items():
for col_name in possible_names:
if col_name in spirometry_df.columns:
column_mapping[target_col] = col_name
# Convert to numeric
spirometry_df[col_name] = pd.to_numeric(
spirometry_df[col_name], errors="coerce"
)
break
# If standard columns don't exist, create aliases
for target_col, source_col in column_mapping.items():
if target_col not in spirometry_df.columns and source_col != target_col:
spirometry_df[target_col] = spirometry_df[source_col]
# Select rows of interest
rows_map = {
"Lung Volume": "FVC",
"Lung Power": "FEV1",
"Power/Volume": "FEV1/FVC%",
}
records = []
for label, param in rows_map.items():
# Try exact match first
row = spirometry_df.loc[spirometry_df["Parameters"].str.strip() == param]
if row.empty:
# Try case-insensitive match
row = spirometry_df.loc[
spirometry_df["Parameters"].str.strip().str.upper() == param.upper()
]
if row.empty:
# Try matching without % sign
if "%" in param:
param_no_pct = param.replace("%", "")
row = spirometry_df.loc[
spirometry_df["Parameters"].str.strip() == param_no_pct
]
if row.empty:
print(f"Warning: Could not find parameter '{param}' in spirometry data")
print(f"Available parameters: {spirometry_df['Parameters'].tolist()}")
continue
row = row.iloc[0]
# Get values with fallbacks for column name variations
best_val = row.get("Best", row.get("best", pd.NA))
pct_val = row.get(
"%Pred.", row.get("%Pred", row.get("Pred %", row.get("Pred%", pd.NA)))
)
z_val = row.get("ZScore", row.get("Z-Score", row.get("Zscore", pd.NA)))
records.append(
{
"label": label,
"param": param,
"best": best_val,
"pct": pct_val,
"z": z_val,
}
)
# Validate we have exactly 3 records
if len(records) != 3:
raise ValueError(
f"Expected 3 spirometry parameters (FVC, FEV1, FEV1/FVC%), "
f"but found {len(records)}. Found: {[r['param'] for r in records]}"
)
# Figure setup
fig, axes = plt.subplots(
nrows=3,
ncols=1,
figsize=(11.5, 3.6),
sharex=True,
gridspec_kw={"hspace": 0.65},
)
x_min, x_max = -5, 3
# Segment colors
segments = [
(-5, -4, "#f4a7a7"), # red-ish
(-4, -3, "#f7c49a"), # orange-ish
(-3, -1.7, "#f6e3a3"), # yellow-ish
(-1.7, 3, "#c9f0cc"), # green-ish
]
ticks = np.arange(x_min, x_max + 1, 1)
labels = [str(i) for i in ticks]
# Plot each row
for ax, rec in zip(axes, records):
# Background segments
for a, b, color in segments:
ax.barh(
0, width=b - a, left=a, height=0.6, color=color, edgecolor="none"
)
# LLN and Predicted markers
ax.axvline(0, color="black", lw=1)
# Z-score pointer
if pd.notna(rec["z"]):
trans = mtransforms.blended_transform_factory(
ax.transData, ax.transAxes
)
ax.plot(
float(rec["z"]),
1.2,
marker="v",
markersize=12,
color="dimgray",
transform=trans,
clip_on=False,
)
# Labels and styling
ax.set_title(
rec["label"], loc="left", fontsize=11, fontweight="bold", pad=2
)
ax.set_xlim(x_min, x_max)
ax.set_yticks([])
ax.set_xticks(ticks)
ax.set_xticklabels(labels, fontsize=8)
ax.set_xlabel("")
# Top annotations
axes[0].text(-1.7, 0.45, "LLN", ha="center", va="bottom", fontsize=9)
axes[0].text(0, 0.45, "Predicted", ha="center", va="bottom", fontsize=9)
# Right-side summary boxes
fig.subplots_adjust(right=0.78)
box_ax = fig.add_axes([0.805, 0.06, 0.18, 0.90])
box_ax.axis("off")
def pill(ax, xy, text):
x, y = xy
bbox = FancyBboxPatch(
(x - 0.48, y - 0.09),
0.96,
0.18,
boxstyle="round,pad=0.02,rounding_size=0.08",
ec="#dddddd",
fc="#f3f3f3",
linewidth=1.0,
)
ax.add_patch(bbox)
ax.text(
x,
y + 0.025,
text,
ha="center",
va="center",
fontsize=11,
fontweight="bold",
)
ax.text(
x,
y - 0.055,
"of predicted",
ha="center",
va="center",
fontsize=9,
color="#555555",
)
box_ax.set_xlim(0, 1)
box_ax.set_ylim(0, 1)
# Prepare display strings
right_items = []
for rec in records:
name = (
"FVC"
if rec["param"] == "FVC"
else ("FEV1" if rec["param"] == "FEV1" else "FEV1/FVC")
)
unit = "L" if rec["param"] in ("FVC", "FEV1") else "%"
value_fmt = f"{rec['best']:.2f}{unit}"
pct_fmt = f"{rec['pct']:.1f}%"
right_items.append((name, value_fmt, pct_fmt))
# Sort to match order
order = ["FVC", "FEV1", "FEV1/FVC"]
right_items_sorted = [
next(item for item in right_items if item[0] == k) for k in order
]
ys = [0.82, 0.48, 0.15]
for (name, value_fmt, pct_fmt), y in zip(right_items_sorted, ys):
main_line = f"{name}\n{value_fmt}{pct_fmt}"
pill(box_ax, (0.5, y), main_line)
chart_path = self.charts_dir / "spirometry_chart.png"
plt.savefig(chart_path, dpi=300, bbox_inches="tight")
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_metabolism_chart(
self,
rmr_kcal: float,
weight_kg: float = None,
height_cm: float = None,
age_years: int = None,
sex: str = None,
save_as_base64: bool = True,
) -> str:
"""
Generate metabolism chart (Slow vs Fast Metabolism).
Matches the notebook implementation with ratio-based scale (0.3 to 1.9).
Args:
rmr_kcal: Resting metabolic rate in kcal/day (measured RMR)
weight_kg: Weight in kg (optional, for calculating ratio)
height_cm: Height in cm (optional, for calculating ratio)
age_years: Age in years (optional, for calculating ratio)
sex: Sex ("male" or "female", optional, for calculating ratio)
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
from matplotlib.patches import Rectangle
fig, ax = plt.subplots(figsize=(11.5, 2.5))
# Calculate ratio if we have all required parameters
ratio = None
if all([weight_kg, height_cm, age_years, sex]):
# Mifflin-St Jeor equation
if sex.lower() == "male":
mifflin_rmr = 10 * weight_kg + 6.25 * height_cm - 5 * age_years + 5
elif sex.lower() == "female":
mifflin_rmr = 10 * weight_kg + 6.25 * height_cm - 5 * age_years - 161
else:
mifflin_rmr = None
if mifflin_rmr and mifflin_rmr > 0:
ratio = rmr_kcal / mifflin_rmr
# Bar setup - using ratio scale from 0.3 to 1.9 (as in notebook)
scale_edges = [0.3, 0.7, 0.9, 1.1, 1.3, 1.5, 1.9]
scale_labels = ["Very Slow", "Slow", "Average", "Fast", "Very Fast"]
tick_edges = scale_edges[1:-1] # Remove first and last tick (omit 0.3 and 1.9)
x_start = scale_edges[0]
x_end = scale_edges[-1]
# Make the bar THICKER by increasing bar_height and adjusting y_bar
bar_height = 0.36
y_bar = 0.48
color_before = "#B2FFC8"
color_after = "#ECEDF2"
gray_color = "#606060"
# If we have a ratio, use it; otherwise map rmr_kcal to the scale
if ratio is not None:
highlight_end = min(max(ratio, x_start), x_end)
else:
# Fallback: map rmr_kcal to scale (assuming typical range 1000-3000 kcal/day)
# Map to 0.3-1.9 scale
min_rmr = 1000
max_rmr = 3000
normalized = (rmr_kcal - min_rmr) / (max_rmr - min_rmr)
highlight_end = x_start + normalized * (x_end - x_start)
highlight_end = min(max(highlight_end, x_start), x_end)
# Draw plain rectangle bar (no rounding)
ax.add_patch(
Rectangle(
(x_start, y_bar),
x_end - x_start,
bar_height,
ec="none",
fc=color_after,
lw=0,
)
)
# Highlighted rectangle
if highlight_end > x_start:
ax.add_patch(
Rectangle(
(x_start, y_bar),
highlight_end - x_start,
bar_height,
ec="none",
fc=color_before,
lw=0,
)
)
# kCals label, left-aligned, bold inside green, TEXT COLOR gray
ax.text(
x_start + 0.07,
y_bar + bar_height / 2,
f"{int(round(rmr_kcal))}kCals",
ha="left",
va="center",
color=gray_color,
fontsize=12,
weight="bold",
bbox=dict(boxstyle="round,pad=0.14", ec="none", fc="#B2FFC8", alpha=1.0),
)
# Triangle marker above highlight end, gray
ax.plot(
[highlight_end],
[y_bar + bar_height + 0.08],
marker="v",
markersize=14,
color=gray_color,
clip_on=False,
)
# Draw ticks omit leftmost/rightmost (thicker and below bar), color gray
tick_width = 4.1
tick_bottom = y_bar - 0.07 # further below bar
tick_top = y_bar # at the base of bar
for edge in tick_edges:
ax.plot(
[edge, edge],
[tick_bottom, tick_top],
color=gray_color,
lw=tick_width,
solid_capstyle="butt",
clip_on=False,
zorder=2,
)
# Label locations (place directly under each tick), text color gray
label_y = tick_bottom - 0.08
for label, tick in zip(scale_labels, tick_edges):
ax.text(
tick,
label_y,
label,
ha="center",
va="top",
fontsize=11,
weight="bold",
color=gray_color,
)
# Axis title: bold, with extra gap above the graph
ax.text(
x_start,
y_bar + bar_height + 0.5,
"Slow vs Fast Metabolism",
ha="left",
va="bottom",
fontsize=14,
weight="bold",
)
ax.set_xlim(x_start, x_end)
ax.set_ylim(0, 1)
ax.axis("off")
plt.tight_layout()
chart_path = self.charts_dir / "metabolism_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=300)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_fuel_source_chart(
self, fat_percentage: float, save_as_base64: bool = True
) -> str:
"""
Generate fuel source chart (Fats vs Carbs).
Matches the notebook implementation with proper tick styling.
Args:
fat_percentage: Fat percentage at rest
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
from matplotlib.patches import FancyBboxPatch
fig, ax = plt.subplots(figsize=(11.5, 2.5))
carb_percentage = 100 - fat_percentage
optimal_point = 75
# Let the bars be a bit thicker as well: increase bar height and y
fats_bar = FancyBboxPatch(
(0, 0.36),
fat_percentage,
0.28,
boxstyle="round,pad=0,rounding_size=0.1",
ec="none",
fc="#FEEAAB",
)
ax.add_patch(fats_bar)
carbs_bar = FancyBboxPatch(
(fat_percentage, 0.36),
carb_percentage,
0.28,
boxstyle="round,pad=0,rounding_size=0.1",
ec="none",
fc="#A7F5FF",
)
ax.add_patch(carbs_bar)
# Style: match font weight/color/size with other chart
label_fontprops = dict(fontsize=12, weight="bold", color="#333333")
ax.text(
fat_percentage / 2,
0.5,
f"Fats\n{fat_percentage:.0f}%",
ha="center",
va="center",
**label_fontprops,
)
ax.text(
fat_percentage + carb_percentage / 2,
0.5,
f"Carbs\n{100 - fat_percentage:.0f}%",
ha="center",
va="center",
**label_fontprops,
)
# Add 'Optimal' label
ax.text(
optimal_point,
0.9,
"Optimal",
ha="center",
va="center",
fontsize=12,
weight="bold",
color="#606060",
)
# Optimal point line
ax.plot([optimal_point, optimal_point], [0.65, 0.8], color="#606060", lw=3)
# Indicator Triangle
ax.plot(fat_percentage, 0.7, "v", markersize=15, color="#606060", clip_on=False)
# Ticks and Labels - matching notebook implementation
positions = [0, 25, 50, 75, 100]
tick_color = "#606060"
for pos in positions:
# Smallest ticks (first and last) are thicker
if pos == 0:
ax.text(
pos + 0.5,
0.15,
str(pos),
ha="center",
va="center",
fontsize=12,
color="#333333",
weight="bold",
)
ax.plot(
[pos, pos],
[0.25, 0.37],
color=tick_color,
lw=14,
solid_capstyle="butt",
)
elif pos == 100:
ax.text(
pos - 0.5,
0.15,
str(pos),
ha="center",
va="center",
fontsize=12,
color="#333333",
weight="bold",
)
ax.plot(
[pos, pos],
[0.25, 0.37],
color=tick_color,
lw=14,
solid_capstyle="butt",
)
else:
ax.text(
pos,
0.15,
str(pos),
ha="center",
va="center",
fontsize=12,
color="#333333",
weight="bold",
)
ax.plot(
[pos, pos],
[0.25, 0.37],
color=tick_color,
lw=8,
solid_capstyle="butt",
)
# Chart Styling - uniform style for title
ax.set_title("Fuel Source", fontsize=14, weight="bold", loc="left", pad=22)
ax.set_xlim(0, 100)
ax.set_ylim(0, 1)
ax.axis("off")
plt.tight_layout()
chart_path = self.charts_dir / "fuel_source_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=300)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_vo2_max_table(
self,
data: list[list],
columns: list[str],
vo2_max_value: float = None,
category: str = None,
cell_colors: list[list[str]] = None,
save_as_base64: bool = True,
) -> str:
"""
Generate VO2 Max table as an image with optimized sizing, highlighting the patient's category.
Args:
data: List of rows (each row is a list of values)
columns: List of column headers
vo2_max_value: Patient's VO2 max value (for title and arrow)
category: Category that the patient falls into (e.g., 'Good', 'Excellent')
cell_colors: Optional matrix of cell colors
save_as_base64: If True, return base64 string
Returns:
Base64 string or file path
"""
import io
from matplotlib.patches import FancyArrowPatch, RegularPolygon
# Fixed optimal sizing for VO2 Max table (7 columns, 1 data row)
fig, ax = plt.subplots(figsize=(14, 2.2))
ax.axis("off")
# Create table
table = ax.table(
cellText=data,
colLabels=columns,
cellLoc="center",
loc="center",
bbox=[0, 0, 1, 1],
)
# Style the table
table.auto_set_font_size(False)
table.set_fontsize(11)
table.scale(1, 1.8)
# Header row styling (cyan background)
for i in range(len(columns)):
cell = table[(0, i)]
cell.set_facecolor("#7dd3fc") # cyan-300 equivalent
cell.set_text_props(weight="bold", color="black", fontsize=12)
cell.set_edgecolor("#9ca3af") # gray-400
cell.set_linewidth(1)
# Find the column index for the category (if provided)
category_index = None
if category and category in columns:
category_index = columns.index(category)
# Data row styling
for i in range(len(data[0])):
cell = table[(1, i)]
if i == 0: # Age column
cell.set_facecolor("#a5f3fc") # cyan-200
cell.set_text_props(weight="semibold", color="black", fontsize=11)
else:
cell.set_facecolor("#f3f4f6") # gray-100
cell.set_text_props(color="black", fontsize=10)
# Bold the cell that corresponds to the patient's category
if category_index is not None and i == category_index:
cell.set_text_props(weight="bold", color="black", fontsize=11)
cell.set_edgecolor("#9ca3af") # gray-400
cell.set_linewidth(1)
# Add arrow indicator below the category column
if category_index is not None:
# Calculate position
cell_width = 1.0 / len(columns)
arrow_x = (category_index + 0.5) * cell_width
# Draw arrow pointing up
arrow = FancyArrowPatch(
(arrow_x, -0.15),
(arrow_x, -0.05),
arrowstyle="->",
mutation_scale=20,
linewidth=2,
color="black",
transform=ax.transAxes,
)
ax.add_patch(arrow)
# Add triangle at the top
triangle = RegularPolygon(
(arrow_x, -0.05),
3,
radius=0.02,
orientation=np.pi / 2,
color="black",
transform=ax.transAxes,
)
ax.add_patch(triangle)
# Set title - calculate approximate percentile
if vo2_max_value is not None:
if category == "Superior":
percentile = "100th percentile"
else:
percentile_map = {
"Very Poor": "1st-10th percentile",
"Poor": "10th-20th percentile",
"Fair": "20th-40th percentile",
"Good": "40th-60th percentile",
"Excellent": "60th-80th percentile",
}
percentile = percentile_map.get(category, "N/A")
title = f"VO2 Max - {vo2_max_value:.1f} ({percentile})"
ax.set_title(title, fontsize=14, fontweight="bold", pad=10)
if save_as_base64:
buf = io.BytesIO()
plt.savefig(
buf,
format="png",
bbox_inches="tight",
dpi=300,
facecolor="white",
pad_inches=0.05,
)
plt.close(fig)
buf.seek(0)
return base64.b64encode(buf.read()).decode("utf-8")
else:
output_path = (
self.charts_dir / f"vo2_max_table_{pd.Timestamp.now().timestamp()}.png"
)
plt.savefig(
output_path,
bbox_inches="tight",
dpi=300,
facecolor="white",
pad_inches=0.05,
)
plt.close(fig)
return str(output_path)
def generate_heart_rate_zones_table(
self,
data: list[list],
columns: list[str],
cell_colors: list[list[str]] = None,
save_as_base64: bool = True,
) -> str:
"""
Generate Heart Rate Zones table as an image with optimized sizing.
Args:
data: List of rows (each row is a list of values)
columns: List of column headers (Zone 1-5)
cell_colors: Optional matrix of cell colors (IGNORED - using notebook colors)
save_as_base64: If True, return base64 string
Returns:
Base64 string or file path
"""
import io
# Optimal sizing for HR Zones table (5 columns, 8 rows) - match notebook exactly
fig, ax = plt.subplots(figsize=(12, 8))
ax.axis("off")
# Data comes pre-formatted with newlines from context_generator - use as-is
# No text wrapping needed
# Create table without rowLabels - match notebook exactly
table = ax.table(
cellText=data,
colLabels=columns,
loc="center",
cellLoc="center",
)
# Style the table - match notebook exactly
table.auto_set_font_size(False)
table.set_fontsize(10)
table.scale(1, 3.5) # Increased vertical scale for multi-line text
# Header row styling
for j, label in enumerate(columns):
cell = table[(0, j)]
cell.set_facecolor("#7dd3fc") # cyan-300
cell.set_text_props(weight="bold")
# Row specific styling - match notebook colors exactly
colors = ["#fecaca", "#fecaca", "#fef08a", "#bbf7d0", "#bbf7d0"]
# HR BPM row is at index 2 (0-based in data) -> row 3 in table (0 is header)
for j in range(len(columns)):
cell = table[(3, j)]
cell.set_facecolor(colors[j])
cell.set_text_props(weight="bold")
# Breathing row is at index 7 -> row 8 in table
for j in range(len(columns)):
cell = table[(8, j)]
cell.set_facecolor(colors[j])
cell.set_text_props(weight="bold")
# Add title matching notebook
plt.title(
"Personalized Heart Rate Zones", fontsize=16, fontweight="bold", pad=5
)
plt.tight_layout()
if save_as_base64:
buf = io.BytesIO()
plt.savefig(
buf,
format="png",
bbox_inches="tight",
dpi=300,
facecolor="white",
)
plt.close(fig)
buf.seek(0)
return base64.b64encode(buf.read()).decode("utf-8")
else:
output_path = (
self.charts_dir / f"hr_zones_table_{pd.Timestamp.now().timestamp()}.png"
)
plt.savefig(
output_path,
bbox_inches="tight",
dpi=300,
facecolor="white",
)
plt.close(fig)
return str(output_path)
def generate_resting_heart_rate_table(
self,
data: list[list],
columns: list[str],
rhr_value: float = None,
category: str = None,
cell_colors: list[list[str]] = None,
save_as_base64: bool = True,
) -> str:
"""
Generate Resting Heart Rate table as an image with optimized sizing, highlighting the patient's category.
Args:
data: List of rows (each row is a list of values)
columns: List of column headers
rhr_value: Patient's resting heart rate value in bpm (for title and arrow)
category: Category that the patient falls into (e.g., 'Good', 'Excellent')
cell_colors: Optional matrix of cell colors
save_as_base64: If True, return base64 string
Returns:
Base64 string or file path
"""
import io
from matplotlib.patches import FancyArrowPatch, RegularPolygon
# Optimal sizing for RHR table (8 columns, 1 data row)
fig, ax = plt.subplots(figsize=(16, 2.2))
ax.axis("off")
# Create table
table = ax.table(
cellText=data,
colLabels=columns,
cellLoc="center",
loc="center",
bbox=[0, 0, 1, 1],
)
# Style the table
table.auto_set_font_size(False)
table.set_fontsize(11)
table.scale(1, 1.8)
# Header row styling (cyan background)
for i in range(len(columns)):
cell = table[(0, i)]
cell.set_facecolor("#7dd3fc") # cyan-300 equivalent
cell.set_text_props(weight="bold", color="black", fontsize=12)
cell.set_edgecolor("#9ca3af") # gray-400
cell.set_linewidth(1)
# Find the column index for the category (if provided)
category_index = None
if category and category in columns:
category_index = columns.index(category)
# Data row styling
for i in range(len(data[0])):
cell = table[(1, i)]
if i == 0: # Age column
cell.set_facecolor("#a5f3fc") # cyan-200
cell.set_text_props(weight="semibold", color="black", fontsize=11)
else:
# Highlight the category cell with light green background
if category_index is not None and i == category_index:
cell.set_facecolor("#d1fae5") # green-200 equivalent
cell.set_text_props(weight="bold", color="black", fontsize=11)
else:
cell.set_facecolor("#f3f4f6") # gray-100
cell.set_text_props(color="black", fontsize=10)
cell.set_edgecolor("#9ca3af") # gray-400
cell.set_linewidth(1)
# Add arrow indicator below the category column
if category_index is not None:
# Calculate position
cell_width = 1.0 / len(columns)
arrow_x = (category_index + 0.5) * cell_width
# Draw arrow pointing up
arrow = FancyArrowPatch(
(arrow_x, -0.15),
(arrow_x, -0.05),
arrowstyle="->",
mutation_scale=20,
linewidth=2,
color="black",
transform=ax.transAxes,
)
ax.add_patch(arrow)
# Add triangle at the top
triangle = RegularPolygon(
(arrow_x, -0.05),
3,
radius=0.02,
orientation=np.pi / 2,
color="black",
transform=ax.transAxes,
)
ax.add_patch(triangle)
# Set title
if rhr_value is not None:
title = f"Resting Heart Rate - {rhr_value:.0f}bpm"
ax.set_title(title, fontsize=14, fontweight="bold", pad=10)
if save_as_base64:
buf = io.BytesIO()
plt.savefig(
buf,
format="png",
bbox_inches="tight",
dpi=300,
facecolor="white",
pad_inches=0.05,
)
plt.close(fig)
buf.seek(0)
return base64.b64encode(buf.read()).decode("utf-8")
else:
output_path = (
self.charts_dir / f"rhr_table_{pd.Timestamp.now().timestamp()}.png"
)
plt.savefig(
output_path,
bbox_inches="tight",
dpi=300,
facecolor="white",
pad_inches=0.05,
)
plt.close(fig)
return str(output_path)
def generate_muscle_oxygenation_chart(
self, oxygenation_df: pd.DataFrame, save_as_base64: bool = True
) -> tuple:
"""
Generate comprehensive muscle oxygenation (SmO2) chart with both legs and heart rate.
Args:
oxygenation_df: DataFrame with muscle oxygenation data (Train.Red CSV format)
save_as_base64: If True, return base64 string, else return file path
Returns:
Tuple of (chart_string, metrics_dict) where metrics_dict contains key values
"""
# Data preparation
df_oxy = oxygenation_df.copy()
# Convert columns to numeric
df_oxy["Timestamp (seconds passed)"] = pd.to_numeric(
df_oxy["Timestamp (seconds passed)"], errors="coerce"
)
df_oxy["Left_SmO2"] = pd.to_numeric(df_oxy["SmO2"], errors="coerce")
df_oxy["Right_SmO2"] = pd.to_numeric(df_oxy["SmO2.1"], errors="coerce")
df_oxy["Heart_Rate"] = pd.to_numeric(
df_oxy["Heart Rate (BPM)"], errors="coerce"
)
df_oxy["Lap"] = pd.to_numeric(df_oxy["Lap/Event"], errors="coerce")
# Drop rows with missing timestamps
df_oxy = df_oxy.dropna(subset=["Timestamp (seconds passed)"])
df_oxy = df_oxy.sort_values("Timestamp (seconds passed)").reset_index(drop=True)
# Apply 10-second rolling mean smoothing
time_diffs = df_oxy["Timestamp (seconds passed)"].diff().dropna()
avg_sampling_interval = time_diffs.median()
sampling_freq = 1 / avg_sampling_interval if avg_sampling_interval > 0 else 10
window_samples = int(10 * sampling_freq)
df_oxy["Left_SmO2_smooth"] = (
df_oxy["Left_SmO2"]
.rolling(window=window_samples, center=True, min_periods=1)
.mean()
)
df_oxy["Right_SmO2_smooth"] = (
df_oxy["Right_SmO2"]
.rolling(window=window_samples, center=True, min_periods=1)
.mean()
)
df_oxy["Heart_Rate_smooth"] = (
df_oxy["Heart_Rate"]
.rolling(window=window_samples, center=True, min_periods=1)
.mean()
)
# Identify stage boundaries
lap_changes = df_oxy[df_oxy["Lap"].diff() != 0].copy()
lap_starts = {}
for idx, row in lap_changes.iterrows():
lap_num = int(row["Lap"])
lap_starts[lap_num] = row["Timestamp (seconds passed)"]
warm_up_end = lap_starts.get(1, df_oxy["Timestamp (seconds passed)"].max())
recovery_start = lap_starts.get(7, df_oxy["Timestamp (seconds passed)"].max())
# Calculate recovery percentages
warm_up_last_30_start = warm_up_end - 30
warm_up_mask = (
df_oxy["Timestamp (seconds passed)"] >= warm_up_last_30_start
) & (df_oxy["Timestamp (seconds passed)"] <= warm_up_end)
recovery_end = df_oxy["Timestamp (seconds passed)"].max()
recovery_last_30_start = recovery_end - 30
recovery_mask = (
df_oxy["Timestamp (seconds passed)"] >= recovery_last_30_start
) & (df_oxy["Timestamp (seconds passed)"] <= recovery_end)
left_warmup_avg = df_oxy.loc[warm_up_mask, "Left_SmO2_smooth"].mean()
left_recovery_avg = df_oxy.loc[recovery_mask, "Left_SmO2_smooth"].mean()
left_recovery_pct = round((left_recovery_avg / left_warmup_avg) * 100)
right_warmup_avg = df_oxy.loc[warm_up_mask, "Right_SmO2_smooth"].mean()
right_recovery_avg = df_oxy.loc[recovery_mask, "Right_SmO2_smooth"].mean()
right_recovery_pct = round((right_recovery_avg / right_warmup_avg) * 100)
# Calculate key metrics
active_mask = (df_oxy["Timestamp (seconds passed)"] >= warm_up_end) & (
df_oxy["Timestamp (seconds passed)"] <= recovery_start
)
active_data = df_oxy[active_mask]
left_min = active_data["Left_SmO2_smooth"].min()
left_min_lap = int(
active_data.loc[active_data["Left_SmO2_smooth"].idxmin(), "Lap"]
)
right_min = active_data["Right_SmO2_smooth"].min()
right_min_lap = int(
active_data.loc[active_data["Right_SmO2_smooth"].idxmin(), "Lap"]
)
left_drop = left_warmup_avg - left_min
right_drop = right_warmup_avg - right_min
hr_warmup = df_oxy[df_oxy["Timestamp (seconds passed)"] <= warm_up_end][
"Heart_Rate_smooth"
].mean()
hr_max = active_data["Heart_Rate_smooth"].max()
# Create the plot
fig, ax1 = plt.subplots(figsize=(18, 8))
time = df_oxy["Timestamp (seconds passed)"]
ax1.plot(
time,
df_oxy["Left_SmO2_smooth"],
label=f"Left SmO₂ (Rec {left_recovery_pct}% of warm-up)",
color="#2E86AB",
linewidth=2,
)
ax1.plot(
time,
df_oxy["Right_SmO2_smooth"],
label=f"Right SmO₂ (Rec {right_recovery_pct}% of warm-up)",
color="#A23B72",
linewidth=2,
)
ax1.set_xlabel("Time (seconds)", fontsize=12, fontweight="bold")
ax1.set_ylabel("SmO₂ (%)", fontsize=12, fontweight="bold")
ax1.tick_params(axis="y", labelcolor="black")
ax1.grid(True, alpha=0.3, linestyle="--")
# Add secondary axis for heart rate
ax2 = ax1.twinx()
ax2.plot(
time,
df_oxy["Heart_Rate_smooth"],
label="Heart Rate",
color="red",
linewidth=1.5,
linestyle="--",
alpha=0.7,
)
ax2.set_ylabel("Heart Rate (BPM)", fontsize=12, fontweight="bold", color="red")
ax2.tick_params(axis="y", labelcolor="red")
# Add shaded regions
ax1.axvspan(0, warm_up_end, alpha=0.15, color="blue", label="Warm-up")
active_laps = [1, 2, 3, 4, 5, 6]
colors_active = ["yellow", "orange"] * 3
for i, lap in enumerate(active_laps):
start = lap_starts.get(lap, 0)
end = lap_starts.get(lap + 1, recovery_start) if lap < 6 else recovery_start
ax1.axvspan(start, end, alpha=0.1, color=colors_active[i])
ax1.axvspan(
recovery_start, recovery_end, alpha=0.2, color="gray", label="Recovery"
)
ax1.axvline(
x=recovery_start, color="black", linestyle="-", linewidth=2, alpha=0.7
)
# Add lap labels
for lap in range(1, 7):
start = lap_starts.get(lap, 0)
end = lap_starts.get(lap + 1, recovery_start) if lap < 6 else recovery_start
mid = (start + end) / 2
ax1.text(
mid,
ax1.get_ylim()[1] * 0.97,
f"Lap {lap}",
ha="center",
va="top",
fontsize=10,
fontweight="bold",
bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.7),
)
plt.title(
"Train.Red SmO₂ Ramp - Muscle Oxygenation Analysis",
fontsize=16,
fontweight="bold",
pad=20,
)
# Combine legends
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(
lines1 + lines2,
labels1 + labels2,
loc="upper left",
fontsize=10,
framealpha=0.9,
)
plt.tight_layout()
# Prepare metrics dictionary
metrics = {
"left_baseline_smo2": f"{left_warmup_avg:.1f}%",
"right_baseline_smo2": f"{right_warmup_avg:.1f}%",
"left_minimum_smo2": f"{left_min:.1f}%",
"right_minimum_smo2": f"{right_min:.1f}%",
"left_minimum_lap": f"Lap {left_min_lap}",
"right_minimum_lap": f"Lap {right_min_lap}",
"left_oxygen_drop": f"{left_drop:.1f}%",
"right_oxygen_drop": f"{right_drop:.1f}%",
"left_drop_percentage": f"{(left_drop / left_warmup_avg * 100):.0f}% decrease",
"right_drop_percentage": f"{(right_drop / right_warmup_avg * 100):.0f}% decrease",
"left_recovery_percentage": f"{left_recovery_pct}%",
"right_recovery_percentage": f"{right_recovery_pct}%",
"hr_warmup": f"{hr_warmup:.0f}",
"hr_max": f"{hr_max:.0f}",
"test_duration": f"~{(recovery_start - warm_up_end) / 60:.0f} minutes active test",
"recovery_assessment": "Excellent recovery capacity"
if (left_recovery_pct + right_recovery_pct) / 2 >= 100
else "Good recovery capacity",
}
# Save or return
if save_as_base64:
buf = io.BytesIO()
plt.savefig(buf, format="png", dpi=300, bbox_inches="tight")
plt.close(fig)
buf.seek(0)
chart_str = base64.b64encode(buf.read()).decode("utf-8")
return chart_str, metrics
else:
output_path = self.charts_dir / "muscle_oxygenation_chart.png"
plt.savefig(output_path, dpi=300, bbox_inches="tight")
plt.close(fig)
return str(output_path), metrics