from langchain_community.embeddings import HuggingFaceBgeEmbeddings from langchain_text_splitters import RecursiveCharacterTextSplitter import faiss from langchain_community.docstore.in_memory import InMemoryDocstore from langchain_community.vectorstores import FAISS from langchain_community.document_loaders import PyPDFLoader from langchain_community.document_loaders import TextLoader from langchain_community.document_loaders import Docx2txtLoader from uuid import uuid4 from langchain_core.documents import Document from text_extractor import TextExtractor import os import json import base64 import requests from dotenv import load_dotenv load_dotenv() # OpenAI API Key api_key = os.getenv('OPENAI_API_KEY') # loading the embedding model def load_embedding_model(): model_name = "BAAI/bge-small-en" model_kwargs = {"device": "cuda"} #can also be cpu encode_kwargs = {"normalize_embeddings": True} embeddings = HuggingFaceBgeEmbeddings( model_name=model_name, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs ) return embeddings # loading the embedding model embeddings = load_embedding_model() def create_documents(doc): text = doc[0].page_content metadata = doc[0].metadata text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=10, length_function=len, is_separator_regex=False, ) docs = text_splitter.create_documents([text]) # converting the text into documents documents = [] for i, chunk in enumerate(docs): # Increment page number based on the chunk index doc_metadata = metadata.copy() doc_metadata['page'] = i # Assign page number based on chunk index document = Document(page_content=chunk.page_content, metadata=doc_metadata) documents.append(document) return documents def load_txt_document(document_path): try: txt_doc = TextLoader(document_path) text = txt_doc.load() # implementig document splitting docs = create_documents(text) return docs except: raise ValueError(f"Error loading -- {document_path}") def load_docx_document(document_path): try: docx_doc = Docx2txtLoader(document_path) text = docx_doc.load() # implementig document splitting docs = create_documents(text) return docs except: raise ValueError(f"Error loading -- {document_path}") # creating a function that checks the document type and loads the document def load_pdf_document(document_path): try: pdf_doc = PyPDFLoader(document_path) pages = pdf_doc.load_and_split() return pages except: raise ValueError(f"Error loading -- {document_path}") # A general function that loads textual documents def load_document(document_path): if document_path.endswith(".pdf"): return load_pdf_document(document_path) elif document_path.endswith(".txt"): return load_txt_document(document_path) elif document_path.endswith(".docx"): return load_docx_document(document_path) else: raise ValueError(f"Unsupported document type for {document_path}") # Function to encode the image def encode_image(image_path): with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') # Vision API to process the image def process_image(image_path): global api_key # Getting the base64 string base64_image = encode_image(image_path) headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } try: payload = { "model": "gpt-4o-mini", "messages": [ { "role": "user", "content": [ { "type": "text", "text": "What’s in this image?" }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{base64_image}" } } ] } ], "max_tokens": 300 } response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) # returning the content of the response response = response.json()['choices'][0]['message']['content'] except Exception as e: response = "Image not good enough for processing" return response # create image document def create_image_document(image_path): # getting the image name from the image path image_name = image_path.split('/')[-1].split('.')[0] # setting image name as metadata metadata = {'filename': image_name} text_extractor = TextExtractor() text = text_extractor.read_text_from_image(image_path) # removing special characters and line breaks text = ''.join(e for e in text if e.isalnum() or e.isspace() or e == '\n') # if the text is empty, then we will process the image with OpenAI vision model if text == '': text = process_image(image_path) # checking if there's no value error or something, we will only return the text if there isnt any error if text != "Image not good enough for processing": # creating a document from the text doc = Document(page_content=text, metadata=metadata) # returning the document return [doc] else: pass # if there's an error, we will return None def save_embedded_data(embeddings, key="data"): embeddings.save_local(f"vec-db/index/faiss_index_{key}") print("Embeddings saved") def load_embedded_data(embeddings=embeddings, key="data"): embed_db = FAISS.load_local(f"vec-db/index/faiss_index_{key}", embeddings, allow_dangerous_deserialization=True) return embed_db # creating a function to load all documents from a directory. def load_documents_from_directory(directory_path: str): text_doc = ['pdf', 'txt', 'docx', 'doc', 'md'] image_doc = ['jpg', 'jpeg', 'png', 'gif', 'bmp'] audio_doc = ['mp3', 'wav', 'flac', 'ogg', 'm4a'] video_doc = ['mp4', 'avi', 'mkv', 'flv', 'mov'] # accessing the name of the files in the directory files = os.listdir(directory_path) # creating a list to store the documents documents = [] # another list for the document names doc_names = [] # counting the number of pages in the document num_pages= [] # iterating through the files in the directory for file in files: # updating the path path = os.path.join(directory_path, file) # getting the file extension and doc name doc_name, extension = file.split('.')[0] , file.split('.')[-1] # checking if the file is a text document if extension in text_doc: # loading the document doc = load_document(path) # appending the document to the documents list documents.append(doc) # appending the number of pages in the document num_pages.append(len(doc)) # adding the document name to the doc_names list doc_names.append(doc_name) print(f"Document {doc_name} loaded") elif extension in image_doc: # creating an image document doc = create_image_document(path) # appending the document to the documents list documents.append(doc) # appending the number of pages in the document num_pages.append(1) # adding the document name to the doc_names list doc_names.append(doc[0].metadata['filename']) print(f"Document {doc[0].metadata['filename']} loaded") # so we need to create a document id for each document docs_id = [uuid4().hex for i in range(len(documents))] # creating a json file to store the documents, checking if it exists then open it, else create it json_file = f"{directory_path}/documents.json" if os.path.exists(json_file): with open(json_file, 'r') as f: data = json.load(f) data['doc_names'] = doc_names data['docs_id'] = docs_id data['num_pages'] = num_pages with open(json_file, 'w') as f: json.dump(data, f) else: data = {'doc_names': doc_names, 'docs_id': docs_id, 'num_pages': num_pages} with open(json_file, 'w') as f: json.dump(data, f) # returning the documents, and doc ids return documents, docs_id, num_pages # A function to create vector store def create_vector_store(embeddings, documents: list, docs_id: list, num_pages: list): # index set up with the embedding dimension index = faiss.IndexFlatL2(384) # Initialize the FAISS vector store vector_store = FAISS( embedding_function=embeddings, index=index, docstore=InMemoryDocstore(), index_to_docstore_id={}, ) # Now adding other documents to the store. for i in range(len(documents)): doc_id = docs_id[i] page_ids = [doc_id+ str(i) for i in range(num_pages[i])] vector_store.add_documents(documents=documents[i], ids=page_ids) # saving the vector store automatically save_embedded_data(vector_store, key="data") return vector_store # creating a function to add documents to the vector store def add_documents_to_vector_store(embeddings, documents: list, docs_id: list, num_pages: list): # loading the vector store vector_store = load_embedded_data(embeddings) for i in range(len(documents)): doc_id = docs_id[i] page_ids = [doc_id+ str(i) for i in range(num_pages[i])] vector_store.add_documents(documents=documents[i], ids=page_ids) print ("Documents added to the vector store") # A document search function def search(db, query, k=3): docs = db.similarity_search(query, k) all = "" pages = [] for doc in docs: all += f"{doc.page_content}\n" try: pages.append(doc.metadata['page']) except: pages.append(doc.metadata['filename']) return docs[0].page_content, all, pages