215 lines
6.9 KiB
Python
215 lines
6.9 KiB
Python
"""
|
|
Service for generating and managing embeddings.
|
|
"""
|
|
|
|
import os
|
|
import random
|
|
import pinecone
|
|
import numpy as np
|
|
from typing import List, Dict, Any, Optional, Union
|
|
from sentence_transformers import SentenceTransformer
|
|
|
|
from ai_service.config import config
|
|
|
|
class EmbeddingService:
|
|
"""Service for generating and managing embeddings."""
|
|
|
|
def __init__(self, use_mock=True): # Default to mock implementation
|
|
"""Initialize the embedding service."""
|
|
self.use_mock = use_mock
|
|
|
|
if not self.use_mock:
|
|
# Use a smaller model for testing
|
|
self.model_name = "paraphrase-MiniLM-L3-v2" # Smaller model than the default
|
|
try:
|
|
self.model = SentenceTransformer(self.model_name)
|
|
print(f"Loaded embedding model: {self.model_name}")
|
|
except Exception as e:
|
|
print(f"Error loading embedding model: {str(e)}")
|
|
self.use_mock = True
|
|
print("Falling back to mock implementation")
|
|
else:
|
|
print("Using mock embedding implementation")
|
|
self.model_name = "mock-model"
|
|
self.model = None
|
|
|
|
self._initialize_pinecone()
|
|
|
|
def _initialize_pinecone(self):
|
|
"""Initialize Pinecone client."""
|
|
if not config.PINECONE_API_KEY or not config.PINECONE_ENVIRONMENT:
|
|
print("Warning: Pinecone API key or environment not set. Vector storage will not be available.")
|
|
self.index = None
|
|
return
|
|
|
|
try:
|
|
pinecone.init(
|
|
api_key=config.PINECONE_API_KEY,
|
|
environment=config.PINECONE_ENVIRONMENT
|
|
)
|
|
|
|
# Check if index exists, create if it doesn't
|
|
if config.PINECONE_INDEX_NAME not in pinecone.list_indexes():
|
|
pinecone.create_index(
|
|
name=config.PINECONE_INDEX_NAME,
|
|
dimension=self.model.get_sentence_embedding_dimension(),
|
|
metric="cosine"
|
|
)
|
|
|
|
self.index = pinecone.Index(config.PINECONE_INDEX_NAME)
|
|
print(f"Connected to Pinecone index: {config.PINECONE_INDEX_NAME}")
|
|
except Exception as e:
|
|
print(f"Error connecting to Pinecone: {str(e)}")
|
|
self.index = None
|
|
|
|
def generate_embedding(self, text: str) -> List[float]:
|
|
"""
|
|
Generate an embedding for a text.
|
|
|
|
Args:
|
|
text: Text to embed.
|
|
|
|
Returns:
|
|
Embedding vector.
|
|
"""
|
|
if self.use_mock:
|
|
# Generate a mock embedding vector (384 dimensions for consistency)
|
|
return [random.random() for _ in range(384)]
|
|
|
|
embedding = self.model.encode(text)
|
|
return embedding.tolist()
|
|
|
|
def generate_embeddings(self, texts: List[str]) -> List[List[float]]:
|
|
"""
|
|
Generate embeddings for multiple texts.
|
|
|
|
Args:
|
|
texts: List of texts to embed.
|
|
|
|
Returns:
|
|
List of embedding vectors.
|
|
"""
|
|
if self.use_mock:
|
|
# Generate mock embedding vectors
|
|
return [[random.random() for _ in range(384)] for _ in texts]
|
|
|
|
embeddings = self.model.encode(texts)
|
|
return embeddings.tolist()
|
|
|
|
def store_embeddings(self, ids: List[str], embeddings: List[List[float]],
|
|
metadata: Optional[List[Dict[str, Any]]] = None) -> bool:
|
|
"""
|
|
Store embeddings in Pinecone.
|
|
|
|
Args:
|
|
ids: List of IDs for the embeddings.
|
|
embeddings: List of embedding vectors.
|
|
metadata: Optional list of metadata dictionaries.
|
|
|
|
Returns:
|
|
True if storage was successful, False otherwise.
|
|
"""
|
|
if self.use_mock:
|
|
print(f"Mock: Stored {len(ids)} embeddings")
|
|
return True
|
|
|
|
if self.index is None:
|
|
print("Warning: Pinecone index not available. Embeddings not stored.")
|
|
return False
|
|
|
|
if metadata is None:
|
|
metadata = [{} for _ in ids]
|
|
|
|
vectors = [
|
|
(id, embedding, meta)
|
|
for id, embedding, meta in zip(ids, embeddings, metadata)
|
|
]
|
|
|
|
try:
|
|
self.index.upsert(vectors=vectors)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error storing embeddings in Pinecone: {str(e)}")
|
|
return False
|
|
|
|
def search_similar(self, query_embedding: List[float], top_k: int = 5) -> List[Dict[str, Any]]:
|
|
"""
|
|
Search for similar embeddings in Pinecone.
|
|
|
|
Args:
|
|
query_embedding: Query embedding vector.
|
|
top_k: Number of results to return.
|
|
|
|
Returns:
|
|
List of similar items with their metadata.
|
|
"""
|
|
if self.use_mock:
|
|
# Generate mock search results
|
|
print(f"Mock: Searching for similar embeddings (top_k={top_k})")
|
|
mock_results = []
|
|
for i in range(min(top_k, 3)): # Return at most 3 mock results
|
|
mock_results.append({
|
|
'id': f"mock_doc_{i}",
|
|
'score': 0.9 - (i * 0.1), # Decreasing similarity scores
|
|
'metadata': {
|
|
'document_id': f"mock_doc_{i}",
|
|
'chunk_index': i,
|
|
'title': f"Mock Document {i}",
|
|
'description': f"This is a mock document {i}",
|
|
'chunk_text': f"This is the content of mock document {i}..."
|
|
}
|
|
})
|
|
return mock_results
|
|
|
|
if self.index is None:
|
|
print("Warning: Pinecone index not available. Search not performed.")
|
|
return []
|
|
|
|
try:
|
|
results = self.index.query(
|
|
vector=query_embedding,
|
|
top_k=top_k,
|
|
include_metadata=True
|
|
)
|
|
|
|
return [
|
|
{
|
|
'id': match['id'],
|
|
'score': match['score'],
|
|
'metadata': match.get('metadata', {})
|
|
}
|
|
for match in results.get('matches', [])
|
|
]
|
|
except Exception as e:
|
|
print(f"Error searching in Pinecone: {str(e)}")
|
|
return []
|
|
|
|
def delete_embeddings(self, ids: List[str]) -> bool:
|
|
"""
|
|
Delete embeddings from Pinecone.
|
|
|
|
Args:
|
|
ids: List of IDs to delete.
|
|
|
|
Returns:
|
|
True if deletion was successful, False otherwise.
|
|
"""
|
|
if self.use_mock:
|
|
print(f"Mock: Deleted {len(ids)} embeddings")
|
|
return True
|
|
|
|
if self.index is None:
|
|
print("Warning: Pinecone index not available. Deletion not performed.")
|
|
return False
|
|
|
|
try:
|
|
self.index.delete(ids=ids)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error deleting embeddings from Pinecone: {str(e)}")
|
|
return False
|
|
|
|
|
|
# Create a singleton instance
|
|
embedding_service = EmbeddingService()
|