Files
Michael Ikehi c1a894ad50 feat(feedback): Add content improvement feedback system
Frontend (frontend/app.js):

- Add textarea for improvement feedback

- Add submit button with loading state

- Handle API response and display improved content

Backend (backend/copywriter.py):

- Add improve_copy() method using Cohere API

- Integrate retry mechanism for API calls

Backend (backend/main.py):

- Add /improve-content POST endpoint

- Implement error handling and return improved content with metadata

Testing:

- Verified feedback submission flow

- Confirmed improved content generation

- Tested error scenarios and loading states
2025-04-21 17:32:33 +01:00

420 lines
15 KiB
Python

"""
Vector store module for the Marketing Assistant AI.
Uses FAISS for efficient storage and retrieval of content embeddings.
"""
import os
import json
import pickle
import faiss
import numpy as np
from typing import List, Dict, Any, Optional, Tuple
from pathlib import Path
from loguru import logger
from datetime import datetime
import config
from embeddings import embeddings_manager
class VectorStore:
"""Manages vector database operations for content retrieval."""
def __init__(self):
"""Initialize the VectorStore with FAISS index."""
self.store_path = Path(config.VECTOR_DB_PATH)
self.store_path.mkdir(exist_ok=True)
self.index_path = self.store_path / "faiss_index.bin"
self.metadata_path = self.store_path / "metadata.pkl"
self.dimension = None
self.index = None
self.metadata = []
self._load_or_create_index()
logger.info("VectorStore initialized successfully")
# Check if the index is empty and load sample data if needed
if self.index.ntotal == 0:
logger.warning("Vector store is empty. Loading sample data...")
self._load_sample_data()
def _load_or_create_index(self) -> None:
"""Load existing index or create new one if it doesn't exist."""
try:
if self.index_path.exists() and self.metadata_path.exists():
# Load existing index and metadata
self.index = faiss.read_index(str(self.index_path))
with open(self.metadata_path, 'rb') as f:
self.metadata = pickle.load(f)
self.dimension = self.index.d
logger.info(f"Loaded existing vector index with {self.index.ntotal} vectors")
else:
# Default dimension for Cohere embeddings
self.dimension = 1024
self.index = faiss.IndexFlatL2(self.dimension)
self.metadata = []
logger.info(f"Created new vector index with dimension {self.dimension}")
# Save the empty index and metadata
self._save_index()
except Exception as e:
logger.error(f"Error loading or creating index: {str(e)}")
raise
def _save_index(self) -> None:
"""Save the index and metadata to disk."""
try:
faiss.write_index(self.index, str(self.index_path))
with open(self.metadata_path, 'wb') as f:
pickle.dump(self.metadata, f)
logger.debug("Saved vector index and metadata")
except Exception as e:
logger.error(f"Error saving index: {str(e)}")
raise
async def add_documents(
self,
texts: List[str],
metadata_list: Optional[List[Dict[str, Any]]] = None
) -> List[int]:
"""
Add documents to the vector store.
Args:
texts: List of text documents to add
metadata_list: List of metadata dictionaries for each document
Returns:
List of document IDs (vector indices)
"""
try:
if not texts:
logger.warning("No texts provided to add to vector store")
return []
if metadata_list is None:
metadata_list = [{} for _ in texts]
if len(texts) != len(metadata_list):
raise ValueError("Number of texts and metadata entries must match")
# Generate embeddings
embeddings = await embeddings_manager.get_embeddings(texts)
# Check if embeddings match our dimension
if embeddings.shape[1] != self.dimension:
logger.warning(f"Embedding dimension mismatch: expected {self.dimension}, got {embeddings.shape[1]}")
# If we have no documents yet, we can adapt to the new dimension
if self.index.ntotal == 0:
self.dimension = embeddings.shape[1]
self.index = faiss.IndexFlatL2(self.dimension)
logger.info(f"Adapted to new dimension: {self.dimension}")
else:
raise ValueError(f"Embedding dimension mismatch: expected {self.dimension}, got {embeddings.shape[1]}")
# Add timestamp to metadata
timestamp = datetime.now().isoformat()
for meta in metadata_list:
meta['timestamp'] = timestamp
meta['document_id'] = len(self.metadata) + len(metadata_list)
# Store texts in metadata
for i, (text, meta) in enumerate(zip(texts, metadata_list)):
meta['text'] = text
# Add vectors to index
start_idx = self.index.ntotal
self.index.add(embeddings.astype(np.float32))
self.metadata.extend(metadata_list)
# Save updated index
self._save_index()
# Return document IDs
doc_ids = list(range(start_idx, start_idx + len(texts)))
logger.info(f"Added {len(texts)} documents to vector store")
return doc_ids
except Exception as e:
logger.error(f"Error adding documents to vector store: {str(e)}")
raise
async def search(
self,
query: str,
top_k: int = 5,
filters: Optional[Dict[str, Any]] = None,
rerank: bool = True
) -> List[Dict[str, Any]]:
"""
Search for similar documents.
Args:
query: The search query
top_k: Number of results to return
filters: Dictionary of metadata filters
rerank: Whether to use Cohere's reranking
Returns:
List of result dictionaries with document content and metadata
"""
try:
logger.info(f"Searching vector store with query: {query[:50]}... (top_k={top_k})")
if self.index.ntotal == 0:
logger.warning("Empty vector store, no results to return")
return []
logger.info(f"Vector store contains {self.index.ntotal} documents")
# Generate query embedding
query_embedding = await embeddings_manager.get_query_embedding(query)
query_embedding = query_embedding.reshape(1, -1).astype(np.float32)
# First pass: find more candidates than needed for reranking
search_k = top_k * 3 if rerank else top_k
search_k = min(search_k, self.index.ntotal) # Don't request more than we have
distances, indices = self.index.search(query_embedding, search_k)
# Get metadata and texts for matching indices
results = []
for i, idx in enumerate(indices[0]):
if idx < 0 or idx >= len(self.metadata):
continue # Skip invalid indices
metadata = self.metadata[idx]
text = metadata.get('text', '')
# Apply filters if any
if filters and not self._matches_filters(metadata, filters):
continue
results.append({
'document_id': idx,
'text': text,
'metadata': {k: v for k, v in metadata.items() if k != 'text'},
'distance': float(distances[0][i])
})
# Apply reranking if requested
if rerank and results:
texts = [r['text'] for r in results]
reranked = await embeddings_manager.rerank_results(query, texts, top_n=top_k)
# Map reranked results back to our original results
reranked_results = []
for item in reranked:
orig_idx = item['index']
if 0 <= orig_idx < len(results):
reranked_results.append({
**results[orig_idx],
'relevance_score': item['relevance_score']
})
results = reranked_results
else:
# Just take the top_k results
results = results[:top_k]
logger.info(f"Found {len(results)} matching documents for query")
return results
except Exception as e:
logger.error(f"Error searching vector store: {str(e)}")
raise
def _matches_filters(self, metadata: Dict[str, Any], filters: Dict[str, Any]) -> bool:
"""Check if metadata matches the specified filters."""
for key, value in filters.items():
if key not in metadata:
return False
if isinstance(value, list):
# Check if metadata value is in the list
if metadata[key] not in value:
return False
elif metadata[key] != value:
return False
return True
async def delete_document(self, document_id: int) -> bool:
"""
Delete a document from the vector store.
Args:
document_id: ID of the document to delete
Returns:
Boolean indicating success
"""
try:
if document_id < 0 or document_id >= len(self.metadata):
logger.warning(f"Invalid document ID: {document_id}")
return False
# FAISS doesn't support direct deletion, so we need to rebuild the index
# Mark the document as deleted in metadata
self.metadata[document_id]['deleted'] = True
# Save updated metadata
self._save_index()
logger.info(f"Marked document {document_id} as deleted")
return True
except Exception as e:
logger.error(f"Error deleting document: {str(e)}")
raise
async def get_document(self, document_id: int) -> Optional[Dict[str, Any]]:
"""
Retrieve a document by ID.
Args:
document_id: ID of the document to retrieve
Returns:
Document with metadata or None if not found
"""
try:
if document_id < 0 or document_id >= len(self.metadata):
logger.warning(f"Invalid document ID: {document_id}")
return None
metadata = self.metadata[document_id]
# Check if document is marked as deleted
if metadata.get('deleted', False):
logger.warning(f"Document {document_id} is marked as deleted")
return None
text = metadata.get('text', '')
return {
'document_id': document_id,
'text': text,
'metadata': {k: v for k, v in metadata.items() if k != 'text' and k != 'deleted'}
}
except Exception as e:
logger.error(f"Error retrieving document: {str(e)}")
raise
async def update_document(self, document_id: int, text: str, metadata: Optional[Dict[str, Any]] = None) -> bool:
"""
Update a document in the vector store.
Args:
document_id: ID of the document to update
text: New document text
metadata: New metadata (will be merged with existing)
Returns:
Boolean indicating success
"""
try:
if document_id < 0 or document_id >= len(self.metadata):
logger.warning(f"Invalid document ID: {document_id}")
return False
# Get existing metadata
existing_metadata = self.metadata[document_id]
# Check if document is marked as deleted
if existing_metadata.get('deleted', False):
logger.warning(f"Cannot update deleted document {document_id}")
return False
# Generate new embedding
embeddings = await embeddings_manager.get_embeddings([text])
# Update the vector in the index
faiss.IndexFlatL2_update_vectors(self.index, embeddings.astype(np.float32), np.array([document_id], dtype=np.int64))
# Update metadata
if metadata:
for key, value in metadata.items():
existing_metadata[key] = value
existing_metadata['text'] = text
existing_metadata['updated_at'] = datetime.now().isoformat()
# Save updated index
self._save_index()
logger.info(f"Updated document {document_id}")
return True
except Exception as e:
logger.error(f"Error updating document: {str(e)}")
raise
def _load_sample_data(self) -> None:
"""Load sample data from past campaigns into the vector store."""
try:
# Path to past campaigns directory
campaigns_dir = Path(config.DATA_DIR) / "past_campaigns"
if not campaigns_dir.exists() or not campaigns_dir.is_dir():
logger.warning(f"Past campaigns directory not found: {campaigns_dir}")
return
# Find all JSON files in the directory
campaign_files = list(campaigns_dir.glob("*.json"))
if not campaign_files:
logger.warning("No campaign files found in past_campaigns directory")
return
# Load and process each campaign file
texts = []
metadata_list = []
for file_path in campaign_files:
try:
with open(file_path, 'r') as f:
campaign_data = json.load(f)
# Extract content and metadata
if 'content' in campaign_data:
texts.append(campaign_data['content'])
# Create metadata entry
metadata = {
'content_type': campaign_data.get('content_type', 'unknown'),
'campaign_name': campaign_data.get('metadata', {}).get('campaign_name', file_path.stem),
'source': 'past_campaign',
'file_path': str(file_path)
}
# Add performance metrics if available
if 'metadata' in campaign_data and 'performance_metrics' in campaign_data['metadata']:
metadata['performance_metrics'] = campaign_data['metadata']['performance_metrics']
metadata_list.append(metadata)
logger.debug(f"Loaded campaign from {file_path.name}")
except Exception as e:
logger.error(f"Error loading campaign file {file_path}: {str(e)}")
continue
if not texts:
logger.warning("No valid campaign content found in files")
return
# Add documents to vector store
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
doc_ids = loop.run_until_complete(self.add_documents(texts, metadata_list))
logger.info(f"Added {len(doc_ids)} past campaigns to vector store")
finally:
loop.close()
except Exception as e:
logger.error(f"Error loading sample data: {str(e)}")
# Create a singleton instance
vector_store = VectorStore()