Compare commits

...

7 Commits

Author SHA1 Message Date
bolade 0a735d88c8 feat: Refactor report generation to use async methods and improve error handling; enhance spirometry table extraction with better CSV formatting 2025-10-04 10:35:02 +01:00
bolade 358898b7db feat: Enhance context generation and report generation services with improved data handling and structure 2025-10-04 10:25:10 +01:00
bolade d66f3fd18b Add compiled Python bytecode for report generator and spirometry table extractor services
- Generated bytecode for report_generator.py and spirometry_table_extractor.py
- These changes include the compiled .pyc files in the __pycache__ directory
- The report generator service handles the generation of medical reports from uploaded files
- The spirometry table extractor service extracts data from PDF files and processes it for further analysis
2025-10-04 10:07:40 +01:00
bolade 14dc64234d feat: Update patient name extraction and enhance page context generation in PageGenerator 2025-10-03 22:58:20 +01:00
bolade 7a67aac678 feat: Add PageGenerator class for generating report pages with patient data 2025-10-03 22:16:45 +01:00
bolade 11ee6b192f feat: Implement report generator service for medical reports
- Added ReportGeneratorService to handle generation of medical reports from uploaded files.
- Implemented methods for processing Pnoe CSV data, generating graphs, and calculating analysis metrics.
- Integrated Jinja2 for HTML report generation with customizable templates.
- Added functionality to convert HTML content to PDF using Playwright.
- Ensured proper directory structure for saving generated graphs and reports.
2025-10-03 21:41:00 +01:00
bolade 1d8136d6ad Refactor code structure for improved readability and maintainability 2025-10-03 19:19:39 +01:00
52 changed files with 1376 additions and 3452 deletions
+9 -1
View File
@@ -1,3 +1,11 @@
.venv .venv
data/ data/
.env
/graphs
/data
/reports
Binary file not shown.
-807
View File
@@ -1,807 +0,0 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 2,
"id": "b18c1027",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'id': 'gen-1759135172-DIhs7TMuaaVY0h3T2ibV', 'provider': 'Google', 'model': 'google/gemini-2.5-flash-lite', 'object': 'chat.completion', 'created': 1759135172, 'choices': [{'logprobs': None, 'finish_reason': 'stop', 'native_finish_reason': 'STOP', 'index': 0, 'message': {'role': 'assistant', 'content': 'Parameters,Best,LLN,Pred.,%Pred.,ZScore,PRE#1,PRE#2,PRE#3\\nFVC,L,4.24,3.03,3.79,112.0,0.95,4.24,4.17,4.15\\nFEV1,L,3.26,2.53,3.16,103.3,0.28,3.26,3.21,3.14\\nFEV1/FVC%,76.89,72.47,83.78,91.8,-1.05,76.9,77.0,75.7\\nPEF,L/m,684,222,384,178.7,-,444,438,684\\nFEF2575,L/s,2.74,2.15,3.42,80.2,-0.84,2.74,2.68,2.48\\nFEF25,L/s,6.08,-,-,-,6.08,6.0,5.53\\nFEF50,L/s,3.06,-,-,-,3.06,3.1,2.77\\nFEF75,L/s,1.06,0.71,1.41,75.1,-0.72,1.06,1.12,0.94\\nPEFTime,ms,-,-,79,-,79,49,39\\nEvol,mL,-,-,78.0,-,78.0,77.0,197.0\\nFEV6,L,4.22,3.03,3.79,111.4,-,4.22,4.17,4.13', 'refusal': None, 'reasoning': None}}], 'usage': {'prompt_tokens': 1350, 'completion_tokens': 454, 'total_tokens': 1804, 'prompt_tokens_details': {'cached_tokens': 0}, 'completion_tokens_details': {'reasoning_tokens': 0, 'image_tokens': 0}}}\n",
"Content saved to extracted_table.csv\n"
]
}
],
"source": [
"\n",
"import requests\n",
"import json\n",
"import base64\n",
"from pathlib import Path\n",
"\n",
"API_KEY_REF = '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'\n",
"def encode_pdf_to_base64(pdf_path):\n",
" with open(pdf_path, \"rb\") as pdf_file:\n",
" return base64.b64encode(pdf_file.read()).decode('utf-8')\n",
"\n",
"url = \"https://openrouter.ai/api/v1/chat/completions\"\n",
"headers = {\n",
" \"Authorization\": f\"Bearer {API_KEY_REF}\",\n",
" \"Content-Type\": \"application/json\"\n",
"}\n",
"\n",
"# Read and encode the PDF\n",
"pdf_path = \"data/~Moran~K~19910201~Spirometry Exam~20250729~20250729032843.pdf\"\n",
"base64_pdf = encode_pdf_to_base64(pdf_path)\n",
"data_url = f\"data:application/pdf;base64,{base64_pdf}\"\n",
"\n",
"messages = [\n",
" {\n",
" \"role\": \"user\",\n",
" \"content\": [\n",
" {\n",
" \"type\": \"text\",\n",
" \"text\": \"Please extract the Spirometry table from the pdf and return the values in csv format, \"\n",
" \"note that it is the unit of parameter that is beside it and it should not be a column. \"\n",
" \"The '-' Should be treated as empty values.\"\n",
" \"do not add 'csv' at the start or end of the response\"\n",
" },\n",
" {\n",
" \"type\": \"file\",\n",
" \"file\": {\n",
" \"filename\": \"document.pdf\",\n",
" \"file_data\": data_url\n",
" }\n",
" },\n",
" ]\n",
" }\n",
"]\n",
"\n",
"# Optional: Configure PDF processing engine\n",
"# PDF parsing will still work even if the plugin is not explicitly set\n",
"plugins = [\n",
" {\n",
" \"id\": \"file-parser\",\n",
" \"pdf\": {\n",
" \"engine\": \"pdf-text\" # defaults to \"mistral-ocr\". See Pricing above\n",
" }\n",
" }\n",
"]\n",
"\n",
"payload = {\n",
" \"model\": \"google/gemini-2.5-flash-lite\",\n",
" \"messages\": messages,\n",
"}\n",
"\n",
"response = requests.post(url, headers=headers, json=payload)\n",
"# Get the response content\n",
"response_data = response.json()\n",
"print(response_data)\n",
"\n",
"# Extract the content from the response\n",
"if 'choices' in response_data and len(response_data['choices']) > 0:\n",
" content = response_data['choices'][0]['message']['content']\n",
" \n",
" # Save to a CSV file\n",
" output_file = \"extracted_table.csv\"\n",
" with open(output_file, 'w', encoding='utf-8') as f:\n",
" f.write(content)\n",
" \n",
" print(f\"Content saved to {output_file}\")\n",
"else:\n",
" print(\"No content found in response\")"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "56a9d655",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"FVC Best: 4.24, FVC Pred: 112.0\n",
"FEV1 Best: 3.26, FEV1 Pred: 103.3\n",
"FEV1/FVC% Best: 76.89, FEV1/FVC% Pred: 91.8\n"
]
}
],
"source": [
"import pandas as pd\n",
"spirometry_df = pd.read_csv(\"extracted_table.csv\")\n",
"\n",
"fvc_best = spirometry_df.loc[spirometry_df['Parameters'] == 'FVC', 'Best'].values[0]\n",
"fvc_pred = spirometry_df.loc[spirometry_df['Parameters'] == 'FVC', '%Pred.'].values[0]\n",
"\n",
"fev1_best = spirometry_df.loc[spirometry_df['Parameters'] == 'FEV1', 'Best'].values[0]\n",
"fev1_pred = spirometry_df.loc[spirometry_df['Parameters'] == 'FEV1', '%Pred.'].values[0]\n",
"\n",
"fev1_fevc_best = spirometry_df.loc[spirometry_df['Parameters'] == 'FEV1/FVC%', 'Best'].values[0]\n",
"fev1_fevc_pred = spirometry_df.loc[spirometry_df['Parameters'] == 'FEV1/FVC%', '%Pred.'].values[0]\n",
"\n",
"print(f\"FVC Best: {fvc_best}, FVC Pred: {fvc_pred}\")\n",
"print(f\"FEV1 Best: {fev1_best}, FEV1 Pred: {fev1_pred}\")\n",
"print(f\"FEV1/FVC% Best: {fev1_fevc_best}, FEV1/FVC% Pred: {fev1_fevc_pred}\")"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "990f4b4f",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Peak VT: 2.75\n",
"HR at Peak VT: 155.0\n"
]
}
],
"source": [
"df = pd.read_csv('data/Pnoe_20250729_1550-Moran_Keirstyn.csv', delimiter=';')\n",
"peak_vt = df['VT(l)'].max()\n",
"max_vt_row = df.loc[df['VT(l)'].idxmax()]\n",
"print(f\"Peak VT: {peak_vt}\")\n",
"hr = max_vt_row['HR(bpm)']\n",
"print(f\"HR at Peak VT: {hr}\")"
]
},
{
"cell_type": "code",
"execution_count": 18,
"id": "041cbc3d",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Peak VT: 2.3770000000000002\n",
"HR at Peak VT: 171.525\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"/tmp/ipykernel_301535/4157056299.py:3: FutureWarning: errors='ignore' is deprecated and will raise in a future version. Use to_numeric without passing `errors` and catch exceptions explicitly instead\n",
" df = df.apply(pd.to_numeric, errors='ignore')\n"
]
}
],
"source": [
"df = pd.read_csv('data/Pnoe_20250729_1550-Moran_Keirstyn.csv', delimiter=';')\n",
"# Convert all columns to numeric where possible, coercing errors to NaN\n",
"df = df.apply(pd.to_numeric, errors='ignore')\n",
"df['VO2 Pulse'] = df['VO2(ml/min)'] / df['HR(bpm)'] # VO2 Pulse in mL/beat\n",
"df['VO2 Breath'] = df['VO2(ml/min)'] / df['BF(bpm)'] # VO2 per Breath in mL/breath\n",
"df['CHO'] = df['EE(kcal/min)'] * df['CARBS(%)']/100\n",
"df['FAT'] = df['EE(kcal/min)'] * df['FAT(%)']/100\n",
"# Smooth key columns using rolling window\n",
"window_size = 10\n",
"\n",
"# List of columns to smooth\n",
"columns_to_smooth = ['VO2(ml/min)', 'VCO2(ml/min)', 'HR(bpm)', 'VT(l)', 'BF(bpm)', 'VE(l/min)', 'VO2 Pulse', 'VO2 Breath', 'CHO', 'FAT']\n",
"\n",
"# Apply smoothing to each column\n",
"for col in columns_to_smooth:\n",
" if col in df.columns:\n",
" df[f'{col}_smoothed'] = df[col].rolling(window=window_size).mean()\n",
" \n",
"peak_vt = df['VT(l)_smoothed'].max()\n",
"max_vt_row = df.loc[df['VT(l)_smoothed'].idxmax()]\n",
"print(f\"Peak VT: {peak_vt}\")\n",
"hr = max_vt_row['HR(bpm)_smoothed']\n",
"print(f\"HR at Peak VT: {hr}\")"
]
},
{
"cell_type": "code",
"execution_count": 20,
"id": "de7cadd1",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Percent FEV: 72.91411042944786\n"
]
}
],
"source": [
"percent_fev = (peak_vt / fev1_best) * 100\n",
"print(f\"Percent FEV: {percent_fev}\")"
]
},
{
"cell_type": "code",
"execution_count": 24,
"id": "cb972ed3",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>MeasurementDate</th>\n",
" <th>Comment</th>\n",
" <th>ExternalDeviceId</th>\n",
" <th>ExternalPatientId</th>\n",
" <th>FirstName</th>\n",
" <th>LastName</th>\n",
" <th>BirthDate</th>\n",
" <th>Age</th>\n",
" <th>Ethnicity</th>\n",
" <th>Gender</th>\n",
" <th>...</th>\n",
" <th>Child_XC</th>\n",
" <th>Child_XC_Unit</th>\n",
" <th>Child_BIVA_ZRh</th>\n",
" <th>Child_BIVA_ZXcH</th>\n",
" <th>Child_PhA</th>\n",
" <th>Child_PhA_Unit</th>\n",
" <th>Child_REE_Kcal</th>\n",
" <th>Child_REE_MJ</th>\n",
" <th>Child_TEE_Kcal</th>\n",
" <th>Child_TEE_MJ</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>13</th>\n",
" <td>2025-07-29T18:58:54.0000000Z</td>\n",
" <td>NaN</td>\n",
" <td>10000001583275_0055003f5631501320313557</td>\n",
" <td>KM6479696509</td>\n",
" <td>Keirstyn</td>\n",
" <td>Moran</td>\n",
" <td>1991-02-01T00:00:00.0000000Z</td>\n",
" <td>34</td>\n",
" <td>Caucasian</td>\n",
" <td>Female</td>\n",
" <td>...</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" <td>NaN</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"<p>1 rows × 147 columns</p>\n",
"</div>"
],
"text/plain": [
" MeasurementDate Comment \\\n",
"13 2025-07-29T18:58:54.0000000Z NaN \n",
"\n",
" ExternalDeviceId ExternalPatientId FirstName \\\n",
"13 10000001583275_0055003f5631501320313557 KM6479696509 Keirstyn \n",
"\n",
" LastName BirthDate Age Ethnicity Gender ... \\\n",
"13 Moran 1991-02-01T00:00:00.0000000Z 34 Caucasian Female ... \n",
"\n",
" Child_XC Child_XC_Unit Child_BIVA_ZRh Child_BIVA_ZXcH Child_PhA \\\n",
"13 NaN NaN NaN NaN NaN \n",
"\n",
" Child_PhA_Unit Child_REE_Kcal Child_REE_MJ Child_TEE_Kcal Child_TEE_MJ \n",
"13 NaN NaN NaN NaN NaN \n",
"\n",
"[1 rows x 147 columns]"
]
},
"execution_count": 24,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"personal_df = pd.read_excel('data/SECA body comp for all patients.xlsx')\n",
"\n",
"keirstyn_data = personal_df[personal_df['LastName'].str.contains('Moran', case=False, na=False)]\n",
"keirstyn_data"
]
},
{
"cell_type": "code",
"execution_count": 26,
"id": "98d9295a",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"VO2 Max: 47.906290322580645\n"
]
}
],
"source": [
"v02_max = df['VO2(ml/min)_smoothed'].max()\n",
"weight = keirstyn_data['Weight'].iloc[0]\n",
"print(f\"VO2 Max: {v02_max/weight}\")"
]
},
{
"cell_type": "code",
"execution_count": 32,
"id": "cdfeb309",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"==================================================\n",
"Optimal Fat Burning Zone (highest fat:carb ratio):\n",
"Time: 164.0 seconds\n",
"Fat burn rate: 3.894 kcal/min\n",
"Carb burn rate: 1.575 kcal/min\n",
"Fat:Carb ratio: 2.47\n",
"Heart Rate: 96.7 bpm\n",
"VO2: 1147.9 ml/min\n"
]
}
],
"source": [
"# Find the point where fat burning is highest and carb burning is lowest\n",
"# Using the smoothed data for more stable results\n",
"fat_burn_max_idx = df['FAT_smoothed'].idxmax()\n",
"carb_burn_min_idx = df['CHO_smoothed'].idxmin()\n",
"\n",
"# # Get the data at maximum fat burning point\n",
"# max_fat_row = df.loc[fat_burn_max_idx]\n",
"# print(f\"Maximum Fat Burning Point:\")\n",
"# print(f\"Time: {max_fat_row['T(sec)']} seconds\")\n",
"# print(f\"Fat burn rate: {max_fat_row['FAT_smoothed']:.3f} kcal/min\")\n",
"# print(f\"Carb burn rate: {max_fat_row['CHO_smoothed']:.3f} kcal/min\")\n",
"# print(f\"Heart Rate: {max_fat_row['HR(bpm)_smoothed']:.1f} bpm\")\n",
"# print(f\"VO2: {max_fat_row['VO2(ml/min)_smoothed']:.1f} ml/min\")\n",
"\n",
"# print(\"\\n\" + \"=\"*50)\n",
"\n",
"# # Get the data at minimum carb burning point\n",
"# min_carb_row = df.loc[carb_burn_min_idx]\n",
"# print(f\"Minimum Carbohydrate Burning Point:\")\n",
"# print(f\"Time: {min_carb_row['T(sec)']} seconds\")\n",
"# print(f\"Fat burn rate: {min_carb_row['FAT_smoothed']:.3f} kcal/min\")\n",
"# print(f\"Carb burn rate: {min_carb_row['CHO_smoothed']:.3f} kcal/min\")\n",
"# print(f\"Heart Rate: {min_carb_row['HR(bpm)_smoothed']:.1f} bpm\")\n",
"# print(f\"VO2: {min_carb_row['VO2(ml/min)_smoothed']:.1f} ml/min\")\n",
"\n",
"print(\"\\n\" + \"=\"*50)\n",
"\n",
"# Find the optimal fat burning zone (highest fat:carb ratio)\n",
"df['fat_carb_ratio'] = df['FAT_smoothed'] / (df['CHO_smoothed'] + 0.00000001) # Add small value to avoid division by zero\n",
"optimal_fat_idx = df['fat_carb_ratio'].idxmax()\n",
"optimal_row = df.loc[optimal_fat_idx]\n",
"\n",
"print(f\"Optimal Fat Burning Zone (highest fat:carb ratio):\")\n",
"print(f\"Time: {optimal_row['T(sec)']} seconds\")\n",
"print(f\"Fat burn rate: {optimal_row['FAT_smoothed']:.3f} kcal/min\")\n",
"print(f\"Carb burn rate: {optimal_row['CHO_smoothed']:.3f} kcal/min\")\n",
"print(f\"Fat:Carb ratio: {optimal_row['fat_carb_ratio']:.2f}\")\n",
"print(f\"Heart Rate: {optimal_row['HR(bpm)_smoothed']:.1f} bpm\")\n",
"print(f\"VO2: {optimal_row['VO2(ml/min)_smoothed']:.1f} ml/min\")"
]
},
{
"cell_type": "code",
"execution_count": 33,
"id": "4420cfea",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Found 2 intersections at indices: [18, 47]\n",
"\n",
"Last intersection at index 47:\n",
"Time: 251.0 seconds\n",
"Fat burn rate: 3.040 kcal/min\n",
"Carb burn rate: 3.166 kcal/min\n",
"Heart Rate: 100.5 bpm\n",
"VO2: 1283.0 ml/min\n"
]
}
],
"source": [
"# Find intersections where FAT_smoothed and CHO_smoothed cross each other\n",
"intersections = []\n",
"\n",
"for i in range(1, len(df)):\n",
" # Check if there's a crossover between consecutive points\n",
" prev_fat = df.iloc[i-1]['FAT_smoothed']\n",
" prev_cho = df.iloc[i-1]['CHO_smoothed']\n",
" curr_fat = df.iloc[i]['FAT_smoothed']\n",
" curr_cho = df.iloc[i]['CHO_smoothed']\n",
" \n",
" # Skip if any values are NaN\n",
" if pd.isna(prev_fat) or pd.isna(prev_cho) or pd.isna(curr_fat) or pd.isna(curr_cho):\n",
" continue\n",
" \n",
" # Check if lines cross (fat was above/below cho and now it's below/above)\n",
" if ((prev_fat > prev_cho and curr_fat < curr_cho) or \n",
" (prev_fat < prev_cho and curr_fat > curr_cho)):\n",
" intersections.append(i)\n",
"\n",
"print(f\"Found {len(intersections)} intersections at indices: {intersections}\")\n",
"\n",
"if intersections:\n",
" # Get the last intersection\n",
" last_intersection_idx = intersections[-1]\n",
" last_intersection_row = df.iloc[last_intersection_idx]\n",
" \n",
" print(f\"\\nLast intersection at index {last_intersection_idx}:\")\n",
" print(f\"Time: {last_intersection_row['T(sec)']} seconds\")\n",
" print(f\"Fat burn rate: {last_intersection_row['FAT_smoothed']:.3f} kcal/min\")\n",
" print(f\"Carb burn rate: {last_intersection_row['CHO_smoothed']:.3f} kcal/min\")\n",
" print(f\"Heart Rate: {last_intersection_row['HR(bpm)_smoothed']:.1f} bpm\")\n",
" print(f\"VO2: {last_intersection_row['VO2(ml/min)_smoothed']:.1f} ml/min\")\n",
"else:\n",
" print(\"No intersections found between FAT and CHO curves\")"
]
},
{
"cell_type": "code",
"execution_count": 37,
"id": "62803668",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"VT1: {'HeartRate': 100.5, 'Speed': 4.0, 'Time': 251.0}\n",
"VT2: {'HeartRate': 189.71300000000002, 'Speed': 7.5, 'Time': 1524.0}\n"
]
}
],
"source": [
"def detect_vt1(df, fat_col=\"FAT_smoothed\", carb_col=\"CHO_smoothed\"):\n",
" \"\"\"\n",
" Detect VT1 as the first index where carb burn > fat burn and remains higher.\n",
" \"\"\"\n",
" condition = df[carb_col] > df[fat_col]\n",
" crossover_indices = condition[condition].index\n",
"\n",
" if len(crossover_indices) == 0:\n",
" return None # No crossover found\n",
" \n",
" # Find first crossover where carbs remain higher for the rest\n",
" for idx in crossover_indices:\n",
" if all(df.loc[idx:][carb_col] > df.loc[idx:][fat_col]):\n",
" return idx\n",
" return None\n",
"\n",
"\n",
"def detect_vt2(df, vent_col=\"VE(l/min)_smoothed\", bf_col=\"BF(bpm)_smoothed\", smooth_window=5):\n",
" \"\"\"\n",
" Detect VT2 using slope/inflection method.\n",
" Works with either Ventilation (VE) or Breathing Frequency (Bf).\n",
" \"\"\"\n",
" col = vent_col if vent_col in df.columns else bf_col\n",
" \n",
" # Use already smoothed data\n",
" smoothed_col = col\n",
" \n",
" # Compute slope (first derivative)\n",
" df[\"slope\"] = df[smoothed_col].diff()\n",
" \n",
" # Detect inflection: largest change in slope (second derivative peak)\n",
" df[\"second_derivative\"] = df[\"slope\"].diff()\n",
" inflection_idx = df[\"second_derivative\"].idxmax()\n",
" \n",
" return inflection_idx\n",
"\n",
"\n",
"def analyze_thresholds(df_input):\n",
" # Use the existing dataframe\n",
" df_copy = df_input.copy()\n",
" \n",
" # --- Detect VT1 ---\n",
" vt1_idx = detect_vt1(df_copy)\n",
" vt1 = None\n",
" if vt1_idx is not None:\n",
" vt1 = {\n",
" \"HeartRate\": df_copy.loc[vt1_idx, \"HR(bpm)_smoothed\"],\n",
" \"Speed\": df_copy.loc[vt1_idx, \"Speed\"],\n",
" \"Time\": df_copy.loc[vt1_idx, \"T(sec)\"]\n",
" }\n",
" \n",
" # --- Detect VT2 ---\n",
" vt2_idx = detect_vt2(df_copy)\n",
" vt2 = None\n",
" if vt2_idx is not None:\n",
" vt2 = {\n",
" \"HeartRate\": df_copy.loc[vt2_idx, \"HR(bpm)_smoothed\"],\n",
" \"Speed\": df_copy.loc[vt2_idx, \"Speed\"],\n",
" \"Time\": df_copy.loc[vt2_idx, \"T(sec)\"]\n",
" }\n",
" \n",
" return vt1, vt2\n",
"\n",
"\n",
"vt1, vt2 = analyze_thresholds(df)\n",
"print(\"VT1:\", vt1)\n",
"print(\"VT2:\", vt2)\n"
]
},
{
"cell_type": "code",
"execution_count": 40,
"id": "07593b56",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Zone 1 (Active Recovery): 81.7 - 96.7 bpm\n",
"Zone 2 (Aerobic Base): 96.7 - 100.5 bpm\n",
"Zone 3 (Aerobic): 100.5 - 179.7 bpm\n",
"Zone 4 (Lactate Threshold): 179.7 - 199.7 bpm\n",
"Zone 5 (VO2 Max): 199.7+ bpm\n"
]
}
],
"source": [
"zone_1_start = optimal_row['HR(bpm)_smoothed'] - 15\n",
"zone_2_start = optimal_row['HR(bpm)_smoothed']\n",
"zone_3_start = vt1\n",
"zone_4_start = vt2['HeartRate'] - 10\n",
"zone_5_start = vt2['HeartRate'] + 10\n",
"\n",
"zone_1_end = zone_2_start\n",
"zone_2_end = vt1['HeartRate']\n",
"zone_3_end = zone_4_start\n",
"zone_4_end = zone_5_start\n",
"\n",
"print(f\"Zone 1 (Active Recovery): {zone_1_start:.1f} - {zone_1_end:.1f} bpm\")\n",
"print(f\"Zone 2 (Aerobic Base): {zone_2_start:.1f} - {zone_2_end:.1f} bpm\")\n",
"print(f\"Zone 3 (Aerobic): {zone_3_start['HeartRate']:.1f} - {zone_3_end:.1f} bpm\")\n",
"print(f\"Zone 4 (Lactate Threshold): {zone_4_start:.1f} - {zone_4_end:.1f} bpm\")\n",
"print(f\"Zone 5 (VO2 Max): {zone_5_start:.1f}+ bpm\")"
]
},
{
"cell_type": "code",
"execution_count": 60,
"id": "c90415b2",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"VO2 Max detected at index 202:\n",
"Time: 985.0 seconds\n",
"VO2 Breath: 58.2 ml/breath\n",
"VO2: 2167.8 ml/min\n",
"VO2 per kg: 38.8 ml/kg/min\n",
"Heart Rate: 170.5 bpm\n",
"Speed: 6.0 km/h\n",
"VO2 Breath Slope: -0.02\n"
]
}
],
"source": [
"# Calculate the slope of VO2 Breath (first derivative)\n",
"df['vo2_breath_slope'] = df['VO2 Breath_smoothed'].diff()\n",
"\n",
"# Find points where slope is consistently zero or negative\n",
"# We'll use a rolling window to check for consistent negative/zero slope\n",
"window = len(df) // 3 # Number of consecutive points to check\n",
"\n",
"# Calculate rolling mean of slope to smooth out noise\n",
"df['vo2_breath_slope_smoothed'] = df['vo2_breath_slope'].rolling(window=window).mean()\n",
"\n",
"# Find where slope becomes consistently zero or negative\n",
"mask = df['vo2_breath_slope_smoothed'] <= 0\n",
"consistent_negative_indices = mask[mask].index\n",
"\n",
"if len(consistent_negative_indices) > 0:\n",
" # Find the first point where slope becomes consistently negative/zero\n",
" vo2_max_idx = consistent_negative_indices[0]\n",
" vo2_max_row = df.loc[vo2_max_idx]\n",
" \n",
" print(f\"VO2 Max detected at index {vo2_max_idx}:\")\n",
" print(f\"Time: {vo2_max_row['T(sec)']} seconds\")\n",
" print(f\"VO2 Breath: {vo2_max_row['VO2 Breath_smoothed']:.1f} ml/breath\")\n",
" print(f\"VO2: {vo2_max_row['VO2(ml/min)_smoothed']:.1f} ml/min\")\n",
" print(f\"VO2 per kg: {vo2_max_row['VO2(ml/min)_smoothed']/weight:.1f} ml/kg/min\")\n",
" print(f\"Heart Rate: {vo2_max_row['HR(bpm)_smoothed']:.1f} bpm\")\n",
" print(f\"Speed: {vo2_max_row['Speed']} km/h\")\n",
" print(f\"VO2 Breath Slope: {vo2_max_row['vo2_breath_slope_smoothed']:.2f}\")\n",
"else:\n",
" # If no consistent negative slope found, use the maximum VO2 Breath value\n",
" vo2_max_idx = df['VO2 Breath_smoothed'].idxmax()\n",
" vo2_max_row = df.loc[vo2_max_idx]\n",
" \n",
" print(f\"No consistent negative slope found. Using peak VO2 Breath at index {vo2_max_idx}:\")\n",
" print(f\"Time: {vo2_max_row['T(sec)']} seconds\")\n",
" print(f\"VO2 Breath: {vo2_max_row['VO2 Breath_smoothed']:.1f} ml/breath\")\n",
" print(f\"VO2: {vo2_max_row['VO2(ml/min)_smoothed']:.1f} ml/min\")\n",
" print(f\"VO2 per kg: {vo2_max_row['VO2(ml/min)_smoothed']/weight:.1f} ml/kg/min\")\n",
" print(f\"Heart Rate: {vo2_max_row['HR(bpm)_smoothed']:.1f} bpm\")\n",
" print(f\"Speed: {vo2_max_row['Speed']} km/h\")"
]
},
{
"cell_type": "code",
"execution_count": 66,
"id": "c3b2cc59",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"VO2 Pulse and HR slopes diverge consistently starting at index 89:\n",
"Time: 485.0 seconds\n",
"VO2 Pulse (smoothed): 13.91\n",
"Heart Rate (smoothed): 136.2 bpm\n",
"VO2 Pulse Slope: 0.672\n",
"HR Slope: 1.000\n",
"Slope Difference: 1.006\n",
"VO2: 1897.8 ml/min\n",
"Speed: 4.5 km/h\n",
"Threshold used: 0.615\n"
]
}
],
"source": [
"# Calculate slopes for both VO2 Pulse and HR\n",
"df['vo2_pulse_slope'] = df['VO2 Pulse_smoothed'].diff()\n",
"df['hr_slope'] = df['HR(bpm)_smoothed'].diff()\n",
"\n",
"# Calculate the difference between the slopes\n",
"df['slope_difference'] = abs(df['vo2_pulse_slope'] - df['hr_slope'])\n",
"\n",
"# Find where the slope difference becomes consistently large (slopes diverge)\n",
"# Use a rolling window to smooth out noise\n",
"window_size = len(df) // 5 # Adjust window size as needed\n",
"df['slope_difference_smoothed'] = df['slope_difference'].rolling(window=window_size).mean()\n",
"\n",
"# Find the threshold - we'll use the 75th percentile of slope differences as threshold\n",
"threshold = df['slope_difference_smoothed'].quantile(0.75)\n",
"\n",
"# Find points where slope difference exceeds threshold\n",
"divergence_mask = df['slope_difference_smoothed'] > threshold\n",
"divergence_indices = divergence_mask[divergence_mask].index\n",
"\n",
"if len(divergence_indices) > 0:\n",
" # Find the first sustained divergence point\n",
" min_consecutive_points = 5\n",
" consistent_divergence_idx = None\n",
" \n",
" for start_idx in divergence_indices:\n",
" # Check if divergence is sustained for consecutive points\n",
" consecutive_count = 0\n",
" for j in range(start_idx, min(start_idx + min_consecutive_points, len(df))):\n",
" if j in divergence_indices:\n",
" consecutive_count += 1\n",
" else:\n",
" break\n",
" \n",
" if consecutive_count >= min_consecutive_points:\n",
" consistent_divergence_idx = start_idx\n",
" break\n",
" \n",
" if consistent_divergence_idx is not None:\n",
" divergence_row = df.iloc[consistent_divergence_idx]\n",
" \n",
" print(f\"VO2 Pulse and HR slopes diverge consistently starting at index {consistent_divergence_idx}:\")\n",
" print(f\"Time: {divergence_row['T(sec)']} seconds\")\n",
" print(f\"VO2 Pulse (smoothed): {divergence_row['VO2 Pulse_smoothed']:.2f}\")\n",
" print(f\"Heart Rate (smoothed): {divergence_row['HR(bpm)_smoothed']:.1f} bpm\")\n",
" print(f\"VO2 Pulse Slope: {divergence_row['vo2_pulse_slope']:.3f}\")\n",
" print(f\"HR Slope: {divergence_row['hr_slope']:.3f}\")\n",
" print(f\"Slope Difference: {divergence_row['slope_difference_smoothed']:.3f}\")\n",
" print(f\"VO2: {divergence_row['VO2(ml/min)_smoothed']:.1f} ml/min\")\n",
" print(f\"Speed: {divergence_row['Speed']} km/h\")\n",
" print(f\"Threshold used: {threshold:.3f}\")\n",
" else:\n",
" print(f\"No sustained divergence found. Threshold: {threshold:.3f}\")\n",
" # Show the point with maximum slope difference instead\n",
" max_diff_idx = df['slope_difference_smoothed'].idxmax()\n",
" max_diff_row = df.iloc[max_diff_idx]\n",
" \n",
" print(f\"\\nPoint with maximum slope difference at index {max_diff_idx}:\")\n",
" print(f\"Time: {max_diff_row['T(sec)']} seconds\")\n",
" print(f\"VO2 Pulse (smoothed): {max_diff_row['VO2 Pulse_smoothed']:.2f}\")\n",
" print(f\"Heart Rate (smoothed): {max_diff_row['HR(bpm)_smoothed']:.1f} bpm\")\n",
" print(f\"Slope Difference: {max_diff_row['slope_difference_smoothed']:.3f}\")\n",
"else:\n",
" print(\"No significant slope divergence found between VO2 Pulse and HR\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "672d68f3",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Maximum FAT_smoothed occurs at index 30:\n",
"Heart Rate (smoothed): 96.7 bpm\n",
"FAT (smoothed): 3.894 kcal/min\n"
]
}
],
"source": [
"max_fat_smoothed_idx = df['FAT_smoothed'].idxmax()\n",
"max_fat_smoothed_row = df.loc[max_fat_smoothed_idx]\n",
"max_heart_rate = 220 - keirstyn_data['Age'].iloc[0]\n",
"\n",
"print(f\"Maximum FAT_smoothed occurs at index {max_fat_smoothed_idx}:\")\n",
"print(f\"Heart Rate (smoothed): {max_fat_smoothed_row['HR(bpm)_smoothed']:.1f} bpm\")\n",
"print(f\"FAT (smoothed): {max_fat_smoothed_row['FAT_smoothed']:.3f} kcal/min\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fe3b7605",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "report_generation",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.3"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Binary file not shown.
+197
View File
@@ -0,0 +1,197 @@
"""
FastAPI application for medical report generation.
This API provides a single endpoint that accepts all required files
and patient information, then generates a comprehensive medical report.
"""
import shutil
import tempfile
from pathlib import Path
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.responses import FileResponse
from pydantic import BaseModel
from services.report_generator import ReportGeneratorService
app = FastAPI(
title="Medical Report Generation API",
description="API for generating medical performance reports with analysis and graphs",
version="2.0.0",
)
# Define output directories
GRAPHS_DIR = Path("graphs")
GRAPHS_DIR.mkdir(exist_ok=True)
REPORTS_DIR = Path("reports")
REPORTS_DIR.mkdir(exist_ok=True)
# Initialize report generator service
report_service = ReportGeneratorService(
template_dir="app/report_gen",
graphs_dir=str(GRAPHS_DIR),
reports_dir=str(REPORTS_DIR),
)
class ReportResponse(BaseModel):
message: str
report_path: str
graphs_generated: list
analysis_data: dict
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"message": "Medical Report Generation API",
"version": "2.0.0",
"endpoints": {
"generate_report": "POST /generate-report",
"download_report": "GET /download-report/{filename}",
"health": "GET /health",
},
}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "report-generation-api"}
@app.post("/generate-report", response_model=ReportResponse)
async def generate_report(
patient_name: str = Form(..., description="Patient name"),
age: int = Form(..., description="Patient age"),
height: str = Form(..., description="Patient height (e.g., 5'4\")"),
weight: str = Form(..., description="Patient weight (e.g., 123lbs)"),
focus: str = Form(default="Endurance", description="Training focus"),
session_id: str = Form(default="default", description="Session ID"),
spirometry_pdf: UploadFile = File(..., description="Spirometry PDF file"),
pnoe_csv: UploadFile = File(..., description="Pnoe CSV file"),
seca_excel: UploadFile = File(..., description="SECA Excel file"),
):
"""
Generate a comprehensive medical report from uploaded files.
This endpoint accepts all required files and patient information,
processes the data, generates graphs, and returns a PDF report.
Args:
spirometry_pdf: Spirometry PDF file
pnoe_csv: Pnoe CSV data file
seca_excel: SECA body composition Excel file
patient_name: Name of the patient
age: Patient age
height: Patient height
weight: Patient weight
focus: Training focus (default: Endurance)
session_id: Session identifier (default: default)
Returns:
ReportResponse with report path, graphs generated, and analysis data
"""
# Validate file types
if not spirometry_pdf.filename.endswith(".pdf"):
raise HTTPException(status_code=400, detail="Spirometry file must be a PDF")
if not pnoe_csv.filename.endswith(".csv"):
raise HTTPException(status_code=400, detail="Pnoe file must be a CSV")
if not seca_excel.filename.endswith((".xlsx", ".xls")):
raise HTTPException(
status_code=400, detail="SECA file must be an Excel file (.xlsx or .xls)"
)
# Create temporary directory for uploaded files
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Save uploaded files temporarily
spirometry_path = temp_path / f"spirometry_{spirometry_pdf.filename}"
pnoe_path = temp_path / f"pnoe_{pnoe_csv.filename}"
seca_path = temp_path / f"seca_{seca_excel.filename}"
try:
# Write files
with open(spirometry_path, "wb") as f:
shutil.copyfileobj(spirometry_pdf.file, f)
with open(pnoe_path, "wb") as f:
shutil.copyfileobj(pnoe_csv.file, f)
with open(seca_path, "wb") as f:
shutil.copyfileobj(seca_excel.file, f)
# Prepare patient information
patient_info = {
"patient_name": patient_name,
"age": age,
"height": height,
"weight": weight,
"focus": focus,
"session_id": session_id,
}
# Generate report using the service
result = await report_service.generate_report(
spirometry_pdf_path=str(spirometry_path),
pnoe_csv_path=str(pnoe_path),
seca_excel_path=str(seca_path),
patient_info=patient_info,
)
return ReportResponse(
message="Report generated successfully",
report_path=result["report_path"],
graphs_generated=result["graphs_generated"],
analysis_data=result["analysis_data"],
)
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"ERROR: {error_details}") # This will show in terminal
raise HTTPException(
status_code=500,
detail=f"Error generating report: {str(e)}\n{error_details}",
)
finally:
# Close file handles
spirometry_pdf.file.close()
pnoe_csv.file.close()
seca_excel.file.close()
@app.get("/download-report/{filename}")
async def download_report(filename: str):
"""
Download a generated report.
Args:
filename: Name of the report file
Returns:
PDF file
"""
file_path = REPORTS_DIR / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="Report not found")
return FileResponse(
path=file_path,
media_type="application/pdf",
filename=filename,
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
@@ -26,7 +26,7 @@
<!-- Name and Date Section --> <!-- Name and Date Section -->
<div class="text-right mt-16"> <div class="text-right mt-16">
<h2 class="text-4xl font-bold tracking-wider mb-2"> <h2 class="text-4xl font-bold tracking-wider mb-2">
{{ name|upper }} {{ first_name|upper }}
</h2> </h2>
<h2 class="text-4xl font-bold tracking-wider mb-6"> <h2 class="text-4xl font-bold tracking-wider mb-6">
{{ surname|upper }} {{ surname|upper }}
Binary file not shown.
+280
View File
@@ -0,0 +1,280 @@
"""
Context Generator Service
This service processes all data files and generates context dictionaries for each page
of the medical report. It performs analysis on Pnoe, Spirometry, and SECA data.
"""
from datetime import datetime
from typing import Dict, List, Optional, Tuple
import pandas as pd
class ContextGenerator:
"""Generate context data for report pages"""
def __init__(self):
self.pnoe_df = None
self.spirometry_df = None
self.seca_df = None
self.patient_info = {}
def load_data(
self,
pnoe_path: str,
spirometry_path: str,
seca_path: str,
):
"""Load all required datasets"""
self.pnoe_df = pd.read_csv(pnoe_path, delimiter=";")
self.spirometry_df = pd.read_csv(spirometry_path)
self.seca_df = pd.read_excel(seca_path)
self._preprocess_pnoe_data()
def _preprocess_pnoe_data(self):
"""Apply preprocessing steps to Pnoe data"""
# Convert numeric columns
for col in self.pnoe_df.columns:
try:
self.pnoe_df[col] = pd.to_numeric(self.pnoe_df[col])
except (ValueError, TypeError):
pass
self.pnoe_df["VO2 Pulse"] = (
self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["HR(bpm)"]
)
self.pnoe_df["VO2 Breath"] = (
self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["BF(bpm)"]
)
self.pnoe_df["CHO"] = (
self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["CARBS(%)"] / 100
)
self.pnoe_df["FAT"] = (
self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["FAT(%)"] / 100
)
window_size = 10
columns_to_smooth = [
"VO2(ml/min)",
"VCO2(ml/min)",
"HR(bpm)",
"VT(l)",
"BF(bpm)",
"VE(l/min)",
"VO2 Pulse",
"VO2 Breath",
"CHO",
"FAT",
]
for col in columns_to_smooth:
if col in self.pnoe_df.columns:
self.pnoe_df[f"{col}_smoothed"] = (
self.pnoe_df[col].rolling(window=window_size, min_periods=1).mean()
)
def extract_patient_info(self, patient_name: str) -> Dict:
"""Extract patient information from SECA dataset"""
if self.seca_df is not None:
patient_data = self.seca_df[
self.seca_df["LastName"].str.contains(
patient_name, case=False, na=False
)
]
if not patient_data.empty:
row = patient_data.iloc[0]
weight_kg = float(row.get("Weight", 0))
fat_pct = float(row.get("Adult_FMP", 0))
self.patient_info = {
"name": f"{row.get('FirstName', '')} {row.get('LastName', '')}",
"first_name": row.get("FirstName", ""),
"last_name": row.get("LastName", ""),
"age": int(row.get("Age", 0)),
"height": f"{row.get('Height', '')}",
"weight": weight_kg,
"gender": row.get("Gender", "").lower(),
"fat_percentage": fat_pct,
"fat_mass_lbs": weight_kg * fat_pct / 100 * 2.20462,
"lean_mass_lbs": weight_kg * (1 - fat_pct / 100) * 2.20462,
}
return self.patient_info
def calculate_spirometry_metrics(self) -> Dict:
"""Calculate spirometry-related metrics"""
metrics = {}
for param in ["FVC", "FEV1", "FEV1/FVC%"]:
row = self.spirometry_df.loc[
self.spirometry_df["Parameters"].str.strip() == param
]
if not row.empty:
param_key = param.lower().replace("/", "_").replace("%", "_pct")
metrics[f"{param_key}_best"] = row["Best"].values[0]
metrics[f"{param_key}_pred"] = row["%Pred."].values[0]
return metrics
def calculate_pnoe_metrics(self) -> Dict:
"""Calculate all Pnoe-derived metrics"""
metrics = {}
metrics["vo2_max"] = self.pnoe_df["VO2(ml/min)_smoothed"].max()
metrics["vo2_max_per_kg"] = metrics["vo2_max"] / self.patient_info["weight"]
peak_vt_idx = self.pnoe_df["VT(l)_smoothed"].idxmax()
peak_vt_row = self.pnoe_df.loc[peak_vt_idx]
metrics["peak_vt"] = peak_vt_row["VT(l)_smoothed"]
metrics["peak_vt_hr"] = peak_vt_row["HR(bpm)_smoothed"]
fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax()
fat_max_row = self.pnoe_df.loc[fat_max_idx]
metrics["fat_max_value"] = fat_max_row["FAT_smoothed"]
metrics["fat_max_hr"] = fat_max_row["HR(bpm)_smoothed"]
vt1, vt2 = self._detect_thresholds()
metrics["vt1"] = vt1
metrics["vt2"] = vt2
zones = self._calculate_hr_zones(vt1, vt2, fat_max_row)
metrics.update(zones)
return metrics
def _detect_thresholds(self) -> Tuple[Optional[Dict], Optional[Dict]]:
"""Detect VT1 and VT2 thresholds"""
condition = self.pnoe_df["CHO_smoothed"] > self.pnoe_df["FAT_smoothed"]
crossover_indices = condition[condition].index
vt1 = None
if len(crossover_indices) > 0:
vt1_idx = crossover_indices[0]
vt1_row = self.pnoe_df.loc[vt1_idx]
vt1 = {
"HeartRate": vt1_row["HR(bpm)_smoothed"],
"Speed": vt1_row["Speed"],
"Time": vt1_row["T(sec)"],
}
ve_slope = self.pnoe_df["VE(l/min)_smoothed"].diff()
second_derivative = ve_slope.diff()
vt2_idx = second_derivative.idxmax()
vt2 = None
if pd.notna(vt2_idx):
vt2_row = self.pnoe_df.loc[vt2_idx]
vt2 = {
"HeartRate": vt2_row["HR(bpm)_smoothed"],
"Speed": vt2_row["Speed"],
"Time": vt2_row["T(sec)"],
}
return vt1, vt2
def _calculate_hr_zones(
self, vt1: Optional[Dict], vt2: Optional[Dict], fat_max_row: pd.Series
) -> Dict:
"""Calculate heart rate zones based on thresholds"""
zones = {}
if vt1 and vt2:
zone_1_start = fat_max_row["HR(bpm)_smoothed"] - 15
zone_2_start = fat_max_row["HR(bpm)_smoothed"]
zone_3_start = vt1["HeartRate"]
zone_4_start = vt2["HeartRate"] - 10
zone_5_start = vt2["HeartRate"] + 10
zones["zone1_bpm"] = f"{int(zone_1_start)}-{int(zone_2_start)}bpm"
zones["zone2_bpm"] = f"{int(zone_2_start)}-{int(vt1['HeartRate'])}bpm"
zones["zone3_bpm"] = f"{int(zone_3_start)}-{int(zone_4_start)}bpm"
zones["zone4_bpm"] = f"{int(zone_4_start)}-{int(zone_5_start)}bpm"
zones["zone5_bpm"] = f"{int(zone_5_start)}+bpm"
else:
max_hr = 220 - self.patient_info["age"]
zones["zone1_bpm"] = f"{int(max_hr * 0.55)}-{int(max_hr * 0.65)}bpm"
zones["zone2_bpm"] = f"{int(max_hr * 0.65)}-{int(max_hr * 0.75)}bpm"
zones["zone3_bpm"] = f"{int(max_hr * 0.75)}-{int(max_hr * 0.85)}bpm"
zones["zone4_bpm"] = f"{int(max_hr * 0.85)}-{int(max_hr * 0.95)}bpm"
zones["zone5_bpm"] = f"{int(max_hr * 0.95)}+bpm"
return zones
def generate_all_contexts(
self, patient_name: str, graphs: Dict[str, str]
) -> List[Dict]:
"""Main method to generate all page contexts"""
self.extract_patient_info(patient_name)
spirometry_metrics = self.calculate_spirometry_metrics()
pnoe_metrics = self.calculate_pnoe_metrics()
contexts = []
contexts.append(
{
"name": self.patient_info["name"],
"surname": self.patient_info["last_name"],
"date": datetime.now().strftime("%B %d, %Y"),
}
)
contexts.append(
{
"patient_name": self.patient_info["name"],
"test_date": datetime.now().strftime("%B %d, %Y"),
}
)
for i in range(4):
contexts.append(
{"patient_name": self.patient_info["name"], "page_number": i + 3}
)
fev1_percentage = 0
if spirometry_metrics.get("fvc_best"):
fev1_percentage = (
pnoe_metrics["peak_vt"] / spirometry_metrics["fvc_best"]
) * 100
contexts.append(
{
"peak_vt": f"{pnoe_metrics['peak_vt']:.2f}",
"peak_vt_bpm": f"{int(pnoe_metrics['peak_vt_hr'])}",
"fev1_percentage": f"{fev1_percentage:.1f}",
"lung_analysis_chart": graphs.get("spirometry_chart", ""),
"respiratory_analysis_chart": graphs.get("respiratory", ""),
}
)
contexts.append(
{
"vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}",
"age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}",
"zone1_bpm": pnoe_metrics.get("zone1_bpm", ""),
"zone2_bpm": pnoe_metrics.get("zone2_bpm", ""),
"zone3_bpm": pnoe_metrics.get("zone3_bpm", ""),
"zone4_bpm": pnoe_metrics.get("zone4_bpm", ""),
"zone5_bpm": pnoe_metrics.get("zone5_bpm", ""),
"vo2_pulse_chart": graphs.get("vo2_pulse", ""),
}
)
contexts.append(
{
"fat_max_value": f"{pnoe_metrics['fat_max_value']:.2f}",
"fat_max_hr": f"{int(pnoe_metrics['fat_max_hr'])}",
"fuel_utilization_chart": graphs.get("fuel_utilization", ""),
"fat_metabolism_chart": graphs.get("fat_metabolism", ""),
}
)
contexts.append(
{
"fat_percentage": f"{self.patient_info['fat_percentage']:.1f}",
"fat_mass_lbs": f"{self.patient_info['fat_mass_lbs']:.1f}",
"lean_mass_lbs": f"{self.patient_info['lean_mass_lbs']:.1f}",
"body_composition_chart": graphs.get("body_composition", ""),
"body_fat_percent_chart": graphs.get("body_fat_percent", ""),
}
)
for i in range(9):
contexts.append(
{
"patient_name": self.patient_info["name"],
"page_number": i + 11,
"vo2_breath_chart": graphs.get("vo2_breath", ""),
"recovery_chart": graphs.get("recovery", ""),
}
)
return contexts
@@ -1,7 +1,16 @@
"""
Graph Generator Service
This service generates all the charts and visualizations required for the medical report.
Based on the analysis notebooks in services_dfdf/.
"""
import base64 import base64
from pathlib import Path from pathlib import Path
from typing import Dict
import matplotlib
matplotlib.use("Agg") # Use non-interactive backend
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import matplotlib.transforms as mtransforms import matplotlib.transforms as mtransforms
import numpy as np import numpy as np
@@ -11,13 +20,28 @@ from matplotlib.patches import FancyBboxPatch
class GraphGenerator: class GraphGenerator:
"""Generate all charts for medical reports"""
def __init__(self, charts_dir: str = "graphs"): def __init__(self, charts_dir: str = "graphs"):
"""Initialize the GraphGenerator with output directory for charts""" """
Initialize the graph generator.
Args:
charts_dir: Directory to save generated charts
"""
self.charts_dir = Path(charts_dir) self.charts_dir = Path(charts_dir)
self.charts_dir.mkdir(exist_ok=True) self.charts_dir.mkdir(exist_ok=True)
def _image_to_base64(self, image_path: Path) -> str: def _image_to_base64(self, image_path: Path) -> str:
"""Convert image to base64 string""" """
Convert image file to base64 string.
Args:
image_path: Path to image file
Returns:
Base64 encoded string
"""
try: try:
with open(image_path, "rb") as image_file: with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8") return base64.b64encode(image_file.read()).decode("utf-8")
@@ -25,27 +49,35 @@ class GraphGenerator:
return "" return ""
def generate_respiratory_chart( def generate_respiratory_chart(
self, df: pd.DataFrame, save_as_base64: bool = False self, df: pd.DataFrame, save_as_base64: bool = True
) -> str: ) -> str:
"""Generate respiratory chart showing VT and Speed over time""" """
# Get phase times for background regions Generate respiratory chart (VT and Speed over time).
Args:
df: Processed DataFrame with smoothed columns
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
first_unique_phase = df.drop_duplicates(subset="PHASE") first_unique_phase = df.drop_duplicates(subset="PHASE")
phase_times = first_unique_phase["T(sec)"].tolist() phase_times = first_unique_phase["T(sec)"].tolist()
plt.figure(figsize=(18, 5)) plt.figure(figsize=(18, 5))
ax1 = plt.subplot() ax1 = plt.subplot()
# Plot VT with step-like appearance # Plot VT
sns.lineplot(data=df, x="T(sec)", y="VT(l)_smoothed", label="VT (L)") sns.lineplot(data=df, x="T(sec)", y="VT(l)_smoothed", label="VT (L)")
ax1.set_xlabel("Time (sec)") ax1.set_xlabel("Time (sec)")
ax1.set_ylabel("VT (L)") ax1.set_ylabel("VT (L)")
ax1.grid(True, alpha=0.1) ax1.grid(True, alpha=0.1)
ax1.set_ylim(0, min(8, df["VT(l)_smoothed"].max())) ax1.set_ylim(0, min(8, df["VT(l)_smoothed"].max()))
# Plot speed as step function on secondary y-axis # Plot speed on secondary y-axis
ax2 = ax1.twinx() ax2 = ax1.twinx()
ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200)) ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200))
line2 = sns.lineplot( sns.lineplot(
data=df, data=df,
x="T(sec)", x="T(sec)",
y="Speed", y="Speed",
@@ -58,11 +90,9 @@ class GraphGenerator:
ax2.set_ylabel("Speed") ax2.set_ylabel("Speed")
ax2.set_ylim(0, min(30, df["Speed"].max()) + 1) ax2.set_ylim(0, min(30, df["Speed"].max()) + 1)
# Remove default legends first # Combine legends
ax1.get_legend().remove() ax1.get_legend().remove()
ax2.get_legend().remove() ax2.get_legend().remove()
# Combine legends from both axes in the top left
lines1, labels1 = ax1.get_legend_handles_labels() lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels() lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left") ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left")
@@ -81,12 +111,23 @@ class GraphGenerator:
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_fuel_utilization_chart( def generate_fuel_utilization_chart(
self, df: pd.DataFrame, save_as_base64: bool = False self, df: pd.DataFrame, save_as_base64: bool = True
) -> str: ) -> str:
"""Generate fuel utilization chart with stacked bars showing fat vs carbs""" """
# Group by speed and calculate mean for numeric columns only Generate fuel utilization chart (CHO vs FAT by stage).
Args:
df: Processed DataFrame with smoothed columns
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
# Group by speed and calculate mean
speed_groups = df.groupby("Speed").mean(numeric_only=True).round(1) speed_groups = df.groupby("Speed").mean(numeric_only=True).round(1)
speed_groups = speed_groups.iloc[1:-1] speed_groups = speed_groups.iloc[1:-1]
# Filter data
filtered_data = speed_groups[ filtered_data = speed_groups[
(speed_groups.index >= 3.5) & (speed_groups.index <= 7.5) (speed_groups.index >= 3.5) & (speed_groups.index <= 7.5)
] ]
@@ -94,19 +135,24 @@ class GraphGenerator:
plt.figure(figsize=(15, 8)) plt.figure(figsize=(15, 8))
plt.style.use("default") plt.style.use("default")
# Create stage labels and positions
stage_labels = [f"Stage {i}" for i in range(1, len(filtered_data) + 1)] stage_labels = [f"Stage {i}" for i in range(1, len(filtered_data) + 1)]
x_positions = np.arange(len(filtered_data)) x_positions = np.arange(len(filtered_data))
# Calculate fat and carbs energy expenditure from percentages # Calculate fat and carbs energy expenditure
fat_ee = filtered_data["EE(kcal/min)"] * filtered_data["FAT(%)"] / 100 fat_ee = filtered_data["EE(kcal/min)"] * filtered_data["FAT(%)"] / 100
carbs_ee = filtered_data["EE(kcal/min)"] * filtered_data["CARBS(%)"] / 100 carbs_ee = filtered_data["EE(kcal/min)"] * filtered_data["CARBS(%)"] / 100
# Create the main axis for the stacked bars
ax1 = plt.gca() ax1 = plt.gca()
# Create stacked bar chart with colors # Create stacked bar chart
ax1.bar(x_positions, fat_ee, color="#1f77b4", alpha=0.8, width=0.6, label="Fat") ax1.bar(
x_positions,
fat_ee,
color="#1f77b4",
alpha=0.8,
width=0.6,
label="Fat",
)
ax1.bar( ax1.bar(
x_positions, x_positions,
carbs_ee, carbs_ee,
@@ -117,16 +163,15 @@ class GraphGenerator:
label="Carbs", label="Carbs",
) )
# Set labels and formatting for primary axis
ax1.set_xlabel("", fontsize=12) ax1.set_xlabel("", fontsize=12)
ax1.set_ylabel("Fuel (kcal/min)", fontsize=12) ax1.set_ylabel("Fuel (kcal/min)", fontsize=12)
ax1.set_ylim(0, 20) ax1.set_ylim(0, 20)
# Add individual values on each bar segment # Add values on bars
for i, (fat_val, carb_val, total_val) in enumerate( for i, (fat_val, carb_val, total_val) in enumerate(
zip(fat_ee, carbs_ee, filtered_data["EE(kcal/min)"]) zip(fat_ee, carbs_ee, filtered_data["EE(kcal/min)"])
): ):
if fat_val > 0.3: # Fat value if fat_val > 0.3:
ax1.text( ax1.text(
i, i,
fat_val / 2, fat_val / 2,
@@ -137,7 +182,7 @@ class GraphGenerator:
fontweight="bold", fontweight="bold",
color="white", color="white",
) )
if carb_val > 0.3: # Carbs value if carb_val > 0.3:
ax1.text( ax1.text(
i, i,
fat_val + carb_val / 2, fat_val + carb_val / 2,
@@ -148,7 +193,6 @@ class GraphGenerator:
fontweight="bold", fontweight="bold",
color="white", color="white",
) )
# Total EE
ax1.text( ax1.text(
i, i,
total_val + 0.5, total_val + 0.5,
@@ -160,7 +204,7 @@ class GraphGenerator:
color="black", color="black",
) )
# Add speed labels below x-axis # Add speed labels
for i, speed in enumerate(filtered_data.index): for i, speed in enumerate(filtered_data.index):
ax1.text(i, -1.5, f"{speed:.1f} mph", ha="center", va="top", fontsize=9) ax1.text(i, -1.5, f"{speed:.1f} mph", ha="center", va="top", fontsize=9)
ax1.text( ax1.text(
@@ -175,8 +219,6 @@ class GraphGenerator:
# Create secondary y-axis for heart rate # Create secondary y-axis for heart rate
ax2 = ax1.twinx() ax2 = ax1.twinx()
# Plot heart rate line
ax2.plot( ax2.plot(
x_positions, x_positions,
filtered_data["HR(bpm)"], filtered_data["HR(bpm)"],
@@ -187,12 +229,11 @@ class GraphGenerator:
label="Heart Rate", label="Heart Rate",
) )
# Set heart rate axis formatting
ax2.set_ylabel("Heart Rate (bpm)", fontsize=12, color="red") ax2.set_ylabel("Heart Rate (bpm)", fontsize=12, color="red")
ax2.tick_params(axis="y", labelcolor="red") ax2.tick_params(axis="y", labelcolor="red")
ax2.set_ylim(0, 220) ax2.set_ylim(0, 220)
# Add HR values above the points # Add HR values
for i, hr in enumerate(filtered_data["HR(bpm)"]): for i, hr in enumerate(filtered_data["HR(bpm)"]):
ax2.text( ax2.text(
i, i,
@@ -205,7 +246,6 @@ class GraphGenerator:
color="red", color="red",
) )
# Set x-axis formatting
ax1.set_xticks(x_positions) ax1.set_xticks(x_positions)
ax1.set_xticklabels(stage_labels, fontsize=11) ax1.set_xticklabels(stage_labels, fontsize=11)
@@ -221,11 +261,9 @@ class GraphGenerator:
shadow=True, shadow=True,
) )
# Add grid
ax1.grid(True, alpha=0.3, linestyle="-", linewidth=0.5) ax1.grid(True, alpha=0.3, linestyle="-", linewidth=0.5)
ax1.set_axisbelow(True) ax1.set_axisbelow(True)
# Adjust layout
plt.tight_layout() plt.tight_layout()
plt.subplots_adjust(bottom=0.1, top=0.9) plt.subplots_adjust(bottom=0.1, top=0.9)
@@ -236,9 +274,18 @@ class GraphGenerator:
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_vo2_pulse_chart( def generate_vo2_pulse_chart(
self, df: pd.DataFrame, save_as_base64: bool = False self, df: pd.DataFrame, save_as_base64: bool = True
) -> str: ) -> str:
"""Generate VO2 Pulse chart with heart rate and speed""" """
Generate VO2 Pulse chart with HR and Speed.
Args:
df: Processed DataFrame with smoothed columns
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
first_unique_phase = df.drop_duplicates(subset="PHASE") first_unique_phase = df.drop_duplicates(subset="PHASE")
phase_times = first_unique_phase["T(sec)"].tolist() phase_times = first_unique_phase["T(sec)"].tolist()
@@ -292,12 +339,14 @@ class GraphGenerator:
ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200)) ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200))
# Remove default legends first # Combine legends
for ax in [ax1, ax2, ax3]: if ax1.get_legend():
if ax.get_legend(): ax1.get_legend().remove()
ax.get_legend().remove() if ax2.get_legend():
ax2.get_legend().remove()
if ax3.get_legend():
ax3.get_legend().remove()
# Combine legends from all axes
lines1, labels1 = ax1.get_legend_handles_labels() lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels() lines2, labels2 = ax2.get_legend_handles_labels()
lines3, labels3 = ax3.get_legend_handles_labels() lines3, labels3 = ax3.get_legend_handles_labels()
@@ -319,16 +368,24 @@ class GraphGenerator:
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_vo2_breath_chart( def generate_vo2_breath_chart(
self, df: pd.DataFrame, save_as_base64: bool = False self, df: pd.DataFrame, save_as_base64: bool = True
) -> str: ) -> str:
"""Generate VO2 per Breath chart""" """
Generate VO2 per Breath chart.
Args:
df: Processed DataFrame with smoothed columns
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
first_unique_phase = df.drop_duplicates(subset="PHASE") first_unique_phase = df.drop_duplicates(subset="PHASE")
phase_times = first_unique_phase["T(sec)"].tolist() phase_times = first_unique_phase["T(sec)"].tolist()
plt.figure(figsize=(18, 5)) plt.figure(figsize=(18, 5))
ax1 = plt.subplot() ax1 = plt.subplot()
# Plot VO2 per Breath
sns.lineplot( sns.lineplot(
data=df, data=df,
x="T(sec)", x="T(sec)",
@@ -340,7 +397,7 @@ class GraphGenerator:
ax1.set_ylim(0, df["VO2 Breath_smoothed"].max() + 1) ax1.set_ylim(0, df["VO2 Breath_smoothed"].max() + 1)
ax1.grid(True, alpha=0.1) ax1.grid(True, alpha=0.1)
# Plot speed as step function on secondary y-axis # Plot speed on secondary y-axis
ax2 = ax1.twinx() ax2 = ax1.twinx()
ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200)) ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200))
sns.lineplot( sns.lineplot(
@@ -356,11 +413,9 @@ class GraphGenerator:
ax2.set_ylim(0, df["Speed"].max() + 1) ax2.set_ylim(0, df["Speed"].max() + 1)
ax2.set_ylabel("Speed") ax2.set_ylabel("Speed")
# Remove default legends first # Combine legends
ax1.get_legend().remove() ax1.get_legend().remove()
ax2.get_legend().remove() ax2.get_legend().remove()
# Combine legends from both axes in the top left
lines1, labels1 = ax1.get_legend_handles_labels() lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels() lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left") ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left")
@@ -379,9 +434,18 @@ class GraphGenerator:
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_fat_metabolism_chart( def generate_fat_metabolism_chart(
self, df: pd.DataFrame, save_as_base64: bool = False self, df: pd.DataFrame, save_as_base64: bool = True
) -> str: ) -> str:
"""Generate CHO and FAT metabolism chart""" """
Generate fat metabolism chart (CHO vs FAT over time).
Args:
df: Processed DataFrame with smoothed columns
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
first_unique_phase = df.drop_duplicates(subset="PHASE") first_unique_phase = df.drop_duplicates(subset="PHASE")
phase_times = first_unique_phase["T(sec)"].tolist() phase_times = first_unique_phase["T(sec)"].tolist()
@@ -391,7 +455,7 @@ class GraphGenerator:
# Plot CHO # Plot CHO
sns.lineplot(data=df, x="T(sec)", y="CHO_smoothed", label="CHO (kcal/min)") sns.lineplot(data=df, x="T(sec)", y="CHO_smoothed", label="CHO (kcal/min)")
ax1.set_xlabel("Time (sec)") ax1.set_xlabel("Time (sec)")
ax1.set_ylabel("CHO (kcal/min)") ax1.set_ylabel("CHO (g/min)")
ax1.grid(True, alpha=0.1) ax1.grid(True, alpha=0.1)
# Plot FAT on secondary y-axis # Plot FAT on secondary y-axis
@@ -408,11 +472,9 @@ class GraphGenerator:
ax2.set_ylabel("FAT (kcal/min)") ax2.set_ylabel("FAT (kcal/min)")
ax2.set_ylim(0, 15) ax2.set_ylim(0, 15)
# Remove default legends first # Combine legends
ax1.get_legend().remove() ax1.get_legend().remove()
ax2.get_legend().remove() ax2.get_legend().remove()
# Combine legends from both axes in the top left
lines1, labels1 = ax1.get_legend_handles_labels() lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels() lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left") ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left")
@@ -431,9 +493,18 @@ class GraphGenerator:
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_recovery_chart( def generate_recovery_chart(
self, df: pd.DataFrame, save_as_base64: bool = False self, df: pd.DataFrame, save_as_base64: bool = True
) -> str: ) -> str:
"""Generate recovery chart with VCO2, HR, and BF""" """
Generate recovery chart (VCO2, HR, and BF).
Args:
df: Processed DataFrame with smoothed columns
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
first_unique_phase = df.drop_duplicates(subset="PHASE") first_unique_phase = df.drop_duplicates(subset="PHASE")
phase_times = first_unique_phase["T(sec)"].tolist() phase_times = first_unique_phase["T(sec)"].tolist()
@@ -449,7 +520,7 @@ class GraphGenerator:
color="blue", color="blue",
) )
ax1.set_xlabel("Time (sec)") ax1.set_xlabel("Time (sec)")
ax1.set_ylabel("VCO2 (ml/min)") ax1.set_ylabel("VO2 Pulse (mL/beat)")
ax1.set_ylim(0, df["VCO2(ml/min)"].max()) ax1.set_ylim(0, df["VCO2(ml/min)"].max())
ax1.grid(True, alpha=0.1) ax1.grid(True, alpha=0.1)
@@ -468,7 +539,7 @@ class GraphGenerator:
ax2.set_ylim(df["HR(bpm)_smoothed"].min(), df["HR(bpm)_smoothed"].max() + 1) ax2.set_ylim(df["HR(bpm)_smoothed"].min(), df["HR(bpm)_smoothed"].max() + 1)
ax2.tick_params(axis="y", labelcolor="red") ax2.tick_params(axis="y", labelcolor="red")
# Create third y-axis for breathing frequency # Create third y-axis for BF
ax3 = ax1.twinx() ax3 = ax1.twinx()
ax3.spines["right"].set_position(("outward", 60)) ax3.spines["right"].set_position(("outward", 60))
sns.lineplot( sns.lineplot(
@@ -485,12 +556,14 @@ class GraphGenerator:
ax3.set_ylim(0, df["BF(bpm)_smoothed"].max() + 1) ax3.set_ylim(0, df["BF(bpm)_smoothed"].max() + 1)
ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200)) ax1.set_xticks(np.arange(0, df["T(sec)"].max() + 200, 200))
# Remove default legends first # Combine legends
for ax in [ax1, ax2, ax3]: if ax1.get_legend():
if ax.get_legend(): ax1.get_legend().remove()
ax.get_legend().remove() if ax2.get_legend():
ax2.get_legend().remove()
if ax3.get_legend():
ax3.get_legend().remove()
# Combine legends from all axes in the top left
lines1, labels1 = ax1.get_legend_handles_labels() lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels() lines2, labels2 = ax2.get_legend_handles_labels()
lines3, labels3 = ax3.get_legend_handles_labels() lines3, labels3 = ax3.get_legend_handles_labels()
@@ -511,129 +584,41 @@ class GraphGenerator:
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_body_fat_percentage_chart(
self,
gender: str,
age: int,
body_fat_percentage: float,
save_as_base64: bool = False,
) -> str:
"""Generate body fat percentage chart with ranges"""
# Define the segments with muted colors
segments = [
("#F8A8A8", 0, 15), # Muted Red/Salmon: 0% to 15%
("#FFEECC", 15, 5), # Pale Yellow/Cream: 15% to 20%
("#D0F0C0", 20, 15), # Pale Green/Mint: 20% to 35%
("#FFEECC", 35, 5), # Pale Yellow/Cream: 35% to 40%
("#F8A8A8", 40, 10), # Muted Red/Salmon: 40% to 50%
]
# Determine age group
if 20 <= age <= 39:
age_group = "20-39"
elif 40 <= age <= 59:
age_group = "40-59"
elif 60 <= age <= 79:
age_group = "60-79"
else:
age_group = "N/A"
demographic = f"{age_group}\n({gender[0].upper()})"
fig, ax = plt.subplots(figsize=(10, 2))
# Create the Segmented Bar
for color, start, length in segments:
ax.barh(
y=0,
width=length,
left=start,
height=1,
color=color,
edgecolor="black",
linewidth=0.5,
)
# Add the Indicator (Triangle)
ax.plot(
body_fat_percentage,
1.05,
marker="v",
color="black",
markersize=10,
clip_on=False,
transform=ax.get_xaxis_transform(),
)
# Set Axis Properties and Labels
ax.set_xlim(0, 50)
ax.set_xticks(range(0, 51, 5))
ax.set_yticks([])
ax.text(
-0.05,
0,
demographic,
transform=ax.get_yaxis_transform(),
va="center",
ha="right",
fontsize=12,
)
ax.set_xlim(0, 50)
ticks = range(0, 51, 5)
ax.set_xticks(ticks)
labels = [f"{t}%" for t in ticks]
ax.set_xticklabels(labels)
# Clean up spines and add small ticks
ax.spines["right"].set_visible(False)
ax.spines["top"].set_visible(False)
ax.spines["left"].set_visible(False)
ax.spines["bottom"].set_visible(True)
for x in range(0, 51, 5):
ax.plot(
[x, x],
[-0.05, -0.01],
color="black",
transform=ax.get_xaxis_transform(),
clip_on=False,
)
plt.tight_layout()
chart_path = self.charts_dir / "body_fat_percentage_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=300)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_body_composition_chart( def generate_body_composition_chart(
self, fat_mass_lbs: float, lean_mass_lbs: float, save_as_base64: bool = False self, fat_mass_lbs: float, lean_mass_lbs: float, save_as_base64: bool = True
) -> str: ) -> str:
"""Generate donut chart for body composition""" """
Generate body composition donut chart.
Args:
fat_mass_lbs: Fat mass in pounds
lean_mass_lbs: Lean mass in pounds
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
# Calculate percentages # Calculate percentages
total_weight = fat_mass_lbs + lean_mass_lbs total_weight = fat_mass_lbs + lean_mass_lbs
fat_percentage = (fat_mass_lbs / total_weight) * 100 fat_percentage = (fat_mass_lbs / total_weight) * 100
lean_percentage = (lean_mass_lbs / total_weight) * 100 lean_percentage = (lean_mass_lbs / total_weight) * 100
# Data for the chart
sizes = [fat_percentage, lean_percentage] sizes = [fat_percentage, lean_percentage]
colors = ["#fde3ac", "#ff9966"] # Light yellow/tan and orange colors = ["#fde3ac", "#ff9966"]
plt.figure(figsize=(8, 8)) plt.figure(figsize=(8, 8))
# Create the donut chart without labels first # Create donut chart
wedges, texts, autotexts = plt.pie( plt.pie(
sizes, sizes,
autopct="", # Remove auto percentages autopct="",
startangle=90, startangle=90,
wedgeprops=dict(width=0.5, edgecolor="w"), wedgeprops=dict(width=0.5, edgecolor="w"),
colors=colors, colors=colors,
labels=["", ""], labels=["", ""],
) # Remove default labels )
# Add custom text annotations positioned manually # Add custom text annotations
plt.text( plt.text(
-1, -1,
1, 1,
@@ -656,8 +641,7 @@ class GraphGenerator:
bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.8), bbox=dict(boxstyle="round,pad=0.3", facecolor="white", alpha=0.8),
) )
# Set the title plt.axis("equal")
plt.axis("equal") # Equal aspect ratio ensures that pie is drawn as a circle
chart_path = self.charts_dir / "body_composition_chart.png" chart_path = self.charts_dir / "body_composition_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=600) plt.savefig(chart_path, bbox_inches="tight", dpi=600)
@@ -665,16 +649,142 @@ class GraphGenerator:
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_spirometry_chart( def generate_body_fat_percent_chart(
self, spirometry_df: pd.DataFrame, save_as_base64: bool = False self,
fat_percentage: float,
age: int,
gender: str,
save_as_base64: bool = True,
) -> str: ) -> str:
"""Generate spirometry chart with Z-scores and ranges""" """
Generate body fat percentage chart.
Args:
fat_percentage: Body fat percentage
age: Patient age
gender: Patient gender ('male' or 'female')
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
# Determine age group
if 20 <= age <= 39:
age_group = "20-39"
elif 40 <= age <= 59:
age_group = "40-59"
elif 60 <= age <= 79:
age_group = "60-79"
else:
age_group = "20-39" # Default
demographic = f"{age_group}\n({gender[0].upper()})"
# Define segments based on gender (female example)
if gender.lower() == "female":
segments = [
("#F8A8A8", 0, 15), # Muted Red: 0% to 15%
("#FFEECC", 15, 5), # Pale Yellow: 15% to 20%
("#D0F0C0", 20, 15), # Pale Green: 20% to 35%
("#FFEECC", 35, 5), # Pale Yellow: 35% to 40%
("#F8A8A8", 40, 10), # Muted Red: 40% to 50%
]
else: # male
segments = [
("#F8A8A8", 0, 5), # Muted Red: 0% to 5%
("#FFEECC", 5, 5), # Pale Yellow: 5% to 10%
("#D0F0C0", 10, 10), # Pale Green: 10% to 20%
("#FFEECC", 20, 5), # Pale Yellow: 20% to 25%
("#F8A8A8", 25, 25), # Muted Red: 25% to 50%
]
fig, ax = plt.subplots(figsize=(10, 2))
# Create the segmented bar
for color, start, length in segments:
ax.barh(
y=0,
width=length,
left=start,
height=1,
color=color,
edgecolor="black",
linewidth=0.5,
)
# Add the indicator (triangle)
ax.plot(
fat_percentage,
1.05,
marker="v",
color="black",
markersize=10,
clip_on=False,
transform=ax.get_xaxis_transform(),
)
# Set axis properties
ax.set_xlim(0, 50)
ax.set_xticks(range(0, 51, 5))
ax.set_yticks([])
ax.text(
-0.05,
0,
demographic,
transform=ax.get_yaxis_transform(),
va="center",
ha="right",
fontsize=12,
)
ticks = range(0, 51, 5)
ax.set_xticks(ticks)
labels = [f"{t}%" for t in ticks]
ax.set_xticklabels(labels)
# Clean up spines
ax.spines["right"].set_visible(False)
ax.spines["top"].set_visible(False)
ax.spines["left"].set_visible(False)
ax.spines["bottom"].set_visible(True)
# Add tick marks
for x in range(0, 51, 5):
ax.plot(
[x, x],
[-0.05, -0.01],
color="black",
transform=ax.get_xaxis_transform(),
clip_on=False,
)
plt.tight_layout()
chart_path = self.charts_dir / "body_fat_percent_chart.png"
plt.savefig(chart_path, bbox_inches="tight", dpi=300)
plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_spirometry_chart(
self, spirometry_df: pd.DataFrame, save_as_base64: bool = True
) -> str:
"""
Generate spirometry chart with Z-scores.
Args:
spirometry_df: Spirometry DataFrame with parameters
save_as_base64: If True, return base64 string, else return file path
Returns:
Base64 string or file path
"""
# Coerce numeric columns # Coerce numeric columns
for col in ["Best", "LLN", "Pred.", "%Pred.", "ZScore"]: for col in ["Best", "LLN", "Pred.", "%Pred.", "ZScore"]:
if col in spirometry_df.columns: if col in spirometry_df.columns:
spirometry_df[col] = pd.to_numeric(spirometry_df[col], errors="coerce") spirometry_df[col] = pd.to_numeric(spirometry_df[col], errors="coerce")
# Select rows of interest and prepare display values # Select rows of interest
rows_map = { rows_map = {
"Lung Volume": "FVC", "Lung Volume": "FVC",
"Lung Power": "FEV1", "Lung Power": "FEV1",
@@ -707,7 +817,7 @@ class GraphGenerator:
) )
x_min, x_max = -5, 3 x_min, x_max = -5, 3
# Segment colors: red -> orange -> yellow -> green # Segment colors
segments = [ segments = [
(-5, -4, "#f4a7a7"), # red-ish (-5, -4, "#f4a7a7"), # red-ish
(-4, -3, "#f7c49a"), # orange-ish (-4, -3, "#f7c49a"), # orange-ish
@@ -726,10 +836,10 @@ class GraphGenerator:
0, width=b - a, left=a, height=0.6, color=color, edgecolor="none" 0, width=b - a, left=a, height=0.6, color=color, edgecolor="none"
) )
# LLN (-1) and Predicted (0) markers # LLN and Predicted markers
ax.axvline(0, color="black", lw=1) ax.axvline(0, color="black", lw=1)
# Z-score pointer (downward triangle) at top of each panel # Z-score pointer
if pd.notna(rec["z"]): if pd.notna(rec["z"]):
trans = mtransforms.blended_transform_factory( trans = mtransforms.blended_transform_factory(
ax.transData, ax.transAxes ax.transData, ax.transAxes
@@ -744,7 +854,7 @@ class GraphGenerator:
clip_on=False, clip_on=False,
) )
# Labels, ticks, and styling # Labels and styling
ax.set_title( ax.set_title(
rec["label"], loc="left", fontsize=11, fontweight="bold", pad=2 rec["label"], loc="left", fontsize=11, fontweight="bold", pad=2
) )
@@ -760,15 +870,11 @@ class GraphGenerator:
# Right-side summary boxes # Right-side summary boxes
fig.subplots_adjust(right=0.78) fig.subplots_adjust(right=0.78)
box_ax = fig.add_axes( box_ax = fig.add_axes([0.805, 0.06, 0.18, 0.90])
[0.805, 0.06, 0.18, 0.90]
) # [left, bottom, width, height]
box_ax.axis("off") box_ax.axis("off")
# Helper to draw a pill-shaped text box
def pill(ax, xy, text): def pill(ax, xy, text):
x, y = xy x, y = xy
# Draw rounded rectangle background
bbox = FancyBboxPatch( bbox = FancyBboxPatch(
(x - 0.48, y - 0.09), (x - 0.48, y - 0.09),
0.96, 0.96,
@@ -801,7 +907,7 @@ class GraphGenerator:
box_ax.set_xlim(0, 1) box_ax.set_xlim(0, 1)
box_ax.set_ylim(0, 1) box_ax.set_ylim(0, 1)
# Prepare display strings and positions (top to bottom) # Prepare display strings
right_items = [] right_items = []
for rec in records: for rec in records:
name = ( name = (
@@ -814,7 +920,7 @@ class GraphGenerator:
pct_fmt = f"{rec['pct']:.1f}%" pct_fmt = f"{rec['pct']:.1f}%"
right_items.append((name, value_fmt, pct_fmt)) right_items.append((name, value_fmt, pct_fmt))
# Sort to match image order on the right (FVC, FEV1, FEV1/FVC) # Sort to match order
order = ["FVC", "FEV1", "FEV1/FVC"] order = ["FVC", "FEV1", "FEV1/FVC"]
right_items_sorted = [ right_items_sorted = [
next(item for item in right_items if item[0] == k) for k in order next(item for item in right_items if item[0] == k) for k in order
@@ -830,113 +936,3 @@ class GraphGenerator:
plt.close() plt.close()
return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path) return self._image_to_base64(chart_path) if save_as_base64 else str(chart_path)
def generate_all_charts(
self,
pnoe_df: pd.DataFrame,
spirometry_df: pd.DataFrame,
patient_data: Dict,
save_as_base64: bool = False,
) -> Dict[str, str]:
"""Generate all charts at once and return dictionary of paths/base64 strings"""
charts = {}
# Generate physiological charts
charts["respiratory"] = self.generate_respiratory_chart(pnoe_df, save_as_base64)
charts["fuel_utilization_chart"] = self.generate_fuel_utilization_chart(
pnoe_df, save_as_base64
)
charts["vo2_pulse_chart"] = self.generate_vo2_pulse_chart(
pnoe_df, save_as_base64
)
charts["vo2_breath_chart"] = self.generate_vo2_breath_chart(
pnoe_df, save_as_base64
)
charts["fat_metabolism_chart"] = self.generate_fat_metabolism_chart(
pnoe_df, save_as_base64
)
charts["recovery_chart"] = self.generate_recovery_chart(pnoe_df, save_as_base64)
# Generate body composition charts
if (
"gender" in patient_data
and "age" in patient_data
and "fat_percentage" in patient_data
):
charts["body_fat_percentage_chart"] = (
self.generate_body_fat_percentage_chart(
patient_data["gender"],
patient_data["age"],
patient_data["fat_percentage"],
save_as_base64,
)
)
if "fat_mass_lbs" in patient_data and "lean_mass_lbs" in patient_data:
charts["body_composition_chart"] = self.generate_body_composition_chart(
patient_data["fat_mass_lbs"],
patient_data["lean_mass_lbs"],
save_as_base64,
)
# Generate spirometry chart
charts["spirometry_chart"] = self.generate_spirometry_chart(
spirometry_df, save_as_base64
)
return charts
# Example usage
if __name__ == "__main__":
# Initialize graph generator
generator = GraphGenerator()
# Load sample data (you would pass your actual dataframes)
pnoe_df = pd.read_csv("data/Pnoe_20250729_1550-Moran_Keirstyn.csv", delimiter=";")
spirometry_df = pd.read_csv("data/spirometry_data.csv")
# Preprocess pnoe data (same as in your notebook)
pnoe_df = pnoe_df.apply(pd.to_numeric, errors="ignore")
pnoe_df["VO2 Pulse"] = pnoe_df["VO2(ml/min)"] / pnoe_df["HR(bpm)"]
pnoe_df["VO2 Breath"] = pnoe_df["VO2(ml/min)"] / pnoe_df["BF(bpm)"]
pnoe_df["CHO"] = pnoe_df["EE(kcal/min)"] * pnoe_df["CARBS(%)"] / 100
pnoe_df["FAT"] = pnoe_df["EE(kcal/min)"] * pnoe_df["FAT(%)"] / 100
# Apply smoothing
window_size = 10
columns_to_smooth = [
"VO2(ml/min)",
"VCO2(ml/min)",
"HR(bpm)",
"VT(l)",
"BF(bpm)",
"VE(l/min)",
"VO2 Pulse",
"VO2 Breath",
"CHO",
"FAT",
]
for col in columns_to_smooth:
if col in pnoe_df.columns:
pnoe_df[f"{col}_smoothed"] = (
pnoe_df[col].rolling(window=window_size, min_periods=1).mean()
)
# Patient data
patient_data = {
"gender": "female",
"age": 25,
"fat_percentage": 22.4,
"fat_mass_lbs": 27.6,
"lean_mass_lbs": 95.4,
}
# Generate all charts
charts = generator.generate_all_charts(
pnoe_df, spirometry_df, patient_data, save_as_base64=True
)
print(f"Generated {len(charts)} charts:")
for chart_name in charts.keys():
print(f"- {chart_name}")
+416
View File
@@ -0,0 +1,416 @@
"""
Report Generator Service
This service handles the generation of medical reports from uploaded files.
It processes data, generates graphs, and creates PDF reports.
"""
from pathlib import Path
from typing import Any, Dict, List
import pandas as pd
from jinja2 import Environment, FileSystemLoader
from playwright.async_api import async_playwright
from services.context_generator import ContextGenerator
from services.graph_generator import GraphGenerator
from services.spirometry_table_extractor import extract_spirometry_table_from_pdf
class ReportGeneratorService:
"""Service for generating medical performance reports"""
def __init__(
self,
template_dir: str = "app/report_gen",
graphs_dir: str = "graphs",
reports_dir: str = "reports",
data_dir: str = "data",
):
"""
Initialize the report generator service.
Args:
template_dir: Directory containing Jinja2 templates
graphs_dir: Directory to save generated graphs
reports_dir: Directory to save generated reports
data_dir: Directory to store extracted/processed data
"""
self.template_dir = template_dir
self.graphs_dir = Path(graphs_dir)
self.reports_dir = Path(reports_dir)
self.data_dir = Path(data_dir)
self.graph_generator = GraphGenerator(charts_dir=str(self.graphs_dir))
self.context_generator = ContextGenerator()
self.env = Environment(loader=FileSystemLoader(template_dir))
# Ensure directories exist
self.graphs_dir.mkdir(exist_ok=True)
self.reports_dir.mkdir(exist_ok=True)
self.data_dir.mkdir(exist_ok=True)
def process_pnoe_data(self, pnoe_csv_path: str) -> pd.DataFrame:
"""
Load and process Pnoe CSV data.
Args:
pnoe_csv_path: Path to Pnoe CSV file
Returns:
Processed DataFrame with smoothed columns
"""
# Load data
df = pd.read_csv(pnoe_csv_path, delimiter=";")
# Convert numeric columns (updated approach)
for col in df.columns:
try:
df[col] = pd.to_numeric(df[col])
except (ValueError, TypeError):
pass # Keep as-is if not numeric
# Calculate derived columns
df["VO2 Pulse"] = df["VO2(ml/min)"] / df["HR(bpm)"]
df["VO2 Breath"] = df["VO2(ml/min)"] / df["BF(bpm)"]
df["CHO"] = df["EE(kcal/min)"] * df["CARBS(%)"] / 100
df["FAT"] = df["EE(kcal/min)"] * df["FAT(%)"] / 100
# Smooth columns
window_size = 10
columns_to_smooth = [
"VO2(ml/min)",
"VCO2(ml/min)",
"HR(bpm)",
"VT(l)",
"BF(bpm)",
"VE(l/min)",
"VO2 Pulse",
"VO2 Breath",
"CHO",
"FAT",
]
for col in columns_to_smooth:
if col in df.columns:
df[f"{col}_smoothed"] = (
df[col].rolling(window=window_size, min_periods=1).mean()
)
return df
def generate_graphs(self, df: pd.DataFrame) -> List[Dict[str, str]]:
"""
Generate all required graphs from processed data.
Args:
df: Processed DataFrame with smoothed columns
Returns:
List of dictionaries containing graph names and paths
"""
graphs_generated = []
# List of graphs to generate
graph_methods = [
("respiratory", self.graph_generator.generate_respiratory_chart),
("fuel_utilization", self.graph_generator.generate_fuel_utilization_chart),
("vo2_pulse", self.graph_generator.generate_vo2_pulse_chart),
("vo2_breath", self.graph_generator.generate_vo2_breath_chart),
("fat_metabolism", self.graph_generator.generate_fat_metabolism_chart),
("recovery", self.graph_generator.generate_recovery_chart),
]
for name, method in graph_methods:
try:
path = method(df, save_as_base64=False)
graphs_generated.append({"name": name, "path": str(path)})
except Exception as e:
print(f"Warning: Could not generate {name} chart: {e}")
return graphs_generated
def calculate_analysis_metrics(self, df: pd.DataFrame) -> Dict[str, Any]:
"""
Calculate basic analysis metrics from processed data.
Args:
df: Processed DataFrame with smoothed columns
Returns:
Dictionary containing analysis metrics
"""
return {
"vo2_max": float(df["VO2(ml/min)_smoothed"].max())
if "VO2(ml/min)_smoothed" in df.columns
else 0,
"peak_vt": float(df["VT(l)_smoothed"].max())
if "VT(l)_smoothed" in df.columns
else 0,
"max_hr": float(df["HR(bpm)_smoothed"].max())
if "HR(bpm)_smoothed" in df.columns
else 0,
}
def generate_html(
self, patient_info: Dict[str, Any], context_list: List[Dict[str, Any]]
) -> str:
"""
Generate HTML content for the report.
Args:
patient_info: Dictionary containing patient information
(patient_name, age, height, weight, focus)
context_list: List of context dictionaries for each page
Returns:
Complete HTML document as string
"""
html_pages = []
# Header context
header_context = {
"patient_name": patient_info.get("patient_name", ""),
"age": patient_info.get("age", ""),
"height": patient_info.get("height", ""),
"weight": patient_info.get("weight", ""),
"focus": patient_info.get("focus", "Endurance"),
}
# Footer context
footer_context = [
{
"contact_email": "info@ishplabs.com",
"website": "www.ishplabs.com",
"social": "@ishplabs",
"page_number": i + 1,
}
for i in range(len(context_list))
]
# Render header
header_html = self.env.get_template("header.html").render(header_context)
# Render footers
footer_html_list = [
self.env.get_template("footer.html").render(context)
for context in footer_context
]
# Render pages
for i, context in enumerate(context_list):
template = self.env.get_template(f"page_{i + 1}.html").render(context)
if (i + 1) > 2:
full_html = f"""
<div class="page flex flex-col justify-between">
<div>
{header_html}
</div>
<main class="flex-grow p-4">
{template}
</main>
<div class="border-t text-center text-sm text-gray-600">
{footer_html_list[i]}
</div>
</div>
"""
html_pages.append(full_html)
else:
html_pages.append(template)
# Combine with page breaks
final_html = "<div class='page-break'></div>".join(html_pages)
# Wrap in full HTML document
html_doc = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<link href="https://cdn.jsdelivr.net/npm/tailwindcss/dist/tailwind.min.css" rel="stylesheet">
<style>
html, body {{
height: 100%;
margin: 0;
padding: 0;
}}
.page-break {{ page-break-after: always; }}
.page {{
height: 100vh;
min-height: 100vh;
display: flex;
flex-direction: column;
}}
.page main {{
flex: 1;
overflow: hidden;
}}
* {{
margin: 0;
padding: 0;
box-sizing: border-box;
}}
img {{
max-height: 300px;
}}
.chart-large {{
max-height: 500px !important;
}}
</style>
</head>
<body class="m-0 p-0">
{final_html}
</body>
</html>
"""
return html_doc
async def html_to_pdf(self, html_content: str, pdf_path: str) -> None:
"""
Convert HTML content to PDF file.
Args:
html_content: HTML content as string
pdf_path: Path where PDF should be saved
"""
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
await page.set_content(html_content)
await page.pdf(path=pdf_path, format="A4", print_background=True)
await browser.close()
async def generate_report(
self,
spirometry_pdf_path: str,
pnoe_csv_path: str,
seca_excel_path: str,
patient_info: Dict[str, Any],
output_filename: str = None,
) -> Dict[str, Any]:
"""
Generate complete medical report from uploaded files.
This follows the complete workflow:
1. Extract spirometry data from PDF
2. Store all data in data directory
3. Generate all graphs
4. Generate context for each page
5. Generate final HTML and PDF report
Args:
spirometry_pdf_path: Path to Spirometry PDF file
pnoe_csv_path: Path to Pnoe CSV file
seca_excel_path: Path to SECA Excel file
patient_info: Dictionary containing patient information
output_filename: Optional custom output filename
Returns:
Dictionary containing report path, graphs generated, and analysis data
"""
# Step 1: Extract spirometry table from PDF
print("Step 1: Extracting spirometry data from PDF...")
spirometry_csv_path = extract_spirometry_table_from_pdf(
spirometry_pdf_path, output_dir=str(self.data_dir)
)
print(f"Spirometry data saved to: {spirometry_csv_path}")
# Step 2: Process Pnoe data
print("Step 2: Processing Pnoe data...")
df = self.process_pnoe_data(pnoe_csv_path)
# Step 3: Generate all graphs
print("Step 3: Generating graphs...")
graphs_generated = self.generate_graphs(df)
# Create graph dictionary with base64 encoded images
graphs_dict = {}
for graph in graphs_generated:
# Read the graph file and convert to base64
graph_path = Path(graph["path"])
if graph_path.exists():
import base64
with open(graph_path, "rb") as f:
graphs_dict[graph["name"]] = base64.b64encode(f.read()).decode(
"utf-8"
)
# Also generate body composition charts
# Extract patient data for these charts
patient_name = patient_info.get("patient_name", "").split()[-1] # Get last name
# Load SECA data to get body composition info
seca_df = pd.read_excel(seca_excel_path)
patient_data = seca_df[
seca_df["LastName"].str.contains(patient_name, case=False, na=False)
]
if not patient_data.empty:
row = patient_data.iloc[0]
weight_kg = float(row.get("Weight", 0))
fat_pct = float(row.get("Adult_FMP", 0))
age = int(row.get("Age", patient_info.get("age", 25)))
gender = row.get("Gender", "female").lower()
fat_mass_lbs = weight_kg * fat_pct / 100 * 2.20462
lean_mass_lbs = weight_kg * (1 - fat_pct / 100) * 2.20462
# Generate body composition chart
body_comp_b64 = self.graph_generator.generate_body_composition_chart(
fat_mass_lbs, lean_mass_lbs, save_as_base64=True
)
graphs_dict["body_composition"] = body_comp_b64
# Generate body fat percent chart
body_fat_b64 = self.graph_generator.generate_body_fat_percent_chart(
fat_pct, age, gender, save_as_base64=True
)
graphs_dict["body_fat_percent"] = body_fat_b64
# Generate spirometry chart
print("Step 4: Generating spirometry chart...")
try:
spirometry_df = pd.read_csv(spirometry_csv_path)
print(f"Spirometry data loaded: {len(spirometry_df)} rows")
spirometry_chart_b64 = self.graph_generator.generate_spirometry_chart(
spirometry_df, save_as_base64=True
)
graphs_dict["spirometry_chart"] = spirometry_chart_b64
except Exception as e:
print(f"Warning: Could not generate spirometry chart: {e}")
graphs_dict["spirometry_chart"] = ""
# Step 5: Generate context for all pages
print("Step 5: Generating page contexts...")
self.context_generator.load_data(
pnoe_csv_path, str(spirometry_csv_path), seca_excel_path
)
context_list = self.context_generator.generate_all_contexts(
patient_name, graphs_dict
)
# Step 5: Calculate analysis metrics
analysis_data = self.calculate_analysis_metrics(df)
analysis_data["graphs_count"] = len(graphs_generated)
# Step 6: Generate HTML
html_content = self.generate_html(patient_info, context_list)
# Step 7: Generate PDF
if output_filename is None:
patient_name_full = patient_info.get("patient_name", "Unknown")
session_id = patient_info.get("session_id", "default")
output_filename = (
f"report_{patient_name_full.replace(' ', '_')}_{session_id}.pdf"
)
report_path = self.reports_dir / output_filename
print(f"Generating PDF report at {report_path}")
await self.html_to_pdf(html_content, str(report_path))
return {
"report_path": str(report_path),
"graphs_generated": graphs_generated,
"analysis_data": analysis_data,
}
+139
View File
@@ -0,0 +1,139 @@
import base64
import os
import requests
from dotenv import load_dotenv
load_dotenv()
API_KEY_REF = os.getenv("OPENROUTER_API_KEY")
def encode_pdf_to_base64(pdf_path):
with open(pdf_path, "rb") as pdf_file:
return base64.b64encode(pdf_file.read()).decode("utf-8")
def extract_spirometry_table_from_pdf(pdf_path, output_dir="data"):
"""
Extract spirometry table from PDF using AI and save as clean CSV.
Args:
pdf_path: Path to the spirometry PDF file
output_dir: Directory to save the extracted CSV
Returns:
Path to the saved CSV file
"""
import csv
import re
from pathlib import Path
url = "https://openrouter.ai/api/v1/chat/completions"
headers = {
"Authorization": f"Bearer {API_KEY_REF}",
"Content-Type": "application/json",
}
# Read and encode the PDF
base64_pdf = encode_pdf_to_base64(pdf_path)
data_url = f"data:application/pdf;base64,{base64_pdf}"
messages = [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Please extract the Spirometry table from the pdf and return ONLY the values in CSV format. "
"The CSV should have these columns: Parameters,Pre,Best,LLN,Pred.,%Pred.,ZScore\n"
"Rules:\n"
"1. Include ONLY the data rows (FVC, FEV1, FEV1/FVC%, etc.)\n"
"2. Do NOT include units in the data (units are part of parameter name)\n"
"3. Use empty string for missing values (not '-' or 'N/A')\n"
"4. Do NOT add 'csv' markers or code blocks\n"
"5. First line should be the header\n"
"Example format:\n"
"Parameters,Pre,Best,LLN,Pred.,%Pred.,ZScore\n"
"FVC,4.50,4.75,3.20,4.80,99,-0.10",
},
{
"type": "file",
"file": {"filename": "document.pdf", "file_data": data_url},
},
],
}
]
payload = {
"model": "google/gemini-2.5-flash-lite",
"messages": messages,
}
response = requests.post(url, headers=headers, json=payload)
response_data = response.json()
if "choices" in response_data and len(response_data["choices"]) > 0:
content = response_data["choices"][0]["message"]["content"]
# Clean the content - remove markdown code blocks if present
content = re.sub(r"```csv\n?", "", content)
content = re.sub(r"```\n?", "", content)
content = content.strip()
# Parse and validate CSV
lines = content.split("\n")
if not lines:
raise ValueError("No data extracted from PDF")
# Ensure output directory exists
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
output_file = output_path / "extracted_spirometry_table.csv"
# Write cleaned CSV with proper formatting
with open(output_file, "w", encoding="utf-8", newline="") as f:
# Parse the first line as header
header_line = lines[0].strip()
if "," in header_line:
header = [col.strip() for col in header_line.split(",")]
else:
# Default header if not provided
header = [
"Parameters",
"Pre",
"Best",
"LLN",
"Pred.",
"%Pred.",
"ZScore",
]
writer = csv.writer(f)
writer.writerow(header)
# Process data rows
for line in lines[1:]:
line = line.strip()
if not line:
continue
# Split by comma and clean each field
fields = [field.strip() for field in line.split(",")]
# Ensure we have the right number of fields
if len(fields) < len(header):
# Pad with empty strings
fields.extend([""] * (len(header) - len(fields)))
elif len(fields) > len(header):
# Take only the first N fields
fields = fields[: len(header)]
# Replace '-' or 'N/A' with empty string
fields = ["" if f in ["-", "N/A", "n/a", "NA"] else f for f in fields]
writer.writerow(fields)
return str(output_file)
else:
error_msg = response_data.get("error", {}).get("message", "Unknown error")
raise Exception(f"No content found in response: {error_msg}")
-343
View File
@@ -1,343 +0,0 @@
import base64
def image_to_base64(image_path):
try:
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
except FileNotFoundError:
print(f"Warning: Image not found at {image_path}")
return ""
### Defining Page Contexts ###
page_1_context = {
"name": "John Doe",
"surname": "Moran",
"date": "July 29, 2025",
}
page_2_context = {
"content": "This is page 2 content",
}
page_3_context = {
"patient_name": "Keirstyn Moran",
}
page_4_context = {
"body_composition_chart": image_to_base64(
"/home/oluwasanmi/Documents/Work/MKD/report_generation/graphs/body_composition_chart.png"
),
"body_fat_chart": image_to_base64(
"/home/oluwasanmi/Documents/Work/MKD/report_generation/graphs/body_fat_percent_chart.png"
),
}
page_5_context = {
"metabolism_chart": "",
"fuel_source_chart": "",
"resting_calories": 1540,
"neat_calories": 310,
"weight_loss_calories": 1725,
"weight_loss_rate": "1lb/week",
"total_calories": 3575,
}
page_6_context = {
"patient_name": "Keirstyn Moran",
"age": "34",
"height": "5'4\"",
"weight": "123lbs",
"focus": "Endurance",
"deficit_calories": "1725KCals",
"deficit_protein": "120g Protein",
"deficit_carbs": "155g Carbs",
"deficit_fat": "69g Fat",
"deficit_fiber": "25g Fibre",
"refeed_weekday_calories": "1615KCals",
"refeed_weekday_protein": "120g Protein",
"refeed_weekday_carbs": "142g Carbs",
"refeed_weekday_fat": "63g Fat",
"refeed_weekday_fiber": "24g Fibre",
"refeed_weekend_calories": "2000KCals",
"refeed_weekend_protein": "120g Protein",
"refeed_weekend_carbs": "190g Carbs",
"refeed_weekend_fat": "84g Fat",
"refeed_weekend_fiber": "30g Fibre",
"protein_percentage": "28%",
"carbs_percentage": "36%",
"fats_percentage": "36%",
"page_number": "6",
}
page_7_context = {
"indication": "No Respiratory Capacity Limitation",
"peak_vt": 3.2,
"peak_vt_bpm": 198,
"peak_vt_zone": 3,
"fev1_percentage": 85,
"lung_analysis_chart": image_to_base64("/home/oluwasanmi/Documents/Work/MKD/report_generation/graphs/spirometry_chart.png"),
"respiratory_analysis_chart": image_to_base64(
"/home/oluwasanmi/Documents/Work/MKD/report_generation/graphs/respiratory.png"
),
}
page_8_context = {
"vo2_max_value": "49.5",
"vo2_max_percentile": "100th percentile",
"age_range": "30-39",
"very_poor_range": "19.0-24.1",
"poor_range": "24.1-28.2",
"fair_range": "28.2-32.2",
"good_range": "32.2-35.7",
"excellent_range": "35.7-45.8",
"superior_range": "45.8+",
"zone1_percentage": "55-65% of Max Heart Rate",
"zone2_percentage": "65-75% of Max Heart Rate",
"zone3_percentage": "80-85% of Max Heart Rate",
"zone4_percentage": "85-88% of Max Heart Rate",
"zone5_percentage": "90% of Max Heart Rate",
"zone1_bpm": "81-96bpm",
"zone2_bpm": "96-100bpm",
"zone3_bpm": "100-178bpm",
"zone4_bpm": "178-188bpm",
"zone5_bpm": "188-198bpm",
"zone1_speed": "3.5mph",
"zone2_speed": "3.5-4.0mph",
"zone3_speed": "4.0-6.5mph",
"zone4_speed": "6.5-7.0mph",
"zone5_speed": "7.0-8.0mph",
"zone1_incline": "2% Incline",
"zone2_incline": "2% Incline",
"zone3_incline": "2% Incline",
"zone4_incline": "2% Incline",
"zone5_incline": "2% Incline",
"zone1_pace": "10:39min/km Pace",
"zone2_pace": "10:39-9:19min/km Pace",
"zone3_pace": "9:19-5:44min/km Pace",
"zone4_pace": "5:44-5:20min/km Pace",
"zone5_pace": "5:20-4:40min/km Pace",
"zone1_calories": "4.4kcals/minute",
"zone2_calories": "5.9kcals/minute",
"zone3_calories": "9.4kcals/minute",
"zone4_calories": "12.5kcals/minute",
"zone5_calories": "12.8kcals/minute",
"zone1_carb": "Avg: 0.4g/min Carb Utilization",
"zone2_carb": "Avg: 0.6g/min Carb Utilization",
"zone3_carb": "Avg: 1.9g/min Carb Utilization",
"zone4_carb": "Avg: 2.9g/min Carb Utilization",
"zone5_carb": "Avg: 3.1g/min Carb Utilization",
"zone1_breaths": "Avg: 27 breaths",
"zone2_breaths": "Avg: 28 breaths",
"zone3_breaths": "Avg: 31 breaths",
"zone4_breaths": "Avg: 42 breaths",
"zone5_breaths": "Avg: 51 breaths",
"zone1_breath_range": "Ideal Range: 15-20 breaths",
"zone2_breath_range": "Ideal Range: 20-25 breaths",
"zone3_breath_range": "Ideal Range: 25-30 breaths",
"zone4_breath_range": "Ideal Range: 30-35 breaths",
"zone5_breath_range": "Ideal Range: 40+ breaths",
}
page_9_context = {
"fuel_utilization_chart": image_to_base64(
"/home/oluwasanmi/Documents/Work/MKD/report_generation/graphs/fuel_utilization_chart.png"
),
}
page_10_context = {
"vo2_pulse_drop_bpm": "180 bpm",
"vo2_pulse_drop_zone": "Zone 4",
"vo2_pulse_chart": image_to_base64(
"/home/oluwasanmi/Documents/Work/MKD/report_generation/graphs/vo2_pulse_chart.png"
),
"vo2_breath_drop_bpm": "173 bpm",
"vo2_breath_drop_zone": "Zone 3",
"vo2_breath_chart": image_to_base64(
"/home/oluwasanmi/Documents/Work/MKD/report_generation/graphs/vo2_breath_chart.png"
),
}
page_11_context = {
"fat_max_optimal": "*Optimal 10-12Kcals/minute",
"fat_max_value": "3.8Kcals/min",
"fat_max_heart_rate": "49% of Max Heart Rate",
"fat_max_bpm": "97 bpm",
"crossover_bpm": "100bpm",
"crossover_heart_rate": "51% of Max Heart Rate",
"fat_metabolism_note": "100bpm at a speed of 4.0mph and incline of 2%",
"fat_metabolism_chart": image_to_base64(
"/home/oluwasanmi/Documents/Work/MKD/report_generation/graphs/fat_metabolism_chart.png"
),
"cardiac_recovery_time": "(1 minute)",
"cardiac_recovery_percentage": "33%",
"metabolic_recovery_time": "(2 minute)",
"metabolic_recovery_percentage": "65%",
"breath_recovery_time": "(2.5 minute)",
"breath_recovery_percentage": "76%",
"recovery_chart": image_to_base64(
"/home/oluwasanmi/Documents/Work/MKD/report_generation/graphs/recovery_chart.png"
),
"resting_heart_rate": "53bpm",
"hr_age_range": "26-35",
"hr_poor": "82bpm +",
"hr_below_avg": "75-81bpm",
"hr_average": "71-74bpm",
"hr_above_avg": "66-70bpm",
"hr_good": "62-65bpm",
"hr_excellent": "55-61bpm",
"hr_athlete": "44-54bpm",
}
page_12_context = {
}
page_13_context = {
"patient_name": "Keirstyn Moran",
"age": "34",
"height": "5'4\"",
"weight": "123lbs",
"focus": "Endurance",
"zone2_frequency": "3-4x/week",
"zone2_duration": "40+ minutes",
"zone2_hr_range": "96-110bpm",
"zone2_speed": "3.5-4.0mph",
"zone2_incline": "2% Incline",
"zone3_frequency": "1-2x/week",
"zone3_duration": "10-20 minutes",
"zone3_hr_range": "100-178bpm",
"zone3_speed": "4.0-6.5mph",
"zone3_incline": "2% Incline",
"zone3_target_hr": "140bpm",
"zone3_recovery_speed": "3.5mph",
"zone3_recovery_incline": "2% Incline",
"zone1_hr_range": "81-96bpm",
"zone1_duration": "4-8 minutes",
"zone3_repeats": "2-3 times",
"short_sets": "8-10",
"short_duration": "10-30 seconds",
"short_zone": "5",
"short_rpe": "10",
"short_recovery": "20-60 seconds",
"medium_sets": "6-8",
"medium_duration": "30-90 seconds",
"medium_zone": "4",
"medium_rpe": "8-9",
"medium_recovery": "30-90 seconds",
"long_sets": "4-6",
"long_duration": "5-10 minutes",
"long_zone": "3/4",
"long_rpe": "7-8",
"long_recovery": "2.5-5 minutes",
"tempo_sets": "2-3",
"tempo_duration": "10-20 minutes",
"tempo_zone": "3",
"tempo_rpe": "6-7",
"tempo_recovery": "4-8 minutes",
"cardio_sets": "1",
"cardio_duration": ">40 minutes",
"cardio_zone": "2",
"cardio_rpe": "4-5",
"cardio_recovery": "N/A",
"week1_mon_zone": "Zone 2",
"week1_mon_duration": "45 mins",
"week1_tue_zone": "Zone 2",
"week1_tue_duration": "45 mins",
"week1_wed_zone": "Zone 3",
"week1_wed_duration1": "10mins On",
"week1_wed_duration2": "8mins Rest",
"week1_wed_sets": "x2",
"week1_thu_content": "",
"week1_fri_zone": "Zone 2",
"week1_fri_duration": "45 mins",
"week1_sat_content": "",
"week1_sun_content": "",
"week2_mon_zone": "Zone 2",
"week2_mon_duration": "50 mins",
"week2_tue_zone": "Zone 2",
"week2_tue_duration": "50 mins",
"week2_wed_zone": "Zone 3",
"week2_wed_duration1": "10mins On",
"week2_wed_duration2": "6mins Rest",
"week2_wed_sets": "x2",
"week2_thu_content": "",
"week2_fri_zone": "Zone 2",
"week2_fri_duration": "50 mins",
"week2_sat_content": "",
"week2_sun_content": "",
"contact_email": "info@ishplabs.com",
"website": "www.ishplabs.com",
"social": "@ishplabs",
"page_number": "13",
}
page_14_context = {
"patient_name": "Keirstyn Moran",
"contact_email": "info@ishplabs.com",
"website": "www.ishplabs.com",
"social": "@ishplabs",
"page_number": "14",
}
page_15_context = {
"patient_name": "Keirstyn Moran",
"contact_email": "info@ishplabs.com",
"website": "www.ishplabs.com",
"social": "@ishplabs",
"page_number": "15",
}
page_16_context = {
"patient_name": "Keirstyn Moran",
"contact_email": "info@ishplabs.com",
"website": "www.ishplabs.com",
"social": "@ishplabs",
"page_number": "16",
}
page_17_context = {
"patient_name": "Keirstyn Moran",
"contact_email": "info@ishplabs.com",
"website": "www.ishplabs.com",
"social": "@ishplabs",
"page_number": "17",
}
page_18_context = {
"body_fat_percentage_chart": image_to_base64(
"/home/oluwasanmi/Documents/Work/MKD/report_generation/graphs/fat_percent_master_chart.png"
),
}
page_19_context = {
"patient_name": "Keirstyn Moran",
"contact_email": "info@ishplabs.com",
"website": "www.ishplabs.com",
"social": "@ishplabs",
"page_number": "19",
}
context_list = [
page_1_context,
page_2_context,
page_3_context,
page_4_context,
page_5_context,
page_6_context,
page_7_context,
page_8_context,
page_9_context,
page_10_context,
page_11_context,
page_12_context,
page_13_context,
page_14_context,
page_15_context,
page_16_context,
page_17_context,
page_18_context,
page_19_context,
]
-319
View File
@@ -1,319 +0,0 @@
import base64
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import matplotlib.pyplot as plt
import pandas as pd
class ReportGenerator:
def __init__(self):
self.pnoe_df = None
self.patient_df = None
self.spirometry_df = None
self.seca_df = None
self.patient_info = {}
self.charts_dir = Path("graphs")
self.charts_dir.mkdir(exist_ok=True)
def load_data(
self,
pnoe_path: str,
patient_path: str,
spirometry_path: str,
seca_path: str = None,
):
"""Load all required datasets"""
self.pnoe_df = pd.read_csv(pnoe_path, delimiter=";")
self.patient_df = pd.read_csv(patient_path)
self.spirometry_df = pd.read_csv(spirometry_path)
if seca_path:
self.seca_df = pd.read_excel(seca_path)
# Apply preprocessing
self._preprocess_data()
def _preprocess_data(self):
"""Apply preprocessing steps from your notebook"""
# Convert to numeric
self.pnoe_df = self.pnoe_df.apply(pd.to_numeric, errors="ignore")
# Calculate derived columns
self.pnoe_df["VO2 Pulse"] = (
self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["HR(bpm)"]
)
self.pnoe_df["VO2 Breath"] = (
self.pnoe_df["VO2(ml/min)"] / self.pnoe_df["BF(bpm)"]
)
self.pnoe_df["CHO"] = (
self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["CARBS(%)"] / 100
)
self.pnoe_df["FAT"] = (
self.pnoe_df["EE(kcal/min)"] * self.pnoe_df["FAT(%)"] / 100
)
# Apply smoothing
window_size = 10
columns_to_smooth = [
"VO2(ml/min)",
"VCO2(ml/min)",
"HR(bpm)",
"VT(l)",
"BF(bpm)",
"VE(l/min)",
"VO2 Pulse",
"VO2 Breath",
"CHO",
"FAT",
]
for col in columns_to_smooth:
if col in self.pnoe_df.columns:
self.pnoe_df[f"{col}_smoothed"] = (
self.pnoe_df[col].rolling(window=window_size, min_periods=1).mean()
)
def extract_patient_info(self, last_name: str) -> Dict:
"""Extract patient information from datasets"""
if self.seca_df is not None:
patient_data = self.seca_df[
self.seca_df["LastName"].str.contains(last_name, case=False, na=False)
]
if not patient_data.empty:
row = patient_data.iloc[0]
self.patient_info = {
"name": f"{row.get('FirstName', '')} {last_name}",
"age": int(row.get("Age", 0)),
"height": f"{row.get('Height', '')}",
"weight": float(row.get("Weight", 0)),
"gender": row.get("Gender", "").lower(),
"fat_percentage": float(row.get("Adult_FMP", 0)),
}
return self.patient_info
def calculate_spirometry_metrics(self) -> Dict:
"""Calculate spirometry-related metrics"""
metrics = {}
# Extract key spirometry values
for param in ["FVC", "FEV1", "FEV1/FVC%"]:
row = self.spirometry_df.loc[self.spirometry_df["Parameters"] == param]
if not row.empty:
metrics[
f"{param.lower().replace('/', '_').replace('%', '_pct')}_best"
] = row["Best"].values[0]
metrics[
f"{param.lower().replace('/', '_').replace('%', '_pct')}_pred"
] = row["%Pred."].values[0]
return metrics
def calculate_pnoe_metrics(self) -> Dict:
"""Calculate all Pnoe-derived metrics"""
metrics = {}
# Basic metrics
metrics["vo2_max"] = self.pnoe_df["VO2(ml/min)_smoothed"].max()
metrics["vo2_max_per_kg"] = metrics["vo2_max"] / self.patient_info["weight"]
# Peak VT
peak_vt_idx = self.pnoe_df["VT(l)_smoothed"].idxmax()
peak_vt_row = self.pnoe_df.loc[peak_vt_idx]
metrics["peak_vt"] = peak_vt_row["VT(l)_smoothed"]
metrics["peak_vt_hr"] = peak_vt_row["HR(bpm)_smoothed"]
# Fat burning metrics
fat_max_idx = self.pnoe_df["FAT_smoothed"].idxmax()
fat_max_row = self.pnoe_df.loc[fat_max_idx]
metrics["fat_max_value"] = fat_max_row["FAT_smoothed"]
metrics["fat_max_hr"] = fat_max_row["HR(bpm)_smoothed"]
# Calculate zones (simplified from your logic)
metrics.update(self._calculate_hr_zones())
# VT1/VT2 detection
vt1, vt2 = self._detect_thresholds()
metrics["vt1"] = vt1
metrics["vt2"] = vt2
return metrics
def _detect_thresholds(self) -> Tuple[Optional[Dict], Optional[Dict]]:
"""Detect VT1 and VT2 thresholds"""
# VT1: First crossover where carbs > fat
condition = self.pnoe_df["CHO_smoothed"] > self.pnoe_df["FAT_smoothed"]
crossover_indices = condition[condition].index
vt1 = None
if len(crossover_indices) > 0:
vt1_idx = crossover_indices[0]
vt1_row = self.pnoe_df.loc[vt1_idx]
vt1 = {
"HeartRate": vt1_row["HR(bpm)_smoothed"],
"Speed": vt1_row["Speed"],
"Time": vt1_row["T(sec)"],
}
# VT2: Ventilation inflection (simplified)
ve_slope = self.pnoe_df["VE(l/min)_smoothed"].diff()
second_derivative = ve_slope.diff()
vt2_idx = second_derivative.idxmax()
vt2 = None
if pd.notna(vt2_idx):
vt2_row = self.pnoe_df.loc[vt2_idx]
vt2 = {
"HeartRate": vt2_row["HR(bpm)_smoothed"],
"Speed": vt2_row["Speed"],
"Time": vt2_row["T(sec)"],
}
return vt1, vt2
def _calculate_hr_zones(self) -> Dict:
"""Calculate heart rate zones"""
max_hr = 220 - self.patient_info["age"]
# Simplified zone calculation - you can make this more sophisticated
zones = {
"zone1_bpm": f"{int(max_hr * 0.55)}-{int(max_hr * 0.65)}bpm",
"zone2_bpm": f"{int(max_hr * 0.65)}-{int(max_hr * 0.75)}bpm",
"zone3_bpm": f"{int(max_hr * 0.75)}-{int(max_hr * 0.85)}bpm",
"zone4_bpm": f"{int(max_hr * 0.85)}-{int(max_hr * 0.95)}bpm",
"zone5_bpm": f"{int(max_hr * 0.95)}+bpm",
}
return zones
def generate_charts(self) -> Dict[str, str]:
"""Generate all charts and return base64 encoded versions"""
charts = {}
# Generate fuel utilization chart
charts["fuel_utilization_chart"] = self._create_fuel_chart()
# Generate VO2 pulse chart
charts["vo2_pulse_chart"] = self._create_vo2_pulse_chart()
# Generate body composition chart
charts["body_composition_chart"] = self._create_body_comp_chart()
# Add more chart generation methods...
return charts
def _create_fuel_chart(self) -> str:
"""Create and save fuel utilization chart"""
# Use your existing chart code but make it dynamic
speed_groups = self.pnoe_df.groupby("Speed").mean(numeric_only=True).round(1)
speed_groups = speed_groups.iloc[1:-1]
filtered_data = speed_groups[
(speed_groups.index >= 3.5) & (speed_groups.index <= 7.5)
]
plt.figure(figsize=(15, 8))
# ... your chart code here ...
chart_path = self.charts_dir / "fuel_utilization_chart.png"
plt.savefig(chart_path, dpi=300)
plt.close()
return self._image_to_base64(chart_path)
def _create_vo2_pulse_chart(self) -> str:
"""Create VO2 pulse chart"""
# Your VO2 pulse chart code here
chart_path = self.charts_dir / "vo2_pulse_chart.png"
# ... chart generation code ...
return self._image_to_base64(chart_path)
def _create_body_comp_chart(self) -> str:
"""Create body composition chart"""
# Your body composition chart code here
chart_path = self.charts_dir / "body_composition_chart.png"
# ... chart generation code ...
return self._image_to_base64(chart_path)
def _image_to_base64(self, image_path: Path) -> str:
"""Convert image to base64"""
try:
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
except FileNotFoundError:
return ""
def generate_all_contexts(self, last_name: str = "Moran") -> List[Dict]:
"""Main method to generate all page contexts"""
# Extract patient info
self.extract_patient_info(last_name)
# Calculate metrics
spirometry_metrics = self.calculate_spirometry_metrics()
pnoe_metrics = self.calculate_pnoe_metrics()
# Generate charts
charts = self.generate_charts()
# Build contexts for each page
contexts = []
# Page 1
contexts.append(
{
"name": self.patient_info["name"],
"surname": last_name,
"date": "July 29, 2025",
}
)
# Page 2-6 (add as needed)
for i in range(5):
contexts.append({})
# Page 7 - Spirometry
contexts.append(
{
"peak_vt": pnoe_metrics["peak_vt"],
"peak_vt_bpm": pnoe_metrics["peak_vt_hr"],
"fev1_percentage": (
pnoe_metrics["peak_vt"] / spirometry_metrics["fvc_best"]
)
* 100,
"lung_analysis_chart": charts.get("spirometry_chart", ""),
"respiratory_analysis_chart": charts.get("respiratory_chart", ""),
}
)
# Page 8 - VO2 Max and Zones
contexts.append(
{
"vo2_max_value": f"{pnoe_metrics['vo2_max_per_kg']:.1f}",
"age_range": f"{self.patient_info['age'] // 10 * 10}-{self.patient_info['age'] // 10 * 10 + 9}",
**pnoe_metrics, # Include all zone calculations
}
)
# Continue for all pages...
# Add remaining pages as needed
return contexts
# Usage for backend service
def generate_report(
pnoe_file, patient_file, spirometry_file, seca_file=None, patient_name="Moran"
):
"""Main function for backend service"""
generator = ReportGenerator()
generator.load_data(pnoe_file, patient_file, spirometry_file, seca_file)
return generator.generate_all_contexts(patient_name)
# Example usage
if __name__ == "__main__":
contexts = generate_report(
"data/Pnoe_20250729_1550-Moran_Keirstyn.csv",
"data/patient_data.csv",
"data/spirometry_data.csv",
"data/SECA body comp for all patients.xlsx",
)
print(f"Generated {len(contexts)} page contexts")
-12
View File
@@ -1,12 +0,0 @@
Parameters,Best,LLN,Pred.,%Pred.,ZScore,PRE#1,PRE#2,PRE#3
FVC,L,4.24,3.03,3.79,112.0,0.95,4.24,4.17,4.15
FEV1,L,3.26,2.53,3.16,103.3,0.28,3.26,3.21,3.14
FEV1/FVC%,76.89,72.47,83.78,91.8,-1.05,76.9,77.0,75.7
PEF,L/m,684,222,384,178.7,-,444,438,684
FEF2575,L/s,2.74,2.15,3.42,80.2,-0.84,2.74,2.68,2.48
FEF25,L/s,6.08,-,-,-,6.08,6.0,5.53
FEF50,L/s,3.06,-,-,-,3.06,3.1,2.77
FEF75,L/s,1.06,0.71,1.41,75.1,-0.72,1.06,1.12,0.94
PEFTime,ms,-,-,79,-,79,49,39
Evol,mL,-,-,78.0,-,78.0,77.0,197.0
FEV6,L,4.22,3.03,3.79,111.4,-,4.22,4.17,4.13
1 Parameters,Best,LLN,Pred.,%Pred.,ZScore,PRE#1,PRE#2,PRE#3
2 FVC,L,4.24,3.03,3.79,112.0,0.95,4.24,4.17,4.15
3 FEV1,L,3.26,2.53,3.16,103.3,0.28,3.26,3.21,3.14
4 FEV1/FVC%,76.89,72.47,83.78,91.8,-1.05,76.9,77.0,75.7
5 PEF,L/m,684,222,384,178.7,-,444,438,684
6 FEF2575,L/s,2.74,2.15,3.42,80.2,-0.84,2.74,2.68,2.48
7 FEF25,L/s,6.08,-,-,-,6.08,6.0,5.53
8 FEF50,L/s,3.06,-,-,-,3.06,3.1,2.77
9 FEF75,L/s,1.06,0.71,1.41,75.1,-0.72,1.06,1.12,0.94
10 PEFTime,ms,-,-,79,-,79,49,39
11 Evol,mL,-,-,78.0,-,78.0,77.0,197.0
12 FEV6,L,4.22,3.03,3.79,111.4,-,4.22,4.17,4.13
Binary file not shown.

Before

Width:  |  Height:  |  Size: 265 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 287 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 91 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 262 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 396 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 188 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 108 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 238 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 292 KiB

-124
View File
@@ -1,124 +0,0 @@
from jinja2 import Environment, FileSystemLoader
from playwright.sync_api import sync_playwright
from context import context_list
env = Environment(loader=FileSystemLoader("report_gen"))
html_pages = []
header_context = {
"patient_name": "Keirstyn Moran",
"age": 34,
"height": "5'4\"",
"weight": "123lbs",
"focus": "Endurance",
}
footer_context = [
{
"contact_email": "info@ishplabs.com ",
"website": "www.ishplabs.com",
"social": "@ishplabs",
"page_number": i + 1,
}
for i in range(len(context_list))
]
header_html = env.get_template("header.html").render(header_context)
footer_html_list = [
env.get_template("footer.html").render(context) for context in footer_context
]
for i, context in enumerate(context_list):
template = env.get_template(f"page_{i + 1}.html").render(context)
if (i + 1) > 2:
full_html = f"""
<div class="page flex flex-col justify-between">
<div>
{header_html}
</div>
<main class="flex-grow p-4">
{template}
</main>
<div class="border-t text-center text-sm text-gray-600">
{footer_html_list[i]}
</div>
</div>
"""
html_pages.append(full_html)
else:
html_pages.append(template)
# Combine with page breaks
final_html = "<div class='page-break'></div>".join(html_pages)
# Wrap in full HTML document
html_doc = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<link href="https://cdn.jsdelivr.net/npm/tailwindcss/dist/tailwind.min.css" rel="stylesheet">
<style>
html, body {{
height: 100%;
margin: 0;
padding: 0;
}}
.page-break {{ page-break-after: always; }}
.page {{
height: 100vh;
min-height: 100vh;
display: flex;
flex-direction: column;
}}
.page main {{
flex: 1;
overflow: hidden;
}}
/* Reset margins and padding everywhere */
* {{
margin: 0;
padding: 0;
box-sizing: border-box;
}}
/* Prevent images from being too large */
img {{
max-height: 300px;
}}
/* Larger images for specific charts */
.chart-large {{
max-height: 500px !important;
}}
</style>
</head>
<body class="m-0 p-0">
{final_html}
</body>
</html>
"""
# Generate PDF
def html_string_to_pdf(html_content, pdf_path):
with sync_playwright() as p:
browser = p.chromium.launch()
page = browser.new_page()
# Set the HTML directly
page.set_content(html_content)
# Export to PDF
page.pdf(path=pdf_path, format="A4", print_background=True)
browser.close()
html_string_to_pdf(html_doc, "multi_page_report.pdf")
# pdfkit.from_string(html_doc, "truth_report.pdf", options=options)
print("✅ PDF generated: multi_page_report.pdf")
Binary file not shown.
-1550
View File
File diff suppressed because one or more lines are too long
+43
View File
@@ -1,5 +1,8 @@
annotated-types==0.7.0
anyio==4.11.0
asttokens==3.0.0 asttokens==3.0.0
brotli==1.1.0 brotli==1.1.0
certifi==2025.8.3
cffi==2.0.0 cffi==2.0.0
chardet==5.2.0 chardet==5.2.0
charset-normalizer==3.4.3 charset-normalizer==3.4.3
@@ -11,24 +14,39 @@ cssselect2==0.8.0
cycler==0.12.1 cycler==0.12.1
debugpy==1.8.17 debugpy==1.8.17
decorator==5.2.1 decorator==5.2.1
dnspython==2.8.0
email-validator==2.3.0
et-xmlfile==2.0.0 et-xmlfile==2.0.0
executing==2.2.1 executing==2.2.1
fastapi==0.118.0
fastapi-cli==0.0.13
fastapi-cloud-cli==0.3.0
fonttools==4.60.0 fonttools==4.60.0
greenlet==3.2.4
h11==0.16.0
httpcore==1.0.9
httptools==0.6.4
httpx==0.28.1
idna==3.10
ipykernel==6.30.1 ipykernel==6.30.1
ipython==9.5.0 ipython==9.5.0
ipython-pygments-lexers==1.1.1 ipython-pygments-lexers==1.1.1
itsdangerous==2.2.0
jedi==0.19.2 jedi==0.19.2
jinja2==3.1.6 jinja2==3.1.6
jupyter-client==8.6.3 jupyter-client==8.6.3
jupyter-core==5.8.1 jupyter-core==5.8.1
kiwisolver==1.4.9 kiwisolver==1.4.9
markdown-it-py==4.0.0
markupsafe==3.0.2 markupsafe==3.0.2
matplotlib==3.10.6 matplotlib==3.10.6
matplotlib-inline==0.1.7 matplotlib-inline==0.1.7
mdurl==0.1.2
nest-asyncio==1.6.0 nest-asyncio==1.6.0
numpy==2.3.3 numpy==2.3.3
opencv-python-headless==4.11.0.86 opencv-python-headless==4.11.0.86
openpyxl==3.1.5 openpyxl==3.1.5
orjson==3.11.3
packaging==25.0 packaging==25.0
pandas==2.3.2 pandas==2.3.2
pango==0.0.1 pango==0.0.1
@@ -38,12 +56,18 @@ pdfminer-six==20250506
pexpect==4.9.0 pexpect==4.9.0
pillow==11.3.0 pillow==11.3.0
platformdirs==4.4.0 platformdirs==4.4.0
playwright==1.55.0
prompt-toolkit==3.0.52 prompt-toolkit==3.0.52
psutil==7.1.0 psutil==7.1.0
ptyprocess==0.7.0 ptyprocess==0.7.0
pure-eval==0.2.3 pure-eval==0.2.3
pycparser==2.23 pycparser==2.23
pydantic==2.11.9
pydantic-core==2.33.2
pydantic-extra-types==2.10.5
pydantic-settings==2.11.0
pydyf==0.11.0 pydyf==0.11.0
pyee==13.0.0
pygments==2.19.2 pygments==2.19.2
pymupdf==1.26.4 pymupdf==1.26.4
pyparsing==3.2.5 pyparsing==3.2.5
@@ -51,17 +75,36 @@ pypdf==5.9.0
pypdfium2==4.30.0 pypdfium2==4.30.0
pyphen==0.17.2 pyphen==0.17.2
python-dateutil==2.9.0.post0 python-dateutil==2.9.0.post0
python-dotenv==1.1.1
python-multipart==0.0.20
pytz==2025.2 pytz==2025.2
pyyaml==6.0.3
pyzmq==27.1.0 pyzmq==27.1.0
rich==14.1.0
rich-toolkit==0.15.1
rignore==0.7.0
seaborn==0.13.2 seaborn==0.13.2
sentry-sdk==2.39.0
shellingham==1.5.4
six==1.17.0 six==1.17.0
sniffio==1.3.1
stack-data==0.6.3 stack-data==0.6.3
starlette==0.48.0
tabulate==0.9.0 tabulate==0.9.0
tinycss2==1.4.0 tinycss2==1.4.0
tinyhtml5==2.0.0 tinyhtml5==2.0.0
tornado==6.5.2 tornado==6.5.2
traitlets==5.14.3 traitlets==5.14.3
typer==0.19.2
typing-extensions==4.15.0
typing-inspection==0.4.2
tzdata==2025.2 tzdata==2025.2
ujson==5.11.0
urllib3==2.5.0
uvicorn==0.37.0
uvloop==0.21.0
watchfiles==1.1.0
wcwidth==0.2.14 wcwidth==0.2.14
webencodings==0.5.1 webencodings==0.5.1
websockets==15.0.1
zopfli==0.2.3.post1 zopfli==0.2.3.post1