Files
ds_task_marketing_assistant_ai/backend/copywriter.py
T
2025-07-10 19:00:01 +01:00

230 lines
8.8 KiB
Python

import json
import os
from datetime import datetime
from typing import Dict, List, Optional
from pathlib import Path
import openai
from .vector_store import VectorStore
from .brand_style import BrandStyle
from .config import Config
class Copywriter:
def __init__(self):
self.vector_store = VectorStore()
self.brand_style = BrandStyle()
self.user_queries_path = Path("data/user_queries")
self.user_queries_path.mkdir(parents=True, exist_ok=True)
# Initialize OpenAI
openai.api_key = Config.OPENAI_API_KEY
def generate_copy(self, request) -> Dict:
"""Generate marketing copy and log user interaction"""
try:
# Log the user query first
query_log = self._log_user_query(request)
# Get similar content from vector store
similar = self.vector_store.search(request.prompt, request.content_type)
# Format similar content for context
similar_content = ""
if similar:
similar_content = "\n\nSimilar past campaigns for reference:\n"
for i, campaign in enumerate(similar[:3], 1):
similar_content += f"{i}. {campaign.get('content', '')}\n"
# Generate with OpenAI
system_prompt = self.brand_style.get_prompt(request)
user_prompt = f"Create marketing copy for: {request.prompt}{similar_content}"
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.7,
max_tokens=500
)
generated_copy = response.choices[0].message.content
# Update the query log with the result
self._update_query_log(query_log["query_id"], generated_copy, True)
# Store the generated copy for future reference
new_campaign = {
"content": generated_copy,
"content_type": request.content_type,
"metadata": {
"prompt": request.prompt,
"tone": request.tone,
"generated_at": datetime.now().isoformat(),
"query_id": query_log["query_id"]
}
}
# Add to vector store for future similarity searches
self.vector_store.add_campaign(new_campaign)
return {
"result": generated_copy,
"query_id": query_log["query_id"],
"similar_campaigns_used": len(similar)
}
except Exception as e:
# Log the error in user queries
if 'query_log' in locals():
self._update_query_log(query_log["query_id"], str(e), False)
raise e
def _log_user_query(self, request) -> Dict:
"""Log user query for AI training purposes"""
query_id = f"query_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"
# Handle both dict and Pydantic model objects
if hasattr(request, 'dict'):
request_data = request.dict()
else:
request_data = request
query_log = {
"query_id": query_id,
"timestamp": datetime.now().isoformat(),
"user_input": {
"prompt": request_data.get("prompt"),
"content_type": request_data.get("content_type"),
"tone": request_data.get("tone")
},
"status": "processing",
"generated_output": None,
"success": None,
"processing_time": None,
"similar_campaigns_count": 0
}
# Save to user_queries folder
query_file = self.user_queries_path / f"{query_id}.json"
with open(query_file, 'w') as f:
json.dump(query_log, f, indent=2)
return query_log
def _update_query_log(self, query_id: str, output: str, success: bool):
"""Update the query log with results"""
query_file = self.user_queries_path / f"{query_id}.json"
if query_file.exists():
with open(query_file, 'r') as f:
query_log = json.load(f)
query_log.update({
"generated_output": output,
"success": success,
"status": "completed" if success else "failed",
"completed_at": datetime.now().isoformat()
})
with open(query_file, 'w') as f:
json.dump(query_log, f, indent=2)
def get_query_history(self, limit: int = 10) -> List[Dict]:
"""Get recent user queries for analysis"""
query_files = list(self.user_queries_path.glob("*.json"))
query_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
queries = []
for file in query_files[:limit]:
with open(file, 'r') as f:
queries.append(json.load(f))
return queries
def get_query_analytics(self) -> Dict:
"""Get analytics on user queries for AI improvement"""
query_files = list(self.user_queries_path.glob("*.json"))
total_queries = len(query_files)
successful_queries = 0
failed_queries = 0
content_type_counts = {}
tone_counts = {}
for file in query_files:
with open(file, 'r') as f:
query = json.load(f)
if query.get("success"):
successful_queries += 1
elif query.get("success") is False:
failed_queries += 1
content_type = query.get("user_input", {}).get("content_type", "unknown")
content_type_counts[content_type] = content_type_counts.get(content_type, 0) + 1
tone = query.get("user_input", {}).get("tone", "default")
tone_counts[tone] = tone_counts.get(tone, 0) + 1
return {
"total_queries": total_queries,
"successful_queries": successful_queries,
"failed_queries": failed_queries,
"success_rate": round(successful_queries / total_queries * 100, 2) if total_queries > 0 else 0,
"content_type_distribution": content_type_counts,
"tone_distribution": tone_counts
}
def log_user_feedback(self, query_id: str, feedback: Dict):
"""Log user feedback on generated copy for training"""
query_file = self.user_queries_path / f"{query_id}.json"
if query_file.exists():
with open(query_file, 'r') as f:
query_log = json.load(f)
query_log["user_feedback"] = {
"rating": feedback.get("rating"), # 1-5 scale
"comments": feedback.get("comments"),
"used_output": feedback.get("used_output", False),
"modifications_made": feedback.get("modifications_made"),
"feedback_timestamp": datetime.now().isoformat()
}
with open(query_file, 'w') as f:
json.dump(query_log, f, indent=2)
return True
return False
def export_training_data(self, output_file: str = None) -> str:
"""Export user queries and feedback for model training"""
if not output_file:
output_file = f"training_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
query_files = list(self.user_queries_path.glob("*.json"))
training_data = []
for file in query_files:
with open(file, 'r') as f:
query = json.load(f)
# Only include successful queries with feedback for training
if query.get("success") and query.get("user_feedback"):
training_example = {
"input": query["user_input"]["prompt"],
"content_type": query["user_input"]["content_type"],
"tone": query["user_input"]["tone"],
"output": query["generated_output"],
"rating": query["user_feedback"]["rating"],
"used": query["user_feedback"]["used_output"]
}
training_data.append(training_example)
output_path = Path("data") / output_file
with open(output_path, 'w') as f:
json.dump(training_data, f, indent=2)
return str(output_path)