Files
bio-performx/app/services/report_generator.py
T
2025-11-28 12:11:00 +01:00

621 lines
23 KiB
Python

"""
Report Generator Service
This service handles the generation of medical reports from uploaded files.
It processes data, generates graphs, and creates PDF reports.
"""
from pathlib import Path
from typing import Any, Dict, List, Optional
import pandas as pd
from jinja2 import Environment, FileSystemLoader
from playwright.async_api import async_playwright
from services.context_generator import ContextGenerator
from services.graph_generator import GraphGenerator
from services.spirometry_table_extractor import extract_spirometry_table_from_pdf
class ReportGeneratorService:
"""Service for generating medical performance reports"""
def __init__(
self,
template_dir: str = "app/report_gen",
graphs_dir: str = "graphs",
reports_dir: str = "reports",
data_dir: str = "data",
):
"""
Initialize the report generator service.
Args:
template_dir: Directory containing Jinja2 templates
graphs_dir: Directory to save generated graphs
reports_dir: Directory to save generated reports
data_dir: Directory to store extracted/processed data
"""
self.template_dir = template_dir
self.graphs_dir = Path(graphs_dir)
self.reports_dir = Path(reports_dir)
self.data_dir = Path(data_dir)
self.graph_generator = GraphGenerator(charts_dir=str(self.graphs_dir))
self.context_generator = ContextGenerator()
self.env = Environment(loader=FileSystemLoader(template_dir))
# Ensure directories exist
self.graphs_dir.mkdir(exist_ok=True)
self.reports_dir.mkdir(exist_ok=True)
self.data_dir.mkdir(exist_ok=True)
def process_pnoe_data(self, pnoe_csv_path: str) -> pd.DataFrame:
"""
Load and process Pnoe CSV data.
Args:
pnoe_csv_path: Path to Pnoe CSV file
Returns:
Processed DataFrame with smoothed columns
"""
# Load data
df = pd.read_csv(pnoe_csv_path, delimiter=";")
# Convert numeric columns (updated approach)
for col in df.columns:
try:
df[col] = pd.to_numeric(df[col])
except (ValueError, TypeError):
pass # Keep as-is if not numeric
# Calculate derived columns
df["VO2 Pulse"] = df["VO2(ml/min)"] / df["HR(bpm)"]
df["VO2 Breath"] = df["VO2(ml/min)"] / df["BF(bpm)"]
df["CHO"] = df["EE(kcal/min)"] * df["CARBS(%)"] / 100
df["FAT"] = df["EE(kcal/min)"] * df["FAT(%)"] / 100
# Smooth columns
window_size = 10
columns_to_smooth = [
"VO2(ml/min)",
"VCO2(ml/min)",
"HR(bpm)",
"VT(l)",
"BF(bpm)",
"VE(l/min)",
"VO2 Pulse",
"VO2 Breath",
"CHO",
"FAT",
]
for col in columns_to_smooth:
if col in df.columns:
df[f"{col}_smoothed"] = (
df[col].rolling(window=window_size, min_periods=1).mean()
)
return df
def generate_graphs(self, df: pd.DataFrame) -> List[Dict[str, str]]:
"""
Generate all required graphs from processed data.
Args:
df: Processed DataFrame with smoothed columns
Returns:
List of dictionaries containing graph names and paths
"""
graphs_generated = []
# List of graphs to generate
graph_methods = [
("respiratory", self.graph_generator.generate_respiratory_chart),
("fuel_utilization", self.graph_generator.generate_fuel_utilization_chart),
("vo2_pulse", self.graph_generator.generate_vo2_pulse_chart),
("vo2_breath", self.graph_generator.generate_vo2_breath_chart),
("fat_metabolism", self.graph_generator.generate_fat_metabolism_chart),
("recovery", self.graph_generator.generate_recovery_chart),
]
for name, method in graph_methods:
try:
path = method(df, save_as_base64=False)
graphs_generated.append({"name": name, "path": str(path)})
except Exception as e:
print(f"Warning: Could not generate {name} chart: {e}")
return graphs_generated
def calculate_analysis_metrics(self, df: pd.DataFrame) -> Dict[str, Any]:
"""
Calculate basic analysis metrics from processed data.
Args:
df: Processed DataFrame with smoothed columns
Returns:
Dictionary containing analysis metrics
"""
return {
"vo2_max": float(df["VO2(ml/min)_smoothed"].max())
if "VO2(ml/min)_smoothed" in df.columns
else 0,
"peak_vt": float(df["VT(l)_smoothed"].max())
if "VT(l)_smoothed" in df.columns
else 0,
"max_hr": float(df["HR(bpm)_smoothed"].max())
if "HR(bpm)_smoothed" in df.columns
else 0,
}
def generate_html(
self, patient_info: Dict[str, Any], contexts: Dict[str, Dict[str, Any]], report_type: str = "full"
) -> str:
"""
Generate HTML content for the report.
Args:
patient_info: Dictionary containing patient information
(patient_name, age, height, weight, focus)
contexts: Dictionary with keys 'page_1', 'page_2', etc., each containing context data
report_type: Type of report to generate ("full" or "minimal")
Returns:
Complete HTML document as string
"""
html_pages = []
# Header context
header_context = {
"patient_name": patient_info.get("patient_name", ""),
"age": patient_info.get("age", ""),
"height": patient_info.get("height", ""),
"weight": patient_info.get("weight", ""),
"focus": patient_info.get("focus", "Endurance"),
}
# Define page mappings for full vs minimal reports
if report_type == "minimal":
# Minimal report: pages 1, 2, 4, 5, 6, 16, 17, 19, 20
# Map to minimal report pages 1-8
# Page mapping: (original_page_num, template_name, minimal_page_num)
page_mapping = [
(1, "page_1.html", 1),
(2, "page_2_minimal.html", 2),
(4, "page_4.html", 3),
(5, "page_5_minimal.html", 4),
(6, "page_6.html", 5),
(16, "page_16.html", 6),
(17, "page_17_minimal.html", 7),
(19, "page_19_20_minimal.html", 8), # Combined page
]
else:
# Full report: all pages 1-20
page_mapping = [
(i, f"page_{i}.html", i) for i in range(1, 21)
]
num_pages = len(page_mapping)
# Footer context
footer_context = [
{
"contact_email": "info@ishplabs.com",
"website": "www.ishplabs.com",
"social": "@ishplabs",
"page_number": i + 1,
}
for i in range(num_pages)
]
# Render header
header_html = self.env.get_template("header.html").render(header_context)
# Render footers
footer_html_list = [
self.env.get_template("footer.html").render(context)
for context in footer_context
]
# Render pages based on mapping
for idx, (original_page_num, template_name, minimal_page_num) in enumerate(page_mapping):
# For combined page_19_20_minimal, use the combined context
if template_name == "page_19_20_minimal.html":
page_key = "page_19_20_minimal"
else:
page_key = f"page_{original_page_num}"
context = contexts.get(page_key, {})
template = self.env.get_template(template_name).render(context)
# Pages 1 and 2 don't have headers/footers in full report
# In minimal report, only page 1 doesn't have header/footer
page_num_in_report = minimal_page_num if report_type == "minimal" else original_page_num
if page_num_in_report > 2:
full_html = f"""
<div class="page flex flex-col justify-between">
<div>
{header_html}
</div>
<main class="flex-grow p-4">
{template}
</main>
<div class="border-t text-center text-sm text-gray-600">
{footer_html_list[idx]}
</div>
</div>
"""
html_pages.append(full_html)
else:
html_pages.append(template)
# Combine with page breaks
final_html = "<div class='page-break'></div>".join(html_pages)
# Wrap in full HTML document
html_doc = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<link href="https://cdn.jsdelivr.net/npm/tailwindcss/dist/tailwind.min.css" rel="stylesheet">
<style>
html, body {{
height: 100%;
margin: 0;
padding: 0;
}}
.page-break {{ page-break-after: always; }}
.page {{
height: 100vh;
min-height: 100vh;
display: flex;
flex-direction: column;
}}
.page main {{
flex: 1;
overflow: hidden;
}}
* {{
margin: 0;
padding: 0;
box-sizing: border-box;
}}
img {{
max-height: 300px;
}}
.chart-large {{
max-height: 500px !important;
}}
.table-image {{
max-height: none !important;
width: auto !important;
max-width: 100% !important;
height: auto !important;
object-fit: contain;
}}
</style>
</head>
<body class="m-0 p-0">
{final_html}
</body>
</html>
"""
return html_doc
async def html_to_pdf(self, html_content: str, pdf_path: str) -> None:
"""
Convert HTML content to PDF file.
Args:
html_content: HTML content as string
pdf_path: Path where PDF should be saved
"""
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
await page.set_content(html_content)
await page.pdf(path=pdf_path, format="A4", print_background=True)
await browser.close()
async def generate_report(
self,
spirometry_pdf_path: str,
pnoe_csv_path: str,
patient_info: Dict[str, Any],
output_filename: str = None,
metric_overrides: Optional[Dict[str, Any]] = None,
oxygenation_csv_path: Optional[str] = None,
report_type: str = "full",
) -> Dict[str, Any]:
"""
Generate complete medical report from uploaded files.
This follows the complete workflow:
1. Extract spirometry data from PDF
2. Store all data in data directory
3. Generate all graphs
4. Generate context for each page
5. Generate final HTML and PDF report
Args:
spirometry_pdf_path: Path to Spirometry PDF file
pnoe_csv_path: Path to Pnoe CSV file
seca_excel_path: Path to SECA Excel file
patient_info: Dictionary containing patient information
output_filename: Optional custom output filename
Returns:
Dictionary containing report path, graphs generated, and analysis data
"""
# Step 1: Extract spirometry table from PDF
print("Step 1: Extracting spirometry data from PDF...")
spirometry_csv_path = extract_spirometry_table_from_pdf(
spirometry_pdf_path, output_dir=str(self.data_dir)
)
print(f"Spirometry data saved to: {spirometry_csv_path}")
# Step 2: Process Pnoe data
print("Step 2: Processing Pnoe data...")
df = self.process_pnoe_data(pnoe_csv_path)
# Step 3: Generate all graphs
print("Step 3: Generating graphs...")
graphs_generated = self.generate_graphs(df)
# Create graph dictionary with base64 encoded images
import base64
graphs_dict = {}
for graph in graphs_generated:
# Read the graph file and convert to base64
graph_path = Path(graph["path"])
if graph_path.exists():
with open(graph_path, "rb") as f:
graphs_dict[graph["name"]] = base64.b64encode(f.read()).decode(
"utf-8"
)
# Also generate body composition charts
# Use patient info directly (no SECA file needed)
fat_pct = patient_info.get("fat_percentage", 0)
age = patient_info.get("age", 25)
gender = patient_info.get("gender", "female").lower()
# Convert weight to kg if needed
weight_str = str(patient_info.get("weight", "0"))
# Extract numeric value and unit
weight_str_clean = (
weight_str.replace("lbs", "").replace("kg", "").replace(" ", "").strip()
)
try:
weight_value = float(weight_str_clean)
except ValueError:
print(f"Warning: Could not parse weight '{weight_str}', using default 0")
weight_value = 0.0
# Convert to kg if weight is in lbs
if "lbs" in weight_str.lower():
weight_kg = weight_value / 2.20462 # Convert lbs to kg
else:
weight_kg = weight_value # Already in kg or assume kg if no unit specified
# Calculate fat and lean mass in pounds
fat_mass_lbs = weight_kg * fat_pct / 100 * 2.20462
lean_mass_lbs = weight_kg * (1 - fat_pct / 100) * 2.20462
# Generate body composition chart (save as file first, then convert to base64)
try:
body_comp_path = self.graph_generator.generate_body_composition_chart(
fat_mass_lbs, lean_mass_lbs, save_as_base64=False
)
graphs_generated.append(
{"name": "body_composition", "path": str(body_comp_path)}
)
# Convert to base64 for graphs_dict
with open(body_comp_path, "rb") as f:
graphs_dict["body_composition"] = base64.b64encode(f.read()).decode(
"utf-8"
)
except Exception as e:
print(f"Warning: Could not generate body composition chart: {e}")
graphs_dict["body_composition"] = ""
# Generate body fat percent chart (save as file first, then convert to base64)
try:
body_fat_path = self.graph_generator.generate_body_fat_percent_chart(
fat_pct, age, gender, save_as_base64=False
)
graphs_generated.append(
{"name": "body_fat_percent", "path": str(body_fat_path)}
)
# Convert to base64 for graphs_dict
with open(body_fat_path, "rb") as f:
graphs_dict["body_fat_percent"] = base64.b64encode(f.read()).decode(
"utf-8"
)
except Exception as e:
print(f"Warning: Could not generate body fat percent chart: {e}")
graphs_dict["body_fat_percent"] = ""
# Load static body fat percentage master chart for page 18
master_chart_path = Path("app/body_fat_percentage_master_chart.png")
if master_chart_path.exists():
try:
with open(master_chart_path, "rb") as f:
graphs_dict["body_fat_percentage_master_chart"] = base64.b64encode(
f.read()
).decode("utf-8")
except Exception as e:
print(f"Warning: Could not load body fat percentage master chart: {e}")
graphs_dict["body_fat_percentage_master_chart"] = ""
else:
print(
f"Warning: Body fat percentage master chart not found at {master_chart_path}"
)
graphs_dict["body_fat_percentage_master_chart"] = ""
# Load static fuelling analysis flowchart for page 10
flowchart_path = Path("app/estimated_carb_storage.png")
if flowchart_path.exists():
try:
with open(flowchart_path, "rb") as f:
graphs_dict["fuelling_analysis_flowchart"] = base64.b64encode(
f.read()
).decode("utf-8")
except Exception as e:
print(f"Warning: Could not load fuelling analysis flowchart: {e}")
graphs_dict["fuelling_analysis_flowchart"] = ""
else:
print(f"Warning: Fuelling analysis flowchart not found at {flowchart_path}")
graphs_dict["fuelling_analysis_flowchart"] = ""
# Generate spirometry chart
print("Step 4: Generating spirometry chart...")
try:
spirometry_df = pd.read_csv(spirometry_csv_path)
print(f"Spirometry data loaded: {len(spirometry_df)} rows")
print(f"Spirometry columns: {spirometry_df.columns.tolist()}")
if "Parameters" in spirometry_df.columns:
print(f"Available parameters: {spirometry_df['Parameters'].tolist()}")
spirometry_chart_b64 = self.graph_generator.generate_spirometry_chart(
spirometry_df, save_as_base64=True
)
graphs_dict["spirometry_chart"] = spirometry_chart_b64
print("Spirometry chart generated successfully")
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"Warning: Could not generate spirometry chart: {e}")
print(f"Error details: {error_details}")
graphs_dict["spirometry_chart"] = ""
# Generate TSI chart if oxygenation CSV is provided
if oxygenation_csv_path:
print("Step 4.5: Generating TSI chart...")
try:
oxygenation_df = pd.read_csv(oxygenation_csv_path)
tsi_chart_b64 = self.graph_generator.generate_tsi_chart(
oxygenation_df, save_as_base64=True
)
graphs_dict["tsi_chart"] = tsi_chart_b64
except Exception as e:
print(f"Warning: Could not generate TSI chart: {e}")
graphs_dict["tsi_chart"] = ""
# Generate metabolism and fuel source charts for page 5
print("Step 4.6: Generating metabolism and fuel source charts...")
try:
# Calculate RMR and fuel source from pnoe data
from services.context_generator import ContextGenerator
temp_context_gen = ContextGenerator()
temp_context_gen.load_data(pnoe_csv_path, str(spirometry_csv_path), None)
temp_context_gen.patient_info = {
"name": patient_info.get("first_name", ""),
"last_name": patient_info.get("last_name", ""),
"age": patient_info.get("age", 25),
"weight": weight_kg,
"fat_percentage": fat_pct,
"gender": gender,
}
rmr_metrics = temp_context_gen.calculate_rmr_and_fuel_source()
# Convert height to cm if available
height_cm = None
height_str = patient_info.get("height", "")
if height_str:
try:
# Try to parse height string (e.g., "5'4"", "165cm", "165")
import re
# Check if it's in feet'inches" format
feet_inches_match = re.match(r"(\d+)'(\d+)\"", height_str)
if feet_inches_match:
feet = int(feet_inches_match.group(1))
inches = int(feet_inches_match.group(2))
height_cm = (feet * 12 + inches) * 2.54
# Check if it ends with cm
elif "cm" in height_str.lower():
height_cm = float(re.sub(r"[^\d.]", "", height_str))
# Otherwise try to parse as number (assume cm)
else:
height_cm = float(re.sub(r"[^\d.]", "", height_str))
except (ValueError, AttributeError):
pass
# Generate metabolism chart with ratio calculation if we have all parameters
metabolism_chart_b64 = self.graph_generator.generate_metabolism_chart(
rmr_metrics["rmr_kcal"],
weight_kg=weight_kg,
height_cm=height_cm,
age_years=patient_info.get("age", None),
sex=gender,
save_as_base64=True,
)
graphs_dict["metabolism_chart"] = metabolism_chart_b64
# Generate fuel source chart
fuel_source_chart_b64 = self.graph_generator.generate_fuel_source_chart(
rmr_metrics["rest_fat_percentage"], save_as_base64=True
)
graphs_dict["fuel_source_chart"] = fuel_source_chart_b64
except Exception as e:
print(f"Warning: Could not generate metabolism/fuel source charts: {e}")
graphs_dict["metabolism_chart"] = ""
graphs_dict["fuel_source_chart"] = ""
# Step 5: Generate context for all pages
print("Step 5: Generating page contexts...")
patient_name = patient_info.get("patient_name", "")
self.context_generator.load_data(
pnoe_csv_path,
str(spirometry_csv_path),
None, # No SECA file
oxygenation_csv_path, # Pass oxygenation CSV path
)
# Set patient info manually
self.context_generator.patient_info = {
"name": patient_info.get("first_name", ""),
"last_name": patient_info.get("last_name", ""),
"age": patient_info.get("age", 25),
"weight": weight_kg,
"fat_percentage": fat_pct,
"gender": gender,
"next_testing_date": patient_info.get("next_testing_date", "Contact us for scheduling"),
}
contexts = self.context_generator.generate_all_contexts(
patient_name,
graphs_dict,
metric_overrides=metric_overrides,
graph_generator=self.graph_generator,
report_type=report_type,
)
# Step 5: Calculate analysis metrics
analysis_data = self.calculate_analysis_metrics(df)
analysis_data["graphs_count"] = len(graphs_generated)
# Step 6: Generate HTML
html_content = self.generate_html(patient_info, contexts, report_type=report_type)
# Step 7: Generate PDF
if output_filename is None:
patient_name_full = patient_info.get("patient_name", "Unknown")
session_id = patient_info.get("session_id", "default")
output_filename = (
f"report_{patient_name_full.replace(' ', '_')}_{session_id}.pdf"
)
report_path = self.reports_dir / output_filename
print(f"Generating PDF report at {report_path}")
await self.html_to_pdf(html_content, str(report_path))
return {
"report_path": str(report_path),
"graphs_generated": graphs_generated,
"analysis_data": analysis_data,
}