parallel processing added

This commit is contained in:
timothyafolami
2024-08-15 23:17:17 +01:00
parent 179e51070a
commit 713354371e
8 changed files with 500 additions and 76 deletions
+92 -65
View File
@@ -6,14 +6,19 @@ from langchain_community.vectorstores import FAISS
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.document_loaders import TextLoader
from langchain_community.document_loaders import Docx2txtLoader
from langchain_groq import ChatGroq
from langchain_core.prompts.prompt import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from uuid import uuid4
from langchain_core.documents import Document
from text_extractor import TextExtractor
import os
from concurrent.futures import ThreadPoolExecutor
import math
import json
from groq import Groq
import re
import time
import shutil
import numpy as np
from pydub import AudioSegment
@@ -26,10 +31,15 @@ load_dotenv()
# OpenAI API Key
api_key = os.getenv('OPENAI_API_KEY')
# setting up groq api key
os.environ["GROQ_API_KEY"] = os.getenv('GROQ_API_KEY')
client = Groq(api_key = os.getenv('GROQ_API_KEY'))
model = 'whisper-large-v3'
# chat set up
GROQ_LLM = ChatGroq(temperature=0, model_name="llama3-8b-8192", max_tokens=100)
# ----------------------------------------------------------------------------------------------------
# loading the embedding model
def load_embedding_model():
@@ -337,6 +347,25 @@ def preprocess_video_data(video_path: str, time_interval: int):
return documents
#----------------------------------------------------DOC SUMMARIZER --------------------------------------------------
def doc_summarizer(document_page: list) -> str:
initiator_prompt = PromptTemplate(
template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
Create a short summary of the document based on the provided text.
Start with: This document is about...
<|eot_id|><|start_header_id|>user<|end_header_id|>
DOCUMENT: {document_page} \n
<|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
input_variables=["document_page"],
)
initiator_router = initiator_prompt | GROQ_LLM | StrOutputParser()
output = initiator_router.invoke({"document_page":document_page})
return output
#-----------------------------------------------------OTHERS--------------------------------------------------------------
@@ -348,88 +377,86 @@ def load_embedded_data(embeddings=embeddings, key="data"):
embed_db = FAISS.load_local(f"index/faiss_index_{key}", embeddings, allow_dangerous_deserialization=True)
return embed_db
#-----------------------------------------------------Data Loading Process----------------------------------------------------
# creating a function to load all documents from a directory.
def process_document(path, extension, text_doc, image_doc, audio_doc, video_doc):
doc_name = os.path.basename(path).split('.')[0]
process_map = {
"text": load_document,
"image": create_image_document,
"audio": create_audio_document,
"video": preprocess_video_data
}
if extension in text_doc:
doc = process_map["text"](path)
num_pages = len(doc)
elif extension in image_doc:
doc = process_map["image"](path)
num_pages = 1
doc_name = doc[0].metadata['filename']
elif extension in audio_doc:
doc = process_map["audio"](path)
num_pages = len(doc)
doc_name = doc[0].metadata['filename']
elif extension in video_doc:
doc = process_map["video"](path, time_interval=30)
num_pages = len(doc)
doc_name = doc[0].metadata['filename']
else:
return None, None, None # Unhandled extension
print(f"Document {doc_name} loaded")
return doc, doc_name, num_pages
def load_documents_from_directory(directory_path: str):
text_doc = ['pdf', 'txt', 'docx', 'doc', 'md']
image_doc = ['jpg', 'jpeg', 'png', 'gif', 'bmp']
audio_doc = ['mp3', 'wav', 'flac', 'ogg', 'm4a']
video_doc = ['mp4', 'avi', 'mkv', 'flv', 'mov']
# accessing the name of the files in the directory
files = os.listdir(directory_path)
# creating a list to store the documents
documents = []
# another list for the document names
doc_names = []
# counting the number of pages in the document
num_pages= []
# iterating through the files in the directory
for file in files:
# updating the path
path = os.path.join(directory_path, file)
# getting the file extension and doc name
doc_name, extension = path.split('/')[-1].split('.')[0] , file.split('.')[-1]
# checking if the file is a text document
if extension in text_doc:
# loading the document
doc = load_document(path)
# appending the document to the documents list
num_pages = []
doc_summary = []
def process_with_delay(file):
result = process_document(os.path.join(directory_path, file), file.split('.')[-1], text_doc, image_doc, audio_doc, video_doc)
time.sleep(0.1) # Introduce a 0.1s delay between processing each document
return result
with ThreadPoolExecutor() as executor:
results = executor.map(process_with_delay, files)
for doc, doc_name, pages in results:
if doc is not None:
documents.append(doc)
# appending the number of pages in the document
num_pages.append(len(doc))
# adding the document name to the doc_names list
doc_names.append(doc_name)
print(f"Document {doc_name} loaded")
elif extension in image_doc:
# creating an image document
doc = create_image_document(path)
# appending the document to the documents list
documents.append(doc)
# appending the number of pages in the document
num_pages.append(1)
# adding the document name to the doc_names list
doc_names.append(doc[0].metadata['filename'])
print(f"Document {doc[0].metadata['filename']} loaded")
elif extension in audio_doc:
# creating an audio document
doc = create_audio_document(path)
# appending the document to the documents list
documents.append(doc)
# appending the number of pages in the document
num_pages.append(len(doc))
# adding the document name to the doc_names list
doc_names.append(doc[0].metadata['filename'])
print(f"Document {doc[0].metadata['filename']} loaded")
elif extension in video_doc:
# creating a video document
doc = preprocess_video_data(path, time_interval=30)
# appending the document to the documents list
documents.append(doc)
# appending the number of pages in the document
num_pages.append(len(doc))
# adding the document name to the doc_names list
doc_names.append(doc[0].metadata['filename'])
print(f"Document {doc[0].metadata['filename']} loaded")
num_pages.append(pages)
# so we need to create a document id for each document
docs_id = [uuid4().hex for i in range(len(documents))]
# creating a json file to store the documents, checking if it exists then open it, else create it
json_file = f"{directory_path}/documents.json"
# creating doc summary
first_page = doc[0].page_content
summary = doc_summarizer(first_page)
doc_summary.append(summary)
docs_id = [uuid4().hex for _ in range(len(documents))]
json_file = os.path.join(directory_path, 'data.json')
data = {'doc_names': doc_names, 'docs_id': docs_id, 'num_pages': num_pages, 'doc_summaary': doc_summary}
if os.path.exists(json_file):
with open(json_file, 'r') as f:
data = json.load(f)
data['doc_names'] = doc_names
data['docs_id'] = docs_id
data['num_pages'] = num_pages
with open(json_file, 'w') as f:
json.dump(data, f)
with open(json_file, 'r+') as f:
existing_data = json.load(f)
existing_data.update(data)
f.seek(0)
json.dump(existing_data, f)
else:
data = {'doc_names': doc_names, 'docs_id': docs_id, 'num_pages': num_pages}
with open(json_file, 'w') as f:
json.dump(data, f)
# returning the documents, and doc ids
return documents, docs_id, num_pages
@@ -475,6 +502,6 @@ def search(query, k=20):
all = []
info = []
for doc in docs:
all.append({doc.page_content})
# all.append({doc.page_content})
info.append(dict(doc.metadata))
return docs[0].page_content, all, info
return info