2025-04-21 22:49:29 +01:00
|
|
|
import sqlite3
|
|
|
|
|
import json
|
|
|
|
|
import logging
|
|
|
|
|
from typing import Dict, Any, Optional
|
|
|
|
|
import os
|
|
|
|
|
|
|
|
|
|
class Database:
|
|
|
|
|
def __init__(self, db_path: str = "data/app.db"):
|
|
|
|
|
self.db_path = db_path
|
|
|
|
|
os.makedirs(os.path.dirname(db_path), exist_ok=True)
|
|
|
|
|
self._init_db()
|
|
|
|
|
|
|
|
|
|
def _init_db(self):
|
|
|
|
|
"""Initialize the database with required tables."""
|
|
|
|
|
try:
|
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
2025-04-22 12:07:11 +01:00
|
|
|
# Check if we need to migrate the old schema
|
|
|
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='analysis'")
|
|
|
|
|
table_exists = cursor.fetchone() is not None
|
|
|
|
|
|
|
|
|
|
if table_exists:
|
|
|
|
|
# Check if we need to migrate
|
|
|
|
|
cursor.execute("PRAGMA table_info(analysis)")
|
|
|
|
|
columns = [column[1] for column in cursor.fetchall()]
|
|
|
|
|
|
|
|
|
|
if 'issues_and_recommendations' not in columns:
|
|
|
|
|
# Backup old data
|
|
|
|
|
cursor.execute("SELECT document_id, summary, issues, recommendations FROM analysis")
|
|
|
|
|
old_data = cursor.fetchall()
|
|
|
|
|
|
|
|
|
|
# Drop the old table
|
|
|
|
|
cursor.execute("DROP TABLE analysis")
|
|
|
|
|
|
|
|
|
|
# Create the new table
|
|
|
|
|
cursor.execute('''
|
|
|
|
|
CREATE TABLE analysis (
|
|
|
|
|
document_id TEXT PRIMARY KEY,
|
|
|
|
|
summary TEXT,
|
|
|
|
|
issues_and_recommendations TEXT,
|
|
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
|
|
)
|
|
|
|
|
''')
|
|
|
|
|
|
|
|
|
|
# Migrate old data to new format
|
|
|
|
|
for row in old_data:
|
|
|
|
|
doc_id, summary, issues, recommendations = row
|
|
|
|
|
try:
|
|
|
|
|
old_issues = json.loads(issues) if issues else []
|
|
|
|
|
old_recommendations = json.loads(recommendations) if recommendations else []
|
|
|
|
|
|
|
|
|
|
# Combine issues and recommendations
|
|
|
|
|
issues_and_recommendations = []
|
|
|
|
|
for i in range(max(len(old_issues), len(old_recommendations))):
|
|
|
|
|
issue = old_issues[i]['issue'] if i < len(old_issues) else "Unknown Issue"
|
|
|
|
|
recommendation = old_recommendations[i] if i < len(old_recommendations) else "No recommendation provided"
|
|
|
|
|
issues_and_recommendations.append({
|
|
|
|
|
"issue": issue,
|
|
|
|
|
"recommendation": recommendation
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
cursor.execute('''
|
|
|
|
|
INSERT INTO analysis (document_id, summary, issues_and_recommendations)
|
|
|
|
|
VALUES (?, ?, ?)
|
|
|
|
|
''', (
|
|
|
|
|
doc_id,
|
|
|
|
|
summary,
|
|
|
|
|
json.dumps(issues_and_recommendations)
|
|
|
|
|
))
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logging.error(f"Error migrating data for document {doc_id}: {str(e)}")
|
|
|
|
|
else:
|
|
|
|
|
# Create the new table if it doesn't exist
|
|
|
|
|
cursor.execute('''
|
|
|
|
|
CREATE TABLE IF NOT EXISTS analysis (
|
|
|
|
|
document_id TEXT PRIMARY KEY,
|
|
|
|
|
summary TEXT,
|
|
|
|
|
issues_and_recommendations TEXT,
|
|
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
|
|
)
|
|
|
|
|
''')
|
2025-04-21 22:49:29 +01:00
|
|
|
|
|
|
|
|
# Create metadata table
|
|
|
|
|
cursor.execute('''
|
|
|
|
|
CREATE TABLE IF NOT EXISTS metadata (
|
|
|
|
|
document_id TEXT PRIMARY KEY,
|
|
|
|
|
filename TEXT,
|
|
|
|
|
document_type TEXT,
|
|
|
|
|
description TEXT,
|
|
|
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
|
|
|
)
|
|
|
|
|
''')
|
|
|
|
|
|
|
|
|
|
conn.commit()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logging.error(f"Error initializing database: {str(e)}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
def save_analysis(self, document_id: str, analysis: Dict[str, Any]):
|
|
|
|
|
"""Save analysis results to the database."""
|
|
|
|
|
try:
|
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
cursor.execute('''
|
2025-04-22 12:07:11 +01:00
|
|
|
INSERT OR REPLACE INTO analysis (document_id, summary, issues_and_recommendations)
|
|
|
|
|
VALUES (?, ?, ?)
|
2025-04-21 22:49:29 +01:00
|
|
|
''', (
|
|
|
|
|
document_id,
|
|
|
|
|
analysis['summary'],
|
2025-04-22 12:07:11 +01:00
|
|
|
json.dumps(analysis['issues_and_recommendations'])
|
2025-04-21 22:49:29 +01:00
|
|
|
))
|
|
|
|
|
conn.commit()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logging.error(f"Error saving analysis for document {document_id}: {str(e)}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
def get_analysis(self, document_id: str) -> Dict[str, Any]:
|
|
|
|
|
"""Retrieve analysis results from the database."""
|
|
|
|
|
try:
|
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
|
|
|
cursor = conn.cursor()
|
2025-04-22 12:07:11 +01:00
|
|
|
cursor.execute('SELECT summary, issues_and_recommendations FROM analysis WHERE document_id = ?', (document_id,))
|
2025-04-21 22:49:29 +01:00
|
|
|
result = cursor.fetchone()
|
|
|
|
|
|
|
|
|
|
if not result:
|
|
|
|
|
raise FileNotFoundError(f"Analysis not found for document {document_id}")
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
'document_id': document_id,
|
|
|
|
|
'summary': result[0],
|
2025-04-22 12:07:11 +01:00
|
|
|
'issues_and_recommendations': json.loads(result[1])
|
2025-04-21 22:49:29 +01:00
|
|
|
}
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logging.error(f"Error retrieving analysis for document {document_id}: {str(e)}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
def save_metadata(self, document_id: str, metadata: Dict[str, Any]):
|
|
|
|
|
"""Save document metadata to the database."""
|
|
|
|
|
try:
|
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
cursor.execute('''
|
|
|
|
|
INSERT OR REPLACE INTO metadata (document_id, filename, document_type, description)
|
|
|
|
|
VALUES (?, ?, ?, ?)
|
|
|
|
|
''', (
|
|
|
|
|
document_id,
|
|
|
|
|
metadata['filename'],
|
|
|
|
|
metadata['document_type'],
|
|
|
|
|
metadata.get('description')
|
|
|
|
|
))
|
|
|
|
|
conn.commit()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logging.error(f"Error saving metadata for document {document_id}: {str(e)}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
def get_metadata(self, document_id: str) -> Dict[str, Any]:
|
|
|
|
|
"""Retrieve document metadata from the database."""
|
|
|
|
|
try:
|
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
cursor.execute('SELECT filename, document_type, description FROM metadata WHERE document_id = ?', (document_id,))
|
|
|
|
|
result = cursor.fetchone()
|
|
|
|
|
|
|
|
|
|
if not result:
|
|
|
|
|
raise FileNotFoundError(f"Metadata not found for document {document_id}")
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
'document_id': document_id,
|
|
|
|
|
'filename': result[0],
|
|
|
|
|
'document_type': result[1],
|
|
|
|
|
'description': result[2]
|
|
|
|
|
}
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logging.error(f"Error retrieving metadata for document {document_id}: {str(e)}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
def get_all_metadata(self) -> list:
|
|
|
|
|
"""Retrieve metadata for all documents."""
|
|
|
|
|
try:
|
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
cursor.execute('''
|
|
|
|
|
SELECT m.document_id, m.filename, m.document_type, m.description, m.created_at,
|
|
|
|
|
CASE WHEN a.document_id IS NOT NULL THEN 1 ELSE 0 END as has_analysis
|
|
|
|
|
FROM metadata m
|
|
|
|
|
LEFT JOIN analysis a ON m.document_id = a.document_id
|
|
|
|
|
ORDER BY m.created_at DESC
|
|
|
|
|
''')
|
|
|
|
|
results = cursor.fetchall()
|
|
|
|
|
|
|
|
|
|
return [{
|
|
|
|
|
'document_id': row[0],
|
|
|
|
|
'filename': row[1],
|
|
|
|
|
'document_type': row[2],
|
|
|
|
|
'description': row[3],
|
|
|
|
|
'upload_date': row[4],
|
|
|
|
|
'status': 'completed' if row[5] == 1 else 'processing'
|
|
|
|
|
} for row in results]
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logging.error(f"Error retrieving all metadata: {str(e)}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
def delete_document(self, document_id: str):
|
|
|
|
|
"""Delete a document and its associated data from the database."""
|
|
|
|
|
try:
|
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
cursor.execute('DELETE FROM analysis WHERE document_id = ?', (document_id,))
|
|
|
|
|
cursor.execute('DELETE FROM metadata WHERE document_id = ?', (document_id,))
|
|
|
|
|
conn.commit()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logging.error(f"Error deleting document {document_id}: {str(e)}")
|
|
|
|
|
raise
|