audio preprocessing pipeline completed.

This commit is contained in:
timothyafolami
2024-08-09 16:33:21 +01:00
parent d7e56338eb
commit 10fc2622ec
7 changed files with 253 additions and 8 deletions
+120 -6
View File
@@ -11,6 +11,11 @@ from langchain_core.documents import Document
from text_extractor import TextExtractor
import os
import json
from groq import Groq
import re
import shutil
import numpy as np
from pydub import AudioSegment
import base64
import requests
from dotenv import load_dotenv
@@ -18,8 +23,11 @@ load_dotenv()
# OpenAI API Key
api_key = os.getenv('OPENAI_API_KEY')
client = Groq(api_key = os.getenv('GROQ_API_KEY'))
model = 'whisper-large-v3'
# ----------------------------------------------------------------------------------------------------
# loading the embedding model
def load_embedding_model():
model_name = "BAAI/bge-small-en"
@@ -30,10 +38,12 @@ def load_embedding_model():
)
return embeddings
# ----------------------------------------------------------------------------------------------------
# loading the embedding model
embeddings = load_embedding_model()
# --------------------------------------------------------TEXT PREPROCESSING--------------------------------------------
def create_documents(doc):
text = doc[0].page_content
metadata = doc[0].metadata
@@ -98,6 +108,7 @@ def load_document(document_path):
else:
raise ValueError(f"Unsupported document type for {document_path}")
# ----------------------------------------------------IMAGE PROCESSING------------------------------------------------
# Function to encode the image
def encode_image(image_path):
with open(image_path, "rb") as image_file:
@@ -171,6 +182,102 @@ def create_image_document(image_path):
else:
pass # if there's an error, we will return None
# -----------------------------------------------AUDIO PROCESSING-----------------------------------------------------
# Audio to Text
def audio_to_text(filepath):
with open(filepath, "rb") as file:
translation = client.audio.translations.create(
file=(filepath, file.read()),
model="whisper-large-v3",
)
return translation.text
def split_audio_by_duration(audio_file_path, chunk_duration_minutes, print_output=True):
# Convert chunk duration to milliseconds
chunk_length_ms = chunk_duration_minutes * 60 * 1000
# Load audio file
audio = AudioSegment.from_file(audio_file_path)
audio_duration_ms = len(audio)
# Create a temporary directory for storing chunks
base_filename = os.path.basename(audio_file_path).split('.')[0]
chunk_folder = f"{base_filename}_chunks"
if not os.path.exists(chunk_folder):
os.makedirs(chunk_folder)
chunk_paths = []
if audio_duration_ms > chunk_length_ms:
# Calculate the number of chunks
num_chunks = audio_duration_ms // chunk_length_ms + (1 if audio_duration_ms % chunk_length_ms != 0 else 0)
for i in range(num_chunks):
start_ms = i * chunk_length_ms
end_ms = min(start_ms + chunk_length_ms, audio_duration_ms)
chunk = audio[start_ms:end_ms]
chunk_filename = f"{chunk_folder}/{base_filename}_chunk{i+1}.mp3"
chunk.export(chunk_filename, format="mp3")
chunk_paths.append(chunk_filename)
if print_output:
print(f'Exporting {chunk_filename}')
else:
# If audio duration is less than the chunk duration, store the whole file as a single chunk
chunk_filename = f"{chunk_folder}/{base_filename}_chunk1.mp3"
audio.export(chunk_filename, format="mp3")
chunk_paths.append(chunk_filename)
if print_output:
print(f'Exporting {chunk_filename}')
return chunk_folder, chunk_paths
def transcribe_audio_chunks(audio_file_path, chunk_duration_minutes):
# Split the audio file into chunks
chunk_folder, chunk_paths = split_audio_by_duration(audio_file_path, chunk_duration_minutes)
documents = []
for chunk_path in chunk_paths:
# Transcribe the chunk
transcript = audio_to_text(chunk_path) # Assuming this function exists
# Extract the base filename and chunk index using regex
chunk_filename = os.path.basename(chunk_path)
match = re.search(r'(.*)_chunk(\d+)\.mp3$', chunk_filename)
if match:
base_filename = match.group(1)
chunk_index = int(match.group(2))
else:
# Default values in case of unexpected filename format
base_filename = os.path.splitext(chunk_filename)[0]
chunk_index = 1 # Assuming it's the first chunk
# Calculate the chunk's start and end times in minutes
start_min = (chunk_index - 1) * chunk_duration_minutes
end_min = chunk_index * chunk_duration_minutes
actual_end_min = min(end_min, (len(AudioSegment.from_file(audio_file_path)) // 60000)) # To handle the last chunk's actual duration
# Create a document with the transcript and metadata
metadata = {
"filename": base_filename,
"duration": f"{start_min}-{end_min} minutes"
}
document = Document(page_content=transcript, metadata=metadata)
documents.append(document)
# Delete the chunk folder after processing
shutil.rmtree(chunk_folder)
return documents
# creating a function to create audio document
def create_audio_document(audio_file_path, chunk_duration_minutes=3):
documents = transcribe_audio_chunks(audio_file_path, chunk_duration_minutes)
return documents
#-----------------------------------------------------OTHERS--------------------------------------------------------------
def save_embedded_data(embeddings, key="data"):
embeddings.save_local(f"vec-db/index/faiss_index_{key}")
@@ -223,6 +330,16 @@ def load_documents_from_directory(directory_path: str):
# adding the document name to the doc_names list
doc_names.append(doc[0].metadata['filename'])
print(f"Document {doc[0].metadata['filename']} loaded")
elif extension in audio_doc:
# creating an audio document
doc = create_audio_document(path)
# appending the document to the documents list
documents.append(doc)
# appending the number of pages in the document
num_pages.append(len(doc))
# adding the document name to the doc_names list
doc_names.append(doc[0].metadata['filename'])
print(f"Document {doc[0].metadata['filename']} loaded")
# so we need to create a document id for each document
docs_id = [uuid4().hex for i in range(len(documents))]
@@ -282,11 +399,8 @@ def add_documents_to_vector_store(embeddings, documents: list, docs_id: list, nu
def search(db, query, k=3):
docs = db.similarity_search(query, k)
all = ""
pages = []
info = []
for doc in docs:
all += f"{doc.page_content}\n"
try:
pages.append(doc.metadata['page'])
except:
pages.append(doc.metadata['filename'])
return docs[0].page_content, all, pages
info.append(dict(doc.metadata))
return docs[0].page_content, all, info