Files

497 lines
18 KiB
Python
Raw Permalink Normal View History

2025-07-11 22:29:45 +01:00
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import JSONResponse
from typing import Optional
import os
import uuid
2025-07-14 23:41:31 +01:00
from docx import Document
from PyPDF2 import PdfReader
import io
2025-07-11 22:29:45 +01:00
from datetime import datetime
from .config import Config
from .embeddings import EmbeddingGenerator
from .vector_stores import VectorStore
2025-07-14 23:41:31 +01:00
from .compliance_loader import ComplianceLoader
2025-07-11 22:29:45 +01:00
import groq
2025-07-14 23:41:31 +01:00
import json
2025-07-11 22:29:45 +01:00
app = FastAPI(title="Mini SpecsComply Pro")
2025-07-14 23:41:31 +01:00
# Initialize components
2025-07-11 22:29:45 +01:00
embeddings = EmbeddingGenerator()
vector_store = VectorStore()
2025-07-14 23:41:31 +01:00
compliance_loader = ComplianceLoader()
# Load compliance standards on startup
compliance_loader.load_compliance_standards()
2025-07-11 22:29:45 +01:00
# Initialize clients
groq_client = groq.Client(api_key=Config.GROQ_API_KEY)
2025-07-14 23:41:31 +01:00
# In-memory storage for analysis results
analysis_storage = {}
2025-07-11 22:29:45 +01:00
def save_document(file: UploadFile) -> str:
os.makedirs(Config.UPLOAD_FOLDER, exist_ok=True)
doc_id = str(uuid.uuid4())
ext = os.path.splitext(file.filename)[1].lower()
if ext not in Config.ALLOWED_EXTENSIONS:
raise HTTPException(400, "Unsupported file type")
file_path = os.path.join(Config.UPLOAD_FOLDER, f"{doc_id}{ext}")
with open(file_path, "wb") as f:
f.write(file.file.read())
return doc_id, file_path
def extract_text(file_path: str) -> str:
2025-07-14 23:41:31 +01:00
"""Extract text from files"""
try:
if file_path.endswith('.docx'):
doc = Document(file_path)
paragraphs = [para.text for para in doc.paragraphs if para.text]
return '\n'.join(paragraphs) if paragraphs else ""
elif file_path.endswith('.pdf'):
with open(file_path, 'rb') as f:
reader = PdfReader(f)
pages_text = []
for page in reader.pages:
page_text = page.extract_text()
if page_text:
pages_text.append(page_text)
return '\n'.join(pages_text) if pages_text else ""
elif file_path.endswith('.txt'):
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return content if content else ""
else:
raise ValueError("Unsupported file type")
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Failed to extract text: {str(e)}"
)
2025-07-11 22:29:45 +01:00
def analyze_compliance(text: str) -> dict:
2025-07-14 23:41:31 +01:00
"""Enhanced compliance analysis using Groq"""
try:
# Get compliance context
compliance_context = compliance_loader.get_compliance_context()
# Document parsing and section extraction with Groq
groq_parsing_prompt = f"""Extract key sections from this document and identify what type of tender document this appears to be:
DOCUMENT TO ANALYZE:
{text[:3000]}...
Please provide:
1. Document type (e.g., tender response, technical proposal, etc.)
2. Key sections found
3. Main requirements mentioned
4. Document structure analysis
Be concise but thorough."""
parsing_response = groq_client.chat.completions.create(
messages=[{"role": "user", "content": groq_parsing_prompt}],
model=Config.GROQ_MODEL,
temperature=0.1
)
# Safe extraction of parsing response
document_analysis = ""
if parsing_response and parsing_response.choices and len(parsing_response.choices) > 0:
if parsing_response.choices[0].message and parsing_response.choices[0].message.content:
document_analysis = parsing_response.choices[0].message.content
# Comprehensive compliance analysis with Groq
groq_compliance_prompt = f"""You are a compliance expert analyzing tender documents.
COMPLIANCE STANDARDS TO CHECK AGAINST:
{compliance_context[:4000]}
DOCUMENT TO ANALYZE:
{text[:4000]}
Please analyze this document for compliance issues and provide a structured response:
1. COMPLIANCE SUMMARY: Overall compliance status (Compliant/Non-Compliant/Partial)
2. SPECIFIC ISSUES: List specific compliance violations found, including:
- Which standard is violated
- What is missing or incorrect
- Severity (Critical/High/Medium/Low)
- Specific location in document if possible
3. REQUIREMENTS CHECK: Verify if the document meets requirements from:
- Tender specifications
- Supplier qualification requirements
- Form of tender requirements
- Confidentiality agreement requirements
2025-07-11 22:29:45 +01:00
2025-07-14 23:41:31 +01:00
4. RECOMMENDATIONS: Specific actions to fix each issue
5. MISSING ELEMENTS: What key elements are completely missing
Please be detailed and specific in your analysis. Focus on actionable feedback."""
compliance_response = groq_client.chat.completions.create(
messages=[{"role": "user", "content": groq_compliance_prompt}],
model=Config.GROQ_MODEL,
temperature=0.1,
max_tokens=4000
)
# Safe extraction of compliance response
compliance_analysis = ""
if compliance_response and compliance_response.choices and len(compliance_response.choices) > 0:
if compliance_response.choices[0].message and compliance_response.choices[0].message.content:
compliance_analysis = compliance_response.choices[0].message.content
# Extract and structure issues from the compliance analysis
# Parse the structured compliance analysis directly
issues_list = []
# Extract issues from the numbered list in compliance_analysis
if compliance_analysis:
lines = compliance_analysis.split('\n')
current_issue = None
for line in lines:
line = line.strip()
# Look for numbered issues (1. **Issue name**, 2. **Issue name**, etc.)
if line and (line.startswith('1.') or line.startswith('2.') or line.startswith('3.') or
line.startswith('4.') or line.startswith('5.') or line.startswith('6.') or
line.startswith('7.') or line.startswith('8.') or line.startswith('9.') or
line.startswith('10.')):
# Extract the issue title from lines
if '**' in line:
try:
# Extract text between ** markers
issue_title = line.split('**')[1].strip()
if issue_title and len(issue_title) > 3:
current_issue = issue_title
except IndexError:
# Fallback: extract everything after the number
issue_title = line.split('.', 1)[1].strip().replace('*', '').strip()
if issue_title and len(issue_title) > 3:
current_issue = issue_title
else:
# Extract everything after the number
issue_title = line.split('.', 1)[1].strip().replace('*', '').strip()
if issue_title and len(issue_title) > 3:
current_issue = issue_title
# Look for "What's missing or incorrect" to get more details
elif current_issue and line.startswith('* What\'s missing or incorrect:'):
details = line.replace('* What\'s missing or incorrect:', '').strip()
if details and len(details) > 10:
# Combine issue title with details for better context
full_issue = f"{current_issue}: {details}"
issues_list.append(full_issue)
current_issue = None # Reset
# Fallback
elif current_issue and (line.startswith('* Severity:') or line.startswith('* Location:')):
if current_issue not in [issue.split(':')[0] for issue in issues_list]:
issues_list.append(current_issue)
current_issue = None
# If no issues found via structured parsing, try fallback extraction
if not issues_list and compliance_analysis:
# Fallback method: look for bullet points or dashes
for line in compliance_analysis.split('\n'):
line = line.strip()
if line.startswith('- ') or line.startswith('') or line.startswith('* '):
clean_issue = line[2:].strip()
if clean_issue and len(clean_issue) > 10 and not clean_issue.startswith(('Violated', 'What', 'Severity', 'Location')):
issues_list.append(clean_issue)
# Remove duplicates and filter valid issues
seen = set()
unique_issues = []
for issue in issues_list:
if issue and len(str(issue)) > 10 and issue not in seen:
seen.add(issue)
unique_issues.append(str(issue))
# Rerank issues by importance using Cohere
ranked_issues = []
if unique_issues:
try:
ranked_issues = embeddings.rerank_issues(
issues=unique_issues,
query="Most critical compliance violations and missing requirements",
top_n=min(10, len(unique_issues))
)
except Exception as e:
print(f"Reranking failed: {e}")
ranked_issues = unique_issues[:10] # Fallback to first 10 issues
if not ranked_issues:
# Emergency fallback: extract from compliance_analysis manually
fallback_issues = []
if compliance_analysis:
for line in compliance_analysis.split('\n'):
line = line.strip()
if ('missing' in line.lower() or 'violation' in line.lower() or
'non-compliant' in line.lower() or 'issue' in line.lower()) and len(line) > 15:
fallback_issues.append(line)
ranked_issues = fallback_issues[:5] if fallback_issues else ["No specific issues identified"]
return {
"document_analysis": document_analysis,
"compliance_analysis": compliance_analysis,
"issues": ranked_issues,
"total_issues": len(ranked_issues),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
print(f"Error in analyze_compliance: {e}")
import traceback
traceback.print_exc()
# Return a safe fallback response
return {
"document_analysis": "Error occurred during document analysis",
"compliance_analysis": "Error occurred during compliance analysis",
"issues": ["Analysis failed due to technical error"],
"total_issues": 1,
"timestamp": datetime.now().isoformat()
}
def prepare_metadata_for_pinecone(analysis: dict, filename: str) -> dict:
"""Prepare metadata for Pinecone by converting complex objects to strings"""
# Safely get issues and filter out None/empty values
issues = analysis.get("issues", [])
if issues:
# Filter out None, empty strings, and ensure all items are strings
clean_issues = [str(issue) for issue in issues if issue is not None and str(issue).strip()]
issues_str = " | ".join(clean_issues)
else:
issues_str = ""
# Truncate long strings to avoid Pinecone limits
def truncate_string(s: str, max_length: int = 30000) -> str:
if not s:
return ""
return s[:max_length] + "..." if len(s) > max_length else s
# Get analysis fields with fallbacks
document_analysis = analysis.get("document_analysis", "") or ""
compliance_analysis = analysis.get("compliance_analysis", "") or ""
2025-07-11 22:29:45 +01:00
return {
2025-07-14 23:41:31 +01:00
"filename": filename or "unknown",
"upload_time": datetime.now().isoformat(),
"status": "analyzed",
"total_issues": str(analysis.get("total_issues", 0)),
"timestamp": analysis.get("timestamp", datetime.now().isoformat()),
"issues_summary": truncate_string(issues_str),
"document_type": truncate_string(document_analysis[:500]),
"compliance_summary": truncate_string(compliance_analysis[:1000])
2025-07-11 22:29:45 +01:00
}
2025-07-14 23:41:31 +01:00
@app.get("/")
async def root():
return {"message": "Mini SpecsComply Pro API", "status": "running"}
@app.get("/document/{doc_id}/analysis")
async def get_analysis(doc_id: str):
"""Get detailed analysis for a specific document"""
doc = vector_store.get_document(doc_id)
if not doc:
raise HTTPException(404, "Document not found")
# Get full analysis from storage
full_analysis = analysis_storage.get(doc_id, {})
return JSONResponse({
"document_id": doc_id,
"metadata": doc.metadata,
"analysis": full_analysis
})
2025-07-11 22:29:45 +01:00
@app.post("/upload-document")
async def upload_document(file: UploadFile = File(...)):
2025-07-14 23:41:31 +01:00
"""Upload and process a document - returns only basic info, not full analysis"""
file_path = None
2025-07-11 22:29:45 +01:00
try:
2025-07-14 23:41:31 +01:00
# Validate file extension
ext = os.path.splitext(file.filename)[1].lower()
if ext not in Config.ALLOWED_EXTENSIONS:
raise HTTPException(400, "Unsupported file type")
# Save the file temporarily
doc_id = str(uuid.uuid4())
file_path = os.path.join(Config.UPLOAD_FOLDER, f"{doc_id}{ext}")
# Ensure upload directory exists
os.makedirs(Config.UPLOAD_FOLDER, exist_ok=True)
with open(file_path, "wb") as buffer:
buffer.write(await file.read())
# Process the file
print(f"Extracting text from {file_path}")
2025-07-11 22:29:45 +01:00
text = extract_text(file_path)
2025-07-14 23:41:31 +01:00
# Validate extracted text
if not text or not text.strip():
raise HTTPException(400, "Could not extract any text from the uploaded file")
print(f"Generating embeddings for document {doc_id}")
2025-07-11 22:29:45 +01:00
embedding = embeddings.generate_embeddings(text)
2025-07-14 23:41:31 +01:00
# Perform compliance analysis
print(f"Analyzing compliance for document {doc_id}")
analysis = analyze_compliance(text)
# Store full analysis in memory/cache
print(f"Storing analysis for document {doc_id}")
analysis_storage[doc_id] = analysis
# Prepare Pinecone-compatible metadata
print(f"Preparing metadata for document {doc_id}")
pinecone_metadata = prepare_metadata_for_pinecone(analysis, file.filename)
# Store in vector DB with simplified metadata
print(f"Upserting document {doc_id} to vector store")
2025-07-11 22:29:45 +01:00
vector_store.upsert_document(
doc_id=doc_id,
embedding=embedding,
2025-07-14 23:41:31 +01:00
metadata=pinecone_metadata
2025-07-11 22:29:45 +01:00
)
2025-07-14 23:41:31 +01:00
# Clean up the temp file
if os.path.exists(file_path):
os.remove(file_path)
2025-07-11 22:29:45 +01:00
2025-07-14 23:41:31 +01:00
# Return only basic info - NOT the full analysis
2025-07-11 22:29:45 +01:00
return JSONResponse({
"document_id": doc_id,
2025-07-14 23:41:31 +01:00
"status": "success",
"message": "Document processed and analyzed successfully",
"filename": file.filename,
"total_issues": analysis.get("total_issues", 0),
"timestamp": analysis.get("timestamp", datetime.now().isoformat())
2025-07-11 22:29:45 +01:00
})
2025-07-14 23:41:31 +01:00
except HTTPException:
raise
except Exception as e:
print(f"Error in upload_document: {e}")
print(f"Error type: {type(e)}")
import traceback
traceback.print_exc()
2025-07-11 22:29:45 +01:00
2025-07-14 23:41:31 +01:00
if file_path and os.path.exists(file_path):
os.remove(file_path)
raise HTTPException(500, f"Document processing failed: {str(e)}")
2025-07-11 22:29:45 +01:00
@app.post("/document/{doc_id}/resubmit")
async def resubmit_document(doc_id: str, file: UploadFile = File(...)):
2025-07-14 23:41:31 +01:00
"""Resubmit a document for re-analysis"""
2025-07-11 22:29:45 +01:00
try:
# Verify original exists
original = vector_store.get_document(doc_id)
if not original:
raise HTTPException(404, "Original document not found")
# Process new version
2025-07-14 23:41:31 +01:00
ext = os.path.splitext(file.filename)[1].lower()
if ext not in Config.ALLOWED_EXTENSIONS:
raise HTTPException(400, "Unsupported file type")
new_doc_id = str(uuid.uuid4())
file_path = os.path.join(Config.UPLOAD_FOLDER, f"{new_doc_id}{ext}")
os.makedirs(Config.UPLOAD_FOLDER, exist_ok=True)
with open(file_path, "wb") as buffer:
buffer.write(await file.read())
2025-07-11 22:29:45 +01:00
text = extract_text(file_path)
2025-07-14 23:41:31 +01:00
# Validate extracted text
if not text or not text.strip():
raise HTTPException(400, "Could not extract any text from the uploaded file")
2025-07-11 22:29:45 +01:00
embedding = embeddings.generate_embeddings(text)
2025-07-14 23:41:31 +01:00
# Analyze new version
analysis = analyze_compliance(text)
# Store full analysis in memory/cache
analysis_storage[new_doc_id] = analysis
# Prepare Pinecone-compatible metadata
pinecone_metadata = prepare_metadata_for_pinecone(analysis, file.filename)
pinecone_metadata["original_id"] = doc_id
pinecone_metadata["status"] = "resubmitted"
2025-07-11 22:29:45 +01:00
# Store new version
vector_store.upsert_document(
doc_id=new_doc_id,
embedding=embedding,
2025-07-14 23:41:31 +01:00
metadata=pinecone_metadata
2025-07-11 22:29:45 +01:00
)
2025-07-14 23:41:31 +01:00
# Clean up temp file
os.remove(file_path)
2025-07-11 22:29:45 +01:00
2025-07-14 23:41:31 +01:00
# Return basic info
2025-07-11 22:29:45 +01:00
return JSONResponse({
"document_id": new_doc_id,
2025-07-14 23:41:31 +01:00
"original_id": doc_id,
"status": "success",
"message": "Document resubmitted and analyzed successfully",
"filename": file.filename,
"total_issues": analysis.get("total_issues", 0),
"timestamp": analysis.get("timestamp", datetime.now().isoformat())
2025-07-11 22:29:45 +01:00
})
2025-07-14 23:41:31 +01:00
except HTTPException:
raise
2025-07-11 22:29:45 +01:00
except Exception as e:
2025-07-14 23:41:31 +01:00
if 'file_path' in locals() and os.path.exists(file_path):
os.remove(file_path)
2025-07-11 22:29:45 +01:00
raise HTTPException(500, str(e))
2025-07-14 23:41:31 +01:00
@app.get("/compliance-standards")
async def get_compliance_standards():
"""Get list of loaded compliance standards"""
return JSONResponse({
"standards": [
{
"key": key,
"filename": data["filename"],
"sections_count": len(data["sections"])
}
for key, data in compliance_loader.compliance_docs.items()
]
})
2025-07-11 22:29:45 +01:00
if __name__ == "__main__":
import uvicorn
2025-07-14 23:41:31 +01:00
uvicorn.run(app, host="0.0.0.0", port=8000)