Files
ds_fire_fighter/utils.py
T
2024-08-15 21:18:38 +01:00

481 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from langchain_community.embeddings import HuggingFaceBgeEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
import faiss
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.vectorstores import FAISS
from langchain_community.document_loaders import PyPDFLoader
from langchain_community.document_loaders import TextLoader
from langchain_community.document_loaders import Docx2txtLoader
from uuid import uuid4
from langchain_core.documents import Document
from text_extractor import TextExtractor
import os
import math
import json
from groq import Groq
import re
import shutil
import numpy as np
from pydub import AudioSegment
import base64
import requests
from moviepy.editor import VideoFileClip
import ffmpeg
from dotenv import load_dotenv
load_dotenv()
# OpenAI API Key
api_key = os.getenv('OPENAI_API_KEY')
client = Groq(api_key = os.getenv('GROQ_API_KEY'))
model = 'whisper-large-v3'
# ----------------------------------------------------------------------------------------------------
# loading the embedding model
def load_embedding_model():
model_name = "BAAI/bge-small-en"
model_kwargs = {"device": "cuda"} #can also be cpu
encode_kwargs = {"normalize_embeddings": True}
embeddings = HuggingFaceBgeEmbeddings(
model_name=model_name, model_kwargs=model_kwargs, encode_kwargs=encode_kwargs
)
return embeddings
# ----------------------------------------------------------------------------------------------------
# loading the embedding model
embeddings = load_embedding_model()
# --------------------------------------------------------TEXT PREPROCESSING--------------------------------------------
def create_documents(doc, file_type='text'):
text = doc[0].page_content
metadata = doc[0].metadata
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=10,
length_function=len,
is_separator_regex=False,
)
docs = text_splitter.create_documents([text])
# converting the text into documents
documents = []
for i, chunk in enumerate(docs):
# Increment page number based on the chunk index
doc_metadata = metadata.copy()
doc_metadata['page'] = i # Assign page number based on chunk index
doc_metadata['file_type'] = file_type
document = Document(page_content=chunk.page_content, metadata=doc_metadata)
documents.append(document)
return documents
def load_txt_document(document_path):
try:
txt_doc = TextLoader(document_path)
text = txt_doc.load()
# implementig document splitting
docs = create_documents(text)
return docs
except:
raise ValueError(f"Error loading -- {document_path}")
def load_docx_document(document_path):
try:
docx_doc = Docx2txtLoader(document_path)
text = docx_doc.load()
# implementig document splitting
docs = create_documents(text)
return docs
except:
raise ValueError(f"Error loading -- {document_path}")
# creating a function that checks the document type and loads the document
def load_pdf_document(document_path):
try:
pdf_doc = PyPDFLoader(document_path)
pages = pdf_doc.load_and_split()
return pages
except:
raise ValueError(f"Error loading -- {document_path}")
# A general function that loads textual documents
def load_document(document_path):
if document_path.endswith(".pdf"):
return load_pdf_document(document_path)
elif document_path.endswith(".txt"):
return load_txt_document(document_path)
elif document_path.endswith(".docx"):
return load_docx_document(document_path)
else:
raise ValueError(f"Unsupported document type for {document_path}")
# ----------------------------------------------------IMAGE PROCESSING------------------------------------------------
# Function to encode the image
def encode_image(image_path):
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
# Vision API to process the image
def process_image(image_path):
global api_key
# Getting the base64 string
base64_image = encode_image(image_path)
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
try:
payload = {
"model": "gpt-4o-mini",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Whats in this image?"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}"
}
}
]
}
],
"max_tokens": 300
}
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
# returning the content of the response
response = response.json()['choices'][0]['message']['content']
except Exception as e:
response = "Image not good enough for processing"
return response
# create image document
def create_image_document(image_path, file_type='image'):
# getting the image name from the image path
image_name = image_path.split('/')[-1].split('.')[0]
# setting image name as metadata
metadata = {'filename': image_name, 'file_type': file_type}
text_extractor = TextExtractor()
text = text_extractor.read_text_from_image(image_path)
# removing special characters and line breaks
text = ''.join(e for e in text if e.isalnum() or e.isspace() or e == '\n')
# if the text is empty, then we will process the image with OpenAI vision model
if text == '':
text = process_image(image_path)
# checking if there's no value error or something, we will only return the text if there isnt any error
if text != "Image not good enough for processing":
# creating a document from the text
doc = Document(page_content=text, metadata=metadata)
# returning the document
return [doc]
else:
pass # if there's an error, we will return None
# -----------------------------------------------AUDIO PROCESSING-----------------------------------------------------
# Audio to Text
def audio_to_text(filepath):
with open(filepath, "rb") as file:
translation = client.audio.translations.create(
file=(filepath, file.read()),
model="whisper-large-v3",
)
return translation.text
def split_audio_by_duration(audio_file_path, chunk_duration_minutes, print_output=True):
# Convert chunk duration to milliseconds
chunk_length_ms = chunk_duration_minutes * 60 * 1000
# Load audio file
audio = AudioSegment.from_file(audio_file_path)
audio_duration_ms = len(audio)
# Create a temporary directory for storing chunks
base_filename = os.path.basename(audio_file_path).split('.')[0]
chunk_folder = f"{base_filename}_chunks"
if not os.path.exists(chunk_folder):
os.makedirs(chunk_folder)
chunk_paths = []
if audio_duration_ms > chunk_length_ms:
# Calculate the number of chunks, using math.ceil to ensure rounding up
num_chunks = math.ceil(audio_duration_ms / chunk_length_ms)
for i in range(int(num_chunks)):
start_ms = i * chunk_length_ms
end_ms = min(start_ms + chunk_length_ms, audio_duration_ms)
chunk = audio[start_ms:end_ms]
chunk_filename = f"{chunk_folder}/{base_filename}_chunk{i+1}.mp3"
chunk.export(chunk_filename, format="mp3")
chunk_paths.append(chunk_filename)
if print_output:
print(f'Exporting {chunk_filename}')
else:
# If audio duration is less than the chunk duration, store the whole file as a single chunk
chunk_filename = f"{chunk_folder}/{base_filename}_chunk1.mp3"
audio.export(chunk_filename, format="mp3")
chunk_paths.append(chunk_filename)
if print_output:
print(f'Exporting {chunk_filename}')
return chunk_folder, chunk_paths
def transcribe_audio_chunks(audio_file_path, chunk_duration_minutes, file_type='audio'):
# Split the audio file into chunks
chunk_folder, chunk_paths = split_audio_by_duration(audio_file_path, chunk_duration_minutes)
documents = []
for chunk_path in chunk_paths:
# Transcribe the chunk
transcript = audio_to_text(chunk_path) # Assuming this function exists
# Extract the base filename and chunk index using regex
chunk_filename = os.path.basename(chunk_path)
match = re.search(r'(.*)_chunk(\d+)\.mp3$', chunk_filename)
if match:
base_filename = match.group(1)
chunk_index = int(match.group(2))
else:
# Default values in case of unexpected filename format
base_filename = os.path.splitext(chunk_filename)[0]
chunk_index = 1 # Assuming it's the first chunk
# Calculate the chunk's start and end times in minutes
start_min = (chunk_index - 1) * chunk_duration_minutes
end_min = chunk_index * chunk_duration_minutes
actual_end_min = min(end_min, (len(AudioSegment.from_file(audio_file_path)) // 60000)) # To handle the last chunk's actual duration
# Create a document with the transcript and metadata
metadata = {
"filename": base_filename,
"duration": f"{start_min}-{end_min} minutes",
"file_type": file_type,
}
document = Document(page_content=transcript, metadata=metadata)
documents.append(document)
# Delete the chunk folder after processing
shutil.rmtree(chunk_folder)
return documents
# creating a function to create audio document
def create_audio_document(audio_file_path, chunk_duration_minutes=3, file_type='audio'):
documents = transcribe_audio_chunks(audio_file_path, chunk_duration_minutes, file_type)
return documents
# ------------------------------------------------VIDEO PROCESSING-----------------------------------------------------
def preprocess_video_data(video_path: str, time_interval: int):
# Load the video file
video = VideoFileClip(video_path)
# Get the duration of the video
duration = video.duration
# create an audio version of the video
audio_path = video_path.replace('.mp4', '.mp3')
_ = video.audio.write_audiofile(audio_path)
# creating a snapshot of the videos at the time interval
# Extract the video filename without extension
video_name = os.path.splitext(os.path.basename(video_path))[0]
# Create a directory for snapshots using the video name
snapshot_dir = os.path.join(os.path.dirname(video_path), f"{video_name}")
os.makedirs(snapshot_dir, exist_ok=True)
# Get the duration of the video using ffmpeg
probe = ffmpeg.probe(video_path)
duration = float(probe['format']['duration'])
# Loop through the video and take snapshots at 0s, 3min, 6min, etc.
for i in range(0, int(duration), time_interval):
start_time = i
end_time = min(i + time_interval, int(duration))
# Format the interval as 'start-end'
interval_str = f"{start_time}-{end_time}"
# Save the snapshot as an image file in the created folder
frame_img = os.path.join(snapshot_dir, f"{interval_str}s.png")
# Extract the frame using ffmpeg
(
ffmpeg
.input(video_path, ss=start_time)
.output(frame_img, vframes=1)
.run()
)
print(f"Snapshots saved in {snapshot_dir}.")
# now creating document from the audio file
documents = create_audio_document(audio_path, chunk_duration_minutes=0.5, file_type='video')
# deleting the audio file
os.remove(audio_path)
return documents
#-----------------------------------------------------OTHERS--------------------------------------------------------------
def save_embedded_data(embeddings, key="data"):
embeddings.save_local(f"index/faiss_index_{key}")
print("Embeddings saved")
def load_embedded_data(embeddings=embeddings, key="data"):
embed_db = FAISS.load_local(f"index/faiss_index_{key}", embeddings, allow_dangerous_deserialization=True)
return embed_db
# creating a function to load all documents from a directory.
def load_documents_from_directory(directory_path: str):
text_doc = ['pdf', 'txt', 'docx', 'doc', 'md']
image_doc = ['jpg', 'jpeg', 'png', 'gif', 'bmp']
audio_doc = ['mp3', 'wav', 'flac', 'ogg', 'm4a']
video_doc = ['mp4', 'avi', 'mkv', 'flv', 'mov']
# accessing the name of the files in the directory
files = os.listdir(directory_path)
# creating a list to store the documents
documents = []
# another list for the document names
doc_names = []
# counting the number of pages in the document
num_pages= []
# iterating through the files in the directory
for file in files:
# updating the path
path = os.path.join(directory_path, file)
# getting the file extension and doc name
doc_name, extension = path.split('/')[-1].split('.')[0] , file.split('.')[-1]
# checking if the file is a text document
if extension in text_doc:
# loading the document
doc = load_document(path)
# appending the document to the documents list
documents.append(doc)
# appending the number of pages in the document
num_pages.append(len(doc))
# adding the document name to the doc_names list
doc_names.append(doc_name)
print(f"Document {doc_name} loaded")
elif extension in image_doc:
# creating an image document
doc = create_image_document(path)
# appending the document to the documents list
documents.append(doc)
# appending the number of pages in the document
num_pages.append(1)
# adding the document name to the doc_names list
doc_names.append(doc[0].metadata['filename'])
print(f"Document {doc[0].metadata['filename']} loaded")
elif extension in audio_doc:
# creating an audio document
doc = create_audio_document(path)
# appending the document to the documents list
documents.append(doc)
# appending the number of pages in the document
num_pages.append(len(doc))
# adding the document name to the doc_names list
doc_names.append(doc[0].metadata['filename'])
print(f"Document {doc[0].metadata['filename']} loaded")
elif extension in video_doc:
# creating a video document
doc = preprocess_video_data(path, time_interval=30)
# appending the document to the documents list
documents.append(doc)
# appending the number of pages in the document
num_pages.append(len(doc))
# adding the document name to the doc_names list
doc_names.append(doc[0].metadata['filename'])
print(f"Document {doc[0].metadata['filename']} loaded")
# so we need to create a document id for each document
docs_id = [uuid4().hex for i in range(len(documents))]
# creating a json file to store the documents, checking if it exists then open it, else create it
json_file = f"{directory_path}/documents.json"
if os.path.exists(json_file):
with open(json_file, 'r') as f:
data = json.load(f)
data['doc_names'] = doc_names
data['docs_id'] = docs_id
data['num_pages'] = num_pages
with open(json_file, 'w') as f:
json.dump(data, f)
else:
data = {'doc_names': doc_names, 'docs_id': docs_id, 'num_pages': num_pages}
with open(json_file, 'w') as f:
json.dump(data, f)
# returning the documents, and doc ids
return documents, docs_id, num_pages
# A function to create vector store
def create_vector_store(documents: list, docs_id: list, num_pages: list):
# index set up with the embedding dimension
index = faiss.IndexFlatL2(384)
# Initialize the FAISS vector store
vector_store = FAISS(
embedding_function=embeddings,
index=index,
docstore=InMemoryDocstore(),
index_to_docstore_id={},
)
# Now adding other documents to the store.
for i in range(len(documents)):
doc_id = docs_id[i]
page_ids = [doc_id+ str(i) for i in range(num_pages[i])]
vector_store.add_documents(documents=documents[i], ids=page_ids)
# saving the vector store automatically
save_embedded_data(vector_store, key="data")
return vector_store
# creating a function to add documents to the vector store
def add_documents_to_vector_store(embeddings, documents: list, docs_id: list, num_pages: list):
# loading the vector store
vector_store = load_embedded_data(embeddings)
for i in range(len(documents)):
doc_id = docs_id[i]
page_ids = [doc_id+ str(i) for i in range(num_pages[i])]
vector_store.add_documents(documents=documents[i], ids=page_ids)
print ("Documents added to the vector store")
# A document search function
def search(query, k=20):
# loading the embedded data
embed_db = load_embedded_data()
db = embed_db
docs = db.similarity_search(query, k)
all = []
info = []
for doc in docs:
all.append({doc.page_content})
info.append(dict(doc.metadata))
return docs[0].page_content, all, info