Files
ds_task_scp_bolade/src/services/database.py
T
boladeE 932f76b603 refactor: Transition to SQLAlchemy for database management
- Replaced SQLite direct connections with SQLAlchemy ORM for better abstraction and maintainability.
- Introduced new database models for 'Analysis' and 'Metadata' with appropriate fields.
- Enhanced database initialization and session management.
- Updated methods for saving, retrieving, and deleting analysis and metadata records to use SQLAlchemy sessions.
2025-04-23 14:27:15 +01:00

195 lines
8.2 KiB
Python

import os
from datetime import datetime
from typing import Dict, Any, List, Optional
from sqlalchemy import create_engine, Column, String, DateTime, Integer, Boolean, event, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.engine import Engine
import logging
import json
Base = declarative_base()
# Enable foreign key support for SQLite
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
class Analysis(Base):
__tablename__ = 'analysis'
document_id = Column(String, primary_key=True)
summary = Column(String)
issues_and_recommendations = Column(String)
created_at = Column(DateTime, default=datetime.utcnow)
class Metadata(Base):
__tablename__ = 'metadata'
document_id = Column(String, primary_key=True)
filename = Column(String)
document_type = Column(String)
description = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
class Database:
def __init__(self, db_path: str = "data/app.db"):
self.db_path = db_path
os.makedirs(os.path.dirname(db_path), exist_ok=True)
# Configure SQLite engine with better defaults
self.engine = create_engine(
f'sqlite:///{db_path}',
connect_args={
'check_same_thread': False, # Needed for FastAPI
'timeout': 30, # Set a reasonable timeout
},
pool_pre_ping=True, # Check connections before using them
pool_recycle=3600, # Recycle connections after an hour
)
self.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=self.engine)
self._init_db()
def _init_db(self):
"""Initialize the database with required tables."""
try:
Base.metadata.create_all(bind=self.engine)
# Set SQLite-specific optimizations
with self.engine.connect() as conn:
conn.execute(text("PRAGMA journal_mode=WAL")) # Write-Ahead Logging
conn.execute(text("PRAGMA synchronous=NORMAL")) # Better performance
conn.execute(text("PRAGMA cache_size=-2000")) # Use 2MB of memory for cache
conn.execute(text("PRAGMA temp_store=MEMORY")) # Store temp tables in memory
conn.commit()
except Exception as e:
logging.error(f"Error initializing database: {str(e)}")
raise
def get_db(self):
"""Get a database session."""
db = self.SessionLocal()
try:
yield db
finally:
db.close()
def save_analysis(self, document_id: str, analysis: Dict[str, Any]):
"""Save analysis results to the database."""
try:
with self.SessionLocal() as session:
# Check if record exists
existing = session.query(Analysis).filter(Analysis.document_id == document_id).first()
if existing:
# Update existing record
existing.summary = analysis['summary']
existing.issues_and_recommendations = json.dumps(analysis['issues_and_recommendations'])
else:
# Create new record
analysis_record = Analysis(
document_id=document_id,
summary=analysis['summary'],
issues_and_recommendations=json.dumps(analysis['issues_and_recommendations'])
)
session.add(analysis_record)
session.commit()
except Exception as e:
logging.error(f"Error saving analysis for document {document_id}: {str(e)}")
raise
def get_analysis(self, document_id: str) -> Dict[str, Any]:
"""Retrieve analysis results from the database."""
try:
with self.SessionLocal() as session:
analysis = session.query(Analysis).filter(Analysis.document_id == document_id).first()
if not analysis:
raise FileNotFoundError(f"Analysis not found for document {document_id}")
return {
'document_id': analysis.document_id,
'summary': analysis.summary,
'issues_and_recommendations': json.loads(analysis.issues_and_recommendations)
}
except Exception as e:
logging.error(f"Error retrieving analysis for document {document_id}: {str(e)}")
raise
def save_metadata(self, document_id: str, metadata: Dict[str, Any]):
"""Save document metadata to the database."""
try:
with self.SessionLocal() as session:
# Check if record exists
existing = session.query(Metadata).filter(Metadata.document_id == document_id).first()
if existing:
# Update existing record
existing.filename = metadata['filename']
existing.document_type = metadata['document_type']
existing.description = metadata.get('description')
else:
# Create new record
metadata_record = Metadata(
document_id=document_id,
filename=metadata['filename'],
document_type=metadata['document_type'],
description=metadata.get('description')
)
session.add(metadata_record)
session.commit()
except Exception as e:
logging.error(f"Error saving metadata for document {document_id}: {str(e)}")
raise
def get_metadata(self, document_id: str) -> Dict[str, Any]:
"""Retrieve document metadata from the database."""
try:
with self.SessionLocal() as session:
metadata = session.query(Metadata).filter(Metadata.document_id == document_id).first()
if not metadata:
raise FileNotFoundError(f"Metadata not found for document {document_id}")
return {
'document_id': metadata.document_id,
'filename': metadata.filename,
'document_type': metadata.document_type,
'description': metadata.description
}
except Exception as e:
logging.error(f"Error retrieving metadata for document {document_id}: {str(e)}")
raise
def get_all_metadata(self) -> List[Dict[str, Any]]:
"""Retrieve metadata for all documents."""
try:
with self.SessionLocal() as session:
results = session.query(
Metadata,
Analysis.document_id.isnot(None).label('has_analysis')
).outerjoin(
Analysis,
Metadata.document_id == Analysis.document_id
).order_by(
Metadata.created_at.desc()
).all()
return [{
'document_id': row[0].document_id,
'filename': row[0].filename,
'document_type': row[0].document_type,
'description': row[0].description,
'upload_date': row[0].created_at,
'status': 'completed' if row[1] else 'processing'
} for row in results]
except Exception as e:
logging.error(f"Error retrieving all metadata: {str(e)}")
raise
def delete_document(self, document_id: str):
"""Delete a document and its associated data from the database."""
try:
with self.SessionLocal() as session:
session.query(Analysis).filter(Analysis.document_id == document_id).delete()
session.query(Metadata).filter(Metadata.document_id == document_id).delete()
session.commit()
except Exception as e:
logging.error(f"Error deleting document {document_id}: {str(e)}")
raise