Add initial project structure with configuration, utilities, and API endpoints

This commit is contained in:
2025-02-07 19:24:57 +06:00
parent 480f6f06c2
commit 87e7b99daa
21 changed files with 513 additions and 159 deletions
+7
View File
@@ -0,0 +1,7 @@
API_KEY=your_api_key
OPENAI_API_KEY=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
DATABASE_URL=your_database_url
SECRET_KEY=your_secret_key
COHERE_API_KEY="AvFmArWCS6HtYD1Aa5vpFoAObjxYYK3JcO75pGcT"
GROQ_API_KEY = "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"
GROQ_MODEL_NAME = "llama-3.3-70b-versatile"
+11
View File
@@ -0,0 +1,11 @@
venv
datasets
data_processing
experiments
index
models
chroma_index
src/marketing_assistant_ai/__pycache__
src/marketing_assistant_ai/chroma_manager.pyc
src/marketing_assistant_ai/llm_manager.pyc
src/marketing_assistant_ai/=3.35.0
+27
View File
@@ -0,0 +1,27 @@
To use this code:
1. Create a `.env` file in the root directory with your configuration:
```env
API_KEY=your_api_key
OPENAI_API_KEY=your_openai_api_key
DATABASE_URL=your_database_url
SECRET_KEY=your_secret_key
```
2. Install the requirements:
```bash
pip install -r requirements.txt
```
3. Run the API:
```bash
uvicorn api.endpoints:app --reload
```
This code provides a foundation for:
- Processing and analyzing CRM data
- Generating marketing content using AI
- Answering NLP and Timeline Therapy questions
- Securing the application
- Validating data
- Exposing functionality through API endpoints
+35
View File
@@ -0,0 +1,35 @@
import os
import sys
"""
├── config/
│ ├── __init__.py
│ └── settings.py
├── data_processing/
│ ├── __init__.py
│ ├── crm_processor.py
│ ├── content_processor.py
│ └── training_data_processor.py
├── models/
│ ├── __init__.py
│ ├── marketing_assistant.py
│ ├── qa_bot.py
│ └── practitioner_assistant.py
├── utils/
│ ├── __init__.py
│ ├── data_validator.py
│ └── security.py
├── api/
│ ├── __init__.py
│ └── endpoints.py
└── requirements.txt
"""
View File
+35
View File
@@ -0,0 +1,35 @@
# api/endpoints.py
from fastapi import FastAPI, HTTPException, Depends
from typing import Dict
from models.marketing_assistant import MarketingAssistant
from models.qa_bot import NLPQABot
from config.settings import settings
app = FastAPI()
marketing_assistant = MarketingAssistant(api_key=settings.OPENAI_API_KEY)
qa_bot = NLPQABot()
@app.post("/generate-marketing-content/")
async def generate_marketing_content(
content_type: str,
parameters: Dict
) -> Dict:
try:
content = marketing_assistant.generate_content(
content_type=content_type,
**parameters
)
return {"content": content}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/answer-question/")
async def answer_question(
question: str,
context: str
) -> Dict:
try:
answer = qa_bot.answer_question(question, context)
return {"answer": answer}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
+4
View File
@@ -0,0 +1,4 @@
# config/__init__.py
from .settings import Settings
settings = Settings()
+14
View File
@@ -0,0 +1,14 @@
# config/settings.py
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
API_KEY: str
OPENAI_API_KEY: str
DATABASE_URL: str
SECRET_KEY: str
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
class Config:
env_file = ".env"
+11
View File
@@ -0,0 +1,11 @@
python-dotenv
fastapi
uvicorn
langchain_community
langchain_chroma
langchain_core
llama-index
chromadb
langchain_groq
python-multipart
pydantic>=2.0
View File
+44 -26
View File
@@ -1,55 +1,73 @@
from typing import List, Optional
from langchain_core.documents import Document
from langchain_chroma import Chroma
import uuid import uuid
import chromadb import os
from config import CHROMA_PATH, COLLECTION_NAME, MODEL_NAME from typing import List
from langchain_huggingface import HuggingFaceEmbeddings from langchain_chroma import Chroma
from langchain_core.documents import Document
from config import settings
from utils import CustomEmbeddings
class ChromaManager: class ChromaManager:
def __init__(self): def __init__(self):
self.embed_model = HuggingFaceEmbeddings(
model_name=MODEL_NAME,
encode_kwargs={'normalize_embeddings': True}
)
self.vector_store = Chroma( self.vector_store = Chroma(
collection_name=COLLECTION_NAME, collection_name=settings.COLLECTION_NAME,
persist_directory=str(CHROMA_PATH), persist_directory=settings.CHROMA_PATH,
embedding_function=self.embed_model embedding_function=CustomEmbeddings(settings.MODEL_NAME)
) )
def get_collection_info(self): def get_collection_info(self):
index_size = 0
if os.path.exists(settings.CHROMA_PATH):
for dirpath, _, filenames in os.walk(settings.CHROMA_PATH):
for f in filenames:
fp = os.path.join(dirpath, f)
index_size += os.path.getsize(fp)
return { return {
"collection_name": settings.COLLECTION_NAME,
"document_count": self.vector_store._collection.count(), "document_count": self.vector_store._collection.count(),
"collection_name": COLLECTION_NAME "index_size": f"{index_size/1024/1024:.2f} MB"
} }
def add_documents(self, documents: List[Document]): def add_documents(self, documents: List[Document]):
try: try:
ids = [str(uuid.uuid4()) for _ in documents] ids = [str(uuid.uuid4()) for _ in documents]
texts = [doc.page_content for doc in documents] self.vector_store.add_documents(documents, ids=ids)
metadatas = [doc.metadata for doc in documents]
self.vector_store.add_texts(
texts=texts,
metadatas=metadatas,
ids=ids
)
return ids return ids
except Exception as e: except Exception as e:
raise ValueError(f"Error adding documents: {str(e)}") raise RuntimeError(f"Error adding documents: {str(e)}")
def delete_document(self, doc_id: str): def delete_document(self, doc_id: str):
try: try:
self.vector_store._collection.delete(ids=[doc_id]) self.vector_store._collection.delete(ids=[doc_id])
return True return True
except Exception as e: except Exception as e:
raise ValueError(f"Delete error: {str(e)}") raise RuntimeError(f"Delete error: {str(e)}")
def get_all_files_metadata(self):
try:
print(len(self.vector_store.get()["ids"]))
for x in range(len(self.vector_store.get()["ids"])):
# print(db.get()["metadatas"][x])
doc = self.vector_store.get()["metadatas"][x]
source = doc["source"]
print(source)
return self.vector_store.get()
except Exception as e:
raise RuntimeError(f"Error retrieving files metadata: {str(e)}")
def update_document(self, doc_id: str, new_content: str, metadata: dict): def update_document(self, doc_id: str, new_content: str, metadata: dict):
try: try:
new_doc = Document(page_content=new_content, metadata=metadata)
self.delete_document(doc_id) self.delete_document(doc_id)
new_doc = Document(page_content=new_content, metadata=metadata)
return self.add_documents([new_doc])[0] return self.add_documents([new_doc])[0]
except Exception as e: except Exception as e:
raise ValueError(f"Update error: {str(e)}") raise RuntimeError(f"Update error: {str(e)}")
# if __name__ == "__main__":
# chroma_manager = ChromaManager()
# #chroma_manager.create_collection()
# #chroma_manager.add_documents([Document(page_content="Test document", metadata={"source": "test"})])
# print(chroma_manager.get_all_files_metadata())
# print(chroma_manager.get_collection_info())
+12 -17
View File
@@ -1,20 +1,15 @@
import os import os
from pathlib import Path
# Base directory class Settings:
BASE_DIR = Path(__file__).parent.parent MODEL_NAME = "BAAI/bge-large-en-v1.5"
RERANKER_NAME = "BAAI/bge-reranker-large"
GROQ_MODEL = "llama-3.3-70b-versatile"
#DOCS_PATH = "/home/kowshik/work/ds_tjc/datasets/Client-Assets"
DOCS_PATH = "/home/kowshik/work/ds_tjc/datasets/marketing_data"
CHROMA_PATH = "/home/kowshik/work/ds_tjc/chroma_index"
COLLECTION_NAME = "marketing_docs"
API_KEY = "4BkwTtVd5VwhTiFDdG3NfzgATrCq7aD8AjnvWNeivirTntHgRvL6Xe84ULHcVTLB"
SERVER_URL = "https://ma.rommelcorral.com"
GROQ_API_KEY = "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"
# Configuration settings = Settings()
MODEL_NAME = "BAAI/bge-large-en-v1.5"
RERANKER_NAME = "BAAI/bge-reranker-base"
GROQ_MODEL = "llama-3.3-70b-versatile"
DOCS_PATH = BASE_DIR / "client_assets"
CHROMA_PATH = BASE_DIR / "chroma_index"
COLLECTION_NAME = "marketing_docs"
# Create directories if they don't exist
DOCS_PATH.mkdir(exist_ok=True)
CHROMA_PATH.mkdir(exist_ok=True)
# Groq API Key (Set through environment variable)
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
+58 -62
View File
@@ -1,87 +1,83 @@
from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi import FastAPI, HTTPException, UploadFile, File, Form
from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse
from schemas import DocumentUpload, DocumentUpdate, DocumentDelete, QueryRequest, ResponseSchema from typing import List
from utils import process_uploaded_files, save_upload_file import base64
from langchain_core.documents import Document
from chroma_manager import ChromaManager from chroma_manager import ChromaManager
from rag import RAGSystem from rag import RAGSystem
from config import DOCS_PATH from schemas import (
import uuid DocumentUpload,
QueryRequest,
app = FastAPI(title="Marketing Assistant AI") DocumentResponse,
CollectionInfo,
# CORS Configuration UpdateDocumentRequest
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
) )
from utils import save_uploaded_file
# Initialize components app = FastAPI()
chroma_manager = ChromaManager() chroma_manager = ChromaManager()
rag_system = RAGSystem() rag_system = RAGSystem()
@app.post("/upload/", response_model=ResponseSchema) @app.post("/upload/")
async def upload_document(file: UploadFile = File(...)): async def upload_document(file: UploadFile = File(...), file_category: str = Form(...)):
try: try:
# Save file if file_category not in ["email", "books", "article", "social"]:
filename = f"{uuid.uuid4()}_{file.filename}" raise HTTPException(status_code=400, detail="Invalid file category")
save_upload_file(file, filename)
# Process and add to Chroma content = await file.read()
documents = process_uploaded_files() filepath = save_uploaded_file(content, file.filename)
chroma_manager.add_documents(documents)
return { document = Document(
"result": "Documents processed successfully", page_content=str(content), # Convert bytes to string representation
"collection_info": chroma_manager.get_collection_info() metadata={"source": filepath, "filename": file.filename}
} )
doc_id = chroma_manager.add_documents([document])[0]
return JSONResponse(
content={"message": "Document uploaded successfully", "doc_id": doc_id},
status_code=201
)
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.put("/update/", response_model=ResponseSchema) @app.post("/query/")
async def update_document(update_data: DocumentUpdate): async def process_query(query: QueryRequest):
try:
response = rag_system.get_response(query.question)
return {"response": response}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/documents/{doc_id}")
async def delete_document(doc_id: str):
try:
success = chroma_manager.delete_document(doc_id)
if success:
return {"message": "Document deleted successfully"}
raise HTTPException(status_code=404, detail="Document not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.put("/documents/{doc_id}")
async def update_document(doc_id: str, update_data: UpdateDocumentRequest):
try: try:
new_id = chroma_manager.update_document( new_id = chroma_manager.update_document(
update_data.doc_id, doc_id,
update_data.new_content, update_data.new_content,
update_data.metadata update_data.metadata
) )
return { return {"message": "Document updated", "new_doc_id": new_id}
"result": f"Document updated with ID: {new_id}",
"collection_info": chroma_manager.get_collection_info()
}
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.delete("/delete/", response_model=ResponseSchema) @app.get("/collection-info/", response_model=CollectionInfo)
async def delete_document(delete_data: DocumentDelete):
try:
chroma_manager.delete_document(delete_data.doc_id)
return {
"result": "Document deleted successfully",
"collection_info": chroma_manager.get_collection_info()
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/query/", response_model=ResponseSchema)
async def process_query(query: QueryRequest):
try:
response = rag_system.query(query.question)
return {
"result": response,
"collection_info": chroma_manager.get_collection_info()
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/collection-info/", response_model=ResponseSchema)
async def get_collection_info(): async def get_collection_info():
try: try:
return { info = chroma_manager.get_collection_info()
"result": "Current collection status", return info
"collection_info": chroma_manager.get_collection_info()
}
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
+70 -21
View File
@@ -1,42 +1,91 @@
from langchain.retrievers import ContextualCompressionRetriever from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CrossEncoderReranker from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder
from langchain_groq import ChatGroq from langchain_groq import ChatGroq
from langchain_core.prompts import ChatPromptTemplate from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough from langchain_core.runnables import RunnablePassthrough
from chroma_manager import ChromaManager from config import settings
from config import RERANKER_NAME, GROQ_MODEL, GROQ_API_KEY from utils import CustomEmbeddings, CustomCrossEncoder
from langchain_chroma import Chroma
class RAGSystem: class RAGSystem:
def __init__(self): def __init__(self):
self.chroma_manager = ChromaManager() self.embeddings = CustomEmbeddings(settings.MODEL_NAME)
self.reranker = CrossEncoderReranker(
model=CustomCrossEncoder(settings.RERANKER_NAME),
top_n=5
)
self.vector_store = Chroma(
collection_name=settings.COLLECTION_NAME,
persist_directory=settings.CHROMA_PATH,
embedding_function=self.embeddings
)
self.retriever = ContextualCompressionRetriever(
base_compressor=self.reranker,
base_retriever=self.vector_store.as_retriever(search_kwargs={"k": 10})
)
self.llm = ChatGroq( self.llm = ChatGroq(
temperature=0.01, temperature=0.01,
groq_api_key=GROQ_API_KEY, groq_api_key=settings.GROQ_API_KEY,
model_name=GROQ_MODEL model_name=settings.GROQ_MODEL
)
self._init_retriever()
self._init_chain()
def _init_retriever(self):
model = HuggingFaceCrossEncoder(model_name=RERANKER_NAME)
reranker = CrossEncoderReranker(model=model, top_n=5)
self.retriever = ContextualCompressionRetriever(
base_compressor=reranker,
base_retriever=self.chroma_manager.vector_store.as_retriever(search_kwargs={"k": 10})
) )
def _init_chain(self): self.prompt = ChatPromptTemplate.from_template("""
template = """...""" # Your existing template here Act like you are Adriana James, write marketing copy in her signature style. Just mimic her style and provide the answer to the user's query. Make sure that you are Adriana James, and you are providing the answer to the user's query.
Here is some of her past **Email_Templates**:
Template - 1:
Dear friend,
As we approach the final days of 2024, I wanted to reach out with a message of hope and possibility for the year ahead. The dawn of 2025 brings with it an opportunity not just for fresh starts, but for transformative growth and achievement.
You may have already begun thinking about your aspirations for the coming year. Whether you have or haven't, I'd like to personally invite you to join me for an intimate goal-setting session where we'll work together to crystallize your vision for 2025.
I believe that every remarkable success story begins with clarity - knowing exactly what you want and placing it firmly in your future Time Line in a way that makes it inevitable. This isn't just about writing down wishes; it's about crafting the blueprint for your next chapter.
Before our session, I encourage you to reflect on three crucial questions:
1. What is the most important achievement you envision for 2025?
2. How can you leverage your unique experiences and skills to create positive change in the world?
3. What stepping stones will you need to place along your path to ensure your primary goal becomes reality?
Here's what makes this journey so powerful: as you pursue specific goals, you naturally develop new skills, strategies, and behaviors. Sometimes, achieving a goal requires you to become an entirely new version of yourself - and that transformation is often the most valuable reward of all.
Join me for this complimentary goal-setting session:
Date: Thursday 15 January 2025
Time: 4pm AEDT
Register Today
The more attention you invest in this process, the more you'll free yourself from limitations, unleash your creativity, and uncover possibilities you never imagined. This creates a beautiful cycle: greater goals lead to greater successes, which build self-confidence and positive momentum.
Register today for this special session. I look forward to helping you lay the foundation for an extraordinary 2025.
Be Well!
Template - 2:
Hi [[contact.first_name]],
I trust you've been putting the valuable insights from our recent Goal-Setting Masterclass to good use. I hope you've had a chance to dive into the videos and set your sights on exciting goals for 2025 across all areas of your life -career, relationships, finance, health, and beyond.
Now, I'm thrilled to invite you to join me for an exclusive live Q&A session on Monday, February 3rd, 2025, at 7PM AEDT. This is your opportunity to delve deeper into the techniques shared and learn more about how to make 2025 truly exceptional.
Whether you're looking to fine-tune your objectives, overcome obstacles, or gain more insights into applying these powerful techniques, I'm here to support you. Let's work together to make sure you're fully equipped to create a prosperous and successful year in every aspect of your life.
Come prepared with your questions, and let's turn your 2025 goals into reality!
Zoom details
Be Well!
Dr Adriana James and team
For more information
visit www.nlpcoaching.com or email us via info@nlpcoaching.com | Copyright 2025 The Tad James Company. All rights reserved. Australia/International: 90-96 Bourke Road Alexandria, NSW 2015 United States / International: 1450 W Horizon Ridge Pkway #544, Henderson NV, 89012 Unsubscribe
Query: {question}
Adriana James Resource Context: {context}
Now, write marketing copy in Adriana James' signature style from the context(Adriana James content) above and provide the answer to the user's query.
Note: Don't provide anything extra. Just give me the response no extra words nothing at all. Just the response to the user's query.
""")
prompt = ChatPromptTemplate.from_template(template)
self.rag_chain = ( self.rag_chain = (
{"context": self.retriever, "question": RunnablePassthrough()} {"context": self.retriever, "question": RunnablePassthrough()}
| prompt | self.prompt
| self.llm | self.llm
| StrOutputParser() | StrOutputParser()
) )
def query(self, question: str): def get_response(self, question: str) -> str:
return self.rag_chain.invoke(question) return self.rag_chain.invoke(question)
+16 -12
View File
@@ -2,19 +2,23 @@ from pydantic import BaseModel
from typing import List, Optional from typing import List, Optional
class DocumentUpload(BaseModel): class DocumentUpload(BaseModel):
description: Optional[str] = None file: str # Base64 encoded file content
filename: str
class DocumentUpdate(BaseModel): metadata: Optional[dict] = {}
doc_id: str
new_content: str
metadata: dict
class DocumentDelete(BaseModel):
doc_id: str
class QueryRequest(BaseModel): class QueryRequest(BaseModel):
question: str question: str
class ResponseSchema(BaseModel): class DocumentResponse(BaseModel):
result: str id: str
collection_info: dict content: str
metadata: dict
class CollectionInfo(BaseModel):
collection_name: str
document_count: int
index_size: str
class UpdateDocumentRequest(BaseModel):
new_content: str
metadata: dict
+33 -18
View File
@@ -1,22 +1,37 @@
from langchain_community.document_loaders import DirectoryLoader import os
from langchain_text_splitters import RecursiveCharacterTextSplitter import requests
from typing import List, Tuple
from langchain_core.documents import Document from langchain_core.documents import Document
from config import DOCS_PATH from config import settings
from langchain.retrievers.document_compressors.cross_encoder import BaseCrossEncoder
class CustomEmbeddings:
def __init__(self, model_name: str):
self.model_name = model_name
def process_uploaded_files(): def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Process documents in the upload directory""" headers = {"Authorization": f"Bearer {settings.API_KEY}"}
loader = DirectoryLoader(DOCS_PATH, glob=["**/*.pdf", "**/*.txt"]) payload = {"model": self.model_name, "input": texts}
documents = loader.load() response = requests.post(f"{settings.SERVER_URL}/embeddings", json=payload, headers=headers)
response.raise_for_status()
return [item['embedding'] for item in response.json()['data']]
text_splitter = RecursiveCharacterTextSplitter( def embed_query(self, text: str) -> List[float]:
chunk_size=1000, return self.embed_documents([text])[0]
chunk_overlap=200
)
return text_splitter.split_documents(documents)
def save_upload_file(file, filename: str): class CustomCrossEncoder(BaseCrossEncoder):
"""Save uploaded file to documents directory""" def __init__(self, model_name: str):
file_path = DOCS_PATH / filename self.model_name = model_name
with open(file_path, "wb") as buffer:
buffer.write(file.file.read()) def score(self, text_pairs: List[Tuple[str, str]]) -> List[float]:
return file_path query, documents = text_pairs[0][0], [doc for _, doc in text_pairs]
headers = {"Authorization": f"Bearer {settings.API_KEY}"}
payload = {"model": self.model_name, "query": query, "documents": documents}
response = requests.post(f"{settings.SERVER_URL}/rerank", json=payload, headers=headers)
response.raise_for_status()
return [item['relevance_score'] for item in sorted(response.json()['results'], key=lambda x: x['index'])]
def save_uploaded_file(content: bytes, filename: str) -> str:
filepath = os.path.join(settings.DOCS_PATH, filename)
with open(filepath, "wb") as f:
f.write(content)
return filepath
+1
View File
@@ -0,0 +1 @@
# This file is intentionally left blank.
+18
View File
@@ -0,0 +1,18 @@
from datetime import datetime
# Cohere API Configuration
COHERE_API_KEY = "ZlABLjvSsT86iObp9cgIgNkx2BLPs62pZiXBczw9"
EMBEDDING_MODEL = "embed-english-v3.0" # Cohere model name
EMBEDDING_DIMENSION = 1024 # Dimension for Cohere embeddings
# FAISS Configuration
FAISS_INDEX_PATH = ""
METADATA_PATH = ""
# API Configuration
API_HOST = "0.0.0.0"
API_PORT = 5125
# Logging Configuration
CURRENT_TIME = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
CURRENT_USER = "tjc"
+16
View File
@@ -0,0 +1,16 @@
# utils/data_validator.py
from typing import Dict, Any
import pandas as pd
class DataValidator:
@staticmethod
def validate_crm_data(data: pd.DataFrame) -> bool:
"""Validate CRM data structure"""
required_columns = ['customer_id', 'interaction_date', 'interaction_type']
return all(col in data.columns for col in required_columns)
@staticmethod
def validate_training_data(data: Dict[str, Any]) -> bool:
"""Validate training material data"""
required_fields = ['content', 'category', 'level']
return all(field in data for field in required_fields)
+28
View File
@@ -0,0 +1,28 @@
# utils/security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from config.settings import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class Security:
@staticmethod
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM
)
return encoded_jwt
@staticmethod
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
+70
View File
@@ -0,0 +1,70 @@
import json
from typing import List, Dict, Tuple
from concurrent.futures import ThreadPoolExecutor
import os
import numpy as np
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.vectorstores import FAISS
from langchain_cohere import CohereEmbeddings
import faiss
from langchain_core.documents import Document
from config import COHERE_API_KEY, EMBEDDING_MODEL, EMBEDDING_DIMENSION
class VectorDB:
def __init__(self):
self._executor = ThreadPoolExecutor(max_workers=10)
self.COHERE_API_KEY = COHERE_API_KEY
os.environ["COHERE_API_KEY"] = self.COHERE_API_KEY
self.embeddings = CohereEmbeddings(model=EMBEDDING_MODEL)
self.index = faiss.IndexFlatL2(EMBEDDING_DIMENSION)
self.vector_score = FAISS(
embedding_function=self.embeddings,
index=self.index,
docstore=InMemoryDocstore(),
index_to_docstore_id={},
)
def load_embeddings(self, file_id: str, file_path: str):
"""
Load embeddings from file
"""
try:
if not os.path.isdir(file_path):
raise Exception(f"{file_path} is not a valid directory.")
print("Files in directory: ", os.listdir(file_path))
print("Current working directory: ", os.getcwd())
os.chdir("/home/kowshik/work/ds_tjc/index/faiss_index")
print("Changed directory to: ", os.getcwd())
new_vector_store = FAISS.load_local(
folder_path=file_path,
index_name="index",
embeddings=self.embeddings,
allow_dangerous_deserialization=True,
)
return new_vector_store
except Exception as e:
raise Exception(f"Error loading embeddings: {str(e)}")
def search(self, new_vector_store, query: str, top_k: int = 5) -> List[Dict]:
"""
Search for similar documents and return serializable results
"""
try:
raw_results = new_vector_store.similarity_search_with_score(query, k=top_k)
# Convert results to serializable format
processed_results = []
for doc, score in raw_results:
processed_result = {
'content': doc.page_content,
'metadata': doc.metadata,
'score': float(score) # Convert numpy.float32 to Python float
}
processed_results.append(processed_result)
return processed_results
except Exception as e:
raise Exception(f"Error during search: {str(e)}")