from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.responses import JSONResponse from typing import Optional import os import uuid from docx import Document from PyPDF2 import PdfReader import io from datetime import datetime from .config import Config from .embeddings import EmbeddingGenerator from .vector_stores import VectorStore from .compliance_loader import ComplianceLoader import groq import json app = FastAPI(title="Mini SpecsComply Pro") # Initialize components embeddings = EmbeddingGenerator() vector_store = VectorStore() compliance_loader = ComplianceLoader() # Load compliance standards on startup compliance_loader.load_compliance_standards() # Initialize clients groq_client = groq.Client(api_key=Config.GROQ_API_KEY) # In-memory storage for analysis results analysis_storage = {} def save_document(file: UploadFile) -> str: os.makedirs(Config.UPLOAD_FOLDER, exist_ok=True) doc_id = str(uuid.uuid4()) ext = os.path.splitext(file.filename)[1].lower() if ext not in Config.ALLOWED_EXTENSIONS: raise HTTPException(400, "Unsupported file type") file_path = os.path.join(Config.UPLOAD_FOLDER, f"{doc_id}{ext}") with open(file_path, "wb") as f: f.write(file.file.read()) return doc_id, file_path def extract_text(file_path: str) -> str: """Extract text from files""" try: if file_path.endswith('.docx'): doc = Document(file_path) paragraphs = [para.text for para in doc.paragraphs if para.text] return '\n'.join(paragraphs) if paragraphs else "" elif file_path.endswith('.pdf'): with open(file_path, 'rb') as f: reader = PdfReader(f) pages_text = [] for page in reader.pages: page_text = page.extract_text() if page_text: pages_text.append(page_text) return '\n'.join(pages_text) if pages_text else "" elif file_path.endswith('.txt'): with open(file_path, 'r', encoding='utf-8') as f: content = f.read() return content if content else "" else: raise ValueError("Unsupported file type") except Exception as e: raise HTTPException( status_code=400, detail=f"Failed to extract text: {str(e)}" ) def analyze_compliance(text: str) -> dict: """Enhanced compliance analysis using Groq""" try: # Get compliance context compliance_context = compliance_loader.get_compliance_context() # Document parsing and section extraction with Groq groq_parsing_prompt = f"""Extract key sections from this document and identify what type of tender document this appears to be: DOCUMENT TO ANALYZE: {text[:3000]}... Please provide: 1. Document type (e.g., tender response, technical proposal, etc.) 2. Key sections found 3. Main requirements mentioned 4. Document structure analysis Be concise but thorough.""" parsing_response = groq_client.chat.completions.create( messages=[{"role": "user", "content": groq_parsing_prompt}], model=Config.GROQ_MODEL, temperature=0.1 ) # Safe extraction of parsing response document_analysis = "" if parsing_response and parsing_response.choices and len(parsing_response.choices) > 0: if parsing_response.choices[0].message and parsing_response.choices[0].message.content: document_analysis = parsing_response.choices[0].message.content # Comprehensive compliance analysis with Groq groq_compliance_prompt = f"""You are a compliance expert analyzing tender documents. COMPLIANCE STANDARDS TO CHECK AGAINST: {compliance_context[:4000]} DOCUMENT TO ANALYZE: {text[:4000]} Please analyze this document for compliance issues and provide a structured response: 1. COMPLIANCE SUMMARY: Overall compliance status (Compliant/Non-Compliant/Partial) 2. SPECIFIC ISSUES: List specific compliance violations found, including: - Which standard is violated - What is missing or incorrect - Severity (Critical/High/Medium/Low) - Specific location in document if possible 3. REQUIREMENTS CHECK: Verify if the document meets requirements from: - Tender specifications - Supplier qualification requirements - Form of tender requirements - Confidentiality agreement requirements 4. RECOMMENDATIONS: Specific actions to fix each issue 5. MISSING ELEMENTS: What key elements are completely missing Please be detailed and specific in your analysis. Focus on actionable feedback.""" compliance_response = groq_client.chat.completions.create( messages=[{"role": "user", "content": groq_compliance_prompt}], model=Config.GROQ_MODEL, temperature=0.1, max_tokens=4000 ) # Safe extraction of compliance response compliance_analysis = "" if compliance_response and compliance_response.choices and len(compliance_response.choices) > 0: if compliance_response.choices[0].message and compliance_response.choices[0].message.content: compliance_analysis = compliance_response.choices[0].message.content # Extract and structure issues from the compliance analysis # Parse the structured compliance analysis directly issues_list = [] # Extract issues from the numbered list in compliance_analysis if compliance_analysis: lines = compliance_analysis.split('\n') current_issue = None for line in lines: line = line.strip() # Look for numbered issues (1. **Issue name**, 2. **Issue name**, etc.) if line and (line.startswith('1.') or line.startswith('2.') or line.startswith('3.') or line.startswith('4.') or line.startswith('5.') or line.startswith('6.') or line.startswith('7.') or line.startswith('8.') or line.startswith('9.') or line.startswith('10.')): # Extract the issue title from lines if '**' in line: try: # Extract text between ** markers issue_title = line.split('**')[1].strip() if issue_title and len(issue_title) > 3: current_issue = issue_title except IndexError: # Fallback: extract everything after the number issue_title = line.split('.', 1)[1].strip().replace('*', '').strip() if issue_title and len(issue_title) > 3: current_issue = issue_title else: # Extract everything after the number issue_title = line.split('.', 1)[1].strip().replace('*', '').strip() if issue_title and len(issue_title) > 3: current_issue = issue_title # Look for "What's missing or incorrect" to get more details elif current_issue and line.startswith('* What\'s missing or incorrect:'): details = line.replace('* What\'s missing or incorrect:', '').strip() if details and len(details) > 10: # Combine issue title with details for better context full_issue = f"{current_issue}: {details}" issues_list.append(full_issue) current_issue = None # Reset # Fallback elif current_issue and (line.startswith('* Severity:') or line.startswith('* Location:')): if current_issue not in [issue.split(':')[0] for issue in issues_list]: issues_list.append(current_issue) current_issue = None # If no issues found via structured parsing, try fallback extraction if not issues_list and compliance_analysis: # Fallback method: look for bullet points or dashes for line in compliance_analysis.split('\n'): line = line.strip() if line.startswith('- ') or line.startswith('• ') or line.startswith('* '): clean_issue = line[2:].strip() if clean_issue and len(clean_issue) > 10 and not clean_issue.startswith(('Violated', 'What', 'Severity', 'Location')): issues_list.append(clean_issue) # Remove duplicates and filter valid issues seen = set() unique_issues = [] for issue in issues_list: if issue and len(str(issue)) > 10 and issue not in seen: seen.add(issue) unique_issues.append(str(issue)) # Rerank issues by importance using Cohere ranked_issues = [] if unique_issues: try: ranked_issues = embeddings.rerank_issues( issues=unique_issues, query="Most critical compliance violations and missing requirements", top_n=min(10, len(unique_issues)) ) except Exception as e: print(f"Reranking failed: {e}") ranked_issues = unique_issues[:10] # Fallback to first 10 issues if not ranked_issues: # Emergency fallback: extract from compliance_analysis manually fallback_issues = [] if compliance_analysis: for line in compliance_analysis.split('\n'): line = line.strip() if ('missing' in line.lower() or 'violation' in line.lower() or 'non-compliant' in line.lower() or 'issue' in line.lower()) and len(line) > 15: fallback_issues.append(line) ranked_issues = fallback_issues[:5] if fallback_issues else ["No specific issues identified"] return { "document_analysis": document_analysis, "compliance_analysis": compliance_analysis, "issues": ranked_issues, "total_issues": len(ranked_issues), "timestamp": datetime.now().isoformat() } except Exception as e: print(f"Error in analyze_compliance: {e}") import traceback traceback.print_exc() # Return a safe fallback response return { "document_analysis": "Error occurred during document analysis", "compliance_analysis": "Error occurred during compliance analysis", "issues": ["Analysis failed due to technical error"], "total_issues": 1, "timestamp": datetime.now().isoformat() } def prepare_metadata_for_pinecone(analysis: dict, filename: str) -> dict: """Prepare metadata for Pinecone by converting complex objects to strings""" # Safely get issues and filter out None/empty values issues = analysis.get("issues", []) if issues: # Filter out None, empty strings, and ensure all items are strings clean_issues = [str(issue) for issue in issues if issue is not None and str(issue).strip()] issues_str = " | ".join(clean_issues) else: issues_str = "" # Truncate long strings to avoid Pinecone limits def truncate_string(s: str, max_length: int = 30000) -> str: if not s: return "" return s[:max_length] + "..." if len(s) > max_length else s # Get analysis fields with fallbacks document_analysis = analysis.get("document_analysis", "") or "" compliance_analysis = analysis.get("compliance_analysis", "") or "" return { "filename": filename or "unknown", "upload_time": datetime.now().isoformat(), "status": "analyzed", "total_issues": str(analysis.get("total_issues", 0)), "timestamp": analysis.get("timestamp", datetime.now().isoformat()), "issues_summary": truncate_string(issues_str), "document_type": truncate_string(document_analysis[:500]), "compliance_summary": truncate_string(compliance_analysis[:1000]) } @app.get("/") async def root(): return {"message": "Mini SpecsComply Pro API", "status": "running"} @app.get("/document/{doc_id}/analysis") async def get_analysis(doc_id: str): """Get detailed analysis for a specific document""" doc = vector_store.get_document(doc_id) if not doc: raise HTTPException(404, "Document not found") # Get full analysis from storage full_analysis = analysis_storage.get(doc_id, {}) return JSONResponse({ "document_id": doc_id, "metadata": doc.metadata, "analysis": full_analysis }) @app.post("/upload-document") async def upload_document(file: UploadFile = File(...)): """Upload and process a document - returns only basic info, not full analysis""" file_path = None try: # Validate file extension ext = os.path.splitext(file.filename)[1].lower() if ext not in Config.ALLOWED_EXTENSIONS: raise HTTPException(400, "Unsupported file type") # Save the file temporarily doc_id = str(uuid.uuid4()) file_path = os.path.join(Config.UPLOAD_FOLDER, f"{doc_id}{ext}") # Ensure upload directory exists os.makedirs(Config.UPLOAD_FOLDER, exist_ok=True) with open(file_path, "wb") as buffer: buffer.write(await file.read()) # Process the file print(f"Extracting text from {file_path}") text = extract_text(file_path) # Validate extracted text if not text or not text.strip(): raise HTTPException(400, "Could not extract any text from the uploaded file") print(f"Generating embeddings for document {doc_id}") embedding = embeddings.generate_embeddings(text) # Perform compliance analysis print(f"Analyzing compliance for document {doc_id}") analysis = analyze_compliance(text) # Store full analysis in memory/cache print(f"Storing analysis for document {doc_id}") analysis_storage[doc_id] = analysis # Prepare Pinecone-compatible metadata print(f"Preparing metadata for document {doc_id}") pinecone_metadata = prepare_metadata_for_pinecone(analysis, file.filename) # Store in vector DB with simplified metadata print(f"Upserting document {doc_id} to vector store") vector_store.upsert_document( doc_id=doc_id, embedding=embedding, metadata=pinecone_metadata ) # Clean up the temp file if os.path.exists(file_path): os.remove(file_path) # Return only basic info - NOT the full analysis return JSONResponse({ "document_id": doc_id, "status": "success", "message": "Document processed and analyzed successfully", "filename": file.filename, "total_issues": analysis.get("total_issues", 0), "timestamp": analysis.get("timestamp", datetime.now().isoformat()) }) except HTTPException: raise except Exception as e: print(f"Error in upload_document: {e}") print(f"Error type: {type(e)}") import traceback traceback.print_exc() if file_path and os.path.exists(file_path): os.remove(file_path) raise HTTPException(500, f"Document processing failed: {str(e)}") @app.post("/document/{doc_id}/resubmit") async def resubmit_document(doc_id: str, file: UploadFile = File(...)): """Resubmit a document for re-analysis""" try: # Verify original exists original = vector_store.get_document(doc_id) if not original: raise HTTPException(404, "Original document not found") # Process new version ext = os.path.splitext(file.filename)[1].lower() if ext not in Config.ALLOWED_EXTENSIONS: raise HTTPException(400, "Unsupported file type") new_doc_id = str(uuid.uuid4()) file_path = os.path.join(Config.UPLOAD_FOLDER, f"{new_doc_id}{ext}") os.makedirs(Config.UPLOAD_FOLDER, exist_ok=True) with open(file_path, "wb") as buffer: buffer.write(await file.read()) text = extract_text(file_path) # Validate extracted text if not text or not text.strip(): raise HTTPException(400, "Could not extract any text from the uploaded file") embedding = embeddings.generate_embeddings(text) # Analyze new version analysis = analyze_compliance(text) # Store full analysis in memory/cache analysis_storage[new_doc_id] = analysis # Prepare Pinecone-compatible metadata pinecone_metadata = prepare_metadata_for_pinecone(analysis, file.filename) pinecone_metadata["original_id"] = doc_id pinecone_metadata["status"] = "resubmitted" # Store new version vector_store.upsert_document( doc_id=new_doc_id, embedding=embedding, metadata=pinecone_metadata ) # Clean up temp file os.remove(file_path) # Return basic info return JSONResponse({ "document_id": new_doc_id, "original_id": doc_id, "status": "success", "message": "Document resubmitted and analyzed successfully", "filename": file.filename, "total_issues": analysis.get("total_issues", 0), "timestamp": analysis.get("timestamp", datetime.now().isoformat()) }) except HTTPException: raise except Exception as e: if 'file_path' in locals() and os.path.exists(file_path): os.remove(file_path) raise HTTPException(500, str(e)) @app.get("/compliance-standards") async def get_compliance_standards(): """Get list of loaded compliance standards""" return JSONResponse({ "standards": [ { "key": key, "filename": data["filename"], "sections_count": len(data["sections"]) } for key, data in compliance_loader.compliance_docs.items() ] }) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)