import os from dotenv import load_dotenv from langchain_openai import ChatOpenAI from tavily import TavilyClient from langchain_core.prompts.prompt import PromptTemplate from langchain_core.output_parsers import JsonOutputParser from loguru import logger import concurrent.futures import json load_dotenv() os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") os.environ["TAVILY_API_KEY"] = os.getenv("TAVILY_API_KEY") os.environ["PERPLEXITY_AI_API"] = os.getenv("PERPLEXITY_AI_API") llm = ChatOpenAI(model="gpt-4o") # Instantiating TavilyClient tavily_client = TavilyClient() # Check for missing environment variables if not os.getenv("OPENAI_API_KEY") or not os.getenv("TAVILY_API_KEY") or not os.getenv("PERPLEXITY_AI_API"): logger.error("One or more API keys are missing from the environment variables.") raise EnvironmentError("Missing API keys.") def get_influencer_data(social_media: str, influencer_name:str , socialmedia_name: str, socialmedia_followers:str) -> dict: logger.info(f"Formatting Influencer Data") initiator_prompt = PromptTemplate( template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a Influencer Data Extractor AI Agent tasked with extracting information from a search result\n You are provided with three informations: \n 1. The social media name \n 2. The influencer social media name search result \n 3. The influencer social media followers/subscribers search results \n Your job is to extract the influencer social media name, and social media followers or subscriber. \n You are to return this as a JSON output. Two data should be there, the influencer social media name, and social media followers or subscribers.\n Do this decently and properly. If the search result isn't showing the social media name or social media folloers/subscribers, return the data as NULL. \n Don not add to the search result, just return the JSON data as expected. Also rmember to replace social media with the given social media name in the JSON output.\n <|eot_id|><|start_header_id|>user<|end_header_id|> SOCIALMEDIA: {social_media} \n INFLUENCER_NAME: {influencer_name}\n SOCIALMEDIA_NAME: {socialmedia_name} \n SOCIALMEDIA_FOLLOWERS: {socialmedia_followers} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>""", input_variables=["social_media", "socialmedia_name", "socialmedia_followers", "influencer_name"], ) initiator_router = initiator_prompt | llm | JsonOutputParser() try: output = initiator_router.invoke({"social_media":social_media, "socialmedia_name":socialmedia_name, "socialmedia_followers":socialmedia_followers, "influencer_name":influencer_name}) except Exception as e: logger.error(f"Error extracting influencer data: {e}") return {"influencer_name": influencer_name, "followers": None} # Return structured JSON return output def socialmedia_details(social_media:str, influencer_name:str, product_category:str): logger.info(f"{social_media} name search") name_question = f"What is the {social_media} username of {influencer_name}, this person is a popular {product_category} influencer." print(name_question) name = tavily_client.qna_search(query=name_question, search_depth='advanced', max_results=10) print(name) logger.info(f"{social_media} followers search") followers_question = f"How many {social_media} followers does {influencer_name} have? Note this is a {product_category} popular influencer." print(followers_question) followers = tavily_client.qna_search(query=followers_question, search_depth='advanced', max_results=10) print(followers) logger.info(f"Formatting Influencer {social_media} Data") format_response = get_influencer_data(social_media={social_media}, influencer_name=influencer_name, socialmedia_name=name, socialmedia_followers=followers) return format_response def get_influencer_contact(influencer_name:str, contact:str, location:str) -> dict: logger.info(f"Formatting Influencer Data") initiator_prompt = PromptTemplate( template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a Influencer Data Extractor AI Agent tasked with extracting information from a search result\n You are provided with three informations: \n 1. The social media name \n 2. The influencer contact info search result \n 3. The influencer location or state search results \n Your job is to extract the influencer's contact info, and location. Make sure things are properly filtered and good. \n Pick relevalt info, that can be useful for reaching the influencer \n You are to return this as a JSON output. The contact and location should be the keys in the JSON \n Do not add to the search result, just return the JSON data as expected.\n <|eot_id|><|start_header_id|>user<|end_header_id|> INFLUENCER_NAME: {influencer_name}\n CONTACT: {contact} \n LOCATION: {location} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>""", input_variables=["contact", "location", "influencer_name"], ) initiator_router = initiator_prompt | llm | JsonOutputParser() output = initiator_router.invoke({"contact":contact, "location":location, "influencer_name":influencer_name}) return output def contact_details(influencer_name: str, product_category: str) -> dict: """Fetch and format influencer contact details.""" logger.info(f"contact details search") contact_question = f"What is the contact details or email of {influencer_name}, this person is a popular {product_category} influencer." print(contact_question) contact = tavily_client.qna_search(query=contact_question, search_depth='advanced', max_results=10) print(contact) logger.info(f"Location details search") location_question = f"What is the location of {influencer_name}, this person is a popular {product_category} influencer. Check for state and country." print(location_question) location = tavily_client.qna_search(query=location_question, search_depth='advanced', max_results=10) print(location) logger.info(f"Formatting Influencer contact details") format_response = get_influencer_contact(influencer_name=influencer_name, contact=contact, location=location) return format_response # creating a function to get all the influencer data def influencer_data(influencer_name: str, product_category:str): logger.info("Getting {influencer_name} social media data") facebook = socialmedia_details(social_media="Facebook", influencer_name=influencer_name, product_category=product_category) instagram = socialmedia_details(social_media="Instagram", influencer_name=influencer_name, product_category=product_category) tiktok = socialmedia_details(social_media="Tiktok", influencer_name=influencer_name, product_category=product_category) youtube = socialmedia_details(social_media="Youtube", influencer_name=influencer_name, product_category=product_category) contact_info = contact_details(influencer_name=influencer_name, product_category=product_category) response = { "name": influencer_name, "facebook": facebook, "instagram": instagram, "tiktok": tiktok, "youtube": youtube, "contact" : contact_info } return json.dumps(response) def clean_influencer_data(influencer_data: list) -> dict: logger.info("Cleaning and correcting influencer data") initiator_prompt = PromptTemplate( template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a data-cleaning AI agent. Your task is to review and correct the follower count data for influencers, while maintaining the exact structure of the input. Instructions: - Locate follower or subscriber counts within the data. - Convert shorthand notation to full integer values: - 'k' represents thousands (e.g., '25k' becomes 25000). - 'M' represents millions (e.g., '1.2M' becomes 1200000). - 'B' represents billions (e.g., '1B' becomes 1000000000). - Leave full integer counts as they are. - If a follower or subscriber count is missing, keep it as NULL. Return the cleaned data in the same JSON format as provided, with only corrected follower counts if needed. <|eot_id|><|start_header_id|>user<|end_header_id|> {influencer_data} <|eot_id|><|start_header_id|>assistant<|end_header_id|>""", input_variables=["influencer_data"] ) initiator_router = initiator_prompt | llm | JsonOutputParser() output = initiator_router.invoke({"influencer_data": influencer_data}) return output # Function to get all influencers details concurrently for a category# Function to get all influencers details concurrently for a category def get_all_influencer_data(influencer_names: list, category: str): all_influencers_data = [] # Using ThreadPoolExecutor to fetch influencer data concurrently with concurrent.futures.ThreadPoolExecutor() as executor: # Submit tasks for each influencer and collect the future objects future_to_influencer = {executor.submit(influencer_data, name, category): name for name in influencer_names} for future in concurrent.futures.as_completed(future_to_influencer): influencer = future_to_influencer[future] try: influencer_details = future.result() # Get the result of the completed future all_influencers_data.append(influencer_details) except Exception as exc: logger.error(f"{influencer} generated an exception: {exc}") # Convert the result to JSON format all_influencers_json = json.dumps(all_influencers_data, indent=4) # cleaning cleaned = clean_influencer_data(all_influencers_json) return cleaned def extract_youtube_data(influencer_data: list) -> dict: logger.info("Extracting YouTube channel names for influencers") initiator_prompt = PromptTemplate( template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a data-extraction AI agent. Your task is to extract the YouTube channel name for each influencer from the provided data. If the YouTube channel name is not available, return null for that influencer. Instructions: - For each influencer, locate their YouTube channel name. - If the YouTube channel name is present, include it in the output. - If the YouTube channel name is missing, return null for that influencer. Return the output as a dictionary where the keys are the influencer names and the values are their corresponding YouTube channel names. <|eot_id|><|start_header_id|>user<|end_header_id|> {influencer_data} <|eot_id|><|start_header_id|>assistant<|end_header_id|>""", input_variables=["influencer_data"] ) initiator_router = initiator_prompt | llm | JsonOutputParser() output = initiator_router.invoke({"influencer_data": influencer_data}) return output # # data = get_all_influencer_data(influencer_names=['Kylie Swift'], category="Entertainment & Pop Culture") # data = socialmedia_details("Facebook", "Kylie Swift", "Entertainment") # print(data)