import os import logging import re import uuid import yt_dlp from deepgram.utils import verboselogs from dotenv import load_dotenv load_dotenv() from deepgram import DeepgramClient, PrerecordedOptions, FileSource # Define your URLs (example URLs) #audio_url = "https://s3.us-east-2.amazonaws.com/com.mkdlabs.images/baas/jordan/019933724441Business%20English%20Conversation%20Lesson%2045_%20Meeting%20a%20New%20Colleague.mp3" #video_url = "https://s3.us-east-2.amazonaws.com/com.mkdlabs.images/baas/jordan/038426704141Business%20English%20Conversation%20Lesson%2045_%20%20Meeting%20a%20New%20Colleague.mp4" # Folder for file uploads/downloads # Folder for file uploads/downloads UPLOAD_FOLDER = os.path.join(os.getcwd(), "../uploads") os.makedirs(UPLOAD_FOLDER, exist_ok=True) def sanitize_filename(name: str) -> str: """ Remove characters from the filename that are not allowed in many file systems. """ return re.sub(r'[^\w\s-]', '', name).strip().replace(' ', '_') def extract_audio(url: str, output_template=os.path.join(UPLOAD_FOLDER, "%(title)s.%(ext)s")) -> str: """ Download and extract audio from a video URL using yt-dlp. The file will be saved in the 'upload' folder. Returns: str: The absolute path to the downloaded audio file (with a unique id appended). """ ydl_opts = { "format": "bestaudio/best", "outtmpl": output_template, "postprocessors": [{ "key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "192", }], "quiet": True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=True) # Prepare the filename from the info. # Note: prepare_filename returns the filename *before* postprocessing, # so we change the extension to mp3. original_filepath = os.path.splitext(ydl.prepare_filename(info))[0] + ".mp3" # Debug: list files in the upload folder if not os.path.exists(original_filepath): files = os.listdir(UPLOAD_FOLDER) print("Warning: Could not find expected file.") print("Files in upload folder:", files) raise FileNotFoundError(f"Expected audio file not found: {original_filepath}") # Get the video's title and sanitize it title = info.get('title', 'audio') safe_title = sanitize_filename(title) # Generate a unique identifier unique_id = uuid.uuid4().hex # Unique identifier in hex format # Construct the new filename with the unique id appended. new_audio_filename = f"{safe_title}_{unique_id}.mp3" new_audio_filepath = os.path.join(UPLOAD_FOLDER, new_audio_filename) # Rename the downloaded file to include the unique ID. os.rename(original_filepath, new_audio_filepath) print(f"Renamed file to: {new_audio_filepath}") # Return the absolute path to the renamed audio file. return os.path.abspath(new_audio_filepath) def transcribe_media(file_loc: str, media_type: str = "audio"): """ Transcribe media using Deepgram. If media_type is "audio" (remote URL), use Deepgram's URL transcription. If media_type is "video" (remote URL), extract audio locally (in the upload folder), transcribe via file, and then delete the local audio file. Args: file_loc (str): URL to the remote audio or video file. media_type (str): "audio" or "video". Returns: dict: The transcription response from Deepgram. """ api_key = os.getenv("DEEPGRAM_API_KEY2") print(f"Using Deepgram API Key: {api_key}") local_audio_path="some_rand" try: deepgram: DeepgramClient = DeepgramClient(api_key=api_key) options: PrerecordedOptions = PrerecordedOptions( model="nova-3", smart_format=True, diarize=True, ) if media_type.lower() == "audio": # For remote audio files, use the URL transcription method. response = deepgram.listen.rest.v("1").transcribe_url({"url": file_loc}, options) elif media_type.lower() == "video": # For remote video files, first extract the audio locally. local_audio_path = extract_audio(file_loc) print(f"Extracted audio to: {local_audio_path}") # Transcribe using the local file method. with open(local_audio_path, "rb") as file: buffer_data = file.read() payload: FileSource = {"buffer": buffer_data} response = deepgram.listen.rest.v("1").transcribe_file(payload, options) # Clean up: delete the local audio file. if os.path.exists(local_audio_path): os.remove(local_audio_path) print(f"Deleted local audio file: {local_audio_path}") else: raise ValueError("media_type must be either 'audio' or 'video'.") return response except Exception as e: print(f"Exception during transcription: {e}") return None finally: # Clean up: delete the local audio file. if os.path.exists(local_audio_path): os.remove(local_audio_path) print(f"Deleted local audio file: {local_audio_path}") def group_words_into_sentences(words, max_words=15, include_speakers=True): """ Group words into sentences based on speaker changes. Args: words: List of word objects from the transcription max_words: Maximum number of words per sentence include_speakers: Whether to include speaker information in the output (True for Pro plan, False for Freemium plan) Returns: A dictionary containing the sentences """ sentences = [] current_sentence = [] current_speaker = None start_time = None for i, word_info in enumerate(words): word = word_info["punctuated_word"] speaker = word_info["speaker"] if include_speakers else "speaker_0" # Use a default speaker if not including speakers start = word_info["start"] end = word_info["end"] # If speaker changes or sentence reaches max length, start a new sentence if speaker != current_speaker: if current_sentence: sentence_obj = { "sentence": " ".join([w["word"] for w in current_sentence]), "start": start_time, "end": words[i-1]["end"], "words": current_sentence } # Only include speaker information if include_speakers is True if include_speakers: sentence_obj["speaker"] = current_speaker sentences.append(sentence_obj) current_sentence = [] current_speaker = speaker start_time = start # Append word with metadata inside the current sentence current_sentence.append({"word": word, "start": start, "end": end}) # Append the last sentence if any words remain if current_sentence: sentence_obj = { "sentence": " ".join([w["word"] for w in current_sentence]), "start": start_time, "end": words[-1]["end"], "words": current_sentence } # Only include speaker information if include_speakers is True if include_speakers: sentence_obj["speaker"] = current_speaker sentences.append(sentence_obj) return {"sentences": sentences} if __name__ == "__main__": audio_url = "https://s3.us-east-2.amazonaws.com/com.mkdlabs.images/baas/jordan/019933724441Business%20English%20Conversation%20Lesson%2045_%20Meeting%20a%20New%20Colleague.mp3" video_url = "https://s3.us-east-2.amazonaws.com/com.mkdlabs.images/baas/jordan/038426704141Business%20English%20Conversation%20Lesson%2045_%20%20Meeting%20a%20New%20Colleague.mp4" # Folder for file uploads/downloads response = transcribe_media(video_url, media_type="video") print(response)