Files
ds_sabaproject/social_media_collection.py
T
timothyafolami b7a73606e0 recent data
2024-11-27 20:44:26 +01:00

166 lines
8.6 KiB
Python

import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from tavily import TavilyClient
from langchain_core.prompts.prompt import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from loguru import logger
import concurrent.futures
import json
load_dotenv()
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")
os.environ["TAVILY_API_KEY"] = os.getenv("TAVILY_API_KEY")
os.environ["PERPLEXITY_AI_API"] = os.getenv("PERPLEXITY_AI_API")
llm = ChatOpenAI(model="gpt-4o")
# Instantiating TavilyClient
tavily_client = TavilyClient()
# Check for missing environment variables
if not os.getenv("OPENAI_API_KEY") or not os.getenv("TAVILY_API_KEY") or not os.getenv("PERPLEXITY_AI_API"):
logger.error("One or more API keys are missing from the environment variables.")
raise EnvironmentError("Missing API keys.")
def get_influencer_data(social_media: str, influencer_name:str , socialmedia_name: str, socialmedia_followers:str) -> dict:
logger.info(f"Formatting Influencer Data")
initiator_prompt = PromptTemplate(
template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
You are a Influencer Data Extractor AI Agent tasked with extracting information from a search result\n
You are provided with three informations: \n
1. The social media name \n
2. The influencer social media name search result \n
3. The influencer social media followers/subscribers search results \n
Your job is to extract the influencer social media name, and social media followers or subscriber. \n
You are to return this as a JSON output. Two data should be there, the influencer social media name, and social media followers or subscribers.\n
Do this decently and properly. If the search result isn't showing the social media name or social media folloers/subscribers, return the data as NULL. \n
Don not add to the search result, just return the JSON data as expected. Also rmember to replace social media with the given social media name in the JSON output.\n
<|eot_id|><|start_header_id|>user<|end_header_id|>
SOCIALMEDIA: {social_media} \n
INFLUENCER_NAME: {influencer_name}\n
SOCIALMEDIA_NAME: {socialmedia_name} \n
SOCIALMEDIA_FOLLOWERS: {socialmedia_followers} \n
<|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
input_variables=["social_media", "socialmedia_name", "socialmedia_followers", "influencer_name"],
)
initiator_router = initiator_prompt | llm | JsonOutputParser()
try:
output = initiator_router.invoke({"social_media":social_media, "socialmedia_name":socialmedia_name, "socialmedia_followers":socialmedia_followers, "influencer_name":influencer_name})
except Exception as e:
logger.error(f"Error extracting influencer data: {e}")
return {"influencer_name": influencer_name, "followers": None} # Return structured JSON
return output
def socialmedia_details(social_media:str, influencer_name:str, product_category:str):
logger.info(f"{social_media} name search")
name_question = f"What is the {social_media} username of {influencer_name}, this person is a popular {product_category} influencer."
print(name_question)
name = tavily_client.qna_search(query=name_question, search_depth='advanced', max_results=10)
print(name)
logger.info(f"{social_media} followers search")
followers_question = f"How many {social_media} followers does {influencer_name} have? Note this is a {product_category} popular influencer."
print(followers_question)
followers = tavily_client.qna_search(query=followers_question, search_depth='advanced', max_results=10)
print(followers)
logger.info(f"Formatting Influencer {social_media} Data")
format_response = get_influencer_data(social_media={social_media}, influencer_name=influencer_name, socialmedia_name=name, socialmedia_followers=followers)
return format_response
def get_influencer_contact(influencer_name:str, contact:str, location:str) -> dict:
logger.info(f"Formatting Influencer Data")
initiator_prompt = PromptTemplate(
template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
You are a Influencer Data Extractor AI Agent tasked with extracting information from a search result\n
You are provided with three informations: \n
1. The social media name \n
2. The influencer contact info search result \n
3. The influencer location or state search results \n
Your job is to extract the influencer's contact info, and location. Make sure things are properly filtered and good. \n
Pick relevalt info, that can be useful for reaching the influencer \n
You are to return this as a JSON output. The contact and location should be the keys in the JSON \n
Do not add to the search result, just return the JSON data as expected.\n
<|eot_id|><|start_header_id|>user<|end_header_id|>
INFLUENCER_NAME: {influencer_name}\n
CONTACT: {contact} \n
LOCATION: {location} \n
<|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
input_variables=["contact", "location", "influencer_name"],
)
initiator_router = initiator_prompt | llm | JsonOutputParser()
output = initiator_router.invoke({"contact":contact, "location":location, "influencer_name":influencer_name})
return output
def contact_details(influencer_name: str, product_category: str) -> dict:
"""Fetch and format influencer contact details."""
logger.info(f"contact details search")
contact_question = f"What is the contact details or email of {influencer_name}, this person is a popular {product_category} influencer."
print(contact_question)
contact = tavily_client.qna_search(query=contact_question, search_depth='advanced', max_results=10)
print(contact)
logger.info(f"Location details search")
location_question = f"What is the location of {influencer_name}, this person is a popular {product_category} influencer. Check for state and country."
print(location_question)
location = tavily_client.qna_search(query=location_question, search_depth='advanced', max_results=10)
print(location)
logger.info(f"Formatting Influencer contact details")
format_response = get_influencer_contact(influencer_name=influencer_name, contact=contact, location=location)
return format_response
# creating a function to get all the influencer data
def influencer_data(influencer_name: str, product_category:str):
logger.info("Getting {influencer_name} social media data")
facebook = socialmedia_details(social_media="Facebook", influencer_name=influencer_name, product_category=product_category)
instagram = socialmedia_details(social_media="Instagram", influencer_name=influencer_name, product_category=product_category)
tiktok = socialmedia_details(social_media="Tiktok", influencer_name=influencer_name, product_category=product_category)
youtube = socialmedia_details(social_media="Youtube", influencer_name=influencer_name, product_category=product_category)
contact_info = contact_details(influencer_name=influencer_name, product_category=product_category)
response = {
"name": influencer_name,
"facebook": facebook,
"instagram": instagram,
"tiktok": tiktok,
"youtube": youtube,
"contact" : contact_info
}
return json.dumps(response)
# Function to get all influencers details concurrently for a category
def get_all_influencer_data(influencer_names: list, category: str):
all_influencers_data = []
# Using ThreadPoolExecutor to fetch influencer data concurrently
with concurrent.futures.ThreadPoolExecutor() as executor:
# Submit tasks for each influencer and collect the future objects
future_to_influencer = {executor.submit(influencer_data, name, category): name for name in influencer_names}
for future in concurrent.futures.as_completed(future_to_influencer):
influencer = future_to_influencer[future]
try:
influencer_details = future.result() # Get the result of the completed future
all_influencers_data.append(influencer_details)
except Exception as exc:
logger.error(f"{influencer} generated an exception: {exc}")
return all_influencers_data
# # data = get_all_influencer_data(influencer_names=['Kylie Swift'], category="Entertainment & Pop Culture")
# data = socialmedia_details("Facebook", "Kylie Swift", "Entertainment")
# print(data)