Files
ds_zagres_ai/app/services/document_service.py
T
2025-05-09 15:41:16 +01:00

166 lines
5.0 KiB
Python

"""
Service for document processing and embedding.
"""
import os
from typing import List, Dict, Any, Optional
import pinecone
from app.database.db import db
from app.models.document import Document, DocumentChunk
from app.config.config import Config
class DocumentService:
"""Service for document processing and embedding."""
def __init__(self, config: Config = None):
"""
Initialize the document service.
Args:
config: Configuration object.
"""
self.config = config or Config()
self._initialize_pinecone()
def _initialize_pinecone(self):
"""Initialize Pinecone client."""
pinecone.init(
api_key=self.config.PINECONE_API_KEY,
environment=self.config.PINECONE_ENVIRONMENT
)
# Check if index exists, create if it doesn't
if self.config.PINECONE_INDEX_NAME not in pinecone.list_indexes():
pinecone.create_index(
name=self.config.PINECONE_INDEX_NAME,
dimension=768, # Default dimension for sentence-transformers
metric="cosine"
)
self.index = pinecone.Index(self.config.PINECONE_INDEX_NAME)
def create_document(self, title: str, file_path: str, content_type: str,
description: Optional[str], user_id: int) -> Document:
"""
Create a new document record.
Args:
title: Document title.
file_path: Path to the document file.
content_type: MIME type of the document.
description: Optional description of the document.
user_id: ID of the user who uploaded the document.
Returns:
Created document.
"""
document = Document(
title=title,
file_path=file_path,
content_type=content_type,
description=description,
uploaded_by=user_id,
status='pending'
)
db.session.add(document)
db.session.commit()
return document
def process_document(self, document_id: int) -> bool:
"""
Process a document for embedding.
Args:
document_id: ID of the document to process.
Returns:
True if processing was successful, False otherwise.
"""
document = Document.query.get(document_id)
if not document:
return False
try:
# Update status to processing
document.status = 'processing'
db.session.commit()
# TODO: Implement document parsing and chunking
# This will be implemented in the next step
# Update status to completed
document.status = 'completed'
db.session.commit()
return True
except Exception as e:
# Update status to error
document.status = 'error'
db.session.commit()
# Log the error
print(f"Error processing document {document_id}: {str(e)}")
return False
def get_document(self, document_id: int) -> Optional[Document]:
"""
Get a document by ID.
Args:
document_id: ID of the document.
Returns:
Document if found, None otherwise.
"""
return Document.query.get(document_id)
def get_all_documents(self, user_id: Optional[int] = None) -> List[Document]:
"""
Get all documents, optionally filtered by user.
Args:
user_id: Optional user ID to filter by.
Returns:
List of documents.
"""
query = Document.query
if user_id:
query = query.filter_by(uploaded_by=user_id)
return query.order_by(Document.created_at.desc()).all()
def delete_document(self, document_id: int) -> bool:
"""
Delete a document and its chunks.
Args:
document_id: ID of the document to delete.
Returns:
True if deletion was successful, False otherwise.
"""
document = Document.query.get(document_id)
if not document:
return False
try:
# Delete document chunks from Pinecone
chunks = DocumentChunk.query.filter_by(document_id=document_id).all()
embedding_ids = [chunk.embedding_id for chunk in chunks if chunk.embedding_id]
if embedding_ids:
self.index.delete(ids=embedding_ids)
# Delete document from database
db.session.delete(document)
db.session.commit()
return True
except Exception as e:
# Log the error
print(f"Error deleting document {document_id}: {str(e)}")
db.session.rollback()
return False