import json import os from datetime import datetime from typing import Dict, List, Optional from pathlib import Path import openai from .vector_store import VectorStore from .brand_style import BrandStyle from .config import Config class Copywriter: def __init__(self): self.vector_store = VectorStore() self.brand_style = BrandStyle() self.user_queries_path = Path("data/user_queries") self.user_queries_path.mkdir(parents=True, exist_ok=True) # Initialize OpenAI openai.api_key = Config.OPENAI_API_KEY def generate_copy(self, request) -> Dict: """Generate marketing copy and log user interaction""" try: # Log the user query first query_log = self._log_user_query(request) # Get similar content from vector store similar = self.vector_store.search(request.prompt, request.content_type) # Format similar content for context similar_content = "" if similar: similar_content = "\n\nSimilar past campaigns for reference:\n" for i, campaign in enumerate(similar[:3], 1): similar_content += f"{i}. {campaign.get('content', '')}\n" # Generate with OpenAI system_prompt = self.brand_style.get_prompt(request) user_prompt = f"Create marketing copy for: {request.prompt}{similar_content}" response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0.7, max_tokens=500 ) generated_copy = response.choices[0].message.content # Update the query log with the result self._update_query_log(query_log["query_id"], generated_copy, True) # Store the generated copy for future reference new_campaign = { "content": generated_copy, "content_type": request.content_type, "metadata": { "prompt": request.prompt, "tone": request.tone, "generated_at": datetime.now().isoformat(), "query_id": query_log["query_id"] } } # Add to vector store for future similarity searches self.vector_store.add_campaign(new_campaign) return { "result": generated_copy, "query_id": query_log["query_id"], "similar_campaigns_used": len(similar) } except Exception as e: # Log the error in user queries if 'query_log' in locals(): self._update_query_log(query_log["query_id"], str(e), False) raise e def _log_user_query(self, request) -> Dict: """Log user query for AI training purposes""" query_id = f"query_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}" # Handle both dict and Pydantic model objects if hasattr(request, 'dict'): request_data = request.dict() else: request_data = request query_log = { "query_id": query_id, "timestamp": datetime.now().isoformat(), "user_input": { "prompt": request_data.get("prompt"), "content_type": request_data.get("content_type"), "tone": request_data.get("tone") }, "status": "processing", "generated_output": None, "success": None, "processing_time": None, "similar_campaigns_count": 0 } # Save to user_queries folder query_file = self.user_queries_path / f"{query_id}.json" with open(query_file, 'w') as f: json.dump(query_log, f, indent=2) return query_log def _update_query_log(self, query_id: str, output: str, success: bool): """Update the query log with results""" query_file = self.user_queries_path / f"{query_id}.json" if query_file.exists(): with open(query_file, 'r') as f: query_log = json.load(f) query_log.update({ "generated_output": output, "success": success, "status": "completed" if success else "failed", "completed_at": datetime.now().isoformat() }) with open(query_file, 'w') as f: json.dump(query_log, f, indent=2) def get_query_history(self, limit: int = 10) -> List[Dict]: """Get recent user queries for analysis""" query_files = list(self.user_queries_path.glob("*.json")) query_files.sort(key=lambda x: x.stat().st_mtime, reverse=True) queries = [] for file in query_files[:limit]: with open(file, 'r') as f: queries.append(json.load(f)) return queries def get_query_analytics(self) -> Dict: """Get analytics on user queries for AI improvement""" query_files = list(self.user_queries_path.glob("*.json")) total_queries = len(query_files) successful_queries = 0 failed_queries = 0 content_type_counts = {} tone_counts = {} for file in query_files: with open(file, 'r') as f: query = json.load(f) if query.get("success"): successful_queries += 1 elif query.get("success") is False: failed_queries += 1 content_type = query.get("user_input", {}).get("content_type", "unknown") content_type_counts[content_type] = content_type_counts.get(content_type, 0) + 1 tone = query.get("user_input", {}).get("tone", "default") tone_counts[tone] = tone_counts.get(tone, 0) + 1 return { "total_queries": total_queries, "successful_queries": successful_queries, "failed_queries": failed_queries, "success_rate": round(successful_queries / total_queries * 100, 2) if total_queries > 0 else 0, "content_type_distribution": content_type_counts, "tone_distribution": tone_counts } def log_user_feedback(self, query_id: str, feedback: Dict): """Log user feedback on generated copy for training""" query_file = self.user_queries_path / f"{query_id}.json" if query_file.exists(): with open(query_file, 'r') as f: query_log = json.load(f) query_log["user_feedback"] = { "rating": feedback.get("rating"), # 1-5 scale "comments": feedback.get("comments"), "used_output": feedback.get("used_output", False), "modifications_made": feedback.get("modifications_made"), "feedback_timestamp": datetime.now().isoformat() } with open(query_file, 'w') as f: json.dump(query_log, f, indent=2) return True return False def export_training_data(self, output_file: str = None) -> str: """Export user queries and feedback for model training""" if not output_file: output_file = f"training_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" query_files = list(self.user_queries_path.glob("*.json")) training_data = [] for file in query_files: with open(file, 'r') as f: query = json.load(f) # Only include successful queries with feedback for training if query.get("success") and query.get("user_feedback"): training_example = { "input": query["user_input"]["prompt"], "content_type": query["user_input"]["content_type"], "tone": query["user_input"]["tone"], "output": query["generated_output"], "rating": query["user_feedback"]["rating"], "used": query["user_feedback"]["used_output"] } training_data.append(training_example) output_path = Path("data") / output_file with open(output_path, 'w') as f: json.dump(training_data, f, indent=2) return str(output_path)