Files
ds_task_scp_bolade/src/services/database.py
T

162 lines
6.6 KiB
Python
Raw Normal View History

import sqlite3
import json
import logging
from typing import Dict, Any, Optional
import os
class Database:
def __init__(self, db_path: str = "data/app.db"):
self.db_path = db_path
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self._init_db()
def _init_db(self):
"""Initialize the database with required tables."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Create analysis table
cursor.execute('''
CREATE TABLE IF NOT EXISTS analysis (
document_id TEXT PRIMARY KEY,
summary TEXT,
issues TEXT,
recommendations TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create metadata table
cursor.execute('''
CREATE TABLE IF NOT EXISTS metadata (
document_id TEXT PRIMARY KEY,
filename TEXT,
document_type TEXT,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
except Exception as e:
logging.error(f"Error initializing database: {str(e)}")
raise
def save_analysis(self, document_id: str, analysis: Dict[str, Any]):
"""Save analysis results to the database."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO analysis (document_id, summary, issues, recommendations)
VALUES (?, ?, ?, ?)
''', (
document_id,
analysis['summary'],
json.dumps(analysis['issues']),
json.dumps(analysis['recommendations'])
))
conn.commit()
except Exception as e:
logging.error(f"Error saving analysis for document {document_id}: {str(e)}")
raise
def get_analysis(self, document_id: str) -> Dict[str, Any]:
"""Retrieve analysis results from the database."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT summary, issues, recommendations FROM analysis WHERE document_id = ?', (document_id,))
result = cursor.fetchone()
if not result:
raise FileNotFoundError(f"Analysis not found for document {document_id}")
return {
'document_id': document_id,
'summary': result[0],
'issues': json.loads(result[1]),
'recommendations': json.loads(result[2])
}
except Exception as e:
logging.error(f"Error retrieving analysis for document {document_id}: {str(e)}")
raise
def save_metadata(self, document_id: str, metadata: Dict[str, Any]):
"""Save document metadata to the database."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO metadata (document_id, filename, document_type, description)
VALUES (?, ?, ?, ?)
''', (
document_id,
metadata['filename'],
metadata['document_type'],
metadata.get('description')
))
conn.commit()
except Exception as e:
logging.error(f"Error saving metadata for document {document_id}: {str(e)}")
raise
def get_metadata(self, document_id: str) -> Dict[str, Any]:
"""Retrieve document metadata from the database."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('SELECT filename, document_type, description FROM metadata WHERE document_id = ?', (document_id,))
result = cursor.fetchone()
if not result:
raise FileNotFoundError(f"Metadata not found for document {document_id}")
return {
'document_id': document_id,
'filename': result[0],
'document_type': result[1],
'description': result[2]
}
except Exception as e:
logging.error(f"Error retrieving metadata for document {document_id}: {str(e)}")
raise
def get_all_metadata(self) -> list:
"""Retrieve metadata for all documents."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT m.document_id, m.filename, m.document_type, m.description, m.created_at,
CASE WHEN a.document_id IS NOT NULL THEN 1 ELSE 0 END as has_analysis
FROM metadata m
LEFT JOIN analysis a ON m.document_id = a.document_id
ORDER BY m.created_at DESC
''')
results = cursor.fetchall()
return [{
'document_id': row[0],
'filename': row[1],
'document_type': row[2],
'description': row[3],
'upload_date': row[4],
'status': 'completed' if row[5] == 1 else 'processing'
} for row in results]
except Exception as e:
logging.error(f"Error retrieving all metadata: {str(e)}")
raise
def delete_document(self, document_id: str):
"""Delete a document and its associated data from the database."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute('DELETE FROM analysis WHERE document_id = ?', (document_id,))
cursor.execute('DELETE FROM metadata WHERE document_id = ?', (document_id,))
conn.commit()
except Exception as e:
logging.error(f"Error deleting document {document_id}: {str(e)}")
raise