from langchain_community.embeddings import HuggingFaceBgeEmbeddings from langchain_text_splitters import RecursiveCharacterTextSplitter import faiss from langchain_community.docstore.in_memory import InMemoryDocstore from langchain_community.vectorstores import FAISS from langchain_community.document_loaders import PyPDFLoader from langchain_community.document_loaders import TextLoader from langchain_community.document_loaders import Docx2txtLoader from langchain_groq import ChatGroq from langchain_core.prompts.prompt import PromptTemplate from langchain_core.output_parsers import StrOutputParser from uuid import uuid4 from langchain_core.documents import Document from text_extractor import TextExtractor import os, sys sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from loggings.logging_config import logger import random from PIL import Image, ImageDraw, ImageFont from concurrent.futures import ThreadPoolExecutor import math import json from groq import Groq import re import time import shutil import numpy as np from pydub import AudioSegment import base64 import requests from moviepy.editor import VideoFileClip import ffmpeg from dotenv import load_dotenv load_dotenv() # OpenAI API Key api_key = os.getenv('OPENAI_API_KEY') # setting up groq api key os.environ["GROQ_API_KEY"] = os.getenv('GROQ_API_KEY') client = Groq(api_key = os.getenv('GROQ_API_KEY')) model = 'whisper-large-v3' # chat set up GROQ_LLM = ChatGroq(temperature=0, model_name="llama3-8b-8192", max_tokens=100) # ---------------------------------------------------------------------------------------------------- # loading the embedding model def load_embedding_model(): model_name = "BAAI/bge-small-en" model_kwargs = {"device": "cpu"} #can also be cpu encode_kwargs = {"normalize_embeddings": True} embeddings = HuggingFaceBgeEmbeddings( model_name=model_name, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs ) return embeddings # ---------------------------------------------------------------------------------------------------- # loading the embedding model logger.info("Loading the embedding model") embeddings = load_embedding_model() logger.info("Embedding model loaded") # --------------------------------------------------------TEXT PREPROCESSING-------------------------------------------- def create_documents(doc, file_type='text'): logger.info(f"Creating documents from text") text = doc[0].page_content metadata = doc[0].metadata text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=10, length_function=len, is_separator_regex=False, ) docs = text_splitter.create_documents([text]) # converting the text into documents documents = [] for i, chunk in enumerate(docs): # Increment page number based on the chunk index doc_metadata = metadata.copy() doc_metadata['page'] = i # Assign page number based on chunk index doc_metadata['file_type'] = file_type document = Document(page_content=chunk.page_content, metadata=doc_metadata) documents.append(document) return documents def load_txt_document(document_path): logger.info(f"Loading text document from {document_path}") try: txt_doc = TextLoader(document_path) text = txt_doc.load() # implementig document splitting docs = create_documents(text) return docs except: raise ValueError(f"Error loading -- {document_path}") def load_docx_document(document_path): logger.info(f"Loading docx document from {document_path}") try: docx_doc = Docx2txtLoader(document_path) text = docx_doc.load() # implementig document splitting docs = create_documents(text) return docs except: raise ValueError(f"Error loading -- {document_path}") # creating a function that checks the document type and loads the document def load_pdf_document(document_path): logger.info(f"Loading pdf document from {document_path}") try: pdf_doc = PyPDFLoader(document_path) pages = pdf_doc.load_and_split() return pages except: raise ValueError(f"Error loading -- {document_path}") # A general function that loads textual documents def load_document(document_path): if document_path.endswith(".pdf"): return load_pdf_document(document_path) elif document_path.endswith(".txt"): return load_txt_document(document_path) elif document_path.endswith(".docx"): return load_docx_document(document_path) else: raise ValueError(f"Unsupported document type for {document_path}") # ----------------------------------------------------IMAGE PROCESSING------------------------------------------------ # Function to encode the image def encode_image(image_path): logger.info(f"Encoding image {image_path}") with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') # Vision API to process the image def process_image(image_path): logger.info(f"Processing image {image_path}") global api_key # Getting the base64 string base64_image = encode_image(image_path) headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } try: payload = { "model": "gpt-4o-mini", "messages": [ { "role": "user", "content": [ { "type": "text", "text": "What’s in this image?" }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{base64_image}" } } ] } ], "max_tokens": 300 } response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) # returning the content of the response response = response.json()['choices'][0]['message']['content'] except Exception as e: response = "Image not good enough for processing" return response # create image document def create_image_document(image_path, file_type='image'): logger.info(f"Creating image document from {image_path}") # getting the image name from the image path image_name = image_path.split('\\')[-1].split('.')[0] # setting image name as metadata metadata = {'source': image_name, 'file_type': file_type} text_extractor = TextExtractor() text = text_extractor.read_text_from_image(image_path) # removing special characters and line breaks text = ''.join(e for e in text if e.isalnum() or e.isspace() or e == '\n') # if the text is empty, then we will process the image with OpenAI vision model if text == '': text = process_image(image_path) # checking if there's no value error or something, we will only return the text if there isnt any error if text != "Image not good enough for processing": # creating a document from the text doc = Document(page_content=text, metadata=metadata) # returning the document return [doc] else: pass # if there's an error, we will return None # -----------------------------------------------AUDIO PROCESSING----------------------------------------------------- # Audio to Text def audio_to_text(filepath): logger.info(f"Transcribing audio file {filepath}") with open(filepath, "rb") as file: translation = client.audio.translations.create( file=(filepath, file.read()), model="whisper-large-v3", ) return translation.text def split_audio_by_duration(audio_file_path, chunk_duration_minutes, print_output=True): logger.info(f"Splitting audio file {audio_file_path} by duration") # Convert chunk duration to milliseconds chunk_length_ms = chunk_duration_minutes * 60 * 1000 # Load audio file audio = AudioSegment.from_file(audio_file_path) audio_duration_ms = len(audio) # Create a temporary directory for storing chunks base_filename = os.path.basename(audio_file_path).split('.')[0] chunk_folder = f"{base_filename}_chunks" if not os.path.exists(chunk_folder): os.makedirs(chunk_folder) chunk_paths = [] if audio_duration_ms > chunk_length_ms: # Calculate the number of chunks, using math.ceil to ensure rounding up num_chunks = math.ceil(audio_duration_ms / chunk_length_ms) for i in range(int(num_chunks)): start_ms = i * chunk_length_ms end_ms = min(start_ms + chunk_length_ms, audio_duration_ms) chunk = audio[start_ms:end_ms] chunk_filename = f"{chunk_folder}/{base_filename}_chunk{i+1}.mp3" chunk.export(chunk_filename, format="mp3") chunk_paths.append(chunk_filename) if print_output: print(f'Exporting {chunk_filename}') else: # If audio duration is less than the chunk duration, store the whole file as a single chunk chunk_filename = f"{chunk_folder}/{base_filename}_chunk1.mp3" audio.export(chunk_filename, format="mp3") chunk_paths.append(chunk_filename) if print_output: print(f'Exporting {chunk_filename}') return chunk_folder, chunk_paths def transcribe_audio_chunks(audio_file_path, chunk_duration_minutes, file_type='audio'): logger.info(f"Transcribing audio chunks from {audio_file_path}") # Split the audio file into chunks chunk_folder, chunk_paths = split_audio_by_duration(audio_file_path, chunk_duration_minutes) documents = [] for chunk_path in chunk_paths: # Transcribe the chunk transcript = audio_to_text(chunk_path) # Assuming this function exists # Extract the base filename and chunk index using regex chunk_filename = os.path.basename(chunk_path) match = re.search(r'(.*)_chunk(\d+)\.mp3$', chunk_filename) if match: base_filename = match.group(1) chunk_index = int(match.group(2)) else: # Default values in case of unexpected filename format base_filename = os.path.splitext(chunk_filename)[0] chunk_index = 1 # Assuming it's the first chunk # Calculate the chunk's start and end times in minutes start_min = (chunk_index - 1) * chunk_duration_minutes end_min = chunk_index * chunk_duration_minutes actual_end_min = min(end_min, (len(AudioSegment.from_file(audio_file_path)) // 60000)) # To handle the last chunk's actual duration # preparing the start and end min in a timestamp format, also also catching cases of decimal, making it a real time if start_min % 1 == 0: start_min = f"{int(start_min)}:00" end_min = f"{int(end_min)}:00" else: # splitting the decimal part of the start and end min start_min_int, start_min_dec = str(start_min).split('.') end_min_int, end_min_dec = str(end_min).split('.') # converting the decimal part to seconds start_sec = int(start_min_dec) * 6 end_sec = int(end_min_dec) * 6 start_min = f"{start_min_int}:{start_sec}" end_min = f"{end_min_int}:{end_sec}" # Create a document with the transcript and metadata metadata = { "source": base_filename, "timestamp": f"{start_min}-{end_min}", "file_type": file_type, } document = Document(page_content=transcript, metadata=metadata) documents.append(document) # Delete the chunk folder after processing shutil.rmtree(chunk_folder) # adding a delay time.sleep(0.2) return documents # creating a function to create audio document def create_audio_document(audio_file_path, chunk_duration_minutes=3, file_type='audio'): documents = transcribe_audio_chunks(audio_file_path, chunk_duration_minutes, file_type) return documents # ------------------------------------------------VIDEO PROCESSING----------------------------------------------------- def preprocess_video_data(video_path: str, time_interval: int): logger.info(f"Preprocessing video data from {video_path}") # Load the video file video = VideoFileClip(video_path) # Get the duration of the video duration = video.duration # create an audio version of the video audio_path = video_path.replace('.mp4', '.mp3') _ = video.audio.write_audiofile(audio_path) # creating a snapshot of the videos at the time interval # Extract the video filename without extension video_name = os.path.splitext(os.path.basename(video_path))[0] # Create a directory for snapshots using the video name snapshot_dir = os.path.join(os.path.dirname(video_path), f"{video_name}") os.makedirs(snapshot_dir, exist_ok=True) # Get the duration of the video using ffmpeg probe = ffmpeg.probe(video_path) duration = float(probe['format']['duration']) # Loop through the video and take snapshots at 0s, 3min, 6min, etc. for i in range(0, int(duration), time_interval): start_time = i end_time = min(i + time_interval, int(duration)) # Format the interval as 'start-end' interval_str = f"{start_time}-{end_time}" # Save the snapshot as an image file in the created folder frame_img = os.path.join(snapshot_dir, f"{interval_str}s.png") # Extract the frame using ffmpeg ( ffmpeg .input(video_path, ss=start_time) .output(frame_img, vframes=1) .run() ) print(f"Snapshots saved in {snapshot_dir}.") # now creating document from the audio file documents = create_audio_document(audio_path, chunk_duration_minutes=0.5, file_type='video') logger.info(f"Documents created from video {video_path}") # deleting the audio file os.remove(audio_path) return documents #----------------------------------------------------DOC SUMMARIZER -------------------------------------------------- def doc_summarizer(document_page: list) -> str: logger.info(f"Summarizing document") initiator_prompt = PromptTemplate( template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> Create a short summary of the document based on the provided text. Start with: This document is about... <|eot_id|><|start_header_id|>user<|end_header_id|> DOCUMENT: {document_page} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>""", input_variables=["document_page"], ) initiator_router = initiator_prompt | GROQ_LLM | StrOutputParser() output = initiator_router.invoke({"document_page":document_page}) return output #-----------------------------------------------------OTHERS-------------------------------------------------------------- def save_embedded_data(embeddings, path = "index/faiss_index",): logger.info(f"Saving embeddings") embeddings.save_local(f"index/faiss_index") print("Embeddings saved") return 'saved' def load_embedded_data(embeddings=embeddings, path = "index/faiss_index"): logger.info(f"Loading embedded data") embed_db = FAISS.load_local(f"index/faiss_index", embeddings, allow_dangerous_deserialization=True) return embed_db #-----------------------------------------------------Data Loading Process---------------------------------------------------- # creating a function to load all documents from a directory. def process_document(path, extension, text_doc, image_doc, audio_doc, video_doc): doc_name = os.path.basename(path).split('.')[0] process_map = { "text": load_document, "image": create_image_document, "audio": create_audio_document, "video": preprocess_video_data } if extension in text_doc: doc = process_map["text"](path) num_pages = len(doc) elif extension in image_doc: doc = process_map["image"](path) num_pages = 1 doc_name = doc[0].metadata['source'].split('\\')[-1] elif extension in audio_doc: doc = process_map["audio"](path) num_pages = len(doc) doc_name = doc[0].metadata['source'] elif extension in video_doc: doc = process_map["video"](path, time_interval=30) num_pages = len(doc) doc_name = doc[0].metadata['source'] else: return None, None, None # Unhandled extension print(f"Document {doc_name} loaded") return doc, doc_name, num_pages def load_documents_from_directory(directory_path: str): text_doc = ['pdf', 'txt', 'docx', 'doc', 'md'] image_doc = ['jpg', 'jpeg', 'png', 'gif', 'bmp'] audio_doc = ['mp3', 'wav', 'flac', 'ogg', 'm4a'] video_doc = ['mp4', 'avi', 'mkv', 'flv', 'mov'] files = os.listdir(directory_path) documents = [] doc_names = [] num_pages = [] doc_summary = [] def process_with_delay(file): result = process_document(os.path.join(directory_path, file), file.split('.')[-1], text_doc, image_doc, audio_doc, video_doc) time.sleep(0.4) # Introduce a 0.4s delay between processing each document return result with ThreadPoolExecutor() as executor: results = executor.map(process_with_delay, files) for doc, doc_name, pages in results: if doc is not None: documents.append(doc) doc_names.append(doc_name) num_pages.append(pages) # creating doc summary first_page = doc[0].page_content summary = doc_summarizer(first_page) doc_summary.append(summary) # adding some delay time.sleep(0.5) docs_id = [uuid4().hex for _ in range(len(documents))] json_file = os.path.join(directory_path, 'data.json') # creating a dictionary for each document in the json file for i in range(len(documents)): data = {doc_names[i].split("\\")[-1]: {'doc_id':docs_id[i], 'num_pages': num_pages[i], 'doc_summary': doc_summary[i]}} if os.path.exists(json_file): with open(json_file, 'r+') as f: existing_data = json.load(f) existing_data.update(data) f.seek(0) json.dump(existing_data, f) else: with open(json_file, 'w') as f: json.dump(data, f) return documents, docs_id, num_pages # A function to create vector store def create_vector_store(documents: list, docs_id: list, num_pages: list): logger.info(f"Creating vector store") # index set up with the embedding dimension index = faiss.IndexFlatL2(384) # Initialize the FAISS vector store vector_store = FAISS( embedding_function=embeddings, index=index, docstore=InMemoryDocstore(), index_to_docstore_id={}, ) # Now adding other documents to the store. for i in range(len(documents)): doc_id = docs_id[i] page_ids = [doc_id+ str(i) for i in range(num_pages[i])] vector_store.add_documents(documents=documents[i], ids=page_ids) logger.info(f"Vector store created") logger.info(f"Saving the vector store") # saving the vector store automatically save_embedded_data(vector_store) logger.info(f"Vector store saved") return vector_store # creating a function to add documents to the vector store def add_documents_to_vector_store(embeddings, documents: list, docs_id: list, num_pages: list): # loading the vector store vector_store = load_embedded_data(embeddings) for i in range(len(documents)): doc_id = docs_id[i] page_ids = [doc_id+ str(i) for i in range(num_pages[i])] vector_store.add_documents(documents=documents[i], ids=page_ids) print ("Documents added to the vector store") #----------------------------------------------------------Thumbnail Generator----------------------------------------------------- def create_text_thumbnail(file_path): logger.info(f"Creating thumbnail for {file_path}") # Create a folder for thumbnails if it doesn't exist thumbnail_folder = os.path.join(os.path.dirname(file_path), 'thumbnails') os.makedirs(thumbnail_folder, exist_ok=True) # Extract file name (without extension) file_name = os.path.splitext(os.path.basename(file_path))[0] # Create a random background color background_color = tuple(random.randint(0, 255) for _ in range(3)) # Create an image with the random background color img = Image.new('RGB', (800, 400), color=background_color) # Initialize drawing context d = ImageDraw.Draw(img) # Load a font try: font = ImageFont.truetype("arial.ttf", 25) # Adjust the font size as needed except IOError: font = ImageFont.load_default() # Get the bounding box of the text text_bbox = d.textbbox((0, 0), file_name, font=font) text_width = text_bbox[2] - text_bbox[0] text_height = text_bbox[3] - text_bbox[1] # Calculate the position to center the text text_x = (img.width - text_width) / 2 text_y = (img.height - text_height) / 2 # Draw the text onto the image d.text((text_x, text_y), file_name, font=font, fill=(255, 255, 255)) # White text # Save the image thumbnail_path = os.path.join(thumbnail_folder, f"{file_name}.png") img.save(thumbnail_path) print(f"Thumbnail created: {thumbnail_path}") def process_directory(directory_path): supported_extensions = ['.txt', '.pdf', '.docx', '.mp3', '.m4a'] for file in os.listdir(directory_path): file_path = os.path.join(directory_path, file) if os.path.isfile(file_path): file_extension = os.path.splitext(file)[1].lower() if file_extension in supported_extensions: create_text_thumbnail(file_path) return "Done" #-----------------------------------------------------------SEARCH------------------------------------------------------- # A document search function def search(query, k=20): logger.info(f"Searching for {query}") # loading the embedded data embed_db = load_embedded_data() db = embed_db docs = db.similarity_search(query, k) logger.info(f"Search completed") all = [] info = [] for doc in docs: # all.append({doc.page_content}) info.append(dict(doc.metadata)) return info