feat: Enhance document processing and analysis features

- Added support for processing DOCX files and extracting content.
- Updated database schema to include a combined 'issues_and_recommendations' field.
- Improved error handling during document uploads and analysis.
- Modified the analysis display to show issues and recommendations in a structured format.
- Adjusted API call parameters for better performance and error management.
This commit is contained in:
boladeE
2025-04-22 12:07:11 +01:00
parent b0ec64b883
commit c4145977dd
6 changed files with 352 additions and 129 deletions
+1
View File
@@ -135,6 +135,7 @@ async def get_analysis(request: Request, doc_id: str):
analysis = await document_processor.get_analysis(doc_id)
metadata = database.get_metadata(doc_id)
print(f"analysis: {analysis}")
return templates.TemplateResponse(
"analysis.html",
{
+69 -17
View File
@@ -16,16 +16,70 @@ class Database:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Create analysis table
cursor.execute('''
CREATE TABLE IF NOT EXISTS analysis (
document_id TEXT PRIMARY KEY,
summary TEXT,
issues TEXT,
recommendations TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Check if we need to migrate the old schema
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='analysis'")
table_exists = cursor.fetchone() is not None
if table_exists:
# Check if we need to migrate
cursor.execute("PRAGMA table_info(analysis)")
columns = [column[1] for column in cursor.fetchall()]
if 'issues_and_recommendations' not in columns:
# Backup old data
cursor.execute("SELECT document_id, summary, issues, recommendations FROM analysis")
old_data = cursor.fetchall()
# Drop the old table
cursor.execute("DROP TABLE analysis")
# Create the new table
cursor.execute('''
CREATE TABLE analysis (
document_id TEXT PRIMARY KEY,
summary TEXT,
issues_and_recommendations TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Migrate old data to new format
for row in old_data:
doc_id, summary, issues, recommendations = row
try:
old_issues = json.loads(issues) if issues else []
old_recommendations = json.loads(recommendations) if recommendations else []
# Combine issues and recommendations
issues_and_recommendations = []
for i in range(max(len(old_issues), len(old_recommendations))):
issue = old_issues[i]['issue'] if i < len(old_issues) else "Unknown Issue"
recommendation = old_recommendations[i] if i < len(old_recommendations) else "No recommendation provided"
issues_and_recommendations.append({
"issue": issue,
"recommendation": recommendation
})
cursor.execute('''
INSERT INTO analysis (document_id, summary, issues_and_recommendations)
VALUES (?, ?, ?)
''', (
doc_id,
summary,
json.dumps(issues_and_recommendations)
))
except Exception as e:
logging.error(f"Error migrating data for document {doc_id}: {str(e)}")
else:
# Create the new table if it doesn't exist
cursor.execute('''
CREATE TABLE IF NOT EXISTS analysis (
document_id TEXT PRIMARY KEY,
summary TEXT,
issues_and_recommendations TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create metadata table
cursor.execute('''
@@ -49,13 +103,12 @@ class Database:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO analysis (document_id, summary, issues, recommendations)
VALUES (?, ?, ?, ?)
INSERT OR REPLACE INTO analysis (document_id, summary, issues_and_recommendations)
VALUES (?, ?, ?)
''', (
document_id,
analysis['summary'],
json.dumps(analysis['issues']),
json.dumps(analysis['recommendations'])
json.dumps(analysis['issues_and_recommendations'])
))
conn.commit()
except Exception as e:
@@ -67,7 +120,7 @@ class Database:
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT summary, issues, recommendations FROM analysis WHERE document_id = ?', (document_id,))
cursor.execute('SELECT summary, issues_and_recommendations FROM analysis WHERE document_id = ?', (document_id,))
result = cursor.fetchone()
if not result:
@@ -76,8 +129,7 @@ class Database:
return {
'document_id': document_id,
'summary': result[0],
'issues': json.loads(result[1]),
'recommendations': json.loads(result[2])
'issues_and_recommendations': json.loads(result[1])
}
except Exception as e:
logging.error(f"Error retrieving analysis for document {document_id}: {str(e)}")
+102 -92
View File
@@ -19,15 +19,16 @@ class DocumentProcessor:
self.database = Database()
async def process_document(self, doc_id: str, file_path: str, document_type: str, is_resubmission: bool = False):
try:
# Read document content with error handling for encoding
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except UnicodeDecodeError:
# Try with a different encoding if UTF-8 fails
with open(file_path, 'r', encoding='latin-1') as f:
content = f.read()
import docx
doc = docx.Document(file_path)
content = "\n".join([para.text for para in doc.paragraphs])
except Exception as e:
logging.error(f"Error reading Word document: {str(e)}")
content = ""
logging.info(f"Processing document {doc_id} with content length: {len(content)}")
@@ -54,7 +55,7 @@ class DocumentProcessor:
"content": content
}
],
"max_tokens": 4000
"max_tokens": 1000
}
# Make the API call with error handling
@@ -84,21 +85,22 @@ class DocumentProcessor:
logging.error(f"Error calling DeepSeek API: {str(e)}")
summary = "Document analysis could not be completed due to API connection issues."
# Process with DeepSeek for deep reasoning using URL
# Process with DeepSeek for issues and recommendations
deepseek_payload = {
"model": "deepseek-chat",
"messages": [
{
"role": "system",
"content": "You are an expert in document compliance analysis. Analyze the following document for compliance issues and provide detailed feedback."
"content": f"You are an expert in document compliance analysis for this type of document: {document_type}. Analyze the following document for compliance issues and provide detailed feedback."
},
{
"role": "user",
"content": f"""Analyze this type of document {document_type} for compliance issues and provide detailed feedback:\n\n{content}
and these are the main sections of the document:\n\n{summary}"""
and these are the main sections of the document:\n\n{summary}..
Return the issues and recommendations in a structured format: 'Issue: <issue>. Recommendation: <recommendation>.'"""
}
],
"max_tokens": 4000
"max_tokens": 1000
}
# Make the API call with error handling
@@ -113,39 +115,26 @@ class DocumentProcessor:
# Check if the response is successful
if deepseek_response.status_code != 200:
logging.error(f"DeepSeek API error: {deepseek_response.status_code} - {deepseek_response.text}")
# Use a fallback for issues if the API call fails
issues = ["Document analysis could not be completed due to API limitations."]
# Use a fallback for issues and recommendations if the API call fails
issues_and_recommendations = [{"issue": "Document analysis could not be completed due to API limitations.", "recommendation": "Please try again later."}]
else:
# Try to parse the JSON response
try:
deepseek_result = deepseek_response.json()
issues = self._extract_issues(deepseek_result['choices'][0]['message']['content'])
issues_and_recommendations = self._extract_issues_and_recommendations(deepseek_result['choices'][0]['message']['content'])
except (json.JSONDecodeError, KeyError) as e:
logging.error(f"Error parsing DeepSeek response: {str(e)}")
logging.error(f"Response text: {deepseek_response.text}")
issues = ["Document analysis could not be completed due to parsing errors."]
issues_and_recommendations = [{"issue": "Document analysis could not be completed due to parsing errors.", "recommendation": "Please try again later."}]
except requests.exceptions.RequestException as e:
logging.error(f"Error calling DeepSeek API: {str(e)}")
issues = ["Document analysis could not be completed due to API connection issues."]
# Use Cohere reranker to prioritize issues
try:
reranked_issues = self.cohere_client.rerank(
query="Compliance issues in technical document",
documents=issues,
model=config.COHERE_RERANKER_MODEL
)
except Exception as e:
logging.error(f"Error using Cohere reranker: {str(e)}")
# Create a simple reranked issues list if Cohere fails
reranked_issues = [type('obj', (object,), {'document': issue, 'index': i}) for i, issue in enumerate(issues)]
issues_and_recommendations = [{"issue": "Document analysis could not be completed due to API connection issues.", "recommendation": "Please try again later."}]
# Store analysis results
analysis = {
"document_id": doc_id,
"summary": summary,
"issues": self._format_issues(reranked_issues),
"recommendations": self._generate_recommendations(reranked_issues)
"issues_and_recommendations": issues_and_recommendations
}
# Save analysis to database
@@ -176,73 +165,94 @@ class DocumentProcessor:
async def get_analysis(self, doc_id: str) -> Dict[str, Any]:
return self.database.get_analysis(doc_id)
def _extract_issues(self, deepseek_response: str) -> List[str]:
# Simple extraction of issues from DeepSeek's response
# In a real implementation, this would be more sophisticated
print(deepseek_response)
return [issue.strip() for issue in re.split(r'\d+\.', deepseek_response) if issue.strip()]
def _extract_issues_and_recommendations(self, deepseek_response: str) -> List[Dict[str, str]]:
# Extract issues and recommendations from DeepSeek's response
def _format_issues(self, reranked_issues) -> List[Dict[str, Any]]:
return [
{
"issue": issue[0] if isinstance(issue, tuple) else issue.document,
"severity": "high" if i < 3 else "medium" if i < 6 else "low",
"rank": i + 1
}
for i, issue in enumerate(reranked_issues)
]
issues_and_recommendations = []
def _generate_recommendations(self, reranked_issues) -> List[str]:
# Generate specific recommendations for each issue
recommendations = []
print(f"Generating recommendations for {reranked_issues} issues")
# Extract the results from the RerankResponse object
results = reranked_issues.results if hasattr(reranked_issues, 'results') else reranked_issues
# Split the response into lines
lines = deepseek_response.split('\n')
for issue in results[:5]: # Focus on top 5 issues
recommendation_payload = {
"model": "deepseek-chat",
"messages": [
{
"role": "system",
"content": "You are an expert in document compliance. Provide specific, actionable recommendations to fix compliance issues."
},
{
"role": "user",
"content": f"Provide a specific, actionable recommendation to fix this compliance issue: {issue}"
}
],
"max_tokens": 1000
}
current_issue = None
current_recommendation = None
for i, line in enumerate(lines):
line = line.strip()
# Make the API call with error handling
try:
recommendation_response = requests.post(
self.deepseek_url,
json=recommendation_payload,
headers=self.deepseek_headers,
timeout=60 # Add timeout
)
# Check if this line contains an issue
if '**Issue:**' in line:
# If we already have an issue and recommendation, add them to the list
if current_issue and current_recommendation:
issues_and_recommendations.append({
'issue': current_issue,
'recommendation': current_recommendation
})
# Check if the response is successful
if recommendation_response.status_code != 200:
logging.error(f"DeepSeek API error: {recommendation_response.status_code} - {recommendation_response.text}")
recommendations.append("Recommendation could not be generated due to API limitations.")
else:
# Try to parse the JSON response
try:
recommendation_result = recommendation_response.json()
recommendations.append(recommendation_result['choices'][0]['message']['content'])
except (json.JSONDecodeError, KeyError) as e:
logging.error(f"Error parsing DeepSeek response: {str(e)}")
logging.error(f"Response text: {recommendation_response.text}")
recommendations.append("Recommendation could not be generated due to parsing errors.")
except requests.exceptions.RequestException as e:
logging.error(f"Error calling DeepSeek API: {str(e)}")
recommendations.append("Recommendation could not be generated due to API connection issues.")
return recommendations
# Extract the issue text
issue_text = line.split('**Issue:**')[1].strip()
current_issue = issue_text
current_recommendation = None
# Check if this line contains a recommendation
elif '**Recommendation:**' in line:
# Extract the recommendation text
recommendation_text = line.split('**Recommendation:**')[1].strip()
current_recommendation = recommendation_text
# If we're at the last line and have both issue and recommendation, add them
elif i == len(lines) - 1 and current_issue and current_recommendation:
issues_and_recommendations.append({
'issue': current_issue,
'recommendation': current_recommendation
})
# If we still have an issue and recommendation at the end, add them
if current_issue and current_recommendation:
issues_and_recommendations.append({
'issue': current_issue,
'recommendation': current_recommendation
})
# If no issues were found, try an alternative approach
if not issues_and_recommendations:
# Look for numbered issues in the format "1. **Issue:** ... **Recommendation:** ..."
for line in lines:
if line.strip().startswith(('1.', '2.', '3.', '4.', '5.', '6.', '7.', '8.', '9.')):
if '**Issue:**' in line and '**Recommendation:**' in line:
parts = line.split('**Recommendation:**')
if len(parts) == 2:
issue_part = parts[0]
recommendation_part = parts[1]
# Clean up the issue text
issue = issue_part.split('**Issue:**')[1].strip()
# Clean up the recommendation text
recommendation = recommendation_part.strip()
issues_and_recommendations.append({'issue': issue, 'recommendation': recommendation})
# If still no issues found, try one more approach
if not issues_and_recommendations:
# Look for any line containing both Issue and Recommendation
for line in lines:
if 'Issue:' in line and 'Recommendation:' in line:
parts = line.split('Recommendation:')
if len(parts) == 2:
issue_part = parts[0]
recommendation_part = parts[1]
# Clean up the issue text
issue = issue_part.replace('Issue:', '').strip()
# Clean up the recommendation text
recommendation = recommendation_part.strip()
issues_and_recommendations.append({'issue': issue, 'recommendation': recommendation})
print(f"issues_and_recommendations: {issues_and_recommendations}")
return issues_and_recommendations
def _store_document(self, doc_id: str, file_path: str):
# save document to vector store
self.vector_store.add_document(doc_id, file_path)
+10 -19
View File
@@ -30,25 +30,16 @@
</div>
</div>
<h5 class="card-title">Compliance Issues</h5>
<div class="accordion" id="issuesAccordion">
{% for issue in analysis.issues %}
<div class="accordion-item">
<h2 class="accordion-header" id="heading{{ loop.index }}">
<button class="accordion-button {% if not loop.first %}collapsed{% endif %}" type="button" data-bs-toggle="collapse" data-bs-target="#collapse{{ loop.index }}">
<span class="badge bg-{{ 'danger' if issue.severity == 'high' else 'warning' if issue.severity == 'medium' else 'info' }} me-2">
{{ issue.severity|title }}
</span>
{{ issue.issue }}
</button>
</h2>
<div id="collapse{{ loop.index }}" class="accordion-collapse collapse {% if loop.first %}show{% endif %}" data-bs-parent="#issuesAccordion">
<div class="accordion-body">
<p><strong>Rank:</strong> {{ issue.rank }}</p>
<p><strong>Recommendation:</strong></p>
<div class="alert alert-info markdown-body">
{{ analysis.recommendations[loop.index0]|markdown|safe }}
</div>
<h5 class="card-title">Issues and Recommendations</h5>
<div class="list-group mb-3">
{% for item in analysis.issues_and_recommendations %}
<div class="list-group-item">
<div class="mb-2">
<strong>Issue:</strong>
<p class="mb-3">{{ item.issue }}</p>
<strong>Recommendation:</strong>
<div class="alert alert-info markdown-body mt-2">
{{ item.recommendation|markdown|safe }}
</div>
</div>
</div>
+1 -1
View File
@@ -27,7 +27,7 @@
<div class="mb-3">
<label for="documentFile" class="form-label">Document File</label>
<input class="form-control" type="file" id="documentFile" name="file" required>
<div class="form-text">Supported formats: PDF, DOCX, TXT, MD</div>
<div class="form-text">Supported formats: DOCX</div>
</div>
<div class="d-grid">