From 932f76b603157d760a6d31b33405febc30da6dd2 Mon Sep 17 00:00:00 2001 From: boladeE Date: Wed, 23 Apr 2025 14:27:15 +0100 Subject: [PATCH] refactor: Transition to SQLAlchemy for database management - Replaced SQLite direct connections with SQLAlchemy ORM for better abstraction and maintainability. - Introduced new database models for 'Analysis' and 'Metadata' with appropriate fields. - Enhanced database initialization and session management. - Updated methods for saving, retrieving, and deleting analysis and metadata records to use SQLAlchemy sessions. --- requirements.txt | 3 +- src/services/database.py | 275 ++++++++++++++++++--------------------- 2 files changed, 130 insertions(+), 148 deletions(-) diff --git a/requirements.txt b/requirements.txt index 36ac719..5653fce 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ pinecone-client==3.0.2 cohere==4.47 groq==0.4.2 python-dotenv==1.0.1 -pydantic==2.6.3 \ No newline at end of file +pydantic==2.6.3 +sqlalchemy==2.0.27 \ No newline at end of file diff --git a/src/services/database.py b/src/services/database.py index 4c468e6..1477200 100644 --- a/src/services/database.py +++ b/src/services/database.py @@ -1,116 +1,98 @@ -import sqlite3 -import json -import logging -from typing import Dict, Any, Optional import os +from datetime import datetime +from typing import Dict, Any, List, Optional +from sqlalchemy import create_engine, Column, String, DateTime, Integer, Boolean, event, text +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker, Session +from sqlalchemy.engine import Engine +import logging +import json + +Base = declarative_base() + +# Enable foreign key support for SQLite +@event.listens_for(Engine, "connect") +def set_sqlite_pragma(dbapi_connection, connection_record): + cursor = dbapi_connection.cursor() + cursor.execute("PRAGMA foreign_keys=ON") + cursor.close() + +class Analysis(Base): + __tablename__ = 'analysis' + + document_id = Column(String, primary_key=True) + summary = Column(String) + issues_and_recommendations = Column(String) + created_at = Column(DateTime, default=datetime.utcnow) + +class Metadata(Base): + __tablename__ = 'metadata' + + document_id = Column(String, primary_key=True) + filename = Column(String) + document_type = Column(String) + description = Column(String, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow) class Database: def __init__(self, db_path: str = "data/app.db"): self.db_path = db_path os.makedirs(os.path.dirname(db_path), exist_ok=True) + # Configure SQLite engine with better defaults + self.engine = create_engine( + f'sqlite:///{db_path}', + connect_args={ + 'check_same_thread': False, # Needed for FastAPI + 'timeout': 30, # Set a reasonable timeout + }, + pool_pre_ping=True, # Check connections before using them + pool_recycle=3600, # Recycle connections after an hour + ) + self.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=self.engine) self._init_db() def _init_db(self): """Initialize the database with required tables.""" try: - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - - # Check if we need to migrate the old schema - cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='analysis'") - table_exists = cursor.fetchone() is not None - - if table_exists: - # Check if we need to migrate - cursor.execute("PRAGMA table_info(analysis)") - columns = [column[1] for column in cursor.fetchall()] - - if 'issues_and_recommendations' not in columns: - # Backup old data - cursor.execute("SELECT document_id, summary, issues, recommendations FROM analysis") - old_data = cursor.fetchall() - - # Drop the old table - cursor.execute("DROP TABLE analysis") - - # Create the new table - cursor.execute(''' - CREATE TABLE analysis ( - document_id TEXT PRIMARY KEY, - summary TEXT, - issues_and_recommendations TEXT, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - ''') - - # Migrate old data to new format - for row in old_data: - doc_id, summary, issues, recommendations = row - try: - old_issues = json.loads(issues) if issues else [] - old_recommendations = json.loads(recommendations) if recommendations else [] - - # Combine issues and recommendations - issues_and_recommendations = [] - for i in range(max(len(old_issues), len(old_recommendations))): - issue = old_issues[i]['issue'] if i < len(old_issues) else "Unknown Issue" - recommendation = old_recommendations[i] if i < len(old_recommendations) else "No recommendation provided" - issues_and_recommendations.append({ - "issue": issue, - "recommendation": recommendation - }) - - cursor.execute(''' - INSERT INTO analysis (document_id, summary, issues_and_recommendations) - VALUES (?, ?, ?) - ''', ( - doc_id, - summary, - json.dumps(issues_and_recommendations) - )) - except Exception as e: - logging.error(f"Error migrating data for document {doc_id}: {str(e)}") - else: - # Create the new table if it doesn't exist - cursor.execute(''' - CREATE TABLE IF NOT EXISTS analysis ( - document_id TEXT PRIMARY KEY, - summary TEXT, - issues_and_recommendations TEXT, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - ''') - - # Create metadata table - cursor.execute(''' - CREATE TABLE IF NOT EXISTS metadata ( - document_id TEXT PRIMARY KEY, - filename TEXT, - document_type TEXT, - description TEXT, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP - ) - ''') - + Base.metadata.create_all(bind=self.engine) + # Set SQLite-specific optimizations + with self.engine.connect() as conn: + conn.execute(text("PRAGMA journal_mode=WAL")) # Write-Ahead Logging + conn.execute(text("PRAGMA synchronous=NORMAL")) # Better performance + conn.execute(text("PRAGMA cache_size=-2000")) # Use 2MB of memory for cache + conn.execute(text("PRAGMA temp_store=MEMORY")) # Store temp tables in memory conn.commit() except Exception as e: logging.error(f"Error initializing database: {str(e)}") raise + def get_db(self): + """Get a database session.""" + db = self.SessionLocal() + try: + yield db + finally: + db.close() + def save_analysis(self, document_id: str, analysis: Dict[str, Any]): """Save analysis results to the database.""" try: - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - cursor.execute(''' - INSERT OR REPLACE INTO analysis (document_id, summary, issues_and_recommendations) - VALUES (?, ?, ?) - ''', ( - document_id, - analysis['summary'], - json.dumps(analysis['issues_and_recommendations']) - )) - conn.commit() + with self.SessionLocal() as session: + # Check if record exists + existing = session.query(Analysis).filter(Analysis.document_id == document_id).first() + if existing: + # Update existing record + existing.summary = analysis['summary'] + existing.issues_and_recommendations = json.dumps(analysis['issues_and_recommendations']) + else: + # Create new record + analysis_record = Analysis( + document_id=document_id, + summary=analysis['summary'], + issues_and_recommendations=json.dumps(analysis['issues_and_recommendations']) + ) + session.add(analysis_record) + session.commit() except Exception as e: logging.error(f"Error saving analysis for document {document_id}: {str(e)}") raise @@ -118,18 +100,15 @@ class Database: def get_analysis(self, document_id: str) -> Dict[str, Any]: """Retrieve analysis results from the database.""" try: - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - cursor.execute('SELECT summary, issues_and_recommendations FROM analysis WHERE document_id = ?', (document_id,)) - result = cursor.fetchone() - - if not result: + with self.SessionLocal() as session: + analysis = session.query(Analysis).filter(Analysis.document_id == document_id).first() + if not analysis: raise FileNotFoundError(f"Analysis not found for document {document_id}") return { - 'document_id': document_id, - 'summary': result[0], - 'issues_and_recommendations': json.loads(result[1]) + 'document_id': analysis.document_id, + 'summary': analysis.summary, + 'issues_and_recommendations': json.loads(analysis.issues_and_recommendations) } except Exception as e: logging.error(f"Error retrieving analysis for document {document_id}: {str(e)}") @@ -138,18 +117,24 @@ class Database: def save_metadata(self, document_id: str, metadata: Dict[str, Any]): """Save document metadata to the database.""" try: - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - cursor.execute(''' - INSERT OR REPLACE INTO metadata (document_id, filename, document_type, description) - VALUES (?, ?, ?, ?) - ''', ( - document_id, - metadata['filename'], - metadata['document_type'], - metadata.get('description') - )) - conn.commit() + with self.SessionLocal() as session: + # Check if record exists + existing = session.query(Metadata).filter(Metadata.document_id == document_id).first() + if existing: + # Update existing record + existing.filename = metadata['filename'] + existing.document_type = metadata['document_type'] + existing.description = metadata.get('description') + else: + # Create new record + metadata_record = Metadata( + document_id=document_id, + filename=metadata['filename'], + document_type=metadata['document_type'], + description=metadata.get('description') + ) + session.add(metadata_record) + session.commit() except Exception as e: logging.error(f"Error saving metadata for document {document_id}: {str(e)}") raise @@ -157,45 +142,42 @@ class Database: def get_metadata(self, document_id: str) -> Dict[str, Any]: """Retrieve document metadata from the database.""" try: - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - cursor.execute('SELECT filename, document_type, description FROM metadata WHERE document_id = ?', (document_id,)) - result = cursor.fetchone() - - if not result: + with self.SessionLocal() as session: + metadata = session.query(Metadata).filter(Metadata.document_id == document_id).first() + if not metadata: raise FileNotFoundError(f"Metadata not found for document {document_id}") return { - 'document_id': document_id, - 'filename': result[0], - 'document_type': result[1], - 'description': result[2] + 'document_id': metadata.document_id, + 'filename': metadata.filename, + 'document_type': metadata.document_type, + 'description': metadata.description } except Exception as e: logging.error(f"Error retrieving metadata for document {document_id}: {str(e)}") raise - def get_all_metadata(self) -> list: + def get_all_metadata(self) -> List[Dict[str, Any]]: """Retrieve metadata for all documents.""" try: - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - cursor.execute(''' - SELECT m.document_id, m.filename, m.document_type, m.description, m.created_at, - CASE WHEN a.document_id IS NOT NULL THEN 1 ELSE 0 END as has_analysis - FROM metadata m - LEFT JOIN analysis a ON m.document_id = a.document_id - ORDER BY m.created_at DESC - ''') - results = cursor.fetchall() + with self.SessionLocal() as session: + results = session.query( + Metadata, + Analysis.document_id.isnot(None).label('has_analysis') + ).outerjoin( + Analysis, + Metadata.document_id == Analysis.document_id + ).order_by( + Metadata.created_at.desc() + ).all() return [{ - 'document_id': row[0], - 'filename': row[1], - 'document_type': row[2], - 'description': row[3], - 'upload_date': row[4], - 'status': 'completed' if row[5] == 1 else 'processing' + 'document_id': row[0].document_id, + 'filename': row[0].filename, + 'document_type': row[0].document_type, + 'description': row[0].description, + 'upload_date': row[0].created_at, + 'status': 'completed' if row[1] else 'processing' } for row in results] except Exception as e: logging.error(f"Error retrieving all metadata: {str(e)}") @@ -204,11 +186,10 @@ class Database: def delete_document(self, document_id: str): """Delete a document and its associated data from the database.""" try: - with sqlite3.connect(self.db_path) as conn: - cursor = conn.cursor() - cursor.execute('DELETE FROM analysis WHERE document_id = ?', (document_id,)) - cursor.execute('DELETE FROM metadata WHERE document_id = ?', (document_id,)) - conn.commit() + with self.SessionLocal() as session: + session.query(Analysis).filter(Analysis.document_id == document_id).delete() + session.query(Metadata).filter(Metadata.document_id == document_id).delete() + session.commit() except Exception as e: logging.error(f"Error deleting document {document_id}: {str(e)}") raise \ No newline at end of file