Files
ds_sabaproject/names_collection.py
T

120 lines
4.5 KiB
Python
Raw Normal View History

2024-10-28 23:20:31 +01:00
from openai import OpenAI
import os
import requests
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from tavily import TavilyClient
from langchain_core.prompts.prompt import PromptTemplate
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
import concurrent.futures
import json
from loguru import logger
load_dotenv()
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")
llm = ChatOpenAI(model="gpt-4o")
API_KEY = os.getenv('PERPLEXITY_AI_API')
def perplexity_data(prompt, api_key=API_KEY):
url = "https://api.perplexity.ai/chat/completions"
payload = {
"model": "llama-3.1-sonar-huge-128k-online",
"messages": [
{
"role": "system",
"content": "Be precise and concise."
},
{
"role": "user",
"content": prompt
}
],
"temperature": 0.2,
"top_p": 0.9,
"return_citations": True,
"search_domain_filter": ["perplexity.ai"],
"return_images": False,
"return_related_questions": False,
"search_recency_filter": "month",
"top_k": 0,
"stream": False,
"presence_penalty": 0,
"frequency_penalty": 1
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
response = requests.post(url, json=payload, headers=headers)
# Check if the request was successful
if response.status_code == 200:
response_data = response.json()
try:
# Extract the message content
message_content = response_data['choices'][0]['message']['content']
return message_content
except (KeyError, IndexError):
return "Unexpected response format."
else:
return f"Request failed with status code: {response.status_code}"
def extract_names(influencer_names:str) -> dict:
logger.info(f"Formatting Influencer Data")
initiator_prompt = PromptTemplate(
template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
You are a Influencer Data Extractor AI Agent tasked with extracting information from a search result\n
You are provided with three informations: \n
1. Search result of influencers names
Your job is to extract all the influencers names\n
Make sure to extract all the names there \n
You are to return this as a JSON output. The key should be 'names' in the JSON \n
Do not add to the search result, just return the JSON data of all the influencer names as expected.\n
<|eot_id|><|start_header_id|>user<|end_header_id|>
INFLUENCER_NAMEs: {influencer_names}\n
<|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
input_variables=["influencer_names"],
)
initiator_router = initiator_prompt | llm | JsonOutputParser()
output = initiator_router.invoke({"influencer_names":influencer_names})
return output
def get_category_influencers(product_category: str):
logger.info("Search for influencers names")
names_prompt = f"Give me a list of the names of the all the top and popular {product_category} influencers in USA. People that are known across the internet. I only need their names. Give me all that's available as much as possible."
search_names = perplexity_data(names_prompt)
logger.info("Extracting influencers names")
influencers_names = extract_names(search_names)
return influencers_names
# This is the function to get influencers for all product categories using multithreading concurrency
def get_all_names(product_categories: list):
all_influencers = {}
# Using ThreadPoolExecutor for multithreading concurrency
with concurrent.futures.ThreadPoolExecutor() as executor:
# Create a dictionary where futures map to their category name
future_to_category = {executor.submit(get_category_influencers, category): category for category in product_categories}
for future in concurrent.futures.as_completed(future_to_category):
category = future_to_category[future]
try:
influencers = future.result() # Get result of the completed future
all_influencers[category] = influencers
except Exception as exc:
logger.error(f"{category} generated an exception: {exc}")
2024-11-13 00:41:30 +01:00
return all_influencers
2024-11-27 20:44:26 +01:00
# test = get_all_names(['Beauty'])
# print(test['Beauty']['names'])