From 87e7b99daa5eb38f7dfbf3c55a69f5e38c6b4136 Mon Sep 17 00:00:00 2001 From: kowshik Date: Fri, 7 Feb 2025 19:24:57 +0600 Subject: [PATCH] Add initial project structure with configuration, utilities, and API endpoints --- .env | 7 ++ .gitignore | 11 ++ README.md | 27 ++++ __init__.py | 35 ++++++ api/__init__.py | 0 api/endpoints.py | 35 ++++++ config/__init__.py | 4 + config/settings.py | 14 +++ requirements.txt | 11 ++ src/__init__.py | 0 src/marketing_assistant_ai/chroma_manager.py | 70 +++++++---- src/marketing_assistant_ai/config.py | 29 ++--- src/marketing_assistant_ai/main.py | 122 +++++++++---------- src/marketing_assistant_ai/rag.py | 93 ++++++++++---- src/marketing_assistant_ai/schemas.py | 28 +++-- src/marketing_assistant_ai/utils.py | 53 +++++--- utils/__init__.py | 1 + utils/config.py | 18 +++ utils/data_validator.py | 16 +++ utils/security.py | 28 +++++ utils/vector_db.py | 70 +++++++++++ 21 files changed, 513 insertions(+), 159 deletions(-) create mode 100644 .env create mode 100644 .gitignore create mode 100644 README.md create mode 100644 __init__.py create mode 100644 api/__init__.py create mode 100644 api/endpoints.py create mode 100644 config/__init__.py create mode 100644 config/settings.py create mode 100644 requirements.txt create mode 100644 src/__init__.py create mode 100644 utils/__init__.py create mode 100644 utils/config.py create mode 100644 utils/data_validator.py create mode 100644 utils/security.py create mode 100644 utils/vector_db.py diff --git a/.env b/.env new file mode 100644 index 0000000..9fc22c2 --- /dev/null +++ b/.env @@ -0,0 +1,7 @@ +API_KEY=your_api_key +OPENAI_API_KEY=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 +DATABASE_URL=your_database_url +SECRET_KEY=your_secret_key +COHERE_API_KEY="AvFmArWCS6HtYD1Aa5vpFoAObjxYYK3JcO75pGcT" +GROQ_API_KEY = "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" +GROQ_MODEL_NAME = "llama-3.3-70b-versatile" \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7662eef --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +venv +datasets +data_processing +experiments +index +models +chroma_index +src/marketing_assistant_ai/__pycache__ +src/marketing_assistant_ai/chroma_manager.pyc +src/marketing_assistant_ai/llm_manager.pyc +src/marketing_assistant_ai/=3.35.0 diff --git a/README.md b/README.md new file mode 100644 index 0000000..1d17be1 --- /dev/null +++ b/README.md @@ -0,0 +1,27 @@ +To use this code: + +1. Create a `.env` file in the root directory with your configuration: +```env +API_KEY=your_api_key +OPENAI_API_KEY=your_openai_api_key +DATABASE_URL=your_database_url +SECRET_KEY=your_secret_key +``` + +2. Install the requirements: +```bash +pip install -r requirements.txt +``` + +3. Run the API: +```bash +uvicorn api.endpoints:app --reload +``` + +This code provides a foundation for: +- Processing and analyzing CRM data +- Generating marketing content using AI +- Answering NLP and Timeline Therapy questions +- Securing the application +- Validating data +- Exposing functionality through API endpoints diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..85ef326 --- /dev/null +++ b/__init__.py @@ -0,0 +1,35 @@ +import os +import sys + + + + +""" +├── config/ +│ ├── __init__.py +│ └── settings.py +│ +├── data_processing/ +│ ├── __init__.py +│ ├── crm_processor.py +│ ├── content_processor.py +│ └── training_data_processor.py +│ +├── models/ +│ ├── __init__.py +│ ├── marketing_assistant.py +│ ├── qa_bot.py +│ └── practitioner_assistant.py +│ +├── utils/ +│ ├── __init__.py +│ ├── data_validator.py +│ └── security.py +│ +├── api/ +│ ├── __init__.py +│ └── endpoints.py +│ +└── requirements.txt +""" + diff --git a/api/__init__.py b/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/endpoints.py b/api/endpoints.py new file mode 100644 index 0000000..0dbf082 --- /dev/null +++ b/api/endpoints.py @@ -0,0 +1,35 @@ +# api/endpoints.py +from fastapi import FastAPI, HTTPException, Depends +from typing import Dict +from models.marketing_assistant import MarketingAssistant +from models.qa_bot import NLPQABot +from config.settings import settings + +app = FastAPI() +marketing_assistant = MarketingAssistant(api_key=settings.OPENAI_API_KEY) +qa_bot = NLPQABot() + +@app.post("/generate-marketing-content/") +async def generate_marketing_content( + content_type: str, + parameters: Dict +) -> Dict: + try: + content = marketing_assistant.generate_content( + content_type=content_type, + **parameters + ) + return {"content": content} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/answer-question/") +async def answer_question( + question: str, + context: str +) -> Dict: + try: + answer = qa_bot.answer_question(question, context) + return {"answer": answer} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/config/__init__.py b/config/__init__.py new file mode 100644 index 0000000..781457b --- /dev/null +++ b/config/__init__.py @@ -0,0 +1,4 @@ +# config/__init__.py +from .settings import Settings + +settings = Settings() \ No newline at end of file diff --git a/config/settings.py b/config/settings.py new file mode 100644 index 0000000..887356f --- /dev/null +++ b/config/settings.py @@ -0,0 +1,14 @@ +# config/settings.py +from pydantic_settings import BaseSettings +from typing import Optional + +class Settings(BaseSettings): + API_KEY: str + OPENAI_API_KEY: str + DATABASE_URL: str + SECRET_KEY: str + ALGORITHM: str = "HS256" + ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 + + class Config: + env_file = ".env" \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..40ddbd0 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,11 @@ +python-dotenv +fastapi +uvicorn +langchain_community +langchain_chroma +langchain_core +llama-index +chromadb +langchain_groq +python-multipart +pydantic>=2.0 \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/marketing_assistant_ai/chroma_manager.py b/src/marketing_assistant_ai/chroma_manager.py index 8f8c6c8..a4720c3 100644 --- a/src/marketing_assistant_ai/chroma_manager.py +++ b/src/marketing_assistant_ai/chroma_manager.py @@ -1,55 +1,73 @@ -from typing import List, Optional -from langchain_core.documents import Document -from langchain_chroma import Chroma import uuid -import chromadb -from config import CHROMA_PATH, COLLECTION_NAME, MODEL_NAME -from langchain_huggingface import HuggingFaceEmbeddings +import os +from typing import List +from langchain_chroma import Chroma +from langchain_core.documents import Document +from config import settings +from utils import CustomEmbeddings class ChromaManager: def __init__(self): - self.embed_model = HuggingFaceEmbeddings( - model_name=MODEL_NAME, - encode_kwargs={'normalize_embeddings': True} - ) self.vector_store = Chroma( - collection_name=COLLECTION_NAME, - persist_directory=str(CHROMA_PATH), - embedding_function=self.embed_model + collection_name=settings.COLLECTION_NAME, + persist_directory=settings.CHROMA_PATH, + embedding_function=CustomEmbeddings(settings.MODEL_NAME) ) def get_collection_info(self): + index_size = 0 + if os.path.exists(settings.CHROMA_PATH): + for dirpath, _, filenames in os.walk(settings.CHROMA_PATH): + for f in filenames: + fp = os.path.join(dirpath, f) + index_size += os.path.getsize(fp) + return { + "collection_name": settings.COLLECTION_NAME, "document_count": self.vector_store._collection.count(), - "collection_name": COLLECTION_NAME + "index_size": f"{index_size/1024/1024:.2f} MB" } def add_documents(self, documents: List[Document]): try: ids = [str(uuid.uuid4()) for _ in documents] - texts = [doc.page_content for doc in documents] - metadatas = [doc.metadata for doc in documents] - - self.vector_store.add_texts( - texts=texts, - metadatas=metadatas, - ids=ids - ) + self.vector_store.add_documents(documents, ids=ids) return ids except Exception as e: - raise ValueError(f"Error adding documents: {str(e)}") + raise RuntimeError(f"Error adding documents: {str(e)}") def delete_document(self, doc_id: str): try: self.vector_store._collection.delete(ids=[doc_id]) return True except Exception as e: - raise ValueError(f"Delete error: {str(e)}") + raise RuntimeError(f"Delete error: {str(e)}") + + def get_all_files_metadata(self): + try: + print(len(self.vector_store.get()["ids"])) + for x in range(len(self.vector_store.get()["ids"])): + # print(db.get()["metadatas"][x]) + doc = self.vector_store.get()["metadatas"][x] + source = doc["source"] + print(source) + + return self.vector_store.get() + except Exception as e: + raise RuntimeError(f"Error retrieving files metadata: {str(e)}") def update_document(self, doc_id: str, new_content: str, metadata: dict): try: - new_doc = Document(page_content=new_content, metadata=metadata) self.delete_document(doc_id) + new_doc = Document(page_content=new_content, metadata=metadata) return self.add_documents([new_doc])[0] except Exception as e: - raise ValueError(f"Update error: {str(e)}") \ No newline at end of file + raise RuntimeError(f"Update error: {str(e)}") + + +# if __name__ == "__main__": +# chroma_manager = ChromaManager() +# #chroma_manager.create_collection() +# #chroma_manager.add_documents([Document(page_content="Test document", metadata={"source": "test"})]) +# print(chroma_manager.get_all_files_metadata()) +# print(chroma_manager.get_collection_info()) \ No newline at end of file diff --git a/src/marketing_assistant_ai/config.py b/src/marketing_assistant_ai/config.py index 14e56af..801080a 100644 --- a/src/marketing_assistant_ai/config.py +++ b/src/marketing_assistant_ai/config.py @@ -1,20 +1,15 @@ import os -from pathlib import Path -# Base directory -BASE_DIR = Path(__file__).parent.parent +class Settings: + MODEL_NAME = "BAAI/bge-large-en-v1.5" + RERANKER_NAME = "BAAI/bge-reranker-large" + GROQ_MODEL = "llama-3.3-70b-versatile" + #DOCS_PATH = "/home/kowshik/work/ds_tjc/datasets/Client-Assets" + DOCS_PATH = "/home/kowshik/work/ds_tjc/datasets/marketing_data" + CHROMA_PATH = "/home/kowshik/work/ds_tjc/chroma_index" + COLLECTION_NAME = "marketing_docs" + API_KEY = "4BkwTtVd5VwhTiFDdG3NfzgATrCq7aD8AjnvWNeivirTntHgRvL6Xe84ULHcVTLB" + SERVER_URL = "https://ma.rommelcorral.com" + GROQ_API_KEY = "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" -# Configuration -MODEL_NAME = "BAAI/bge-large-en-v1.5" -RERANKER_NAME = "BAAI/bge-reranker-base" -GROQ_MODEL = "llama-3.3-70b-versatile" -DOCS_PATH = BASE_DIR / "client_assets" -CHROMA_PATH = BASE_DIR / "chroma_index" -COLLECTION_NAME = "marketing_docs" - -# Create directories if they don't exist -DOCS_PATH.mkdir(exist_ok=True) -CHROMA_PATH.mkdir(exist_ok=True) - -# Groq API Key (Set through environment variable) -GROQ_API_KEY = os.getenv("GROQ_API_KEY") \ No newline at end of file +settings = Settings() \ No newline at end of file diff --git a/src/marketing_assistant_ai/main.py b/src/marketing_assistant_ai/main.py index 5897935..e28ae2d 100644 --- a/src/marketing_assistant_ai/main.py +++ b/src/marketing_assistant_ai/main.py @@ -1,87 +1,83 @@ -from fastapi import FastAPI, UploadFile, File, HTTPException -from fastapi.middleware.cors import CORSMiddleware -from schemas import DocumentUpload, DocumentUpdate, DocumentDelete, QueryRequest, ResponseSchema -from utils import process_uploaded_files, save_upload_file +from fastapi import FastAPI, HTTPException, UploadFile, File, Form +from fastapi.responses import JSONResponse +from typing import List +import base64 +from langchain_core.documents import Document from chroma_manager import ChromaManager from rag import RAGSystem -from config import DOCS_PATH -import uuid - -app = FastAPI(title="Marketing Assistant AI") - -# CORS Configuration -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_methods=["*"], - allow_headers=["*"], +from schemas import ( + DocumentUpload, + QueryRequest, + DocumentResponse, + CollectionInfo, + UpdateDocumentRequest ) +from utils import save_uploaded_file -# Initialize components +app = FastAPI() chroma_manager = ChromaManager() rag_system = RAGSystem() -@app.post("/upload/", response_model=ResponseSchema) -async def upload_document(file: UploadFile = File(...)): +@app.post("/upload/") +async def upload_document(file: UploadFile = File(...), file_category: str = Form(...)): try: - # Save file - filename = f"{uuid.uuid4()}_{file.filename}" - save_upload_file(file, filename) + if file_category not in ["email", "books", "article", "social"]: + raise HTTPException(status_code=400, detail="Invalid file category") - # Process and add to Chroma - documents = process_uploaded_files() - chroma_manager.add_documents(documents) + content = await file.read() + filepath = save_uploaded_file(content, file.filename) - return { - "result": "Documents processed successfully", - "collection_info": chroma_manager.get_collection_info() - } + document = Document( + page_content=str(content), # Convert bytes to string representation + metadata={"source": filepath, "filename": file.filename} + ) + + doc_id = chroma_manager.add_documents([document])[0] + return JSONResponse( + content={"message": "Document uploaded successfully", "doc_id": doc_id}, + status_code=201 + ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) -@app.put("/update/", response_model=ResponseSchema) -async def update_document(update_data: DocumentUpdate): +@app.post("/query/") +async def process_query(query: QueryRequest): + try: + response = rag_system.get_response(query.question) + return {"response": response} + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.delete("/documents/{doc_id}") +async def delete_document(doc_id: str): + try: + success = chroma_manager.delete_document(doc_id) + if success: + return {"message": "Document deleted successfully"} + raise HTTPException(status_code=404, detail="Document not found") + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.put("/documents/{doc_id}") +async def update_document(doc_id: str, update_data: UpdateDocumentRequest): try: new_id = chroma_manager.update_document( - update_data.doc_id, + doc_id, update_data.new_content, update_data.metadata ) - return { - "result": f"Document updated with ID: {new_id}", - "collection_info": chroma_manager.get_collection_info() - } + return {"message": "Document updated", "new_doc_id": new_id} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) -@app.delete("/delete/", response_model=ResponseSchema) -async def delete_document(delete_data: DocumentDelete): - try: - chroma_manager.delete_document(delete_data.doc_id) - return { - "result": "Document deleted successfully", - "collection_info": chroma_manager.get_collection_info() - } - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -@app.post("/query/", response_model=ResponseSchema) -async def process_query(query: QueryRequest): - try: - response = rag_system.query(query.question) - return { - "result": response, - "collection_info": chroma_manager.get_collection_info() - } - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) - -@app.get("/collection-info/", response_model=ResponseSchema) +@app.get("/collection-info/", response_model=CollectionInfo) async def get_collection_info(): try: - return { - "result": "Current collection status", - "collection_info": chroma_manager.get_collection_info() - } + info = chroma_manager.get_collection_info() + return info except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file + raise HTTPException(status_code=500, detail=str(e)) + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/src/marketing_assistant_ai/rag.py b/src/marketing_assistant_ai/rag.py index 7e0cb56..377f027 100644 --- a/src/marketing_assistant_ai/rag.py +++ b/src/marketing_assistant_ai/rag.py @@ -1,42 +1,91 @@ from langchain.retrievers import ContextualCompressionRetriever from langchain.retrievers.document_compressors import CrossEncoderReranker -from langchain_community.cross_encoders import HuggingFaceCrossEncoder from langchain_groq import ChatGroq from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough -from chroma_manager import ChromaManager -from config import RERANKER_NAME, GROQ_MODEL, GROQ_API_KEY +from config import settings +from utils import CustomEmbeddings, CustomCrossEncoder +from langchain_chroma import Chroma class RAGSystem: def __init__(self): - self.chroma_manager = ChromaManager() + self.embeddings = CustomEmbeddings(settings.MODEL_NAME) + self.reranker = CrossEncoderReranker( + model=CustomCrossEncoder(settings.RERANKER_NAME), + top_n=5 + ) + + self.vector_store = Chroma( + collection_name=settings.COLLECTION_NAME, + persist_directory=settings.CHROMA_PATH, + embedding_function=self.embeddings + ) + + self.retriever = ContextualCompressionRetriever( + base_compressor=self.reranker, + base_retriever=self.vector_store.as_retriever(search_kwargs={"k": 10}) + ) + self.llm = ChatGroq( temperature=0.01, - groq_api_key=GROQ_API_KEY, - model_name=GROQ_MODEL + groq_api_key=settings.GROQ_API_KEY, + model_name=settings.GROQ_MODEL ) - self._init_retriever() - self._init_chain() - - def _init_retriever(self): - model = HuggingFaceCrossEncoder(model_name=RERANKER_NAME) - reranker = CrossEncoderReranker(model=model, top_n=5) - self.retriever = ContextualCompressionRetriever( - base_compressor=reranker, - base_retriever=self.chroma_manager.vector_store.as_retriever(search_kwargs={"k": 10}) - ) - - def _init_chain(self): - template = """...""" # Your existing template here - prompt = ChatPromptTemplate.from_template(template) + self.prompt = ChatPromptTemplate.from_template(""" + Act like you are Adriana James, write marketing copy in her signature style. Just mimic her style and provide the answer to the user's query. Make sure that you are Adriana James, and you are providing the answer to the user's query. + + Here is some of her past **Email_Templates**: + + Template - 1: + Dear friend, + + As we approach the final days of 2024, I wanted to reach out with a message of hope and possibility for the year ahead. The dawn of 2025 brings with it an opportunity not just for fresh starts, but for transformative growth and achievement. + You may have already begun thinking about your aspirations for the coming year. Whether you have or haven't, I'd like to personally invite you to join me for an intimate goal-setting session where we'll work together to crystallize your vision for 2025. + I believe that every remarkable success story begins with clarity - knowing exactly what you want and placing it firmly in your future Time Line in a way that makes it inevitable. This isn't just about writing down wishes; it's about crafting the blueprint for your next chapter. + Before our session, I encourage you to reflect on three crucial questions: + + 1. What is the most important achievement you envision for 2025? + 2. How can you leverage your unique experiences and skills to create positive change in the world? + 3. What stepping stones will you need to place along your path to ensure your primary goal becomes reality? + + Here's what makes this journey so powerful: as you pursue specific goals, you naturally develop new skills, strategies, and behaviors. Sometimes, achieving a goal requires you to become an entirely new version of yourself - and that transformation is often the most valuable reward of all. + Join me for this complimentary goal-setting session: + Date: Thursday 15 January 2025 + Time: 4pm AEDT + Register Today + The more attention you invest in this process, the more you'll free yourself from limitations, unleash your creativity, and uncover possibilities you never imagined. This creates a beautiful cycle: greater goals lead to greater successes, which build self-confidence and positive momentum. + Register today for this special session. I look forward to helping you lay the foundation for an extraordinary 2025. + Be Well! + + Template - 2: + Hi [[contact.first_name]], + I trust you've been putting the valuable insights from our recent Goal-Setting Masterclass to good use. I hope you've had a chance to dive into the videos and set your sights on exciting goals for 2025 across all areas of your life -career, relationships, finance, health, and beyond. + Now, I'm thrilled to invite you to join me for an exclusive live Q&A session on Monday, February 3rd, 2025, at 7PM AEDT. This is your opportunity to delve deeper into the techniques shared and learn more about how to make 2025 truly exceptional. + Whether you're looking to fine-tune your objectives, overcome obstacles, or gain more insights into applying these powerful techniques, I'm here to support you. Let's work together to make sure you're fully equipped to create a prosperous and successful year in every aspect of your life. + Come prepared with your questions, and let's turn your 2025 goals into reality! + Zoom details + Be Well! + Dr Adriana James and team + For more information + visit www.nlpcoaching.com or email us via info@nlpcoaching.com | Copyright 2025 The Tad James Company. All rights reserved. Australia/International: 90-96 Bourke Road Alexandria, NSW 2015 United States / International: 1450 W Horizon Ridge Pkway #544, Henderson NV, 89012 Unsubscribe + + + Query: {question} + Adriana James Resource Context: {context} + + Now, write marketing copy in Adriana James' signature style from the context(Adriana James content) above and provide the answer to the user's query. + + Note: Don't provide anything extra. Just give me the response no extra words nothing at all. Just the response to the user's query. + """) + self.rag_chain = ( {"context": self.retriever, "question": RunnablePassthrough()} - | prompt + | self.prompt | self.llm | StrOutputParser() ) - def query(self, question: str): + def get_response(self, question: str) -> str: return self.rag_chain.invoke(question) \ No newline at end of file diff --git a/src/marketing_assistant_ai/schemas.py b/src/marketing_assistant_ai/schemas.py index 3a5aa9e..ae82327 100644 --- a/src/marketing_assistant_ai/schemas.py +++ b/src/marketing_assistant_ai/schemas.py @@ -2,19 +2,23 @@ from pydantic import BaseModel from typing import List, Optional class DocumentUpload(BaseModel): - description: Optional[str] = None - -class DocumentUpdate(BaseModel): - doc_id: str - new_content: str - metadata: dict - -class DocumentDelete(BaseModel): - doc_id: str + file: str # Base64 encoded file content + filename: str + metadata: Optional[dict] = {} class QueryRequest(BaseModel): question: str -class ResponseSchema(BaseModel): - result: str - collection_info: dict \ No newline at end of file +class DocumentResponse(BaseModel): + id: str + content: str + metadata: dict + +class CollectionInfo(BaseModel): + collection_name: str + document_count: int + index_size: str + +class UpdateDocumentRequest(BaseModel): + new_content: str + metadata: dict \ No newline at end of file diff --git a/src/marketing_assistant_ai/utils.py b/src/marketing_assistant_ai/utils.py index d086f51..ca263d7 100644 --- a/src/marketing_assistant_ai/utils.py +++ b/src/marketing_assistant_ai/utils.py @@ -1,22 +1,37 @@ -from langchain_community.document_loaders import DirectoryLoader -from langchain_text_splitters import RecursiveCharacterTextSplitter +import os +import requests +from typing import List, Tuple from langchain_core.documents import Document -from config import DOCS_PATH +from config import settings +from langchain.retrievers.document_compressors.cross_encoder import BaseCrossEncoder +class CustomEmbeddings: + def __init__(self, model_name: str): + self.model_name = model_name -def process_uploaded_files(): - """Process documents in the upload directory""" - loader = DirectoryLoader(DOCS_PATH, glob=["**/*.pdf", "**/*.txt"]) - documents = loader.load() - - text_splitter = RecursiveCharacterTextSplitter( - chunk_size=1000, - chunk_overlap=200 - ) - return text_splitter.split_documents(documents) + def embed_documents(self, texts: List[str]) -> List[List[float]]: + headers = {"Authorization": f"Bearer {settings.API_KEY}"} + payload = {"model": self.model_name, "input": texts} + response = requests.post(f"{settings.SERVER_URL}/embeddings", json=payload, headers=headers) + response.raise_for_status() + return [item['embedding'] for item in response.json()['data']] -def save_upload_file(file, filename: str): - """Save uploaded file to documents directory""" - file_path = DOCS_PATH / filename - with open(file_path, "wb") as buffer: - buffer.write(file.file.read()) - return file_path \ No newline at end of file + def embed_query(self, text: str) -> List[float]: + return self.embed_documents([text])[0] + +class CustomCrossEncoder(BaseCrossEncoder): + def __init__(self, model_name: str): + self.model_name = model_name + + def score(self, text_pairs: List[Tuple[str, str]]) -> List[float]: + query, documents = text_pairs[0][0], [doc for _, doc in text_pairs] + headers = {"Authorization": f"Bearer {settings.API_KEY}"} + payload = {"model": self.model_name, "query": query, "documents": documents} + response = requests.post(f"{settings.SERVER_URL}/rerank", json=payload, headers=headers) + response.raise_for_status() + return [item['relevance_score'] for item in sorted(response.json()['results'], key=lambda x: x['index'])] + +def save_uploaded_file(content: bytes, filename: str) -> str: + filepath = os.path.join(settings.DOCS_PATH, filename) + with open(filepath, "wb") as f: + f.write(content) + return filepath \ No newline at end of file diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..82789f2 --- /dev/null +++ b/utils/__init__.py @@ -0,0 +1 @@ +# This file is intentionally left blank. \ No newline at end of file diff --git a/utils/config.py b/utils/config.py new file mode 100644 index 0000000..a7edafb --- /dev/null +++ b/utils/config.py @@ -0,0 +1,18 @@ +from datetime import datetime + +# Cohere API Configuration +COHERE_API_KEY = "ZlABLjvSsT86iObp9cgIgNkx2BLPs62pZiXBczw9" +EMBEDDING_MODEL = "embed-english-v3.0" # Cohere model name +EMBEDDING_DIMENSION = 1024 # Dimension for Cohere embeddings + +# FAISS Configuration +FAISS_INDEX_PATH = "" +METADATA_PATH = "" + +# API Configuration +API_HOST = "0.0.0.0" +API_PORT = 5125 + +# Logging Configuration +CURRENT_TIME = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") +CURRENT_USER = "tjc" \ No newline at end of file diff --git a/utils/data_validator.py b/utils/data_validator.py new file mode 100644 index 0000000..ecd99c5 --- /dev/null +++ b/utils/data_validator.py @@ -0,0 +1,16 @@ +# utils/data_validator.py +from typing import Dict, Any +import pandas as pd + +class DataValidator: + @staticmethod + def validate_crm_data(data: pd.DataFrame) -> bool: + """Validate CRM data structure""" + required_columns = ['customer_id', 'interaction_date', 'interaction_type'] + return all(col in data.columns for col in required_columns) + + @staticmethod + def validate_training_data(data: Dict[str, Any]) -> bool: + """Validate training material data""" + required_fields = ['content', 'category', 'level'] + return all(field in data for field in required_fields) \ No newline at end of file diff --git a/utils/security.py b/utils/security.py new file mode 100644 index 0000000..0a52ff0 --- /dev/null +++ b/utils/security.py @@ -0,0 +1,28 @@ +# utils/security.py +from datetime import datetime, timedelta +from typing import Optional +from jose import JWTError, jwt +from passlib.context import CryptContext +from config.settings import settings + +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + +class Security: + @staticmethod + def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): + to_encode = data.copy() + if expires_delta: + expire = datetime.utcnow() + expires_delta + else: + expire = datetime.utcnow() + timedelta(minutes=15) + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode( + to_encode, + settings.SECRET_KEY, + algorithm=settings.ALGORITHM + ) + return encoded_jwt + + @staticmethod + def verify_password(plain_password: str, hashed_password: str) -> bool: + return pwd_context.verify(plain_password, hashed_password) \ No newline at end of file diff --git a/utils/vector_db.py b/utils/vector_db.py new file mode 100644 index 0000000..bb3f916 --- /dev/null +++ b/utils/vector_db.py @@ -0,0 +1,70 @@ +import json +from typing import List, Dict, Tuple +from concurrent.futures import ThreadPoolExecutor +import os +import numpy as np +from langchain_community.docstore.in_memory import InMemoryDocstore +from langchain_community.vectorstores import FAISS +from langchain_cohere import CohereEmbeddings +import faiss +from langchain_core.documents import Document +from config import COHERE_API_KEY, EMBEDDING_MODEL, EMBEDDING_DIMENSION + +class VectorDB: + def __init__(self): + self._executor = ThreadPoolExecutor(max_workers=10) + self.COHERE_API_KEY = COHERE_API_KEY + os.environ["COHERE_API_KEY"] = self.COHERE_API_KEY + self.embeddings = CohereEmbeddings(model=EMBEDDING_MODEL) + self.index = faiss.IndexFlatL2(EMBEDDING_DIMENSION) + self.vector_score = FAISS( + embedding_function=self.embeddings, + index=self.index, + docstore=InMemoryDocstore(), + index_to_docstore_id={}, + ) + + def load_embeddings(self, file_id: str, file_path: str): + """ + Load embeddings from file + """ + try: + if not os.path.isdir(file_path): + raise Exception(f"{file_path} is not a valid directory.") + print("Files in directory: ", os.listdir(file_path)) + print("Current working directory: ", os.getcwd()) + + os.chdir("/home/kowshik/work/ds_tjc/index/faiss_index") + print("Changed directory to: ", os.getcwd()) + + new_vector_store = FAISS.load_local( + folder_path=file_path, + index_name="index", + embeddings=self.embeddings, + allow_dangerous_deserialization=True, + ) + return new_vector_store + except Exception as e: + raise Exception(f"Error loading embeddings: {str(e)}") + + def search(self, new_vector_store, query: str, top_k: int = 5) -> List[Dict]: + """ + Search for similar documents and return serializable results + """ + try: + raw_results = new_vector_store.similarity_search_with_score(query, k=top_k) + + # Convert results to serializable format + processed_results = [] + for doc, score in raw_results: + processed_result = { + 'content': doc.page_content, + 'metadata': doc.metadata, + 'score': float(score) # Convert numpy.float32 to Python float + } + processed_results.append(processed_result) + + return processed_results + + except Exception as e: + raise Exception(f"Error during search: {str(e)}") \ No newline at end of file