""" Service for document processing and chunking. """ import os import json import uuid import requests import base64 from typing import List, Dict, Any, Optional from langchain_text_splitters import RecursiveCharacterTextSplitter from ai_service.config import config class DocumentService: """Service for document processing and chunking.""" def __init__(self): """Initialize the document service.""" self.chunk_size = config.CHUNK_SIZE self.chunk_overlap = config.CHUNK_OVERLAP self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap, length_function=len ) # OpenWebUI configuration self.openwebui_url = config.OPENWEBUI_URL self.openwebui_api_key = config.OPENWEBUI_API_KEY # Ensure data directory exists os.makedirs('ai_service/data', exist_ok=True) # For now, we'll store document metadata in a simple JSON file self.metadata_file = 'ai_service/data/document_metadata.json' self._load_metadata() def _load_metadata(self): """Load document metadata from file.""" if os.path.exists(self.metadata_file): try: with open(self.metadata_file, 'r') as f: self.documents = json.load(f) except Exception as e: print(f"Error loading document metadata: {str(e)}") self.documents = {} else: self.documents = {} def _save_metadata(self): """Save document metadata to file.""" try: with open(self.metadata_file, 'w') as f: json.dump(self.documents, f, indent=2) except Exception as e: print(f"Error saving document metadata: {str(e)}") def process_document(self, content: str, title: str, description: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> str: """ Process a document for embedding. Args: content: Document content. title: Document title. description: Optional document description. metadata: Optional additional metadata. Returns: Document ID. """ # Generate a unique ID for the document doc_id = str(uuid.uuid4()) # Upload the document to OpenWebUI for RAG processing try: # Prepare headers headers = {"Content-Type": "application/json"} if self.openwebui_api_key: headers["Authorization"] = f"Bearer {self.openwebui_api_key}" # Prepare the document data document_data = { "filename": f"{title}.txt", "content": base64.b64encode(content.encode('utf-8')).decode('utf-8'), "description": description or title } # Upload to OpenWebUI response = requests.post( f"{self.openwebui_url}/api/knowledge/upload", headers=headers, json=document_data, timeout=60 ) response.raise_for_status() result = response.json() # Get the OpenWebUI document ID openwebui_doc_id = result.get('id', '') # Store document metadata self.documents[doc_id] = { 'id': doc_id, 'title': title, 'description': description or '', 'openwebui_id': openwebui_doc_id, 'metadata': metadata or {} } # Save metadata to file self._save_metadata() return doc_id except Exception as e: print(f"Error uploading document to OpenWebUI: {str(e)}") # Fall back to local processing if OpenWebUI upload fails print("Falling back to local document processing") # Split the document into chunks for local reference chunks = self.text_splitter.split_text(content) # Store document metadata self.documents[doc_id] = { 'id': doc_id, 'title': title, 'description': description or '', 'chunk_count': len(chunks), 'openwebui_upload_failed': True, 'metadata': metadata or {} } # Save metadata to file self._save_metadata() return doc_id def get_document(self, doc_id: str) -> Optional[Dict[str, Any]]: """ Get document metadata. Args: doc_id: Document ID. Returns: Document metadata if found, None otherwise. """ return self.documents.get(doc_id) def get_all_documents(self) -> List[Dict[str, Any]]: """ Get all document metadata. Returns: List of document metadata. """ # Get documents from local storage local_documents = list(self.documents.values()) # Try to get documents from OpenWebUI as well try: # Prepare headers headers = {"Content-Type": "application/json"} if self.openwebui_api_key: headers["Authorization"] = f"Bearer {self.openwebui_api_key}" # Get documents from OpenWebUI response = requests.get( f"{self.openwebui_url}/api/knowledge", headers=headers, timeout=30 ) if response.status_code == 200: openwebui_docs = response.json() # Update local documents with OpenWebUI information for doc in local_documents: if 'openwebui_id' in doc: for openwebui_doc in openwebui_docs: if openwebui_doc.get('id') == doc['openwebui_id']: doc['openwebui_status'] = 'active' doc['openwebui_info'] = openwebui_doc break except Exception as e: print(f"Error getting documents from OpenWebUI: {str(e)}") return local_documents def delete_document(self, doc_id: str) -> bool: """ Delete a document and its chunks. Args: doc_id: Document ID. Returns: True if deletion was successful, False otherwise. """ if doc_id not in self.documents: return False # Check if document was uploaded to OpenWebUI doc = self.documents[doc_id] openwebui_id = doc.get('openwebui_id') if openwebui_id: try: # Prepare headers headers = {"Content-Type": "application/json"} if self.openwebui_api_key: headers["Authorization"] = f"Bearer {self.openwebui_api_key}" # Delete from OpenWebUI response = requests.delete( f"{self.openwebui_url}/api/knowledge/{openwebui_id}", headers=headers, timeout=30 ) if response.status_code != 200: print(f"Warning: Failed to delete document from OpenWebUI: {response.text}") except Exception as e: print(f"Error deleting document from OpenWebUI: {str(e)}") # Delete document metadata del self.documents[doc_id] # Save metadata to file self._save_metadata() return True def search_documents(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: """ Search for documents similar to a query. Args: query: Search query. top_k: Number of results to return. Returns: List of similar document chunks with their metadata. """ # Note: We don't need to implement this method anymore since # RAG is handled directly by OpenWebUI when use_rag=True in the model service # Return empty results - this is just a placeholder # The actual RAG functionality is in the model_service.generate_response method return [] # Create a singleton instance document_service = DocumentService()