Add compiled Python bytecode for report generator and spirometry table extractor services

- Generated bytecode for report_generator.py and spirometry_table_extractor.py
- These changes include the compiled .pyc files in the __pycache__ directory
- The report generator service handles the generation of medical reports from uploaded files
- The spirometry table extractor service extracts data from PDF files and processes it for further analysis
This commit is contained in:
bolade
2025-10-04 10:07:40 +01:00
parent 14dc64234d
commit d66f3fd18b
15 changed files with 482 additions and 3751 deletions
+96 -235
View File
@@ -1,319 +1,180 @@
import base64
"""
Context Generator Service
This service processes all data files and generates context dictionaries for each page
of the medical report. It performs analysis on Pnoe, Spirometry, and SECA data.
"""
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import matplotlib.pyplot as plt
import pandas as pd
class ReportGenerator:
class ContextGenerator:
"""Generate context data for report pages"""
def __init__(self):
self.pnoe_df = None
self.patient_df = None
self.spirometry_df = None
self.seca_df = None
self.patient_info = {}
self.charts_dir = Path("graphs")
self.charts_dir.mkdir(exist_ok=True)
def load_data(
self,
pnoe_path: str,
patient_path: str,
spirometry_path: str,
seca_path: str = None,
seca_path: str,
):
"""Load all required datasets"""
self.pnoe_df = pd.read_csv(pnoe_path, delimiter=";")
self.patient_df = pd.read_csv(patient_path)
self.spirometry_df = pd.read_csv(spirometry_path)
if seca_path:
self.seca_df = pd.read_excel(seca_path)
self.seca_df = pd.read_excel(seca_path)
self._preprocess_pnoe_data()
# Apply preprocessing
self._preprocess_data()
def _preprocess_data(self):
"""Apply preprocessing steps from your notebook"""
# Convert to numeric
def _preprocess_pnoe_data(self):
"""Apply preprocessing steps to Pnoe data"""
self.pnoe_df = self.pnoe_df.apply(pd.to_numeric, errors="ignore")
# Calculate derived columns
self.pnoe_df["VO2 Pulse"] = (
self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["HR(bpm)"]
)
self.pnoe_df["VO2 Breath"] = (
self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["BF(bpm)"]
)
self.pnoe_df["CHO"] = (
self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["CARBS(%)"] / 100
)
self.pnoe_df["FAT"] = (
self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["FAT(%)"] / 100
)
# Apply smoothing
self.pnoe_df["VO2 Pulse"] = self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["HR(bpm)"]
self.pnoe_df["VO2 Breath"] = self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["BF(bpm)"]
self.pnoe_df["CHO"] = self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["CARBS(%)"] / 100
self.pnoe_df["FAT"] = self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["FAT(%)"] / 100
window_size = 10
columns_to_smooth = [
"VO2(ml/min)",
"VCO2(ml/min)",
"HR(bpm)",
"VT(l)",
"BF(bpm)",
"VE(l/min)",
"VO2 Pulse",
"VO2 Breath",
"CHO",
"FAT",
]
columns_to_smooth = ["VO2(ml/min)", "VCO2(ml/min)", "HR(bpm)", "VT(l)", "BF(bpm)", "VE(l/min)", "VO2 Pulse", "VO2 Breath", "CHO", "FAT"]
for col in columns_to_smooth:
if col in self.pnoe_df.columns:
self.pnoe_df[f"{col}_smoothed"] = (
self.pnoe_df[col].rolling(window=window_size, min_periods=1).mean()
)
self.pnoe_df[f"{col}_smoothed"] = self.pnoe_df[col].rolling(window=window_size, min_periods=1).mean()
def extract_patient_info(self, last_name: str) -> Dict:
"""Extract patient information from datasets"""
def extract_patient_info(self, patient_name: str) -> Dict:
"""Extract patient information from SECA dataset"""
if self.seca_df is not None:
patient_data = self.seca_df[
self.seca_df["LastName"].str.contains(last_name, case=False, na=False)
]
patient_data = self.seca_df[self.seca_df["LastName"].str.contains(patient_name, case=False, na=False)]
if not patient_data.empty:
row = patient_data.iloc[0]
weight_kg = float(row.get("Weight", 0))
fat_pct = float(row.get("Adult_FMP", 0))
self.patient_info = {
"name": f"{row.get('FirstName', '')} {last_name}",
"name": f"{row.get('FirstName', '')} {row.get('LastName', '')}",
"first_name": row.get("FirstName", ""),
"last_name": row.get("LastName", ""),
"age": int(row.get("Age", 0)),
"height": f"{row.get('Height', '')}",
"weight": float(row.get("Weight", 0)),
"weight": weight_kg,
"gender": row.get("Gender", "").lower(),
"fat_percentage": float(row.get("Adult_FMP", 0)),
"fat_percentage": fat_pct,
"fat_mass_lbs": weight_kg * fat_pct / 100 * 2.20462,
"lean_mass_lbs": weight_kg * (1 - fat_pct / 100) * 2.20462,
}
return self.patient_info
def calculate_spirometry_metrics(self) -> Dict:
"""Calculate spirometry-related metrics"""
metrics = {}
# Extract key spirometry values
for param in ["FVC", "FEV1", "FEV1/FVC%"]:
row = self.spirometry_df.loc[self.spirometry_df["Parameters"] == param]
row = self.spirometry_df.loc[self.spirometry_df["Parameters"].str.strip() == param]
if not row.empty:
metrics[
f"{param.lower().replace('/', '_').replace('%', '_pct')}_best"
] = row["Best"].values[0]
metrics[
f"{param.lower().replace('/', '_').replace('%', '_pct')}_pred"
] = row["%Pred."].values[0]
param_key = param.lower().replace('/', '_').replace('%', '_pct')
metrics[f"{param_key}_best"] = row["Best"].values[0]
metrics[f"{param_key}_pred"] = row["%Pred."].values[0]
return metrics
def calculate_pnoe_metrics(self) -> Dict:
"""Calculate all Pnoe-derived metrics"""
metrics = {}
# Basic metrics
metrics["vo2_max"] = self.pnoe_df["VO2(ml/min)_smoothed"].max()
metrics["vo2_max_per_kg"] = metrics["vo2_max"] / self.patient_info["weight"]
# Peak VT
peak_vt_idx = self.pnoe_df["VT(l)_smoothed"].idxmax()
peak_vt_row = self.pnoe_df.loc[peak_vt_idx]
metrics["peak_vt"] = peak_vt_row["VT(l)_smoothed"]
metrics["peak_vt_hr"] = peak_vt_row["HR(bpm)_smoothed"]
# Fat burning metrics
fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax()
fat_max_row = self.pnoe_df.loc[fat_max_idx]
metrics["fat_max_value"] = fat_max_row["FAT_smoothed"]
metrics["fat_max_hr"] = fat_max_row["HR(bpm)_smoothed"]
# Calculate zones (simplified from your logic)
metrics.update(self._calculate_hr_zones())
# VT1/VT2 detection
vt1, vt2 = self._detect_thresholds()
metrics["vt1"] = vt1
metrics["vt2"] = vt2
zones = self._calculate_hr_zones(vt1, vt2, fat_max_row)
metrics.update(zones)
return metrics
def _detect_thresholds(self) -> Tuple[Optional[Dict], Optional[Dict]]:
"""Detect VT1 and VT2 thresholds"""
# VT1: First crossover where carbs > fat
condition = self.pnoe_df["CHO_smoothed"] > self.pnoe_df["FAT_smoothed"]
crossover_indices = condition[condition].index
vt1 = None
if len(crossover_indices) > 0:
vt1_idx = crossover_indices[0]
vt1_row = self.pnoe_df.loc[vt1_idx]
vt1 = {
"HeartRate": vt1_row["HR(bpm)_smoothed"],
"Speed": vt1_row["Speed"],
"Time": vt1_row["T(sec)"],
}
# VT2: Ventilation inflection (simplified)
vt1 = {"HeartRate": vt1_row["HR(bpm)_smoothed"], "Speed": vt1_row["Speed"], "Time": vt1_row["T(sec)"]}
ve_slope = self.pnoe_df["VE(l/min)_smoothed"].diff()
second_derivative = ve_slope.diff()
vt2_idx = second_derivative.idxmax()
vt2 = None
if pd.notna(vt2_idx):
vt2_row = self.pnoe_df.loc[vt2_idx]
vt2 = {
"HeartRate": vt2_row["HR(bpm)_smoothed"],
"Speed": vt2_row["Speed"],
"Time": vt2_row["T(sec)"],
}
vt2 = {"HeartRate": vt2_row["HR(bpm)_smoothed"], "Speed": vt2_row["Speed"], "Time": vt2_row["T(sec)"]}
return vt1, vt2
def _calculate_hr_zones(self) -> Dict:
"""Calculate heart rate zones"""
max_hr = 220 - self.patient_info["age"]
# Simplified zone calculation - you can make this more sophisticated
zones = {
"zone1_bpm": f"{int(max_hr * 0.55)}-{int(max_hr * 0.65)}bpm",
"zone2_bpm": f"{int(max_hr * 0.65)}-{int(max_hr * 0.75)}bpm",
"zone3_bpm": f"{int(max_hr * 0.75)}-{int(max_hr * 0.85)}bpm",
"zone4_bpm": f"{int(max_hr * 0.85)}-{int(max_hr * 0.95)}bpm",
"zone5_bpm": f"{int(max_hr * 0.95)}+bpm",
}
def _calculate_hr_zones(self, vt1: Optional[Dict], vt2: Optional[Dict], fat_max_row: pd.Series) -> Dict:
"""Calculate heart rate zones based on thresholds"""
zones = {}
if vt1 and vt2:
zone_1_start = fat_max_row["HR(bpm)_smoothed"] - 15
zone_2_start = fat_max_row["HR(bpm)_smoothed"]
zone_3_start = vt1["HeartRate"]
zone_4_start = vt2["HeartRate"] - 10
zone_5_start = vt2["HeartRate"] + 10
zones["zone1_bpm"] = f"{int(zone_1_start)}-{int(zone_2_start)}bpm"
zones["zone2_bpm"] = f"{int(zone_2_start)}-{int(vt1['HeartRate'])}bpm"
zones["zone3_bpm"] = f"{int(zone_3_start)}-{int(zone_4_start)}bpm"
zones["zone4_bpm"] = f"{int(zone_4_start)}-{int(zone_5_start)}bpm"
zones["zone5_bpm"] = f"{int(zone_5_start)}+bpm"
else:
max_hr = 220 - self.patient_info["age"]
zones["zone1_bpm"] = f"{int(max_hr * 0.55)}-{int(max_hr * 0.65)}bpm"
zones["zone2_bpm"] = f"{int(max_hr * 0.65)}-{int(max_hr * 0.75)}bpm"
zones["zone3_bpm"] = f"{int(max_hr * 0.75)}-{int(max_hr * 0.85)}bpm"
zones["zone4_bpm"] = f"{int(max_hr * 0.85)}-{int(max_hr * 0.95)}bpm"
zones["zone5_bpm"] = f"{int(max_hr * 0.95)}+bpm"
return zones
def generate_charts(self) -> Dict[str, str]:
"""Generate all charts and return base64 encoded versions"""
charts = {}
# Generate fuel utilization chart
charts["fuel_utilization_chart"] = self._create_fuel_chart()
# Generate VO2 pulse chart
charts["vo2_pulse_chart"] = self._create_vo2_pulse_chart()
# Generate body composition chart
charts["body_composition_chart"] = self._create_body_comp_chart()
# Add more chart generation methods...
return charts
def _create_fuel_chart(self) -> str:
"""Create and save fuel utilization chart"""
# Use your existing chart code but make it dynamic
speed_groups = self.pnoe_df.groupby("Speed").mean(numeric_only=True).round(1)
speed_groups = speed_groups.iloc[1:-1]
filtered_data = speed_groups[
(speed_groups.index >= 3.5) & (speed_groups.index <= 7.5)
]
plt.figure(figsize=(15, 8))
# ... your chart code here ...
chart_path = self.charts_dir / "fuel_utilization_chart.png"
plt.savefig(chart_path, dpi=300)
plt.close()
return self._image_to_base64(chart_path)
def _create_vo2_pulse_chart(self) -> str:
"""Create VO2 pulse chart"""
# Your VO2 pulse chart code here
chart_path = self.charts_dir / "vo2_pulse_chart.png"
# ... chart generation code ...
return self._image_to_base64(chart_path)
def _create_body_comp_chart(self) -> str:
"""Create body composition chart"""
# Your body composition chart code here
chart_path = self.charts_dir / "body_composition_chart.png"
# ... chart generation code ...
return self._image_to_base64(chart_path)
def _image_to_base64(self, image_path: Path) -> str:
"""Convert image to base64"""
try:
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
except FileNotFoundError:
return ""
def generate_all_contexts(self, last_name: str = "Moran") -> List[Dict]:
def generate_all_contexts(self, patient_name: str, graphs: Dict[str, str]) -> List[Dict]:
"""Main method to generate all page contexts"""
# Extract patient info
self.extract_patient_info(last_name)
# Calculate metrics
self.extract_patient_info(patient_name)
spirometry_metrics = self.calculate_spirometry_metrics()
pnoe_metrics = self.calculate_pnoe_metrics()
# Generate charts
charts = self.generate_charts()
# Build contexts for each page
contexts = []
# Page 1
contexts.append(
{
"name": self.patient_info["name"],
"surname": last_name,
"date": "July 29, 2025",
}
)
# Page 2-6 (add as needed)
for i in range(5):
contexts.append({})
# Page 7 - Spirometry
contexts.append(
{
"peak_vt": pnoe_metrics["peak_vt"],
"peak_vt_bpm": pnoe_metrics["peak_vt_hr"],
"fev1_percentage": (
pnoe_metrics["peak_vt"] / spirometry_metrics["fvc_best"]
)
* 100,
"lung_analysis_chart": charts.get("spirometry_chart", ""),
"respiratory_analysis_chart": charts.get("respiratory_chart", ""),
}
)
# Page 8 - VO2 Max and Zones
contexts.append(
{
"vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}",
"age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}",
**pnoe_metrics, # Include all zone calculations
}
)
# Continue for all pages...
# Add remaining pages as needed
contexts.append({"name": self.patient_info["name"], "surname": self.patient_info["last_name"], "date": datetime.now().strftime("%B %d, %Y")})
contexts.append({"patient_name": self.patient_info["name"], "test_date": datetime.now().strftime("%B %d, %Y")})
for i in range(4):
contexts.append({"patient_name": self.patient_info["name"], "page_number": i + 3})
fev1_percentage = 0
if spirometry_metrics.get("fvc_best"):
fev1_percentage = (pnoe_metrics["peak_vt"] / spirometry_metrics["fvc_best"]) * 100
contexts.append({"peak_vt": f"{pnoe_metrics['peak_vt']:.2f}", "peak_vt_bpm": f"{int(pnoe_metrics['peak_vt_hr'])}", "fev1_percentage": f"{fev1_percentage:.1f}", "lung_analysis_chart": graphs.get("spirometry_chart", ""), "respiratory_analysis_chart": graphs.get("respiratory", "")})
contexts.append({"vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}", "age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}", "zone1_bpm": pnoe_metrics.get("zone1_bpm", ""), "zone2_bpm": pnoe_metrics.get("zone2_bpm", ""), "zone3_bpm": pnoe_metrics.get("zone3_bpm", ""), "zone4_bpm": pnoe_metrics.get("zone4_bpm", ""), "zone5_bpm": pnoe_metrics.get("zone5_bpm", ""), "vo2_pulse_chart": graphs.get("vo2_pulse", "")})
contexts.append({"fat_max_value": f"{pnoe_metrics['fat_max_value']:.2f}", "fat_max_hr": f"{int(pnoe_metrics['fat_max_hr'])}", "fuel_utilization_chart": graphs.get("fuel_utilization", ""), "fat_metabolism_chart": graphs.get("fat_metabolism", "")})
contexts.append({"fat_percentage": f"{self.patient_info['fat_percentage']:.1f}", "fat_mass_lbs": f"{self.patient_info['fat_mass_lbs']:.1f}", "lean_mass_lbs": f"{self.patient_info['lean_mass_lbs']:.1f}", "body_composition_chart": graphs.get("body_composition", ""), "body_fat_percent_chart": graphs.get("body_fat_percent", "")})
for i in range(9):
contexts.append({"patient_name": self.patient_info["name"], "page_number": i + 11, "vo2_breath_chart": graphs.get("vo2_breath", ""), "recovery_chart": graphs.get("recovery", "")})
return contexts
# Usage for backend service
def generate_report(
pnoe_file, patient_file, spirometry_file, seca_file=None, patient_name="Moran"
):
"""Main function for backend service"""
generator = ReportGenerator()
generator.load_data(pnoe_file, patient_file, spirometry_file, seca_file)
return generator.generate_all_contexts(patient_name)
# Example usage
if __name__ == "__main__":
contexts = generate_report(
"data/Pnoe_20250729_1550-Moran_Keirstyn.csv",
"data/patient_data.csv",
"data/spirometry_data.csv",
"data/SECA body comp for all patients.xlsx",
)
print(f"Generated {len(contexts)} page contexts")