starter transcript api added

This commit is contained in:
OwusuBlessing
2025-02-15 01:06:29 +01:00
parent 87ad727d6c
commit 24cf605f28
9 changed files with 330 additions and 0 deletions
+112
View File
@@ -0,0 +1,112 @@
import os
from typing import Optional
from fastapi import FastAPI, HTTPException, Security, Depends
from fastapi.security import APIKeyHeader
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from dotenv import load_dotenv
import json
from pydantic import BaseModel
from langchain_openai import ChatOpenAI
import requests
import tempfile
from typing import Dict, Any
from fastapi.responses import Response
from datetime import datetime
from fastapi import HTTPException
from pydantic import BaseModel
from typing import Optional, Union, Dict, Any
import os
import requests
import os
from PyPDF2 import PdfReader
from scripts.transcriber import transcribe_media,group_words_into_sentences # Import the transcribe_media function
# Load environment variables
load_dotenv()
API_KEY = os.getenv("API_KEY_ACCESS")
# Initialize FastAPI app
app = FastAPI(
title="Microdot AI API",
description="API For fire fighter",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Setup API key authentication
api_key_header = APIKeyHeader(name="Authorization", auto_error=False)
async def get_api_key(api_key_header: str = Security(api_key_header)) -> str:
"""Validate API key from header"""
if not api_key_header or not api_key_header.startswith('Bearer '):
raise HTTPException(
status_code=401,
detail={"error": "Unauthorized", "message": "API key is missing or invalid."}
)
token = api_key_header.split(' ')[1]
if token != API_KEY:
raise HTTPException(
status_code=401,
detail={"error": "Unauthorized", "message": "API key does not match."}
)
return token
class TranscribeRequest(BaseModel):
media_url: Optional[str] = None
media_type: Optional[str] # Corrected type hint for media_type
class ChatResp(BaseModel): # Added BaseModel inheritance
error: Optional[str] = None
class TranscriptResponse(BaseModel):
transcript: dict # Changed type hint for transcript to a dictionary
@app.post("/microdot-ai/transcribe")
async def chat_endpoint(
request: TranscribeRequest,
api_key: str = Depends(get_api_key)
):
try:
# Use the transcribe_media function to transcribe the media
if request.media_url:
transcription_response = transcribe_media(request.media_url, media_type=request.media_type)
if transcription_response is None:
raise HTTPException(status_code=500, detail="Transcription failed.")
print(f"Transcription response: {transcription_response}") # Debugging print
# Parse response
words = transcription_response["results"]["channels"][0]["alternatives"][0]["words"]
transcript = group_words_into_sentences(words=words)
return TranscriptResponse(
transcript=transcript, # Corrected to return the transcript
error=None
)
except Exception as e:
print(f"Error processing chat request: {str(e)}") # Print statement added
raise HTTPException(
status_code=500,
detail=f"Error processing chat request: {str(e)}"
)
@app.on_event("startup")
async def startup_event():
"""Initialize required components on startup"""
pass
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=3000, reload=True)
View File
View File
View File
+28
View File
@@ -0,0 +1,28 @@
openai
pandas
python-dotenv
fastapi
uvicorn
langchain-community
langchain-openai
pydantic
pypdf
pypandoc
Spire.Doc
plum-dispatch==1.7.4
scikit-learn
werkzeug
python-multipart
langgraph
tiktoken
langchainhub
chromadb
langchain
langchain-text-splitters
beautifulsoup4
deepgram_sdk
moviepy
yt-dlp
ffmpeg-python
reportlab
anthropic
+190
View File
@@ -0,0 +1,190 @@
import os
import logging
import re
import uuid
import yt_dlp
from deepgram.utils import verboselogs
from dotenv import load_dotenv
load_dotenv()
from deepgram import DeepgramClient, PrerecordedOptions, FileSource
# Define your URLs (example URLs)
#audio_url = "https://s3.us-east-2.amazonaws.com/com.mkdlabs.images/baas/jordan/019933724441Business%20English%20Conversation%20Lesson%2045_%20Meeting%20a%20New%20Colleague.mp3"
#video_url = "https://s3.us-east-2.amazonaws.com/com.mkdlabs.images/baas/jordan/038426704141Business%20English%20Conversation%20Lesson%2045_%20%20Meeting%20a%20New%20Colleague.mp4"
# Folder for file uploads/downloads
# Folder for file uploads/downloads
UPLOAD_FOLDER = os.path.join(os.getcwd(), "../uploads")
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
def sanitize_filename(name: str) -> str:
"""
Remove characters from the filename that are not allowed in many file systems.
"""
return re.sub(r'[^\w\s-]', '', name).strip().replace(' ', '_')
def extract_audio(url: str, output_template=os.path.join(UPLOAD_FOLDER, "%(title)s.%(ext)s")) -> str:
"""
Download and extract audio from a video URL using yt-dlp.
The file will be saved in the 'upload' folder.
Returns:
str: The absolute path to the downloaded audio file (with a unique id appended).
"""
ydl_opts = {
"format": "bestaudio/best",
"outtmpl": output_template,
"postprocessors": [{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": "192",
}],
"quiet": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
# Prepare the filename from the info.
# Note: prepare_filename returns the filename *before* postprocessing,
# so we change the extension to mp3.
original_filepath = os.path.splitext(ydl.prepare_filename(info))[0] + ".mp3"
# Debug: list files in the upload folder
if not os.path.exists(original_filepath):
files = os.listdir(UPLOAD_FOLDER)
print("Warning: Could not find expected file.")
print("Files in upload folder:", files)
raise FileNotFoundError(f"Expected audio file not found: {original_filepath}")
# Get the video's title and sanitize it
title = info.get('title', 'audio')
safe_title = sanitize_filename(title)
# Generate a unique identifier
unique_id = uuid.uuid4().hex # Unique identifier in hex format
# Construct the new filename with the unique id appended.
new_audio_filename = f"{safe_title}_{unique_id}.mp3"
new_audio_filepath = os.path.join(UPLOAD_FOLDER, new_audio_filename)
# Rename the downloaded file to include the unique ID.
os.rename(original_filepath, new_audio_filepath)
print(f"Renamed file to: {new_audio_filepath}")
# Return the absolute path to the renamed audio file.
return os.path.abspath(new_audio_filepath)
def transcribe_media(file_loc: str, media_type: str = "audio"):
"""
Transcribe media using Deepgram.
If media_type is "audio" (remote URL), use Deepgram's URL transcription.
If media_type is "video" (remote URL), extract audio locally (in the upload folder),
transcribe via file, and then delete the local audio file.
Args:
file_loc (str): URL to the remote audio or video file.
media_type (str): "audio" or "video".
Returns:
dict: The transcription response from Deepgram.
"""
api_key = os.getenv("DEEPGRAM_API_KEY2")
print(f"Using Deepgram API Key: {api_key}")
local_audio_path="some_rand"
try:
deepgram: DeepgramClient = DeepgramClient(api_key=api_key)
options: PrerecordedOptions = PrerecordedOptions(
model="nova-3",
smart_format=True,
diarize=True,
)
if media_type.lower() == "audio":
# For remote audio files, use the URL transcription method.
response = deepgram.listen.rest.v("1").transcribe_url({"url": file_loc}, options)
elif media_type.lower() == "video":
# For remote video files, first extract the audio locally.
local_audio_path = extract_audio(file_loc)
print(f"Extracted audio to: {local_audio_path}")
# Transcribe using the local file method.
with open(local_audio_path, "rb") as file:
buffer_data = file.read()
payload: FileSource = {"buffer": buffer_data}
response = deepgram.listen.rest.v("1").transcribe_file(payload, options)
# Clean up: delete the local audio file.
if os.path.exists(local_audio_path):
os.remove(local_audio_path)
print(f"Deleted local audio file: {local_audio_path}")
else:
raise ValueError("media_type must be either 'audio' or 'video'.")
print(f"Transcription response: {response}\n\n")
return response
except Exception as e:
print(f"Exception during transcription: {e}")
return None
finally:
# Clean up: delete the local audio file.
if os.path.exists(local_audio_path):
os.remove(local_audio_path)
print(f"Deleted local audio file: {local_audio_path}")
def group_words_into_sentences(words, max_words=15):
sentences = []
current_sentence = []
current_speaker = None
start_time = None
for i, word_info in enumerate(words):
word = word_info["punctuated_word"]
speaker = word_info["speaker"]
start = word_info["start"]
end = word_info["end"]
# If speaker changes or sentence reaches max length, start a new sentence
if speaker != current_speaker:
if current_sentence:
sentences.append({
"sentence": " ".join([w["word"] for w in current_sentence]),
"speaker": current_speaker,
"start": start_time,
"end": words[i-1]["end"],
"words": current_sentence
})
current_sentence = []
current_speaker = speaker
start_time = start
# Append word with metadata inside the current sentence
current_sentence.append({"word": word, "start": start, "end": end})
# Append the last sentence if any words remain
if current_sentence:
sentences.append({
"sentence": " ".join([w["word"] for w in current_sentence]),
"speaker": current_speaker,
"start": start_time,
"end": words[-1]["end"],
"words": current_sentence
})
return {"sentences": sentences}
if __name__ == "__main__":
audio_url = "https://s3.us-east-2.amazonaws.com/com.mkdlabs.images/baas/jordan/019933724441Business%20English%20Conversation%20Lesson%2045_%20Meeting%20a%20New%20Colleague.mp3"
video_url = "https://s3.us-east-2.amazonaws.com/com.mkdlabs.images/baas/jordan/038426704141Business%20English%20Conversation%20Lesson%2045_%20%20Meeting%20a%20New%20Colleague.mp4"
# Folder for file uploads/downloads
response = transcribe_media(video_url, media_type="video")
print(response)
View File
View File
View File