Files
ds_erp_ai/utils.py
T

473 lines
18 KiB
Python
Raw Normal View History

2024-08-05 22:14:19 +01:00
from langchain_community.embeddings import HuggingFaceBgeEmbeddings
2024-08-07 17:50:40 +01:00
from langchain_text_splitters import RecursiveCharacterTextSplitter
import faiss
from langchain_community.docstore.in_memory import InMemoryDocstore
2024-08-05 22:14:19 +01:00
from langchain_community.vectorstores import FAISS
2024-08-07 17:50:40 +01:00
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.document_loaders import TextLoader
from langchain_community.document_loaders import Docx2txtLoader
from uuid import uuid4
from langchain_core.documents import Document
2024-08-08 14:58:44 +01:00
from text_extractor import TextExtractor
2024-08-07 17:50:40 +01:00
import os
import json
2024-08-09 16:33:21 +01:00
from groq import Groq
import re
import shutil
import numpy as np
from pydub import AudioSegment
import base64
import requests
2024-08-13 21:30:01 +01:00
from moviepy.editor import VideoFileClip
import ffmpeg
from dotenv import load_dotenv
load_dotenv()
# OpenAI API Key
api_key = os.getenv('OPENAI_API_KEY')
2024-08-09 16:33:21 +01:00
client = Groq(api_key = os.getenv('GROQ_API_KEY'))
model = 'whisper-large-v3'
2024-08-05 22:14:19 +01:00
2024-08-09 16:33:21 +01:00
# ----------------------------------------------------------------------------------------------------
2024-08-05 22:14:19 +01:00
# loading the embedding model
def load_embedding_model():
model_name = "BAAI/bge-small-en"
model_kwargs = {"device": "cuda"} #can also be cpu
encode_kwargs = {"normalize_embeddings": True}
embeddings = HuggingFaceBgeEmbeddings(
model_name=model_name, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs
)
return embeddings
2024-08-09 16:33:21 +01:00
# ----------------------------------------------------------------------------------------------------
2024-08-05 22:14:19 +01:00
# loading the embedding model
embeddings = load_embedding_model()
2024-08-09 16:33:21 +01:00
# --------------------------------------------------------TEXT PREPROCESSING--------------------------------------------
2024-08-13 21:30:01 +01:00
def create_documents(doc, file_type='text'):
2024-08-07 17:50:40 +01:00
text = doc[0].page_content
metadata = doc[0].metadata
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=10,
length_function=len,
is_separator_regex=False,
)
docs = text_splitter.create_documents([text])
# converting the text into documents
documents = []
for i, chunk in enumerate(docs):
# Increment page number based on the chunk index
doc_metadata = metadata.copy()
doc_metadata['page'] = i # Assign page number based on chunk index
2024-08-13 21:30:01 +01:00
doc_metadata['file_type'] = file_type
2024-08-07 17:50:40 +01:00
document = Document(page_content=chunk.page_content, metadata=doc_metadata)
documents.append(document)
return documents
def load_txt_document(document_path):
try:
txt_doc = TextLoader(document_path)
text = txt_doc.load()
# implementig document splitting
docs = create_documents(text)
return docs
except:
raise ValueError(f"Error loading -- {document_path}")
def load_docx_document(document_path):
try:
docx_doc = Docx2txtLoader(document_path)
text = docx_doc.load()
# implementig document splitting
docs = create_documents(text)
return docs
except:
raise ValueError(f"Error loading -- {document_path}")
# creating a function that checks the document type and loads the document
def load_pdf_document(document_path):
try:
pdf_doc = PyPDFLoader(document_path)
pages = pdf_doc.load_and_split()
return pages
except:
raise ValueError(f"Error loading -- {document_path}")
# A general function that loads textual documents
def load_document(document_path):
if document_path.endswith(".pdf"):
return load_pdf_document(document_path)
elif document_path.endswith(".txt"):
return load_txt_document(document_path)
elif document_path.endswith(".docx"):
return load_docx_document(document_path)
else:
raise ValueError(f"Unsupported document type for {document_path}")
2024-08-05 22:14:19 +01:00
2024-08-09 16:33:21 +01:00
# ----------------------------------------------------IMAGE PROCESSING------------------------------------------------
# Function to encode the image
def encode_image(image_path):
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
# Vision API to process the image
def process_image(image_path):
global api_key
# Getting the base64 string
base64_image = encode_image(image_path)
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
try:
payload = {
"model": "gpt-4o-mini",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Whats in this image?"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}"
}
}
]
}
],
"max_tokens": 300
}
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
# returning the content of the response
response = response.json()['choices'][0]['message']['content']
except Exception as e:
response = "Image not good enough for processing"
return response
# create image document
2024-08-13 21:30:01 +01:00
def create_image_document(image_path, file_type='image'):
2024-08-08 14:58:44 +01:00
# getting the image name from the image path
image_name = image_path.split('/')[-1].split('.')[0]
# setting image name as metadata
2024-08-13 21:30:01 +01:00
metadata = {'filename': image_name, 'file_type': file_type}
2024-08-08 14:58:44 +01:00
text_extractor = TextExtractor()
text = text_extractor.read_text_from_image(image_path)
# removing special characters and line breaks
text = ''.join(e for e in text if e.isalnum() or e.isspace() or e == '\n')
# if the text is empty, then we will process the image with OpenAI vision model
if text == '':
text = process_image(image_path)
# checking if there's no value error or something, we will only return the text if there isnt any error
if text != "Image not good enough for processing":
# creating a document from the text
doc = Document(page_content=text, metadata=metadata)
# returning the document
return [doc]
else:
pass # if there's an error, we will return None
2024-08-07 17:50:40 +01:00
2024-08-09 16:33:21 +01:00
# -----------------------------------------------AUDIO PROCESSING-----------------------------------------------------
# Audio to Text
def audio_to_text(filepath):
with open(filepath, "rb") as file:
translation = client.audio.translations.create(
file=(filepath, file.read()),
model="whisper-large-v3",
)
return translation.text
def split_audio_by_duration(audio_file_path, chunk_duration_minutes, print_output=True):
# Convert chunk duration to milliseconds
chunk_length_ms = chunk_duration_minutes * 60 * 1000
# Load audio file
audio = AudioSegment.from_file(audio_file_path)
audio_duration_ms = len(audio)
# Create a temporary directory for storing chunks
base_filename = os.path.basename(audio_file_path).split('.')[0]
chunk_folder = f"{base_filename}_chunks"
if not os.path.exists(chunk_folder):
os.makedirs(chunk_folder)
chunk_paths = []
if audio_duration_ms > chunk_length_ms:
# Calculate the number of chunks
num_chunks = audio_duration_ms // chunk_length_ms + (1 if audio_duration_ms % chunk_length_ms != 0 else 0)
for i in range(num_chunks):
start_ms = i * chunk_length_ms
end_ms = min(start_ms + chunk_length_ms, audio_duration_ms)
chunk = audio[start_ms:end_ms]
chunk_filename = f"{chunk_folder}/{base_filename}_chunk{i+1}.mp3"
chunk.export(chunk_filename, format="mp3")
chunk_paths.append(chunk_filename)
if print_output:
print(f'Exporting {chunk_filename}')
else:
# If audio duration is less than the chunk duration, store the whole file as a single chunk
chunk_filename = f"{chunk_folder}/{base_filename}_chunk1.mp3"
audio.export(chunk_filename, format="mp3")
chunk_paths.append(chunk_filename)
if print_output:
print(f'Exporting {chunk_filename}')
return chunk_folder, chunk_paths
2024-08-13 21:30:01 +01:00
def transcribe_audio_chunks(audio_file_path, chunk_duration_minutes, file_type='audio'):
2024-08-09 16:33:21 +01:00
# Split the audio file into chunks
chunk_folder, chunk_paths = split_audio_by_duration(audio_file_path, chunk_duration_minutes)
documents = []
for chunk_path in chunk_paths:
# Transcribe the chunk
transcript = audio_to_text(chunk_path) # Assuming this function exists
# Extract the base filename and chunk index using regex
chunk_filename = os.path.basename(chunk_path)
match = re.search(r'(.*)_chunk(\d+)\.mp3$', chunk_filename)
if match:
base_filename = match.group(1)
chunk_index = int(match.group(2))
else:
# Default values in case of unexpected filename format
base_filename = os.path.splitext(chunk_filename)[0]
chunk_index = 1 # Assuming it's the first chunk
# Calculate the chunk's start and end times in minutes
start_min = (chunk_index - 1) * chunk_duration_minutes
end_min = chunk_index * chunk_duration_minutes
actual_end_min = min(end_min, (len(AudioSegment.from_file(audio_file_path)) // 60000)) # To handle the last chunk's actual duration
# Create a document with the transcript and metadata
metadata = {
"filename": base_filename,
2024-08-13 21:30:01 +01:00
"duration": f"{start_min}-{end_min} minutes",
"file_type": file_type,
2024-08-09 16:33:21 +01:00
}
document = Document(page_content=transcript, metadata=metadata)
documents.append(document)
# Delete the chunk folder after processing
shutil.rmtree(chunk_folder)
return documents
2024-08-13 21:30:01 +01:00
2024-08-09 16:33:21 +01:00
# creating a function to create audio document
2024-08-13 21:30:01 +01:00
def create_audio_document(audio_file_path, chunk_duration_minutes=3, file_type='audio'):
documents = transcribe_audio_chunks(audio_file_path, chunk_duration_minutes, file_type)
return documents
# ------------------------------------------------VIDEO PROCESSING-----------------------------------------------------
def preprocess_video_data(video_path: str, time_interval: int):
# Load the video file
video = VideoFileClip(video_path)
# Get the duration of the video
duration = video.duration
# create an audio version of the video
audio_path = video_path.replace('.mp4', '.mp3')
_ = video.audio.write_audiofile(audio_path)
# creating a snapshot of the videos at the time interval
# Extract the video filename without extension
video_name = os.path.splitext(os.path.basename(video_path))[0]
# Create a directory for snapshots using the video name
snapshot_dir = os.path.join(os.path.dirname(video_path), f"{video_name}_snapshots")
os.makedirs(snapshot_dir, exist_ok=True)
# Get the duration of the video using ffmpeg
probe = ffmpeg.probe(video_path)
duration = float(probe['format']['duration'])
# Loop through the video and take snapshots at 0s, 3min, 6min, etc.
2024-08-14 23:09:10 +01:00
for i in range(0, int(duration), time_interval):
2024-08-13 21:30:01 +01:00
# Calculate the time for the current frame
frame_time = i
# Save the snapshot as an image file in the created folder
frame_img = os.path.join(snapshot_dir, f"frame_at_{frame_time//60}min.png")
# Extract the frame using ffmpeg
(
ffmpeg
.input(video_path, ss=frame_time)
.output(frame_img, vframes=1)
.run()
)
print(f"Snapshots saved in {snapshot_dir}.")
# now creating document from the audio file
2024-08-14 23:09:10 +01:00
documents = create_audio_document(audio_path, chunk_duration_minutes=0.5, file_type='video')
2024-08-09 16:33:21 +01:00
return documents
2024-08-13 21:30:01 +01:00
2024-08-09 16:33:21 +01:00
#-----------------------------------------------------OTHERS--------------------------------------------------------------
2024-08-07 17:50:40 +01:00
def save_embedded_data(embeddings, key="data"):
embeddings.save_local(f"vec-db/index/faiss_index_{key}")
2024-08-05 22:14:19 +01:00
print("Embeddings saved")
2024-08-07 17:50:40 +01:00
def load_embedded_data(embeddings=embeddings, key="data"):
2024-08-05 22:14:19 +01:00
embed_db = FAISS.load_local(f"vec-db/index/faiss_index_{key}", embeddings, allow_dangerous_deserialization=True)
return embed_db
2024-08-07 17:50:40 +01:00
# creating a function to load all documents from a directory.
def load_documents_from_directory(directory_path: str):
text_doc = ['pdf', 'txt', 'docx', 'doc', 'md']
image_doc = ['jpg', 'jpeg', 'png', 'gif', 'bmp']
audio_doc = ['mp3', 'wav', 'flac', 'ogg', 'm4a']
video_doc = ['mp4', 'avi', 'mkv', 'flv', 'mov']
# accessing the name of the files in the directory
files = os.listdir(directory_path)
# creating a list to store the documents
documents = []
# another list for the document names
doc_names = []
# counting the number of pages in the document
num_pages= []
# iterating through the files in the directory
for file in files:
# updating the path
path = os.path.join(directory_path, file)
# getting the file extension and doc name
doc_name, extension = file.split('.')[0] , file.split('.')[-1]
# checking if the file is a text document
if extension in text_doc:
# loading the document
doc = load_document(path)
# appending the document to the documents list
documents.append(doc)
# appending the number of pages in the document
num_pages.append(len(doc))
# adding the document name to the doc_names list
doc_names.append(doc_name)
print(f"Document {doc_name} loaded")
2024-08-08 14:58:44 +01:00
elif extension in image_doc:
# creating an image document
doc = create_image_document(path)
# appending the document to the documents list
documents.append(doc)
# appending the number of pages in the document
num_pages.append(1)
# adding the document name to the doc_names list
doc_names.append(doc[0].metadata['filename'])
print(f"Document {doc[0].metadata['filename']} loaded")
2024-08-09 16:33:21 +01:00
elif extension in audio_doc:
# creating an audio document
doc = create_audio_document(path)
# appending the document to the documents list
documents.append(doc)
# appending the number of pages in the document
num_pages.append(len(doc))
# adding the document name to the doc_names list
doc_names.append(doc[0].metadata['filename'])
print(f"Document {doc[0].metadata['filename']} loaded")
2024-08-13 21:30:01 +01:00
elif extension in video_doc:
# creating a video document
2024-08-14 23:09:10 +01:00
doc = preprocess_video_data(path, time_interval=30)
2024-08-13 21:30:01 +01:00
# appending the document to the documents list
documents.append(doc)
# appending the number of pages in the document
num_pages.append(len(doc))
# adding the document name to the doc_names list
doc_names.append(doc[0].metadata['filename'])
print(f"Document {doc[0].metadata['filename']} loaded")
2024-08-07 17:50:40 +01:00
# so we need to create a document id for each document
docs_id = [uuid4().hex for i in range(len(documents))]
# creating a json file to store the documents, checking if it exists then open it, else create it
json_file = f"{directory_path}/documents.json"
if os.path.exists(json_file):
with open(json_file, 'r') as f:
data = json.load(f)
data['doc_names'] = doc_names
data['docs_id'] = docs_id
data['num_pages'] = num_pages
with open(json_file, 'w') as f:
json.dump(data, f)
else:
data = {'doc_names': doc_names, 'docs_id': docs_id, 'num_pages': num_pages}
with open(json_file, 'w') as f:
json.dump(data, f)
# returning the documents, and doc ids
return documents, docs_id, num_pages
# A function to create vector store
2024-08-14 23:09:10 +01:00
def create_vector_store(documents: list, docs_id: list, num_pages: list):
2024-08-07 17:50:40 +01:00
# index set up with the embedding dimension
index = faiss.IndexFlatL2(384)
# Initialize the FAISS vector store
vector_store = FAISS(
embedding_function=embeddings,
index=index,
docstore=InMemoryDocstore(),
index_to_docstore_id={},
)
# Now adding other documents to the store.
for i in range(len(documents)):
doc_id = docs_id[i]
page_ids = [doc_id+ str(i) for i in range(num_pages[i])]
vector_store.add_documents(documents=documents[i], ids=page_ids)
# saving the vector store automatically
save_embedded_data(vector_store, key="data")
return vector_store
# creating a function to add documents to the vector store
def add_documents_to_vector_store(embeddings, documents: list, docs_id: list, num_pages: list):
# loading the vector store
vector_store = load_embedded_data(embeddings)
for i in range(len(documents)):
doc_id = docs_id[i]
page_ids = [doc_id+ str(i) for i in range(num_pages[i])]
vector_store.add_documents(documents=documents[i], ids=page_ids)
print ("Documents added to the vector store")
2024-08-05 22:14:19 +01:00
# A document search function
2024-08-14 23:09:10 +01:00
def search(query, k=20):
# loading the embedded data
embed_db = load_embedded_data()
2024-08-13 22:16:12 +01:00
db = embed_db
docs = db.similarity_search(query, k)
all = []
info = []
for doc in docs:
all.append({doc.page_content})
info.append(dict(doc.metadata))
return docs[0].page_content, all, info