2025-07-10 19:00:01 +01:00
|
|
|
import json
|
|
|
|
|
import os
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
from typing import Dict, List, Optional
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
import openai
|
2025-07-10 15:16:16 +01:00
|
|
|
from .vector_store import VectorStore
|
|
|
|
|
from .brand_style import BrandStyle
|
2025-07-10 19:00:01 +01:00
|
|
|
from .config import Config
|
2025-07-10 15:16:16 +01:00
|
|
|
|
|
|
|
|
class Copywriter:
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.vector_store = VectorStore()
|
|
|
|
|
self.brand_style = BrandStyle()
|
2025-07-10 19:00:01 +01:00
|
|
|
self.user_queries_path = Path("data/user_queries")
|
|
|
|
|
self.user_queries_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
# Initialize OpenAI
|
|
|
|
|
openai.api_key = Config.OPENAI_API_KEY
|
|
|
|
|
|
|
|
|
|
def generate_copy(self, request) -> Dict:
|
|
|
|
|
"""Generate marketing copy and log user interaction"""
|
|
|
|
|
try:
|
|
|
|
|
# Log the user query first
|
|
|
|
|
query_log = self._log_user_query(request)
|
|
|
|
|
|
|
|
|
|
# Get similar content from vector store
|
|
|
|
|
similar = self.vector_store.search(request.prompt, request.content_type)
|
|
|
|
|
|
|
|
|
|
# Format similar content for context
|
|
|
|
|
similar_content = ""
|
|
|
|
|
if similar:
|
|
|
|
|
similar_content = "\n\nSimilar past campaigns for reference:\n"
|
|
|
|
|
for i, campaign in enumerate(similar[:3], 1):
|
|
|
|
|
similar_content += f"{i}. {campaign.get('content', '')}\n"
|
|
|
|
|
|
|
|
|
|
# Generate with OpenAI
|
|
|
|
|
system_prompt = self.brand_style.get_prompt(request)
|
|
|
|
|
user_prompt = f"Create marketing copy for: {request.prompt}{similar_content}"
|
|
|
|
|
|
|
|
|
|
response = openai.ChatCompletion.create(
|
|
|
|
|
model="gpt-3.5-turbo",
|
|
|
|
|
messages=[
|
|
|
|
|
{"role": "system", "content": system_prompt},
|
|
|
|
|
{"role": "user", "content": user_prompt}
|
|
|
|
|
],
|
|
|
|
|
temperature=0.7,
|
|
|
|
|
max_tokens=500
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
generated_copy = response.choices[0].message.content
|
|
|
|
|
|
|
|
|
|
# Update the query log with the result
|
|
|
|
|
self._update_query_log(query_log["query_id"], generated_copy, True)
|
|
|
|
|
|
|
|
|
|
# Store the generated copy for future reference
|
|
|
|
|
new_campaign = {
|
|
|
|
|
"content": generated_copy,
|
|
|
|
|
"content_type": request.content_type,
|
|
|
|
|
"metadata": {
|
|
|
|
|
"prompt": request.prompt,
|
|
|
|
|
"tone": request.tone,
|
|
|
|
|
"generated_at": datetime.now().isoformat(),
|
|
|
|
|
"query_id": query_log["query_id"]
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Add to vector store for future similarity searches
|
|
|
|
|
self.vector_store.add_campaign(new_campaign)
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"result": generated_copy,
|
|
|
|
|
"query_id": query_log["query_id"],
|
|
|
|
|
"similar_campaigns_used": len(similar)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
# Log the error in user queries
|
|
|
|
|
if 'query_log' in locals():
|
|
|
|
|
self._update_query_log(query_log["query_id"], str(e), False)
|
|
|
|
|
raise e
|
2025-07-10 15:16:16 +01:00
|
|
|
|
2025-07-10 19:00:01 +01:00
|
|
|
def _log_user_query(self, request) -> Dict:
|
|
|
|
|
"""Log user query for AI training purposes"""
|
|
|
|
|
query_id = f"query_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"
|
|
|
|
|
|
|
|
|
|
# Handle both dict and Pydantic model objects
|
|
|
|
|
if hasattr(request, 'dict'):
|
|
|
|
|
request_data = request.dict()
|
|
|
|
|
else:
|
|
|
|
|
request_data = request
|
|
|
|
|
|
|
|
|
|
query_log = {
|
|
|
|
|
"query_id": query_id,
|
|
|
|
|
"timestamp": datetime.now().isoformat(),
|
|
|
|
|
"user_input": {
|
|
|
|
|
"prompt": request_data.get("prompt"),
|
|
|
|
|
"content_type": request_data.get("content_type"),
|
|
|
|
|
"tone": request_data.get("tone")
|
|
|
|
|
},
|
|
|
|
|
"status": "processing",
|
|
|
|
|
"generated_output": None,
|
|
|
|
|
"success": None,
|
|
|
|
|
"processing_time": None,
|
|
|
|
|
"similar_campaigns_count": 0
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Save to user_queries folder
|
|
|
|
|
query_file = self.user_queries_path / f"{query_id}.json"
|
|
|
|
|
with open(query_file, 'w') as f:
|
|
|
|
|
json.dump(query_log, f, indent=2)
|
|
|
|
|
|
|
|
|
|
return query_log
|
|
|
|
|
|
|
|
|
|
def _update_query_log(self, query_id: str, output: str, success: bool):
|
|
|
|
|
"""Update the query log with results"""
|
|
|
|
|
query_file = self.user_queries_path / f"{query_id}.json"
|
|
|
|
|
|
|
|
|
|
if query_file.exists():
|
|
|
|
|
with open(query_file, 'r') as f:
|
|
|
|
|
query_log = json.load(f)
|
|
|
|
|
|
|
|
|
|
query_log.update({
|
|
|
|
|
"generated_output": output,
|
|
|
|
|
"success": success,
|
|
|
|
|
"status": "completed" if success else "failed",
|
|
|
|
|
"completed_at": datetime.now().isoformat()
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
with open(query_file, 'w') as f:
|
|
|
|
|
json.dump(query_log, f, indent=2)
|
|
|
|
|
|
|
|
|
|
def get_query_history(self, limit: int = 10) -> List[Dict]:
|
|
|
|
|
"""Get recent user queries for analysis"""
|
|
|
|
|
query_files = list(self.user_queries_path.glob("*.json"))
|
|
|
|
|
query_files.sort(key=lambda x: x.stat().st_mtime, reverse=True)
|
|
|
|
|
|
|
|
|
|
queries = []
|
|
|
|
|
for file in query_files[:limit]:
|
|
|
|
|
with open(file, 'r') as f:
|
|
|
|
|
queries.append(json.load(f))
|
|
|
|
|
|
|
|
|
|
return queries
|
|
|
|
|
|
|
|
|
|
def get_query_analytics(self) -> Dict:
|
|
|
|
|
"""Get analytics on user queries for AI improvement"""
|
|
|
|
|
query_files = list(self.user_queries_path.glob("*.json"))
|
|
|
|
|
|
|
|
|
|
total_queries = len(query_files)
|
|
|
|
|
successful_queries = 0
|
|
|
|
|
failed_queries = 0
|
|
|
|
|
content_type_counts = {}
|
|
|
|
|
tone_counts = {}
|
|
|
|
|
|
|
|
|
|
for file in query_files:
|
|
|
|
|
with open(file, 'r') as f:
|
|
|
|
|
query = json.load(f)
|
|
|
|
|
|
|
|
|
|
if query.get("success"):
|
|
|
|
|
successful_queries += 1
|
|
|
|
|
elif query.get("success") is False:
|
|
|
|
|
failed_queries += 1
|
|
|
|
|
|
|
|
|
|
content_type = query.get("user_input", {}).get("content_type", "unknown")
|
|
|
|
|
content_type_counts[content_type] = content_type_counts.get(content_type, 0) + 1
|
|
|
|
|
|
|
|
|
|
tone = query.get("user_input", {}).get("tone", "default")
|
|
|
|
|
tone_counts[tone] = tone_counts.get(tone, 0) + 1
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"total_queries": total_queries,
|
|
|
|
|
"successful_queries": successful_queries,
|
|
|
|
|
"failed_queries": failed_queries,
|
|
|
|
|
"success_rate": round(successful_queries / total_queries * 100, 2) if total_queries > 0 else 0,
|
|
|
|
|
"content_type_distribution": content_type_counts,
|
|
|
|
|
"tone_distribution": tone_counts
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def log_user_feedback(self, query_id: str, feedback: Dict):
|
|
|
|
|
"""Log user feedback on generated copy for training"""
|
|
|
|
|
query_file = self.user_queries_path / f"{query_id}.json"
|
|
|
|
|
|
|
|
|
|
if query_file.exists():
|
|
|
|
|
with open(query_file, 'r') as f:
|
|
|
|
|
query_log = json.load(f)
|
|
|
|
|
|
|
|
|
|
query_log["user_feedback"] = {
|
|
|
|
|
"rating": feedback.get("rating"), # 1-5 scale
|
|
|
|
|
"comments": feedback.get("comments"),
|
|
|
|
|
"used_output": feedback.get("used_output", False),
|
|
|
|
|
"modifications_made": feedback.get("modifications_made"),
|
|
|
|
|
"feedback_timestamp": datetime.now().isoformat()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
with open(query_file, 'w') as f:
|
|
|
|
|
json.dump(query_log, f, indent=2)
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def export_training_data(self, output_file: str = None) -> str:
|
|
|
|
|
"""Export user queries and feedback for model training"""
|
|
|
|
|
if not output_file:
|
|
|
|
|
output_file = f"training_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
|
|
|
|
|
|
|
|
|
query_files = list(self.user_queries_path.glob("*.json"))
|
|
|
|
|
training_data = []
|
|
|
|
|
|
|
|
|
|
for file in query_files:
|
|
|
|
|
with open(file, 'r') as f:
|
|
|
|
|
query = json.load(f)
|
|
|
|
|
|
|
|
|
|
# Only include successful queries with feedback for training
|
|
|
|
|
if query.get("success") and query.get("user_feedback"):
|
|
|
|
|
training_example = {
|
|
|
|
|
"input": query["user_input"]["prompt"],
|
|
|
|
|
"content_type": query["user_input"]["content_type"],
|
|
|
|
|
"tone": query["user_input"]["tone"],
|
|
|
|
|
"output": query["generated_output"],
|
|
|
|
|
"rating": query["user_feedback"]["rating"],
|
|
|
|
|
"used": query["user_feedback"]["used_output"]
|
|
|
|
|
}
|
|
|
|
|
training_data.append(training_example)
|
|
|
|
|
|
|
|
|
|
output_path = Path("data") / output_file
|
|
|
|
|
with open(output_path, 'w') as f:
|
|
|
|
|
json.dump(training_data, f, indent=2)
|
|
|
|
|
|
|
|
|
|
return str(output_path)
|