import os import requests from dotenv import load_dotenv from langchain_openai import ChatOpenAI from tavily import TavilyClient from langchain_core.prompts.prompt import PromptTemplate from langchain_core.output_parsers import StrOutputParser, JsonOutputParser from loguru import logger import concurrent.futures import json load_dotenv() os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") os.environ["TAVILY_API_KEY"] = os.getenv("TAVILY_API_KEY") os.environ["PERPLEXITY_AI_API"] = os.getenv("PERPLEXITY_AI_API") llm = ChatOpenAI(model="gpt-4o") # Instantiating TavilyClient tavily_client = TavilyClient() def get_influencer_data(social_media: str, influencer_name:str , socialmedia_name: str, socialmedia_followers:str) -> dict: logger.info(f"Formatting Influencer Data") initiator_prompt = PromptTemplate( template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a Influencer Data Extractor AI Agent tasked with extracting information from a search result\n You are provided with three informations: \n 1. The social media name \n 2. The influencer social media name search result \n 3. The influencer social media followers/subscribers search results \n Your job is to extract the influencer social media name, and social media followers or subscriber. \n You are to return this as a JSON output. Two data should be there, the influencer social media name, and social media followers or subscribers.\n Do this decently and properly. If the search result isn't showing the social media name or social media folloers/subscribers, return the data as NULL. \n Don not add to the search result, just return the JSON data as expected. Also rmember to replace social media with the given social media name in the JSON output.\n <|eot_id|><|start_header_id|>user<|end_header_id|> SOCIALMEDIA: {social_media} \n INFLUENCER_NAME: {influencer_name}\n SOCIALMEDIA_NAME: {socialmedia_name} \n SOCIALMEDIA_FOLLOWERS: {socialmedia_followers} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>""", input_variables=["social_media", "socialmedia_name", "socialmedia_followers", "influencer_name"], ) initiator_router = initiator_prompt | llm | JsonOutputParser() output = initiator_router.invoke({"social_media":social_media, "socialmedia_name":socialmedia_name, "socialmedia_followers":socialmedia_followers, "influencer_name":influencer_name}) return output def socialmedia_details(social_media:str, influencer_name:str, product_category:str): logger.info(f"{social_media} name search") name_question = f"What is the {social_media} username of {influencer_name}, this person is a popular {product_category} influencer." print(name_question) name = tavily_client.qna_search(query=name_question, search_depth='advanced', max_results=10) print(name) logger.info(f"{social_media} followers search") followers_question = f"How many {social_media} followers does {influencer_name} have? Note this is a {product_category} popular influencer." print(followers_question) followers = tavily_client.qna_search(query=followers_question, search_depth='advanced', max_results=10) print(followers) logger.info(f"Formatting Influencer {social_media} Data") format_response = get_influencer_data(social_media={social_media}, influencer_name=influencer_name, socialmedia_name=name, socialmedia_followers=followers) return format_response def get_influencer_contact(influencer_name:str, contact:str, location:str) -> dict: logger.info(f"Formatting Influencer Data") initiator_prompt = PromptTemplate( template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are a Influencer Data Extractor AI Agent tasked with extracting information from a search result\n You are provided with three informations: \n 1. The social media name \n 2. The influencer contact info search result \n 3. The influencer location or state search results \n Your job is to extract the influencer's contact info, and location. Make sure things are properly filtered and good. \n Pick relevalt info, that can be useful for reaching the influencer \n You are to return this as a JSON output. The contact and location should be the keys in the JSON \n Do not add to the search result, just return the JSON data as expected.\n <|eot_id|><|start_header_id|>user<|end_header_id|> INFLUENCER_NAME: {influencer_name}\n CONTACT: {contact} \n LOCATION: {location} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>""", input_variables=["contact", "location", "influencer_name"], ) initiator_router = initiator_prompt | llm | JsonOutputParser() output = initiator_router.invoke({"contact":contact, "location":location, "influencer_name":influencer_name}) return output def contact_details(influencer_name:str, product_category:str): logger.info(f"contact details search") contact_question = f"What is the contact details or email of {influencer_name}, this person is a popular {product_category} influencer." print(contact_question) contact = tavily_client.qna_search(query=contact_question, search_depth='advanced', max_results=10) print(contact) logger.info(f"Location details search") location_question = f"What is the location of {influencer_name}, this person is a popular {product_category} influencer. Check for state and country." print(location_question) location = tavily_client.qna_search(query=location_question, search_depth='advanced', max_results=10) print(location) logger.info(f"Formatting Influencer contact details") format_response = get_influencer_contact(influencer_name=influencer_name, contact=contact, location=location) return format_response # creating a function to get all the influencer data def influencer_data(influencer_name: str, product_category:str): logger.info("Getting {influencer_name} social media data") facebook = socialmedia_details(social_media="Facebook", influencer_name=influencer_name, product_category=product_category) instagram = socialmedia_details(social_media="Instagram", influencer_name=influencer_name, product_category=product_category) tiktok = socialmedia_details(social_media="Tiktok", influencer_name=influencer_name, product_category=product_category) youtube = socialmedia_details(social_media="Youtube", influencer_name=influencer_name, product_category=product_category) contact_info = contact_details(influencer_name=influencer_name, product_category=product_category) response = { "name": influencer_name, "facebook": facebook, "instagram": instagram, "tiktok": tiktok, "youtube": youtube, "contact" : contact_info } return json.dumps(response) # Function to get all influencers details concurrently for a category def get_all_influencer_data(influencer_names: list, category: str): all_influencers_data = [] # Using ThreadPoolExecutor to fetch influencer data concurrently with concurrent.futures.ThreadPoolExecutor() as executor: # Submit tasks for each influencer and collect the future objects future_to_influencer = {executor.submit(influencer_data, name, category): name for name in influencer_names} for future in concurrent.futures.as_completed(future_to_influencer): influencer = future_to_influencer[future] try: influencer_details = future.result() # Get the result of the completed future all_influencers_data.append(influencer_details) except Exception as exc: logger.error(f"{influencer} generated an exception: {exc}") return all_influencers_data