2024-10-28 23:20:31 +01:00
from openai import OpenAI
import os
import requests
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from tavily import TavilyClient
from langchain_core . prompts . prompt import PromptTemplate
from langchain_core . output_parsers import StrOutputParser , JsonOutputParser
import concurrent . futures
import json
from loguru import logger
load_dotenv ( )
os . environ [ " OPENAI_API_KEY " ] = os . getenv ( " OPENAI_API_KEY " )
llm = ChatOpenAI ( model = " gpt-4o " )
API_KEY = os . getenv ( ' PERPLEXITY_AI_API ' )
def perplexity_data ( prompt , api_key = API_KEY ) :
url = " https://api.perplexity.ai/chat/completions "
payload = {
" model " : " llama-3.1-sonar-huge-128k-online " ,
" messages " : [
{
" role " : " system " ,
" content " : " Be precise and concise. "
} ,
{
" role " : " user " ,
" content " : prompt
}
] ,
" temperature " : 0.2 ,
" top_p " : 0.9 ,
" return_citations " : True ,
" search_domain_filter " : [ " perplexity.ai " ] ,
" return_images " : False ,
" return_related_questions " : False ,
" search_recency_filter " : " month " ,
" top_k " : 0 ,
" stream " : False ,
" presence_penalty " : 0 ,
" frequency_penalty " : 1
}
headers = {
" Authorization " : f " Bearer { api_key } " ,
" Content-Type " : " application/json "
}
response = requests . post ( url , json = payload , headers = headers )
# Check if the request was successful
if response . status_code == 200 :
response_data = response . json ( )
try :
# Extract the message content
message_content = response_data [ ' choices ' ] [ 0 ] [ ' message ' ] [ ' content ' ]
return message_content
except ( KeyError , IndexError ) :
return " Unexpected response format. "
else :
return f " Request failed with status code: { response . status_code } "
def extract_names ( influencer_names : str ) - > dict :
logger . info ( f " Formatting Influencer Data " )
initiator_prompt = PromptTemplate (
template = """ <|begin_of_text|><|start_header_id|>system<|end_header_id|>
You are a Influencer Data Extractor AI Agent tasked with extracting information from a search result \n
You are provided with three informations: \n
1. Search result of influencers names
Your job is to extract all the influencers names \n
Make sure to extract all the names there \n
You are to return this as a JSON output. The key should be ' names ' in the JSON \n
Do not add to the search result, just return the JSON data of all the influencer names as expected. \n
<|eot_id|><|start_header_id|>user<|end_header_id|>
INFLUENCER_NAMEs: {influencer_names} \n
<|eot_id|><|start_header_id|>assistant<|end_header_id|> """ ,
input_variables = [ " influencer_names " ] ,
)
initiator_router = initiator_prompt | llm | JsonOutputParser ( )
output = initiator_router . invoke ( { " influencer_names " : influencer_names } )
return output
def get_category_influencers ( product_category : str ) :
logger . info ( " Search for influencers names " )
names_prompt = f " Give me a list of the names of the all the top and popular { product_category } influencers in USA. People that are known across the internet. I only need their names. Give me all that ' s available as much as possible. "
search_names = perplexity_data ( names_prompt )
logger . info ( " Extracting influencers names " )
influencers_names = extract_names ( search_names )
return influencers_names
# This is the function to get influencers for all product categories using multithreading concurrency
def get_all_names ( product_categories : list ) :
all_influencers = { }
# Using ThreadPoolExecutor for multithreading concurrency
with concurrent . futures . ThreadPoolExecutor ( ) as executor :
# Create a dictionary where futures map to their category name
future_to_category = { executor . submit ( get_category_influencers , category ) : category for category in product_categories }
for future in concurrent . futures . as_completed ( future_to_category ) :
category = future_to_category [ future ]
try :
influencers = future . result ( ) # Get result of the completed future
all_influencers [ category ] = influencers
except Exception as exc :
logger . error ( f " { category } generated an exception: { exc } " )
2024-11-13 00:41:30 +01:00
return all_influencers
test = get_all_names ( [ ' Beauty ' ] )
print ( test [ ' Beauty ' ] [ ' names ' ] )