Add tiered summarization based on pricing plans
- Implement advanced AI summarization with action items for Pro plan - Create basic bullet-point summarization for Freemium plan - Add plan tier validation and feature differentiation - Support speaker identification in transcripts - Define plan limits (600 mins Pro/200 mins Freemium)
This commit is contained in:
@@ -0,0 +1,61 @@
|
||||
import anthropic
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
import json
|
||||
from src.prompt import advanced_summary_prompt, basic_summary_prompt, custom_template_prompt
|
||||
load_dotenv()
|
||||
|
||||
def general_summary(transcription, plan_tier="pro"):
|
||||
"""
|
||||
Generate a summary of the transcription based on the user's plan tier.
|
||||
|
||||
Args:
|
||||
transcription: The transcription to summarize
|
||||
plan_tier: The user's plan tier ("freemium" or "pro")
|
||||
|
||||
Returns:
|
||||
A JSON object containing the summary
|
||||
"""
|
||||
client = anthropic.Anthropic(
|
||||
api_key=os.getenv("ANTHTROPIC_API_KEY"),
|
||||
)
|
||||
|
||||
# Select the appropriate prompt based on the user's plan tier
|
||||
if plan_tier.lower() == "freemium":
|
||||
prompt = basic_summary_prompt
|
||||
max_tokens = 2000 # Reduced token count for basic summaries
|
||||
else: # Default to pro
|
||||
prompt = advanced_summary_prompt
|
||||
max_tokens = 4000
|
||||
|
||||
message = client.messages.create(
|
||||
model="claude-3-5-sonnet-20241022",
|
||||
max_tokens=max_tokens,
|
||||
messages=[
|
||||
{"role": "user", "content": f"{prompt}"},
|
||||
{"role": "user", "content": f"Transcription: {transcription}"}
|
||||
]
|
||||
)
|
||||
|
||||
text = message.content[0].text
|
||||
return json.loads(text)
|
||||
|
||||
|
||||
|
||||
|
||||
def custom_summary(template, transcription):
|
||||
client = anthropic.Anthropic(
|
||||
api_key=os.getenv("ANTHTROPIC_API_KEY"),
|
||||
)
|
||||
message = client.messages.create(
|
||||
model="claude-3-5-sonnet-20241022",
|
||||
max_tokens=8000,
|
||||
messages=[
|
||||
{"role": "user", "content": f"{custom_template_prompt}"},
|
||||
{"role": "user", "content": f"TEMPLATE : {template}"},
|
||||
{"role": "user", "content": f"Transcription: {transcription}"}
|
||||
]
|
||||
)
|
||||
|
||||
text = message.content[0].text
|
||||
return json.loads(text)
|
||||
@@ -0,0 +1,190 @@
|
||||
import os
|
||||
import logging
|
||||
import re
|
||||
import uuid
|
||||
import yt_dlp
|
||||
from deepgram.utils import verboselogs
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
from deepgram import DeepgramClient, PrerecordedOptions, FileSource
|
||||
|
||||
# Define your URLs (example URLs)
|
||||
#audio_url = "https://s3.us-east-2.amazonaws.com/com.mkdlabs.images/baas/jordan/019933724441Business%20English%20Conversation%20Lesson%2045_%20Meeting%20a%20New%20Colleague.mp3"
|
||||
#video_url = "https://s3.us-east-2.amazonaws.com/com.mkdlabs.images/baas/jordan/038426704141Business%20English%20Conversation%20Lesson%2045_%20%20Meeting%20a%20New%20Colleague.mp4"
|
||||
|
||||
# Folder for file uploads/downloads
|
||||
|
||||
# Folder for file uploads/downloads
|
||||
UPLOAD_FOLDER = os.path.join(os.getcwd(), "../uploads")
|
||||
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
|
||||
|
||||
def sanitize_filename(name: str) -> str:
|
||||
"""
|
||||
Remove characters from the filename that are not allowed in many file systems.
|
||||
"""
|
||||
return re.sub(r'[^\w\s-]', '', name).strip().replace(' ', '_')
|
||||
|
||||
def extract_audio(url: str, output_template=os.path.join(UPLOAD_FOLDER, "%(title)s.%(ext)s")) -> str:
|
||||
"""
|
||||
Download and extract audio from a video URL using yt-dlp.
|
||||
The file will be saved in the 'upload' folder.
|
||||
|
||||
Returns:
|
||||
str: The absolute path to the downloaded audio file (with a unique id appended).
|
||||
"""
|
||||
ydl_opts = {
|
||||
"format": "bestaudio/best",
|
||||
"outtmpl": output_template,
|
||||
"postprocessors": [{
|
||||
"key": "FFmpegExtractAudio",
|
||||
"preferredcodec": "mp3",
|
||||
"preferredquality": "192",
|
||||
}],
|
||||
"quiet": True,
|
||||
}
|
||||
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
info = ydl.extract_info(url, download=True)
|
||||
# Prepare the filename from the info.
|
||||
# Note: prepare_filename returns the filename *before* postprocessing,
|
||||
# so we change the extension to mp3.
|
||||
original_filepath = os.path.splitext(ydl.prepare_filename(info))[0] + ".mp3"
|
||||
|
||||
# Debug: list files in the upload folder
|
||||
if not os.path.exists(original_filepath):
|
||||
files = os.listdir(UPLOAD_FOLDER)
|
||||
print("Warning: Could not find expected file.")
|
||||
print("Files in upload folder:", files)
|
||||
raise FileNotFoundError(f"Expected audio file not found: {original_filepath}")
|
||||
|
||||
# Get the video's title and sanitize it
|
||||
title = info.get('title', 'audio')
|
||||
safe_title = sanitize_filename(title)
|
||||
|
||||
# Generate a unique identifier
|
||||
unique_id = uuid.uuid4().hex # Unique identifier in hex format
|
||||
|
||||
# Construct the new filename with the unique id appended.
|
||||
new_audio_filename = f"{safe_title}_{unique_id}.mp3"
|
||||
new_audio_filepath = os.path.join(UPLOAD_FOLDER, new_audio_filename)
|
||||
|
||||
# Rename the downloaded file to include the unique ID.
|
||||
os.rename(original_filepath, new_audio_filepath)
|
||||
print(f"Renamed file to: {new_audio_filepath}")
|
||||
|
||||
# Return the absolute path to the renamed audio file.
|
||||
return os.path.abspath(new_audio_filepath)
|
||||
|
||||
def transcribe_media(file_loc: str, media_type: str = "audio"):
|
||||
"""
|
||||
Transcribe media using Deepgram.
|
||||
|
||||
If media_type is "audio" (remote URL), use Deepgram's URL transcription.
|
||||
If media_type is "video" (remote URL), extract audio locally (in the upload folder),
|
||||
transcribe via file, and then delete the local audio file.
|
||||
|
||||
Args:
|
||||
file_loc (str): URL to the remote audio or video file.
|
||||
media_type (str): "audio" or "video".
|
||||
|
||||
Returns:
|
||||
dict: The transcription response from Deepgram.
|
||||
"""
|
||||
api_key = os.getenv("DEEPGRAM_API_KEY2")
|
||||
print(f"Using Deepgram API Key: {api_key}")
|
||||
local_audio_path="some_rand"
|
||||
try:
|
||||
deepgram: DeepgramClient = DeepgramClient(api_key=api_key)
|
||||
options: PrerecordedOptions = PrerecordedOptions(
|
||||
model="nova-3",
|
||||
smart_format=True,
|
||||
diarize=True,
|
||||
)
|
||||
|
||||
if media_type.lower() == "audio":
|
||||
# For remote audio files, use the URL transcription method.
|
||||
response = deepgram.listen.rest.v("1").transcribe_url({"url": file_loc}, options)
|
||||
|
||||
elif media_type.lower() == "video":
|
||||
# For remote video files, first extract the audio locally.
|
||||
local_audio_path = extract_audio(file_loc)
|
||||
print(f"Extracted audio to: {local_audio_path}")
|
||||
|
||||
# Transcribe using the local file method.
|
||||
with open(local_audio_path, "rb") as file:
|
||||
buffer_data = file.read()
|
||||
payload: FileSource = {"buffer": buffer_data}
|
||||
response = deepgram.listen.rest.v("1").transcribe_file(payload, options)
|
||||
|
||||
# Clean up: delete the local audio file.
|
||||
if os.path.exists(local_audio_path):
|
||||
os.remove(local_audio_path)
|
||||
print(f"Deleted local audio file: {local_audio_path}")
|
||||
else:
|
||||
raise ValueError("media_type must be either 'audio' or 'video'.")
|
||||
|
||||
|
||||
return response
|
||||
|
||||
except Exception as e:
|
||||
print(f"Exception during transcription: {e}")
|
||||
return None
|
||||
finally:
|
||||
# Clean up: delete the local audio file.
|
||||
if os.path.exists(local_audio_path):
|
||||
os.remove(local_audio_path)
|
||||
print(f"Deleted local audio file: {local_audio_path}")
|
||||
|
||||
|
||||
|
||||
def group_words_into_sentences(words, max_words=15):
|
||||
sentences = []
|
||||
current_sentence = []
|
||||
current_speaker = None
|
||||
start_time = None
|
||||
|
||||
for i, word_info in enumerate(words):
|
||||
word = word_info["punctuated_word"]
|
||||
speaker = word_info["speaker"]
|
||||
start = word_info["start"]
|
||||
end = word_info["end"]
|
||||
|
||||
# If speaker changes or sentence reaches max length, start a new sentence
|
||||
if speaker != current_speaker:
|
||||
if current_sentence:
|
||||
sentences.append({
|
||||
"sentence": " ".join([w["word"] for w in current_sentence]),
|
||||
"speaker": current_speaker,
|
||||
"start": start_time,
|
||||
"end": words[i-1]["end"],
|
||||
"words": current_sentence
|
||||
})
|
||||
current_sentence = []
|
||||
current_speaker = speaker
|
||||
start_time = start
|
||||
|
||||
# Append word with metadata inside the current sentence
|
||||
current_sentence.append({"word": word, "start": start, "end": end})
|
||||
|
||||
# Append the last sentence if any words remain
|
||||
if current_sentence:
|
||||
sentences.append({
|
||||
"sentence": " ".join([w["word"] for w in current_sentence]),
|
||||
"speaker": current_speaker,
|
||||
"start": start_time,
|
||||
"end": words[-1]["end"],
|
||||
"words": current_sentence
|
||||
})
|
||||
|
||||
return {"sentences": sentences}
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
|
||||
audio_url = "https://s3.us-east-2.amazonaws.com/com.mkdlabs.images/baas/jordan/019933724441Business%20English%20Conversation%20Lesson%2045_%20Meeting%20a%20New%20Colleague.mp3"
|
||||
video_url = "https://s3.us-east-2.amazonaws.com/com.mkdlabs.images/baas/jordan/038426704141Business%20English%20Conversation%20Lesson%2045_%20%20Meeting%20a%20New%20Colleague.mp4"
|
||||
|
||||
# Folder for file uploads/downloads
|
||||
|
||||
response = transcribe_media(video_url, media_type="video")
|
||||
print(response)
|
||||
Reference in New Issue
Block a user