""" Service for document processing and embedding. """ import os from typing import List, Dict, Any, Optional import pinecone from app.database.db import db from app.models.document import Document, DocumentChunk from app.config.config import Config class DocumentService: """Service for document processing and embedding.""" def __init__(self, config: Config = None): """ Initialize the document service. Args: config: Configuration object. """ self.config = config or Config() self._initialize_pinecone() def _initialize_pinecone(self): """Initialize Pinecone client.""" pinecone.init( api_key=self.config.PINECONE_API_KEY, environment=self.config.PINECONE_ENVIRONMENT ) # Check if index exists, create if it doesn't if self.config.PINECONE_INDEX_NAME not in pinecone.list_indexes(): pinecone.create_index( name=self.config.PINECONE_INDEX_NAME, dimension=768, # Default dimension for sentence-transformers metric="cosine" ) self.index = pinecone.Index(self.config.PINECONE_INDEX_NAME) def create_document(self, title: str, file_path: str, content_type: str, description: Optional[str], user_id: int) -> Document: """ Create a new document record. Args: title: Document title. file_path: Path to the document file. content_type: MIME type of the document. description: Optional description of the document. user_id: ID of the user who uploaded the document. Returns: Created document. """ document = Document( title=title, file_path=file_path, content_type=content_type, description=description, uploaded_by=user_id, status='pending' ) db.session.add(document) db.session.commit() return document def process_document(self, document_id: int) -> bool: """ Process a document for embedding. Args: document_id: ID of the document to process. Returns: True if processing was successful, False otherwise. """ document = Document.query.get(document_id) if not document: return False try: # Update status to processing document.status = 'processing' db.session.commit() # TODO: Implement document parsing and chunking # This will be implemented in the next step # Update status to completed document.status = 'completed' db.session.commit() return True except Exception as e: # Update status to error document.status = 'error' db.session.commit() # Log the error print(f"Error processing document {document_id}: {str(e)}") return False def get_document(self, document_id: int) -> Optional[Document]: """ Get a document by ID. Args: document_id: ID of the document. Returns: Document if found, None otherwise. """ return Document.query.get(document_id) def get_all_documents(self, user_id: Optional[int] = None) -> List[Document]: """ Get all documents, optionally filtered by user. Args: user_id: Optional user ID to filter by. Returns: List of documents. """ query = Document.query if user_id: query = query.filter_by(uploaded_by=user_id) return query.order_by(Document.created_at.desc()).all() def delete_document(self, document_id: int) -> bool: """ Delete a document and its chunks. Args: document_id: ID of the document to delete. Returns: True if deletion was successful, False otherwise. """ document = Document.query.get(document_id) if not document: return False try: # Delete document chunks from Pinecone chunks = DocumentChunk.query.filter_by(document_id=document_id).all() embedding_ids = [chunk.embedding_id for chunk in chunks if chunk.embedding_id] if embedding_ids: self.index.delete(ids=embedding_ids) # Delete document from database db.session.delete(document) db.session.commit() return True except Exception as e: # Log the error print(f"Error deleting document {document_id}: {str(e)}") db.session.rollback() return False