Files
ds_zagres_ai/ai_service/embeddings/document_service.py
T
2025-05-09 16:47:30 +01:00

262 lines
8.2 KiB
Python

"""
Service for document processing and chunking.
"""
import os
import json
import uuid
import requests
import base64
from typing import List, Dict, Any, Optional
from langchain_text_splitters import RecursiveCharacterTextSplitter
from ai_service.config import config
class DocumentService:
"""Service for document processing and chunking."""
def __init__(self):
"""Initialize the document service."""
self.chunk_size = config.CHUNK_SIZE
self.chunk_overlap = config.CHUNK_OVERLAP
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap,
length_function=len
)
# OpenWebUI configuration
self.openwebui_url = config.OPENWEBUI_URL
self.openwebui_api_key = config.OPENWEBUI_API_KEY
# Ensure data directory exists
os.makedirs('ai_service/data', exist_ok=True)
# For now, we'll store document metadata in a simple JSON file
self.metadata_file = 'ai_service/data/document_metadata.json'
self._load_metadata()
def _load_metadata(self):
"""Load document metadata from file."""
if os.path.exists(self.metadata_file):
try:
with open(self.metadata_file, 'r') as f:
self.documents = json.load(f)
except Exception as e:
print(f"Error loading document metadata: {str(e)}")
self.documents = {}
else:
self.documents = {}
def _save_metadata(self):
"""Save document metadata to file."""
try:
with open(self.metadata_file, 'w') as f:
json.dump(self.documents, f, indent=2)
except Exception as e:
print(f"Error saving document metadata: {str(e)}")
def process_document(self, content: str, title: str,
description: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None) -> str:
"""
Process a document for embedding.
Args:
content: Document content.
title: Document title.
description: Optional document description.
metadata: Optional additional metadata.
Returns:
Document ID.
"""
# Generate a unique ID for the document
doc_id = str(uuid.uuid4())
# Upload the document to OpenWebUI for RAG processing
try:
# Prepare headers
headers = {"Content-Type": "application/json"}
if self.openwebui_api_key:
headers["Authorization"] = f"Bearer {self.openwebui_api_key}"
# Prepare the document data
document_data = {
"filename": f"{title}.txt",
"content": base64.b64encode(content.encode('utf-8')).decode('utf-8'),
"description": description or title
}
# Upload to OpenWebUI
response = requests.post(
f"{self.openwebui_url}/api/knowledge/upload",
headers=headers,
json=document_data,
timeout=60
)
response.raise_for_status()
result = response.json()
# Get the OpenWebUI document ID
openwebui_doc_id = result.get('id', '')
# Store document metadata
self.documents[doc_id] = {
'id': doc_id,
'title': title,
'description': description or '',
'openwebui_id': openwebui_doc_id,
'metadata': metadata or {}
}
# Save metadata to file
self._save_metadata()
return doc_id
except Exception as e:
print(f"Error uploading document to OpenWebUI: {str(e)}")
# Fall back to local processing if OpenWebUI upload fails
print("Falling back to local document processing")
# Split the document into chunks for local reference
chunks = self.text_splitter.split_text(content)
# Store document metadata
self.documents[doc_id] = {
'id': doc_id,
'title': title,
'description': description or '',
'chunk_count': len(chunks),
'openwebui_upload_failed': True,
'metadata': metadata or {}
}
# Save metadata to file
self._save_metadata()
return doc_id
def get_document(self, doc_id: str) -> Optional[Dict[str, Any]]:
"""
Get document metadata.
Args:
doc_id: Document ID.
Returns:
Document metadata if found, None otherwise.
"""
return self.documents.get(doc_id)
def get_all_documents(self) -> List[Dict[str, Any]]:
"""
Get all document metadata.
Returns:
List of document metadata.
"""
# Get documents from local storage
local_documents = list(self.documents.values())
# Try to get documents from OpenWebUI as well
try:
# Prepare headers
headers = {"Content-Type": "application/json"}
if self.openwebui_api_key:
headers["Authorization"] = f"Bearer {self.openwebui_api_key}"
# Get documents from OpenWebUI
response = requests.get(
f"{self.openwebui_url}/api/knowledge",
headers=headers,
timeout=30
)
if response.status_code == 200:
openwebui_docs = response.json()
# Update local documents with OpenWebUI information
for doc in local_documents:
if 'openwebui_id' in doc:
for openwebui_doc in openwebui_docs:
if openwebui_doc.get('id') == doc['openwebui_id']:
doc['openwebui_status'] = 'active'
doc['openwebui_info'] = openwebui_doc
break
except Exception as e:
print(f"Error getting documents from OpenWebUI: {str(e)}")
return local_documents
def delete_document(self, doc_id: str) -> bool:
"""
Delete a document and its chunks.
Args:
doc_id: Document ID.
Returns:
True if deletion was successful, False otherwise.
"""
if doc_id not in self.documents:
return False
# Check if document was uploaded to OpenWebUI
doc = self.documents[doc_id]
openwebui_id = doc.get('openwebui_id')
if openwebui_id:
try:
# Prepare headers
headers = {"Content-Type": "application/json"}
if self.openwebui_api_key:
headers["Authorization"] = f"Bearer {self.openwebui_api_key}"
# Delete from OpenWebUI
response = requests.delete(
f"{self.openwebui_url}/api/knowledge/{openwebui_id}",
headers=headers,
timeout=30
)
if response.status_code != 200:
print(f"Warning: Failed to delete document from OpenWebUI: {response.text}")
except Exception as e:
print(f"Error deleting document from OpenWebUI: {str(e)}")
# Delete document metadata
del self.documents[doc_id]
# Save metadata to file
self._save_metadata()
return True
def search_documents(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
"""
Search for documents similar to a query.
Args:
query: Search query.
top_k: Number of results to return.
Returns:
List of similar document chunks with their metadata.
"""
# Note: We don't need to implement this method anymore since
# RAG is handled directly by OpenWebUI when use_rag=True in the model service
# Return empty results - this is just a placeholder
# The actual RAG functionality is in the model_service.generate_response method
return []
# Create a singleton instance
document_service = DocumentService()