update functions

This commit is contained in:
Ayomide
2025-07-14 23:41:31 +01:00
parent 0b5a7218b0
commit 97a3b710c3
7 changed files with 580 additions and 75 deletions
+7
View File
@@ -29,6 +29,13 @@ data/
*.feather
*.pkl
*.pickle
*.pdf
*.docx
*.xlsx
data/uploads/
*.pdf
*.docx
*.xlsx
# Vector database files
*.faiss
+95
View File
@@ -0,0 +1,95 @@
import os
from docx import Document
from typing import Dict, List
from .config import Config
from .embeddings import EmbeddingGenerator
class ComplianceLoader:
def __init__(self):
self.embedding_generator = EmbeddingGenerator()
self.compliance_docs = {}
self.compliance_embeddings = {}
def load_compliance_standards(self, data_folder: str = "data/"):
"""Load all compliance documents and generate embeddings"""
compliance_files = [
"Invitation to Tender.docx",
"Tender Specifications.docx",
"Bill of Quantities.docx",
"Scope of Work.docx",
"Supplier SQualification requirements.docx",
"form of tender.docx",
"confidentiality agreement.docx",
"Project1-FEED CONTRACTOR-MUL-E000-PR-LST-000.docx"
]
for filename in compliance_files:
file_path = os.path.join(data_folder, filename)
if os.path.exists(file_path):
try:
# Extract text from compliance document
doc = Document(file_path)
text = '\n'.join([para.text for para in doc.paragraphs])
# Store text and generate embedding
doc_key = filename.replace('.docx', '').replace(' ', '_').lower()
self.compliance_docs[doc_key] = {
'filename': filename,
'content': text,
'sections': self._extract_sections(text)
}
# Generate embedding for similarity search
self.compliance_embeddings[doc_key] = self.embedding_generator.generate_embeddings(text)
print(f"Loaded compliance standard: {filename}")
except Exception as e:
print(f"Error loading {filename}: {str(e)}")
def _extract_sections(self, text: str) -> List[str]:
"""Extract key sections from compliance documents"""
sections = []
lines = text.split('\n')
current_section = []
for line in lines:
line = line.strip()
if line and (line.isupper() or line.endswith(':') or
any(keyword in line.lower() for keyword in ['requirement', 'specification', 'must', 'shall'])):
if current_section:
sections.append('\n'.join(current_section))
current_section = [line]
elif line:
current_section.append(line)
if current_section:
sections.append('\n'.join(current_section))
return sections
def get_relevant_standards(self, document_embedding: List[float], threshold: float = 0.7) -> List[Dict]:
"""Find relevant compliance standards for a document"""
relevant_standards = []
for doc_key, compliance_embedding in self.compliance_embeddings.items():
# Calculate similarity
relevant_standards.append({
'standard': doc_key,
'filename': self.compliance_docs[doc_key]['filename'],
'content': self.compliance_docs[doc_key]['content'],
'sections': self.compliance_docs[doc_key]['sections']
})
return relevant_standards
def get_compliance_context(self) -> str:
"""Get formatted compliance context for LLM prompts"""
context = "COMPLIANCE STANDARDS:\n\n"
for doc_key, doc_data in self.compliance_docs.items():
context += f"=== {doc_data['filename']} ===\n"
context += f"{doc_data['content'][:1000]}...\n\n"
return context
+3 -2
View File
@@ -12,17 +12,18 @@ class Config:
# Groq
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
GROQ_MODEL = "mixtral-8x7b-32768"
GROQ_MODEL = "llama3-70b-8192"
# Claude
CLAUDE_API_KEY = os.getenv("CLAUDE_API_KEY")
CLAUDE_MODEL = "claude-3-5-sonnet-20240620"
CLAUDE_MODEL = "claude-3-5-sonnet-20241022"
# Vector Store
VECTOR_STORE_TYPE = "pinecone"
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
PINECONE_INDEX = "scp-docs"
PINECONE_ENV = "gcp-starter"
EMBEDDING_DIMENSION = 1024
# Document Processing
MAX_DOC_SIZE = 10 * 1024 * 1024 # 10MB
+6 -2
View File
@@ -10,15 +10,19 @@ class EmbeddingGenerator:
response = self.client.embed(
texts=[text],
model=Config.EMBED_MODEL,
input_type="document"
input_type="search_document"
)
return response.embeddings[0]
def rerank_issues(self, issues: list, query: str, top_n: int = 5):
# Handle empty issues list
if not issues:
return []
response = self.client.rerank(
query=query,
documents=issues,
top_n=top_n,
top_n=min(top_n, len(issues)),
model=Config.RERANK_MODEL
)
return [result.document for result in response.results]
+409 -68
View File
@@ -3,20 +3,32 @@ from fastapi.responses import JSONResponse
from typing import Optional
import os
import uuid
from docx import Document
from PyPDF2 import PdfReader
import io
from datetime import datetime
from .config import Config
from .embeddings import EmbeddingGenerator
from .vector_stores import VectorStore
from .compliance_loader import ComplianceLoader
import groq
import anthropic
import json
app = FastAPI(title="Mini SpecsComply Pro")
# Initialize components
embeddings = EmbeddingGenerator()
vector_store = VectorStore()
compliance_loader = ComplianceLoader()
# Load compliance standards on startup
compliance_loader.load_compliance_standards()
# Initialize clients
groq_client = groq.Client(api_key=Config.GROQ_API_KEY)
claude_client = anthropic.Anthropic(api_key=Config.CLAUDE_API_KEY)
# In-memory storage for analysis results
analysis_storage = {}
def save_document(file: UploadFile) -> str:
@@ -35,87 +47,368 @@ def save_document(file: UploadFile) -> str:
def extract_text(file_path: str) -> str:
pass
"""Extract text from files"""
try:
if file_path.endswith('.docx'):
doc = Document(file_path)
paragraphs = [para.text for para in doc.paragraphs if para.text]
return '\n'.join(paragraphs) if paragraphs else ""
elif file_path.endswith('.pdf'):
with open(file_path, 'rb') as f:
reader = PdfReader(f)
pages_text = []
for page in reader.pages:
page_text = page.extract_text()
if page_text:
pages_text.append(page_text)
return '\n'.join(pages_text) if pages_text else ""
elif file_path.endswith('.txt'):
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return content if content else ""
else:
raise ValueError("Unsupported file type")
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Failed to extract text: {str(e)}"
)
def analyze_compliance(text: str) -> dict:
# Parsing with Groq
groq_response = groq_client.chat.completions.create(
messages=[{"role": "user", "content": f"Extract key sections from this document:\n{text}"}],
model=Config.GROQ_MODEL
)
"""Enhanced compliance analysis using Groq"""
# Reasoning with Claude
claude_response = claude_client.messages.create(
model=Config.CLAUDE_MODEL,
max_tokens=4000,
messages=[
{
"role": "user",
"content": f"Analyze this document for compliance issues:\n{text}"
}
]
)
try:
# Get compliance context
compliance_context = compliance_loader.get_compliance_context()
# Rerank by importance
issues = claude_response.content
ranked_issues = embeddings.rerank_issues(
issues=[issue.text for issue in issues],
query="Most critical compliance issues"
)
# Document parsing and section extraction with Groq
groq_parsing_prompt = f"""Extract key sections from this document and identify what type of tender document this appears to be:
DOCUMENT TO ANALYZE:
{text[:3000]}...
Please provide:
1. Document type (e.g., tender response, technical proposal, etc.)
2. Key sections found
3. Main requirements mentioned
4. Document structure analysis
Be concise but thorough."""
parsing_response = groq_client.chat.completions.create(
messages=[{"role": "user", "content": groq_parsing_prompt}],
model=Config.GROQ_MODEL,
temperature=0.1
)
# Safe extraction of parsing response
document_analysis = ""
if parsing_response and parsing_response.choices and len(parsing_response.choices) > 0:
if parsing_response.choices[0].message and parsing_response.choices[0].message.content:
document_analysis = parsing_response.choices[0].message.content
# Comprehensive compliance analysis with Groq
groq_compliance_prompt = f"""You are a compliance expert analyzing tender documents.
COMPLIANCE STANDARDS TO CHECK AGAINST:
{compliance_context[:4000]}
DOCUMENT TO ANALYZE:
{text[:4000]}
Please analyze this document for compliance issues and provide a structured response:
1. COMPLIANCE SUMMARY: Overall compliance status (Compliant/Non-Compliant/Partial)
2. SPECIFIC ISSUES: List specific compliance violations found, including:
- Which standard is violated
- What is missing or incorrect
- Severity (Critical/High/Medium/Low)
- Specific location in document if possible
3. REQUIREMENTS CHECK: Verify if the document meets requirements from:
- Tender specifications
- Supplier qualification requirements
- Form of tender requirements
- Confidentiality agreement requirements
4. RECOMMENDATIONS: Specific actions to fix each issue
5. MISSING ELEMENTS: What key elements are completely missing
Please be detailed and specific in your analysis. Focus on actionable feedback."""
compliance_response = groq_client.chat.completions.create(
messages=[{"role": "user", "content": groq_compliance_prompt}],
model=Config.GROQ_MODEL,
temperature=0.1,
max_tokens=4000
)
# Safe extraction of compliance response
compliance_analysis = ""
if compliance_response and compliance_response.choices and len(compliance_response.choices) > 0:
if compliance_response.choices[0].message and compliance_response.choices[0].message.content:
compliance_analysis = compliance_response.choices[0].message.content
# Extract and structure issues from the compliance analysis
# Parse the structured compliance analysis directly
issues_list = []
# Extract issues from the numbered list in compliance_analysis
if compliance_analysis:
lines = compliance_analysis.split('\n')
current_issue = None
for line in lines:
line = line.strip()
# Look for numbered issues (1. **Issue name**, 2. **Issue name**, etc.)
if line and (line.startswith('1.') or line.startswith('2.') or line.startswith('3.') or
line.startswith('4.') or line.startswith('5.') or line.startswith('6.') or
line.startswith('7.') or line.startswith('8.') or line.startswith('9.') or
line.startswith('10.')):
# Extract the issue title from lines
if '**' in line:
try:
# Extract text between ** markers
issue_title = line.split('**')[1].strip()
if issue_title and len(issue_title) > 3:
current_issue = issue_title
except IndexError:
# Fallback: extract everything after the number
issue_title = line.split('.', 1)[1].strip().replace('*', '').strip()
if issue_title and len(issue_title) > 3:
current_issue = issue_title
else:
# Extract everything after the number
issue_title = line.split('.', 1)[1].strip().replace('*', '').strip()
if issue_title and len(issue_title) > 3:
current_issue = issue_title
# Look for "What's missing or incorrect" to get more details
elif current_issue and line.startswith('* What\'s missing or incorrect:'):
details = line.replace('* What\'s missing or incorrect:', '').strip()
if details and len(details) > 10:
# Combine issue title with details for better context
full_issue = f"{current_issue}: {details}"
issues_list.append(full_issue)
current_issue = None # Reset
# Fallback
elif current_issue and (line.startswith('* Severity:') or line.startswith('* Location:')):
if current_issue not in [issue.split(':')[0] for issue in issues_list]:
issues_list.append(current_issue)
current_issue = None
# If no issues found via structured parsing, try fallback extraction
if not issues_list and compliance_analysis:
# Fallback method: look for bullet points or dashes
for line in compliance_analysis.split('\n'):
line = line.strip()
if line.startswith('- ') or line.startswith('') or line.startswith('* '):
clean_issue = line[2:].strip()
if clean_issue and len(clean_issue) > 10 and not clean_issue.startswith(('Violated', 'What', 'Severity', 'Location')):
issues_list.append(clean_issue)
# Remove duplicates and filter valid issues
seen = set()
unique_issues = []
for issue in issues_list:
if issue and len(str(issue)) > 10 and issue not in seen:
seen.add(issue)
unique_issues.append(str(issue))
# Rerank issues by importance using Cohere
ranked_issues = []
if unique_issues:
try:
ranked_issues = embeddings.rerank_issues(
issues=unique_issues,
query="Most critical compliance violations and missing requirements",
top_n=min(10, len(unique_issues))
)
except Exception as e:
print(f"Reranking failed: {e}")
ranked_issues = unique_issues[:10] # Fallback to first 10 issues
if not ranked_issues:
# Emergency fallback: extract from compliance_analysis manually
fallback_issues = []
if compliance_analysis:
for line in compliance_analysis.split('\n'):
line = line.strip()
if ('missing' in line.lower() or 'violation' in line.lower() or
'non-compliant' in line.lower() or 'issue' in line.lower()) and len(line) > 15:
fallback_issues.append(line)
ranked_issues = fallback_issues[:5] if fallback_issues else ["No specific issues identified"]
return {
"document_analysis": document_analysis,
"compliance_analysis": compliance_analysis,
"issues": ranked_issues,
"total_issues": len(ranked_issues),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
print(f"Error in analyze_compliance: {e}")
import traceback
traceback.print_exc()
# Return a safe fallback response
return {
"document_analysis": "Error occurred during document analysis",
"compliance_analysis": "Error occurred during compliance analysis",
"issues": ["Analysis failed due to technical error"],
"total_issues": 1,
"timestamp": datetime.now().isoformat()
}
def prepare_metadata_for_pinecone(analysis: dict, filename: str) -> dict:
"""Prepare metadata for Pinecone by converting complex objects to strings"""
# Safely get issues and filter out None/empty values
issues = analysis.get("issues", [])
if issues:
# Filter out None, empty strings, and ensure all items are strings
clean_issues = [str(issue) for issue in issues if issue is not None and str(issue).strip()]
issues_str = " | ".join(clean_issues)
else:
issues_str = ""
# Truncate long strings to avoid Pinecone limits
def truncate_string(s: str, max_length: int = 30000) -> str:
if not s:
return ""
return s[:max_length] + "..." if len(s) > max_length else s
# Get analysis fields with fallbacks
document_analysis = analysis.get("document_analysis", "") or ""
compliance_analysis = analysis.get("compliance_analysis", "") or ""
return {
"summary": groq_response.choices[0].message.content,
"issues": ranked_issues,
"timestamp": datetime.now().isoformat()
"filename": filename or "unknown",
"upload_time": datetime.now().isoformat(),
"status": "analyzed",
"total_issues": str(analysis.get("total_issues", 0)),
"timestamp": analysis.get("timestamp", datetime.now().isoformat()),
"issues_summary": truncate_string(issues_str),
"document_type": truncate_string(document_analysis[:500]),
"compliance_summary": truncate_string(compliance_analysis[:1000])
}
@app.post("/upload-document")
async def upload_document(file: UploadFile = File(...)):
try:
doc_id, file_path = save_document(file)
text = extract_text(file_path)
embedding = embeddings.generate_embeddings(text)
# Store in vector DB
vector_store.upsert_document(
doc_id=doc_id,
embedding=embedding,
metadata={
"filename": file.filename,
"upload_time": datetime.now().isoformat(),
"status": "pending"
}
)
# Start analysis
analysis = analyze_compliance(text)
return JSONResponse({
"document_id": doc_id,
"status": "analysis_complete",
"analysis": analysis
})
except Exception as e:
raise HTTPException(500, str(e))
@app.get("/")
async def root():
return {"message": "Mini SpecsComply Pro API", "status": "running"}
@app.get("/document/{doc_id}/analysis")
async def get_analysis(doc_id: str):
"""Get detailed analysis for a specific document"""
doc = vector_store.get_document(doc_id)
if not doc:
raise HTTPException(404, "Document not found")
# Get full analysis from storage
full_analysis = analysis_storage.get(doc_id, {})
return JSONResponse({
"document_id": doc_id,
"metadata": doc.metadata,
"analysis": doc.metadata.get("analysis", {})
"analysis": full_analysis
})
@app.post("/upload-document")
async def upload_document(file: UploadFile = File(...)):
"""Upload and process a document - returns only basic info, not full analysis"""
file_path = None
try:
# Validate file extension
ext = os.path.splitext(file.filename)[1].lower()
if ext not in Config.ALLOWED_EXTENSIONS:
raise HTTPException(400, "Unsupported file type")
# Save the file temporarily
doc_id = str(uuid.uuid4())
file_path = os.path.join(Config.UPLOAD_FOLDER, f"{doc_id}{ext}")
# Ensure upload directory exists
os.makedirs(Config.UPLOAD_FOLDER, exist_ok=True)
with open(file_path, "wb") as buffer:
buffer.write(await file.read())
# Process the file
print(f"Extracting text from {file_path}")
text = extract_text(file_path)
# Validate extracted text
if not text or not text.strip():
raise HTTPException(400, "Could not extract any text from the uploaded file")
print(f"Generating embeddings for document {doc_id}")
embedding = embeddings.generate_embeddings(text)
# Perform compliance analysis
print(f"Analyzing compliance for document {doc_id}")
analysis = analyze_compliance(text)
# Store full analysis in memory/cache
print(f"Storing analysis for document {doc_id}")
analysis_storage[doc_id] = analysis
# Prepare Pinecone-compatible metadata
print(f"Preparing metadata for document {doc_id}")
pinecone_metadata = prepare_metadata_for_pinecone(analysis, file.filename)
# Store in vector DB with simplified metadata
print(f"Upserting document {doc_id} to vector store")
vector_store.upsert_document(
doc_id=doc_id,
embedding=embedding,
metadata=pinecone_metadata
)
# Clean up the temp file
if os.path.exists(file_path):
os.remove(file_path)
# Return only basic info - NOT the full analysis
return JSONResponse({
"document_id": doc_id,
"status": "success",
"message": "Document processed and analyzed successfully",
"filename": file.filename,
"total_issues": analysis.get("total_issues", 0),
"timestamp": analysis.get("timestamp", datetime.now().isoformat())
})
except HTTPException:
raise
except Exception as e:
print(f"Error in upload_document: {e}")
print(f"Error type: {type(e)}")
import traceback
traceback.print_exc()
if file_path and os.path.exists(file_path):
os.remove(file_path)
raise HTTPException(500, f"Document processing failed: {str(e)}")
@app.post("/document/{doc_id}/resubmit")
async def resubmit_document(doc_id: str, file: UploadFile = File(...)):
"""Resubmit a document for re-analysis"""
try:
# Verify original exists
original = vector_store.get_document(doc_id)
@@ -123,33 +416,81 @@ async def resubmit_document(doc_id: str, file: UploadFile = File(...)):
raise HTTPException(404, "Original document not found")
# Process new version
new_doc_id, file_path = save_document(file)
ext = os.path.splitext(file.filename)[1].lower()
if ext not in Config.ALLOWED_EXTENSIONS:
raise HTTPException(400, "Unsupported file type")
new_doc_id = str(uuid.uuid4())
file_path = os.path.join(Config.UPLOAD_FOLDER, f"{new_doc_id}{ext}")
os.makedirs(Config.UPLOAD_FOLDER, exist_ok=True)
with open(file_path, "wb") as buffer:
buffer.write(await file.read())
text = extract_text(file_path)
# Validate extracted text
if not text or not text.strip():
raise HTTPException(400, "Could not extract any text from the uploaded file")
embedding = embeddings.generate_embeddings(text)
# Analyze new version
analysis = analyze_compliance(text)
# Store full analysis in memory/cache
analysis_storage[new_doc_id] = analysis
# Prepare Pinecone-compatible metadata
pinecone_metadata = prepare_metadata_for_pinecone(analysis, file.filename)
pinecone_metadata["original_id"] = doc_id
pinecone_metadata["status"] = "resubmitted"
# Store new version
vector_store.upsert_document(
doc_id=new_doc_id,
embedding=embedding,
metadata={
"filename": file.filename,
"upload_time": datetime.now().isoformat(),
"status": "resubmitted",
"original_id": doc_id
}
metadata=pinecone_metadata
)
# Analyze new version
analysis = analyze_compliance(text)
# Clean up temp file
os.remove(file_path)
# Return basic info
return JSONResponse({
"document_id": new_doc_id,
"status": "analysis_complete",
"analysis": analysis
"original_id": doc_id,
"status": "success",
"message": "Document resubmitted and analyzed successfully",
"filename": file.filename,
"total_issues": analysis.get("total_issues", 0),
"timestamp": analysis.get("timestamp", datetime.now().isoformat())
})
except HTTPException:
raise
except Exception as e:
if 'file_path' in locals() and os.path.exists(file_path):
os.remove(file_path)
raise HTTPException(500, str(e))
@app.get("/compliance-standards")
async def get_compliance_standards():
"""Get list of loaded compliance standards"""
return JSONResponse({
"standards": [
{
"key": key,
"filename": data["filename"],
"sections_count": len(data["sections"])
}
for key, data in compliance_loader.compliance_docs.items()
]
})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
+19
View File
@@ -0,0 +1,19 @@
import anthropic
import os
from dotenv import load_dotenv
load_dotenv()
client = anthropic.Anthropic(api_key=os.getenv("CLAUDE_API_KEY"))
print("API Key loaded:", os.getenv("CLAUDE_API_KEY")[:20] + "..." if os.getenv("CLAUDE_API_KEY") else "NOT FOUND")
# Test the API
try:
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=100,
messages=[{"role": "user", "content": "Hello"}]
)
print("API test successful!")
except Exception as e:
print(f"API test failed: {e}")
+39 -1
View File
@@ -1,12 +1,50 @@
from .config import Config
from pinecone import Pinecone
from pinecone import Pinecone, ServerlessSpec
from typing import List, Optional
import time
class VectorStore:
def __init__(self):
if Config.VECTOR_STORE_TYPE == "pinecone":
self.pc = Pinecone(api_key=Config.PINECONE_API_KEY)
# Free tier supported regions
FREE_TIER_SUPPORTED_REGIONS = {
'aws': 'us-east-1',
'gcp': 'us-central1'
}
# Check if index exists
if Config.PINECONE_INDEX not in self.pc.list_indexes().names():
print(f"Creating new Pinecone index: {Config.PINECONE_INDEX}")
try:
# First try AWS free tier region
self.pc.create_index(
name=Config.PINECONE_INDEX,
dimension=1024, # Cohere embed-english-v3.0 dimension
metric="cosine",
spec=ServerlessSpec(
cloud="aws",
region=FREE_TIER_SUPPORTED_REGIONS['aws']
)
)
except Exception as e:
print(f"AWS region failed, trying GCP: {str(e)}")
# Fallback to GCP if AWS fails
self.pc.create_index(
name=Config.PINECONE_INDEX,
dimension=1024,
metric="cosine",
spec=ServerlessSpec(
cloud="gcp",
region=FREE_TIER_SUPPORTED_REGIONS['gcp']
)
)
# Wait for index to initialize
time.sleep(1)
self.index = self.pc.Index(Config.PINECONE_INDEX)
def upsert_document(self, doc_id: str, embedding: List[float], metadata: dict):