feat: Refactor report generation to use async methods and improve error handling; enhance spirometry table extraction with better CSV formatting
This commit is contained in:
@@ -13,7 +13,21 @@ def encode_pdf_to_base64(pdf_path):
|
||||
return base64.b64encode(pdf_file.read()).decode("utf-8")
|
||||
|
||||
|
||||
def extract_spirometry_table_from_pdf(pdf_path):
|
||||
def extract_spirometry_table_from_pdf(pdf_path, output_dir="data"):
|
||||
"""
|
||||
Extract spirometry table from PDF using AI and save as clean CSV.
|
||||
|
||||
Args:
|
||||
pdf_path: Path to the spirometry PDF file
|
||||
output_dir: Directory to save the extracted CSV
|
||||
|
||||
Returns:
|
||||
Path to the saved CSV file
|
||||
"""
|
||||
import csv
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
url = "https://openrouter.ai/api/v1/chat/completions"
|
||||
headers = {
|
||||
"Authorization": f"Bearer {API_KEY_REF}",
|
||||
@@ -30,10 +44,17 @@ def extract_spirometry_table_from_pdf(pdf_path):
|
||||
"content": [
|
||||
{
|
||||
"type": "text",
|
||||
"text": "Please extract the Spirometry table from the pdf and return the values in csv format, "
|
||||
"note that it is the unit of parameter that is beside it and it should not be a column. "
|
||||
"The '-' Should be treated as empty values."
|
||||
"do not add 'csv' at the start or end of the response",
|
||||
"text": "Please extract the Spirometry table from the pdf and return ONLY the values in CSV format. "
|
||||
"The CSV should have these columns: Parameters,Pre,Best,LLN,Pred.,%Pred.,ZScore\n"
|
||||
"Rules:\n"
|
||||
"1. Include ONLY the data rows (FVC, FEV1, FEV1/FVC%, etc.)\n"
|
||||
"2. Do NOT include units in the data (units are part of parameter name)\n"
|
||||
"3. Use empty string for missing values (not '-' or 'N/A')\n"
|
||||
"4. Do NOT add 'csv' markers or code blocks\n"
|
||||
"5. First line should be the header\n"
|
||||
"Example format:\n"
|
||||
"Parameters,Pre,Best,LLN,Pred.,%Pred.,ZScore\n"
|
||||
"FVC,4.50,4.75,3.20,4.80,99,-0.10",
|
||||
},
|
||||
{
|
||||
"type": "file",
|
||||
@@ -54,11 +75,65 @@ def extract_spirometry_table_from_pdf(pdf_path):
|
||||
if "choices" in response_data and len(response_data["choices"]) > 0:
|
||||
content = response_data["choices"][0]["message"]["content"]
|
||||
|
||||
# Save to a CSV file
|
||||
output_file = "extracted_spirometry_table.csv"
|
||||
with open(output_file, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
# Clean the content - remove markdown code blocks if present
|
||||
content = re.sub(r"```csv\n?", "", content)
|
||||
content = re.sub(r"```\n?", "", content)
|
||||
content = content.strip()
|
||||
|
||||
return f"Extracted table saved to {output_file}"
|
||||
# Parse and validate CSV
|
||||
lines = content.split("\n")
|
||||
if not lines:
|
||||
raise ValueError("No data extracted from PDF")
|
||||
|
||||
# Ensure output directory exists
|
||||
output_path = Path(output_dir)
|
||||
output_path.mkdir(exist_ok=True)
|
||||
output_file = output_path / "extracted_spirometry_table.csv"
|
||||
|
||||
# Write cleaned CSV with proper formatting
|
||||
with open(output_file, "w", encoding="utf-8", newline="") as f:
|
||||
# Parse the first line as header
|
||||
header_line = lines[0].strip()
|
||||
if "," in header_line:
|
||||
header = [col.strip() for col in header_line.split(",")]
|
||||
else:
|
||||
# Default header if not provided
|
||||
header = [
|
||||
"Parameters",
|
||||
"Pre",
|
||||
"Best",
|
||||
"LLN",
|
||||
"Pred.",
|
||||
"%Pred.",
|
||||
"ZScore",
|
||||
]
|
||||
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(header)
|
||||
|
||||
# Process data rows
|
||||
for line in lines[1:]:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
# Split by comma and clean each field
|
||||
fields = [field.strip() for field in line.split(",")]
|
||||
|
||||
# Ensure we have the right number of fields
|
||||
if len(fields) < len(header):
|
||||
# Pad with empty strings
|
||||
fields.extend([""] * (len(header) - len(fields)))
|
||||
elif len(fields) > len(header):
|
||||
# Take only the first N fields
|
||||
fields = fields[: len(header)]
|
||||
|
||||
# Replace '-' or 'N/A' with empty string
|
||||
fields = ["" if f in ["-", "N/A", "n/a", "NA"] else f for f in fields]
|
||||
|
||||
writer.writerow(fields)
|
||||
|
||||
return str(output_file)
|
||||
else:
|
||||
return "No content found in response"
|
||||
error_msg = response_data.get("error", {}).get("message", "Unknown error")
|
||||
raise Exception(f"No content found in response: {error_msg}")
|
||||
|
||||
Reference in New Issue
Block a user