497 lines
18 KiB
Python
497 lines
18 KiB
Python
from fastapi import FastAPI, UploadFile, File, HTTPException
|
|
from fastapi.responses import JSONResponse
|
|
from typing import Optional
|
|
import os
|
|
import uuid
|
|
from docx import Document
|
|
from PyPDF2 import PdfReader
|
|
import io
|
|
from datetime import datetime
|
|
from .config import Config
|
|
from .embeddings import EmbeddingGenerator
|
|
from .vector_stores import VectorStore
|
|
from .compliance_loader import ComplianceLoader
|
|
import groq
|
|
import json
|
|
|
|
app = FastAPI(title="Mini SpecsComply Pro")
|
|
|
|
# Initialize components
|
|
embeddings = EmbeddingGenerator()
|
|
vector_store = VectorStore()
|
|
compliance_loader = ComplianceLoader()
|
|
|
|
# Load compliance standards on startup
|
|
compliance_loader.load_compliance_standards()
|
|
|
|
# Initialize clients
|
|
groq_client = groq.Client(api_key=Config.GROQ_API_KEY)
|
|
|
|
# In-memory storage for analysis results
|
|
analysis_storage = {}
|
|
|
|
|
|
def save_document(file: UploadFile) -> str:
|
|
os.makedirs(Config.UPLOAD_FOLDER, exist_ok=True)
|
|
doc_id = str(uuid.uuid4())
|
|
ext = os.path.splitext(file.filename)[1].lower()
|
|
|
|
if ext not in Config.ALLOWED_EXTENSIONS:
|
|
raise HTTPException(400, "Unsupported file type")
|
|
|
|
file_path = os.path.join(Config.UPLOAD_FOLDER, f"{doc_id}{ext}")
|
|
with open(file_path, "wb") as f:
|
|
f.write(file.file.read())
|
|
|
|
return doc_id, file_path
|
|
|
|
|
|
def extract_text(file_path: str) -> str:
|
|
"""Extract text from files"""
|
|
try:
|
|
if file_path.endswith('.docx'):
|
|
doc = Document(file_path)
|
|
paragraphs = [para.text for para in doc.paragraphs if para.text]
|
|
return '\n'.join(paragraphs) if paragraphs else ""
|
|
|
|
elif file_path.endswith('.pdf'):
|
|
with open(file_path, 'rb') as f:
|
|
reader = PdfReader(f)
|
|
pages_text = []
|
|
for page in reader.pages:
|
|
page_text = page.extract_text()
|
|
if page_text:
|
|
pages_text.append(page_text)
|
|
return '\n'.join(pages_text) if pages_text else ""
|
|
|
|
elif file_path.endswith('.txt'):
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
return content if content else ""
|
|
|
|
else:
|
|
raise ValueError("Unsupported file type")
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Failed to extract text: {str(e)}"
|
|
)
|
|
|
|
|
|
def analyze_compliance(text: str) -> dict:
|
|
"""Enhanced compliance analysis using Groq"""
|
|
|
|
try:
|
|
# Get compliance context
|
|
compliance_context = compliance_loader.get_compliance_context()
|
|
|
|
# Document parsing and section extraction with Groq
|
|
groq_parsing_prompt = f"""Extract key sections from this document and identify what type of tender document this appears to be:
|
|
|
|
DOCUMENT TO ANALYZE:
|
|
{text[:3000]}...
|
|
|
|
Please provide:
|
|
1. Document type (e.g., tender response, technical proposal, etc.)
|
|
2. Key sections found
|
|
3. Main requirements mentioned
|
|
4. Document structure analysis
|
|
|
|
Be concise but thorough."""
|
|
|
|
parsing_response = groq_client.chat.completions.create(
|
|
messages=[{"role": "user", "content": groq_parsing_prompt}],
|
|
model=Config.GROQ_MODEL,
|
|
temperature=0.1
|
|
)
|
|
|
|
# Safe extraction of parsing response
|
|
document_analysis = ""
|
|
if parsing_response and parsing_response.choices and len(parsing_response.choices) > 0:
|
|
if parsing_response.choices[0].message and parsing_response.choices[0].message.content:
|
|
document_analysis = parsing_response.choices[0].message.content
|
|
|
|
# Comprehensive compliance analysis with Groq
|
|
groq_compliance_prompt = f"""You are a compliance expert analyzing tender documents.
|
|
|
|
COMPLIANCE STANDARDS TO CHECK AGAINST:
|
|
{compliance_context[:4000]}
|
|
|
|
DOCUMENT TO ANALYZE:
|
|
{text[:4000]}
|
|
|
|
Please analyze this document for compliance issues and provide a structured response:
|
|
|
|
1. COMPLIANCE SUMMARY: Overall compliance status (Compliant/Non-Compliant/Partial)
|
|
|
|
2. SPECIFIC ISSUES: List specific compliance violations found, including:
|
|
- Which standard is violated
|
|
- What is missing or incorrect
|
|
- Severity (Critical/High/Medium/Low)
|
|
- Specific location in document if possible
|
|
|
|
3. REQUIREMENTS CHECK: Verify if the document meets requirements from:
|
|
- Tender specifications
|
|
- Supplier qualification requirements
|
|
- Form of tender requirements
|
|
- Confidentiality agreement requirements
|
|
|
|
4. RECOMMENDATIONS: Specific actions to fix each issue
|
|
|
|
5. MISSING ELEMENTS: What key elements are completely missing
|
|
|
|
Please be detailed and specific in your analysis. Focus on actionable feedback."""
|
|
|
|
compliance_response = groq_client.chat.completions.create(
|
|
messages=[{"role": "user", "content": groq_compliance_prompt}],
|
|
model=Config.GROQ_MODEL,
|
|
temperature=0.1,
|
|
max_tokens=4000
|
|
)
|
|
|
|
# Safe extraction of compliance response
|
|
compliance_analysis = ""
|
|
if compliance_response and compliance_response.choices and len(compliance_response.choices) > 0:
|
|
if compliance_response.choices[0].message and compliance_response.choices[0].message.content:
|
|
compliance_analysis = compliance_response.choices[0].message.content
|
|
|
|
# Extract and structure issues from the compliance analysis
|
|
# Parse the structured compliance analysis directly
|
|
issues_list = []
|
|
|
|
# Extract issues from the numbered list in compliance_analysis
|
|
if compliance_analysis:
|
|
lines = compliance_analysis.split('\n')
|
|
current_issue = None
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
|
|
# Look for numbered issues (1. **Issue name**, 2. **Issue name**, etc.)
|
|
if line and (line.startswith('1.') or line.startswith('2.') or line.startswith('3.') or
|
|
line.startswith('4.') or line.startswith('5.') or line.startswith('6.') or
|
|
line.startswith('7.') or line.startswith('8.') or line.startswith('9.') or
|
|
line.startswith('10.')):
|
|
|
|
# Extract the issue title from lines
|
|
if '**' in line:
|
|
try:
|
|
# Extract text between ** markers
|
|
issue_title = line.split('**')[1].strip()
|
|
if issue_title and len(issue_title) > 3:
|
|
current_issue = issue_title
|
|
except IndexError:
|
|
# Fallback: extract everything after the number
|
|
issue_title = line.split('.', 1)[1].strip().replace('*', '').strip()
|
|
if issue_title and len(issue_title) > 3:
|
|
current_issue = issue_title
|
|
else:
|
|
# Extract everything after the number
|
|
issue_title = line.split('.', 1)[1].strip().replace('*', '').strip()
|
|
if issue_title and len(issue_title) > 3:
|
|
current_issue = issue_title
|
|
|
|
# Look for "What's missing or incorrect" to get more details
|
|
elif current_issue and line.startswith('* What\'s missing or incorrect:'):
|
|
details = line.replace('* What\'s missing or incorrect:', '').strip()
|
|
if details and len(details) > 10:
|
|
# Combine issue title with details for better context
|
|
full_issue = f"{current_issue}: {details}"
|
|
issues_list.append(full_issue)
|
|
current_issue = None # Reset
|
|
|
|
# Fallback
|
|
elif current_issue and (line.startswith('* Severity:') or line.startswith('* Location:')):
|
|
if current_issue not in [issue.split(':')[0] for issue in issues_list]:
|
|
issues_list.append(current_issue)
|
|
current_issue = None
|
|
|
|
# If no issues found via structured parsing, try fallback extraction
|
|
if not issues_list and compliance_analysis:
|
|
# Fallback method: look for bullet points or dashes
|
|
for line in compliance_analysis.split('\n'):
|
|
line = line.strip()
|
|
if line.startswith('- ') or line.startswith('• ') or line.startswith('* '):
|
|
clean_issue = line[2:].strip()
|
|
if clean_issue and len(clean_issue) > 10 and not clean_issue.startswith(('Violated', 'What', 'Severity', 'Location')):
|
|
issues_list.append(clean_issue)
|
|
|
|
# Remove duplicates and filter valid issues
|
|
seen = set()
|
|
unique_issues = []
|
|
for issue in issues_list:
|
|
if issue and len(str(issue)) > 10 and issue not in seen:
|
|
seen.add(issue)
|
|
unique_issues.append(str(issue))
|
|
|
|
# Rerank issues by importance using Cohere
|
|
ranked_issues = []
|
|
if unique_issues:
|
|
try:
|
|
ranked_issues = embeddings.rerank_issues(
|
|
issues=unique_issues,
|
|
query="Most critical compliance violations and missing requirements",
|
|
top_n=min(10, len(unique_issues))
|
|
)
|
|
except Exception as e:
|
|
print(f"Reranking failed: {e}")
|
|
ranked_issues = unique_issues[:10] # Fallback to first 10 issues
|
|
|
|
if not ranked_issues:
|
|
# Emergency fallback: extract from compliance_analysis manually
|
|
fallback_issues = []
|
|
if compliance_analysis:
|
|
for line in compliance_analysis.split('\n'):
|
|
line = line.strip()
|
|
if ('missing' in line.lower() or 'violation' in line.lower() or
|
|
'non-compliant' in line.lower() or 'issue' in line.lower()) and len(line) > 15:
|
|
fallback_issues.append(line)
|
|
|
|
ranked_issues = fallback_issues[:5] if fallback_issues else ["No specific issues identified"]
|
|
|
|
return {
|
|
"document_analysis": document_analysis,
|
|
"compliance_analysis": compliance_analysis,
|
|
"issues": ranked_issues,
|
|
"total_issues": len(ranked_issues),
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error in analyze_compliance: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
# Return a safe fallback response
|
|
return {
|
|
"document_analysis": "Error occurred during document analysis",
|
|
"compliance_analysis": "Error occurred during compliance analysis",
|
|
"issues": ["Analysis failed due to technical error"],
|
|
"total_issues": 1,
|
|
"timestamp": datetime.now().isoformat()
|
|
}
|
|
|
|
|
|
def prepare_metadata_for_pinecone(analysis: dict, filename: str) -> dict:
|
|
"""Prepare metadata for Pinecone by converting complex objects to strings"""
|
|
# Safely get issues and filter out None/empty values
|
|
issues = analysis.get("issues", [])
|
|
if issues:
|
|
# Filter out None, empty strings, and ensure all items are strings
|
|
clean_issues = [str(issue) for issue in issues if issue is not None and str(issue).strip()]
|
|
issues_str = " | ".join(clean_issues)
|
|
else:
|
|
issues_str = ""
|
|
|
|
# Truncate long strings to avoid Pinecone limits
|
|
def truncate_string(s: str, max_length: int = 30000) -> str:
|
|
if not s:
|
|
return ""
|
|
return s[:max_length] + "..." if len(s) > max_length else s
|
|
|
|
# Get analysis fields with fallbacks
|
|
document_analysis = analysis.get("document_analysis", "") or ""
|
|
compliance_analysis = analysis.get("compliance_analysis", "") or ""
|
|
|
|
return {
|
|
"filename": filename or "unknown",
|
|
"upload_time": datetime.now().isoformat(),
|
|
"status": "analyzed",
|
|
"total_issues": str(analysis.get("total_issues", 0)),
|
|
"timestamp": analysis.get("timestamp", datetime.now().isoformat()),
|
|
"issues_summary": truncate_string(issues_str),
|
|
"document_type": truncate_string(document_analysis[:500]),
|
|
"compliance_summary": truncate_string(compliance_analysis[:1000])
|
|
}
|
|
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
return {"message": "Mini SpecsComply Pro API", "status": "running"}
|
|
|
|
|
|
@app.get("/document/{doc_id}/analysis")
|
|
async def get_analysis(doc_id: str):
|
|
"""Get detailed analysis for a specific document"""
|
|
doc = vector_store.get_document(doc_id)
|
|
if not doc:
|
|
raise HTTPException(404, "Document not found")
|
|
|
|
# Get full analysis from storage
|
|
full_analysis = analysis_storage.get(doc_id, {})
|
|
|
|
return JSONResponse({
|
|
"document_id": doc_id,
|
|
"metadata": doc.metadata,
|
|
"analysis": full_analysis
|
|
})
|
|
|
|
|
|
@app.post("/upload-document")
|
|
async def upload_document(file: UploadFile = File(...)):
|
|
"""Upload and process a document - returns only basic info, not full analysis"""
|
|
file_path = None
|
|
try:
|
|
# Validate file extension
|
|
ext = os.path.splitext(file.filename)[1].lower()
|
|
if ext not in Config.ALLOWED_EXTENSIONS:
|
|
raise HTTPException(400, "Unsupported file type")
|
|
|
|
# Save the file temporarily
|
|
doc_id = str(uuid.uuid4())
|
|
file_path = os.path.join(Config.UPLOAD_FOLDER, f"{doc_id}{ext}")
|
|
|
|
# Ensure upload directory exists
|
|
os.makedirs(Config.UPLOAD_FOLDER, exist_ok=True)
|
|
|
|
with open(file_path, "wb") as buffer:
|
|
buffer.write(await file.read())
|
|
|
|
# Process the file
|
|
print(f"Extracting text from {file_path}")
|
|
text = extract_text(file_path)
|
|
|
|
# Validate extracted text
|
|
if not text or not text.strip():
|
|
raise HTTPException(400, "Could not extract any text from the uploaded file")
|
|
|
|
print(f"Generating embeddings for document {doc_id}")
|
|
embedding = embeddings.generate_embeddings(text)
|
|
|
|
# Perform compliance analysis
|
|
print(f"Analyzing compliance for document {doc_id}")
|
|
analysis = analyze_compliance(text)
|
|
|
|
# Store full analysis in memory/cache
|
|
print(f"Storing analysis for document {doc_id}")
|
|
analysis_storage[doc_id] = analysis
|
|
|
|
# Prepare Pinecone-compatible metadata
|
|
print(f"Preparing metadata for document {doc_id}")
|
|
pinecone_metadata = prepare_metadata_for_pinecone(analysis, file.filename)
|
|
|
|
# Store in vector DB with simplified metadata
|
|
print(f"Upserting document {doc_id} to vector store")
|
|
vector_store.upsert_document(
|
|
doc_id=doc_id,
|
|
embedding=embedding,
|
|
metadata=pinecone_metadata
|
|
)
|
|
|
|
# Clean up the temp file
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
|
|
# Return only basic info - NOT the full analysis
|
|
return JSONResponse({
|
|
"document_id": doc_id,
|
|
"status": "success",
|
|
"message": "Document processed and analyzed successfully",
|
|
"filename": file.filename,
|
|
"total_issues": analysis.get("total_issues", 0),
|
|
"timestamp": analysis.get("timestamp", datetime.now().isoformat())
|
|
})
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
print(f"Error in upload_document: {e}")
|
|
print(f"Error type: {type(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
if file_path and os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
raise HTTPException(500, f"Document processing failed: {str(e)}")
|
|
|
|
|
|
@app.post("/document/{doc_id}/resubmit")
|
|
async def resubmit_document(doc_id: str, file: UploadFile = File(...)):
|
|
"""Resubmit a document for re-analysis"""
|
|
try:
|
|
# Verify original exists
|
|
original = vector_store.get_document(doc_id)
|
|
if not original:
|
|
raise HTTPException(404, "Original document not found")
|
|
|
|
# Process new version
|
|
ext = os.path.splitext(file.filename)[1].lower()
|
|
if ext not in Config.ALLOWED_EXTENSIONS:
|
|
raise HTTPException(400, "Unsupported file type")
|
|
|
|
new_doc_id = str(uuid.uuid4())
|
|
file_path = os.path.join(Config.UPLOAD_FOLDER, f"{new_doc_id}{ext}")
|
|
|
|
os.makedirs(Config.UPLOAD_FOLDER, exist_ok=True)
|
|
|
|
with open(file_path, "wb") as buffer:
|
|
buffer.write(await file.read())
|
|
|
|
text = extract_text(file_path)
|
|
|
|
# Validate extracted text
|
|
if not text or not text.strip():
|
|
raise HTTPException(400, "Could not extract any text from the uploaded file")
|
|
|
|
embedding = embeddings.generate_embeddings(text)
|
|
|
|
# Analyze new version
|
|
analysis = analyze_compliance(text)
|
|
|
|
# Store full analysis in memory/cache
|
|
analysis_storage[new_doc_id] = analysis
|
|
|
|
# Prepare Pinecone-compatible metadata
|
|
pinecone_metadata = prepare_metadata_for_pinecone(analysis, file.filename)
|
|
pinecone_metadata["original_id"] = doc_id
|
|
pinecone_metadata["status"] = "resubmitted"
|
|
|
|
# Store new version
|
|
vector_store.upsert_document(
|
|
doc_id=new_doc_id,
|
|
embedding=embedding,
|
|
metadata=pinecone_metadata
|
|
)
|
|
|
|
# Clean up temp file
|
|
os.remove(file_path)
|
|
|
|
# Return basic info
|
|
return JSONResponse({
|
|
"document_id": new_doc_id,
|
|
"original_id": doc_id,
|
|
"status": "success",
|
|
"message": "Document resubmitted and analyzed successfully",
|
|
"filename": file.filename,
|
|
"total_issues": analysis.get("total_issues", 0),
|
|
"timestamp": analysis.get("timestamp", datetime.now().isoformat())
|
|
})
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
if 'file_path' in locals() and os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
raise HTTPException(500, str(e))
|
|
|
|
|
|
@app.get("/compliance-standards")
|
|
async def get_compliance_standards():
|
|
"""Get list of loaded compliance standards"""
|
|
return JSONResponse({
|
|
"standards": [
|
|
{
|
|
"key": key,
|
|
"filename": data["filename"],
|
|
"sections_count": len(data["sections"])
|
|
}
|
|
for key, data in compliance_loader.compliance_docs.items()
|
|
]
|
|
})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8000)
|