from langchain_community.embeddings import HuggingFaceBgeEmbeddings from langchain_text_splitters import RecursiveCharacterTextSplitter import faiss from langchain_community.docstore.in_memory import InMemoryDocstore from langchain_community.vectorstores import FAISS from langchain_community.document_loaders import PyPDFLoader from langchain_community.document_loaders import TextLoader from langchain_community.document_loaders import Docx2txtLoader from uuid import uuid4 from langchain_core.documents import Document from text_extractor import TextExtractor import os import json from groq import Groq import re import shutil import numpy as np from pydub import AudioSegment import base64 import requests from moviepy.editor import VideoFileClip import ffmpeg from dotenv import load_dotenv load_dotenv() # OpenAI API Key api_key = os.getenv('OPENAI_API_KEY') client = Groq(api_key = os.getenv('GROQ_API_KEY')) model = 'whisper-large-v3' # ---------------------------------------------------------------------------------------------------- # loading the embedding model def load_embedding_model(): model_name = "BAAI/bge-small-en" model_kwargs = {"device": "cuda"} #can also be cpu encode_kwargs = {"normalize_embeddings": True} embeddings = HuggingFaceBgeEmbeddings( model_name=model_name, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs ) return embeddings # ---------------------------------------------------------------------------------------------------- # loading the embedding model embeddings = load_embedding_model() # --------------------------------------------------------TEXT PREPROCESSING-------------------------------------------- def create_documents(doc, file_type='text'): text = doc[0].page_content metadata = doc[0].metadata text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=10, length_function=len, is_separator_regex=False, ) docs = text_splitter.create_documents([text]) # converting the text into documents documents = [] for i, chunk in enumerate(docs): # Increment page number based on the chunk index doc_metadata = metadata.copy() doc_metadata['page'] = i # Assign page number based on chunk index doc_metadata['file_type'] = file_type document = Document(page_content=chunk.page_content, metadata=doc_metadata) documents.append(document) return documents def load_txt_document(document_path): try: txt_doc = TextLoader(document_path) text = txt_doc.load() # implementig document splitting docs = create_documents(text) return docs except: raise ValueError(f"Error loading -- {document_path}") def load_docx_document(document_path): try: docx_doc = Docx2txtLoader(document_path) text = docx_doc.load() # implementig document splitting docs = create_documents(text) return docs except: raise ValueError(f"Error loading -- {document_path}") # creating a function that checks the document type and loads the document def load_pdf_document(document_path): try: pdf_doc = PyPDFLoader(document_path) pages = pdf_doc.load_and_split() return pages except: raise ValueError(f"Error loading -- {document_path}") # A general function that loads textual documents def load_document(document_path): if document_path.endswith(".pdf"): return load_pdf_document(document_path) elif document_path.endswith(".txt"): return load_txt_document(document_path) elif document_path.endswith(".docx"): return load_docx_document(document_path) else: raise ValueError(f"Unsupported document type for {document_path}") # ----------------------------------------------------IMAGE PROCESSING------------------------------------------------ # Function to encode the image def encode_image(image_path): with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') # Vision API to process the image def process_image(image_path): global api_key # Getting the base64 string base64_image = encode_image(image_path) headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } try: payload = { "model": "gpt-4o-mini", "messages": [ { "role": "user", "content": [ { "type": "text", "text": "What’s in this image?" }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{base64_image}" } } ] } ], "max_tokens": 300 } response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) # returning the content of the response response = response.json()['choices'][0]['message']['content'] except Exception as e: response = "Image not good enough for processing" return response # create image document def create_image_document(image_path, file_type='image'): # getting the image name from the image path image_name = image_path.split('/')[-1].split('.')[0] # setting image name as metadata metadata = {'filename': image_name, 'file_type': file_type} text_extractor = TextExtractor() text = text_extractor.read_text_from_image(image_path) # removing special characters and line breaks text = ''.join(e for e in text if e.isalnum() or e.isspace() or e == '\n') # if the text is empty, then we will process the image with OpenAI vision model if text == '': text = process_image(image_path) # checking if there's no value error or something, we will only return the text if there isnt any error if text != "Image not good enough for processing": # creating a document from the text doc = Document(page_content=text, metadata=metadata) # returning the document return [doc] else: pass # if there's an error, we will return None # -----------------------------------------------AUDIO PROCESSING----------------------------------------------------- # Audio to Text def audio_to_text(filepath): with open(filepath, "rb") as file: translation = client.audio.translations.create( file=(filepath, file.read()), model="whisper-large-v3", ) return translation.text def split_audio_by_duration(audio_file_path, chunk_duration_minutes, print_output=True): # Convert chunk duration to milliseconds chunk_length_ms = chunk_duration_minutes * 60 * 1000 # Load audio file audio = AudioSegment.from_file(audio_file_path) audio_duration_ms = len(audio) # Create a temporary directory for storing chunks base_filename = os.path.basename(audio_file_path).split('.')[0] chunk_folder = f"{base_filename}_chunks" if not os.path.exists(chunk_folder): os.makedirs(chunk_folder) chunk_paths = [] if audio_duration_ms > chunk_length_ms: # Calculate the number of chunks num_chunks = audio_duration_ms // chunk_length_ms + (1 if audio_duration_ms % chunk_length_ms != 0 else 0) for i in range(num_chunks): start_ms = i * chunk_length_ms end_ms = min(start_ms + chunk_length_ms, audio_duration_ms) chunk = audio[start_ms:end_ms] chunk_filename = f"{chunk_folder}/{base_filename}_chunk{i+1}.mp3" chunk.export(chunk_filename, format="mp3") chunk_paths.append(chunk_filename) if print_output: print(f'Exporting {chunk_filename}') else: # If audio duration is less than the chunk duration, store the whole file as a single chunk chunk_filename = f"{chunk_folder}/{base_filename}_chunk1.mp3" audio.export(chunk_filename, format="mp3") chunk_paths.append(chunk_filename) if print_output: print(f'Exporting {chunk_filename}') return chunk_folder, chunk_paths def transcribe_audio_chunks(audio_file_path, chunk_duration_minutes, file_type='audio'): # Split the audio file into chunks chunk_folder, chunk_paths = split_audio_by_duration(audio_file_path, chunk_duration_minutes) documents = [] for chunk_path in chunk_paths: # Transcribe the chunk transcript = audio_to_text(chunk_path) # Assuming this function exists # Extract the base filename and chunk index using regex chunk_filename = os.path.basename(chunk_path) match = re.search(r'(.*)_chunk(\d+)\.mp3$', chunk_filename) if match: base_filename = match.group(1) chunk_index = int(match.group(2)) else: # Default values in case of unexpected filename format base_filename = os.path.splitext(chunk_filename)[0] chunk_index = 1 # Assuming it's the first chunk # Calculate the chunk's start and end times in minutes start_min = (chunk_index - 1) * chunk_duration_minutes end_min = chunk_index * chunk_duration_minutes actual_end_min = min(end_min, (len(AudioSegment.from_file(audio_file_path)) // 60000)) # To handle the last chunk's actual duration # Create a document with the transcript and metadata metadata = { "filename": base_filename, "duration": f"{start_min}-{end_min} minutes", "file_type": file_type, } document = Document(page_content=transcript, metadata=metadata) documents.append(document) # Delete the chunk folder after processing shutil.rmtree(chunk_folder) return documents # creating a function to create audio document def create_audio_document(audio_file_path, chunk_duration_minutes=3, file_type='audio'): documents = transcribe_audio_chunks(audio_file_path, chunk_duration_minutes, file_type) return documents # ------------------------------------------------VIDEO PROCESSING----------------------------------------------------- def preprocess_video_data(video_path: str, time_interval: int): # Load the video file video = VideoFileClip(video_path) # Get the duration of the video duration = video.duration # create an audio version of the video audio_path = video_path.replace('.mp4', '.mp3') _ = video.audio.write_audiofile(audio_path) # creating a snapshot of the videos at the time interval # Extract the video filename without extension video_name = os.path.splitext(os.path.basename(video_path))[0] # Create a directory for snapshots using the video name snapshot_dir = os.path.join(os.path.dirname(video_path), f"{video_name}_snapshots") os.makedirs(snapshot_dir, exist_ok=True) # Set the interval to 3 minutes (180 seconds) interval = 180 # Get the duration of the video using ffmpeg probe = ffmpeg.probe(video_path) duration = float(probe['format']['duration']) # Loop through the video and take snapshots at 0s, 3min, 6min, etc. for i in range(0, int(duration), interval): # Calculate the time for the current frame frame_time = i # Save the snapshot as an image file in the created folder frame_img = os.path.join(snapshot_dir, f"frame_at_{frame_time//60}min.png") # Extract the frame using ffmpeg ( ffmpeg .input(video_path, ss=frame_time) .output(frame_img, vframes=1) .run() ) print(f"Snapshots saved in {snapshot_dir}.") # now creating document from the audio file documents = create_audio_document(audio_path, file_type='video') return documents #-----------------------------------------------------OTHERS-------------------------------------------------------------- def save_embedded_data(embeddings, key="data"): embeddings.save_local(f"vec-db/index/faiss_index_{key}") print("Embeddings saved") def load_embedded_data(embeddings=embeddings, key="data"): embed_db = FAISS.load_local(f"vec-db/index/faiss_index_{key}", embeddings, allow_dangerous_deserialization=True) return embed_db # creating a function to load all documents from a directory. def load_documents_from_directory(directory_path: str): text_doc = ['pdf', 'txt', 'docx', 'doc', 'md'] image_doc = ['jpg', 'jpeg', 'png', 'gif', 'bmp'] audio_doc = ['mp3', 'wav', 'flac', 'ogg', 'm4a'] video_doc = ['mp4', 'avi', 'mkv', 'flv', 'mov'] # accessing the name of the files in the directory files = os.listdir(directory_path) # creating a list to store the documents documents = [] # another list for the document names doc_names = [] # counting the number of pages in the document num_pages= [] # iterating through the files in the directory for file in files: # updating the path path = os.path.join(directory_path, file) # getting the file extension and doc name doc_name, extension = file.split('.')[0] , file.split('.')[-1] # checking if the file is a text document if extension in text_doc: # loading the document doc = load_document(path) # appending the document to the documents list documents.append(doc) # appending the number of pages in the document num_pages.append(len(doc)) # adding the document name to the doc_names list doc_names.append(doc_name) print(f"Document {doc_name} loaded") elif extension in image_doc: # creating an image document doc = create_image_document(path) # appending the document to the documents list documents.append(doc) # appending the number of pages in the document num_pages.append(1) # adding the document name to the doc_names list doc_names.append(doc[0].metadata['filename']) print(f"Document {doc[0].metadata['filename']} loaded") elif extension in audio_doc: # creating an audio document doc = create_audio_document(path) # appending the document to the documents list documents.append(doc) # appending the number of pages in the document num_pages.append(len(doc)) # adding the document name to the doc_names list doc_names.append(doc[0].metadata['filename']) print(f"Document {doc[0].metadata['filename']} loaded") elif extension in video_doc: # creating a video document doc = preprocess_video_data(path, time_interval=180) # appending the document to the documents list documents.append(doc) # appending the number of pages in the document num_pages.append(len(doc)) # adding the document name to the doc_names list doc_names.append(doc[0].metadata['filename']) print(f"Document {doc[0].metadata['filename']} loaded") # so we need to create a document id for each document docs_id = [uuid4().hex for i in range(len(documents))] # creating a json file to store the documents, checking if it exists then open it, else create it json_file = f"{directory_path}/documents.json" if os.path.exists(json_file): with open(json_file, 'r') as f: data = json.load(f) data['doc_names'] = doc_names data['docs_id'] = docs_id data['num_pages'] = num_pages with open(json_file, 'w') as f: json.dump(data, f) else: data = {'doc_names': doc_names, 'docs_id': docs_id, 'num_pages': num_pages} with open(json_file, 'w') as f: json.dump(data, f) # returning the documents, and doc ids return documents, docs_id, num_pages # A function to create vector store def create_vector_store(embeddings, documents: list, docs_id: list, num_pages: list): # index set up with the embedding dimension index = faiss.IndexFlatL2(384) # Initialize the FAISS vector store vector_store = FAISS( embedding_function=embeddings, index=index, docstore=InMemoryDocstore(), index_to_docstore_id={}, ) # Now adding other documents to the store. for i in range(len(documents)): doc_id = docs_id[i] page_ids = [doc_id+ str(i) for i in range(num_pages[i])] vector_store.add_documents(documents=documents[i], ids=page_ids) # saving the vector store automatically save_embedded_data(vector_store, key="data") return vector_store # creating a function to add documents to the vector store def add_documents_to_vector_store(embeddings, documents: list, docs_id: list, num_pages: list): # loading the vector store vector_store = load_embedded_data(embeddings) for i in range(len(documents)): doc_id = docs_id[i] page_ids = [doc_id+ str(i) for i in range(num_pages[i])] vector_store.add_documents(documents=documents[i], ids=page_ids) print ("Documents added to the vector store") # A document search function # loading the embedded data embed_db = load_embedded_data() def search(query, k=4): db = embed_db docs = db.similarity_search(query, k) all = [] info = [] for doc in docs: all.append({doc.page_content}) info.append(dict(doc.metadata)) return docs[0].page_content, all, info