import os import base64 import json import logging from typing import Dict, List, Tuple, Optional import openai from PIL import Image import io from dotenv import load_dotenv from content_moderator import ContentModerator # Load environment variables load_dotenv() # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('viral_velocity.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class ViralVelocityScorer: """ Viral Velocity Image Scorer using OpenAI GPT-4 Vision Implements the 4-pillar scoring system for social media images with personalization """ def __init__(self): """Initialize the scorer with OpenAI""" logger.info("Initializing ViralVelocityScorer...") api_key = os.getenv('OPENAI_API_KEY') if not api_key: logger.error("OPENAI_API_KEY not found in environment variables") raise ValueError("OPENAI_API_KEY not found in environment variables") logger.info("OpenAI API key found, initializing client...") # Initialize OpenAI client with explicit configuration try: self.client = openai.OpenAI(api_key=api_key) logger.info("OpenAI client initialized successfully") except Exception as e: logger.error(f"Failed to initialize OpenAI client: {e}") # Try alternative initialization method try: openai.api_key = api_key self.client = openai.OpenAI() logger.info("OpenAI client initialized with alternative method") except Exception as e2: logger.error(f"Alternative initialization also failed: {e2}") raise ValueError(f"Could not initialize OpenAI client: {e2}") # Scoring weights as per project specification self.weights = { 'technical_quality': 0.25, 'compositional_strength': 0.25, 'psychological_engagement': 0.30, 'trend_zeitgeist': 0.20 } logger.info(f"Scoring weights initialized: {self.weights}") # Initialize content moderator logger.info("Initializing content moderator...") self.content_moderator = ContentModerator() logger.info("ViralVelocityScorer initialization complete") def analyze_image(self, image_path: str, user_preferences: Optional[Dict] = None) -> Dict: """ Analyze an image and return comprehensive scoring (legacy method) """ logger.info(f"Starting legacy image analysis for: {image_path}") return self.analyze_image_efficient(image_path, user_preferences) def analyze_image_efficient(self, image_path: str, user_preferences: Optional[Dict] = None) -> Dict: """ Analyze an image more efficiently by combining multiple analyses into fewer API calls Now supports user preferences for personalized scoring """ logger.info(f"Starting efficient image analysis for: {image_path}") if user_preferences: logger.info(f"User preferences: {user_preferences}") else: logger.info("No user preferences provided, using generic scoring") try: # Load and prepare image logger.info("Loading image...") image = Image.open(image_path) logger.info(f"Image loaded successfully. Size: {image.size}, Mode: {image.mode}") # Content Safety & Moderation Check logger.info("Performing content safety check...") is_safe, moderation_result = self.content_moderator.check_content_safety(image_path) if not is_safe: logger.warning(f"Content rejected due to: {moderation_result['rejection_reason']}") return { 'status': 'rejected', 'message': 'Image content was flagged as inappropriate', 'rejection_reason': moderation_result['rejection_reason'], 'moderation_details': moderation_result, 'final_score': 0, 'recommendations': [ 'Please upload a different image that complies with our content guidelines', 'Ensure the image doesn\'t contain inappropriate, violent, or graphic content', 'Consider using images that are suitable for social media platforms' ] } logger.info("Content safety check passed - proceeding with scoring") # Get all scores in one API call logger.info("Getting all scores in a single API call...") scores_result = self._get_all_scores_combined(image, user_preferences) # Calculate weighted final score logger.info("Calculating weighted final score...") final_score = ( scores_result['technical_score'] * self.weights['technical_quality'] + scores_result['compositional_score'] * self.weights['compositional_strength'] + scores_result['psychological_score'] * self.weights['psychological_engagement'] + scores_result['trend_score'] * self.weights['trend_zeitgeist'] ) logger.info(f"Final weighted score: {final_score}") # Get detailed analyses in one API call logger.info("Getting detailed analyses...") details_result = self._get_all_details_combined(image, user_preferences) # Get recommendations logger.info("Generating recommendations...") recommendations = self._get_recommendations(image, final_score, user_preferences) result = { 'final_score': round(final_score, 1), 'technical_quality': { 'score': scores_result['technical_score'], 'weight': self.weights['technical_quality'], 'details': details_result['technical_details'] }, 'compositional_strength': { 'score': scores_result['compositional_score'], 'weight': self.weights['compositional_strength'], 'details': details_result['compositional_details'] }, 'psychological_engagement': { 'score': scores_result['psychological_score'], 'weight': self.weights['psychological_engagement'], 'details': details_result['psychological_details'] }, 'trend_zeitgeist': { 'score': scores_result['trend_score'], 'weight': self.weights['trend_zeitgeist'], 'details': details_result['trend_details'] }, 'recommendations': recommendations, 'user_preferences_used': user_preferences if user_preferences else None, 'content_moderation': { 'status': 'passed', 'details': moderation_result } } logger.info(f"Efficient analysis complete. Final score: {result['final_score']}/100") return result except Exception as e: logger.error(f"Efficient analysis failed with error: {str(e)}", exc_info=True) return {'error': f'Analysis failed: {str(e)}'} def _get_technical_quality_score(self, image: Image) -> float: """Get Technical Quality Score (25% weight)""" logger.info("Starting Technical Quality Score analysis...") prompt = """ Analyze this image for technical quality. Consider: 1. Sharpness & Focus (0-25 points): Is the image sharp and well-focused? 2. Resolution & Clarity (0-25 points): Is the image clear and high-resolution? 3. Image Noise (0-20 points): Is the image free from noise and artifacts? 4. Dynamic Range (0-15 points): Are highlights and shadows well-balanced? 5. Color Fidelity (0-15 points): Are colors natural and well-balanced? Return ONLY a raw JSON object (no markdown formatting, no code blocks) with: { "score": [0-100], "sharpness_focus": [0-25], "resolution_clarity": [0-25], "noise": [0-20], "dynamic_range": [0-15], "color_fidelity": [0-15], "reasoning": "brief explanation" } """ logger.info("Sending Technical Quality prompt to OpenAI...") response = self._get_openai_response(image, prompt) logger.info(f"OpenAI Technical Quality response: {response}") try: cleaned_response = self._clean_json_response(response) logger.info(f"Cleaned response: {cleaned_response}") result = json.loads(cleaned_response) score = result.get('score', 0) logger.info(f"Technical Quality Score parsed successfully: {score}") return score except json.JSONDecodeError as e: logger.error(f"Failed to parse Technical Quality JSON: {e}") logger.error(f"Raw response: {response}") logger.error(f"Cleaned response: {cleaned_response}") return 50 # Default score if parsing fails def _get_compositional_strength_score(self, image: Image) -> float: """Get Compositional Strength Score (25% weight)""" logger.info("Starting Compositional Strength Score analysis...") prompt = """ Analyze this image for compositional strength. Consider: 1. Rule of Thirds (0-25 points): Is the subject positioned according to rule of thirds? 2. Leading Lines (0-20 points): Do lines guide the eye toward the subject? 3. Balance & Symmetry (0-20 points): Is the composition balanced? 4. Depth & Framing (0-20 points): Does the image have good depth and framing? 5. Subject Isolation (0-15 points): Is the subject well-isolated and prominent? Return ONLY a raw JSON object (no markdown formatting, no code blocks) with: { "score": [0-100], "rule_of_thirds": [0-25], "leading_lines": [0-20], "balance_symmetry": [0-20], "depth_framing": [0-20], "subject_isolation": [0-15], "reasoning": "brief explanation" } """ logger.info("Sending Compositional Strength prompt to OpenAI...") response = self._get_openai_response(image, prompt) logger.info(f"OpenAI Compositional Strength response: {response}") try: cleaned_response = self._clean_json_response(response) logger.info(f"Cleaned response: {cleaned_response}") result = json.loads(cleaned_response) score = result.get('score', 0) logger.info(f"Compositional Strength Score parsed successfully: {score}") return score except json.JSONDecodeError as e: logger.error(f"Failed to parse Compositional Strength JSON: {e}") logger.error(f"Raw response: {response}") logger.error(f"Cleaned response: {cleaned_response}") return 50 def _get_psychological_engagement_score(self, image: Image) -> float: """Get Psychological Engagement Score (30% weight)""" logger.info("Starting Psychological Engagement Score analysis...") prompt = """ Analyze this image for psychological engagement potential. Consider: 1. Presence of Faces (0-30 points): Are there faces and are they engaging? 2. Emotional Resonance (0-25 points): Does the image evoke emotions? 3. Color Psychology (0-25 points): Do colors create the right mood? 4. Storytelling (0-20 points): Does the image tell a story? Return ONLY a raw JSON object (no markdown formatting, no code blocks) with: { "score": [0-100], "faces": [0-30], "emotional_resonance": [0-25], "color_psychology": [0-25], "storytelling": [0-20], "reasoning": "brief explanation" } """ logger.info("Sending Psychological Engagement prompt to OpenAI...") response = self._get_openai_response(image, prompt) logger.info(f"OpenAI Psychological Engagement response: {response}") try: cleaned_response = self._clean_json_response(response) logger.info(f"Cleaned response: {cleaned_response}") result = json.loads(cleaned_response) score = result.get('score', 0) logger.info(f"Psychological Engagement Score parsed successfully: {score}") return score except json.JSONDecodeError as e: logger.error(f"Failed to parse Psychological Engagement JSON: {e}") logger.error(f"Raw response: {response}") logger.error(f"Cleaned response: {cleaned_response}") return 50 def _get_trend_zeitgeist_score(self, image: Image) -> float: """Get Trend & Zeitgeist Score (20% weight)""" logger.info("Starting Trend & Zeitgeist Score analysis...") prompt = """ Analyze this image for trend alignment and zeitgeist. Consider: 1. Aesthetic Alignment (0-60 points): Does it align with current visual trends? 2. Authenticity Index (0-40 points): Does it feel authentic and genuine? Return ONLY a raw JSON object (no markdown formatting, no code blocks) with: { "score": [0-100], "aesthetic_alignment": [0-60], "authenticity": [0-40], "detected_aesthetic": "e.g., Y2K, Maximalist, Minimalist, etc.", "reasoning": "brief explanation" } """ logger.info("Sending Trend & Zeitgeist prompt to OpenAI...") response = self._get_openai_response(image, prompt) logger.info(f"OpenAI Trend & Zeitgeist response: {response}") try: cleaned_response = self._clean_json_response(response) logger.info(f"Cleaned response: {cleaned_response}") result = json.loads(cleaned_response) score = result.get('score', 0) logger.info(f"Trend & Zeitgeist Score parsed successfully: {score}") return score except json.JSONDecodeError as e: logger.error(f"Failed to parse Trend & Zeitgeist JSON: {e}") logger.error(f"Raw response: {response}") logger.error(f"Cleaned response: {cleaned_response}") return 50 def _get_all_scores_combined(self, image: Image, user_preferences: Optional[Dict] = None) -> Dict: """Get all scores in a single API call with personalization""" # Build personalized prompt based on user preferences if user_preferences: aesthetic = user_preferences.get('aesthetic', 'General') niche = user_preferences.get('niche', 'General') target_audience = user_preferences.get('target_audience', 'General') content_type = user_preferences.get('content_type', 'Social Media Post') brand_voice = user_preferences.get('brand_voice', 'Neutral') prompt = f""" Analyze this image for social media scoring with personalized criteria. USER PREFERENCES: - Aesthetic: {aesthetic} - Niche: {niche} - Target Audience: {target_audience} - Content Type: {content_type} - Brand Voice: {brand_voice} Score this image based on how well it aligns with the user's specific preferences above. Technical Quality (0-100): Sharpness, resolution, noise, dynamic range, color fidelity Compositional Strength (0-100): Rule of thirds, leading lines, balance, depth, subject isolation Psychological Engagement (0-100): Face detection, emotional resonance, color psychology, storytelling Trend & Zeitgeist (0-100): Alignment with {aesthetic} aesthetic for {niche} targeting {target_audience} Return ONLY a raw JSON object (no markdown formatting, no code blocks) with: {{ "technical_score": [0-100], "compositional_score": [0-100], "psychological_score": [0-100], "trend_score": [0-100], "reasoning": "brief explanation considering user preferences" }} """ else: prompt = """ Analyze this image for social media scoring. Provide scores for all four categories in a single JSON response. Technical Quality (0-100): Sharpness, resolution, noise, dynamic range, color fidelity Compositional Strength (0-100): Rule of thirds, leading lines, balance, depth, subject isolation Psychological Engagement (0-100): Face detection, emotional resonance, color psychology, storytelling Trend & Zeitgeist (0-100): Aesthetic alignment, authenticity index Return ONLY a raw JSON object (no markdown formatting, no code blocks) with: { "technical_score": [0-100], "compositional_score": [0-100], "psychological_score": [0-100], "trend_score": [0-100], "reasoning": "brief overall explanation" } """ logger.info("Sending combined scoring prompt to OpenAI...") response = self._get_openai_response(image, prompt) logger.info(f"OpenAI combined scoring response: {response}") try: cleaned_response = self._clean_json_response(response) logger.info(f"Cleaned combined response: {cleaned_response}") result = json.loads(cleaned_response) scores = { 'technical_score': result.get('technical_score', 50), 'compositional_score': result.get('compositional_score', 50), 'psychological_score': result.get('psychological_score', 50), 'trend_score': result.get('trend_score', 50) } logger.info(f"Combined scores parsed successfully: {scores}") return scores except json.JSONDecodeError as e: logger.error(f"Failed to parse combined scores JSON: {e}") logger.error(f"Raw response: {response}") return { 'technical_score': 50, 'compositional_score': 50, 'psychological_score': 50, 'trend_score': 50 } def _get_all_details_combined(self, image: Image, user_preferences: Optional[Dict] = None) -> Dict: """Get all detailed analyses in a single API call with personalization""" # Build personalized prompt based on user preferences if user_preferences: aesthetic = user_preferences.get('aesthetic', 'General') niche = user_preferences.get('niche', 'General') target_audience = user_preferences.get('target_audience', 'General') prompt = f""" Provide detailed analysis for this image in four categories, considering the user's preferences: - Aesthetic: {aesthetic} - Niche: {niche} - Target Audience: {target_audience} Return as a JSON object with: - technical_details: Brief technical analysis focusing on sharpness, noise, and color quality - compositional_details: Brief compositional analysis focusing on framing, balance, and visual flow - psychological_details: Brief psychological analysis focusing on emotional impact and engagement potential for {target_audience} - trend_details: Brief trend analysis focusing on alignment with {aesthetic} aesthetic for {niche} content Return ONLY a raw JSON object (no markdown formatting, no code blocks) with: {{ "technical_details": "brief technical analysis", "compositional_details": "brief compositional analysis", "psychological_details": "brief psychological analysis", "trend_details": "brief trend analysis" }} """ else: prompt = """ Provide detailed analysis for this image in four categories. Return as a JSON object with: - technical_details: Brief technical analysis focusing on sharpness, noise, and color quality - compositional_details: Brief compositional analysis focusing on framing, balance, and visual flow - psychological_details: Brief psychological analysis focusing on emotional impact and engagement potential - trend_details: Brief trend analysis focusing on current aesthetic alignment and cultural relevance Return ONLY a raw JSON object (no markdown formatting, no code blocks) with: { "technical_details": "brief technical analysis", "compositional_details": "brief compositional analysis", "psychological_details": "brief psychological analysis", "trend_details": "brief trend analysis" } """ logger.info("Sending combined details prompt to OpenAI...") response = self._get_openai_response(image, prompt) logger.info(f"OpenAI combined details response: {response}") try: cleaned_response = self._clean_json_response(response) logger.info(f"Cleaned combined details response: {cleaned_response}") result = json.loads(cleaned_response) details = { 'technical_details': result.get('technical_details', 'Technical analysis not available'), 'compositional_details': result.get('compositional_details', 'Compositional analysis not available'), 'psychological_details': result.get('psychological_details', 'Psychological analysis not available'), 'trend_details': result.get('trend_details', 'Trend analysis not available') } logger.info(f"Combined details parsed successfully") return details except json.JSONDecodeError as e: logger.error(f"Failed to parse combined details JSON: {e}") logger.error(f"Raw response: {response}") return { 'technical_details': 'Technical analysis not available', 'compositional_details': 'Compositional analysis not available', 'psychological_details': 'Psychological analysis not available', 'trend_details': 'Trend analysis not available' } def _clean_json_response(self, response: str) -> str: """Clean JSON response by removing markdown formatting""" cleaned_response = response.strip() if cleaned_response.startswith('```json'): cleaned_response = cleaned_response[7:] # Remove ```json if cleaned_response.startswith('```'): cleaned_response = cleaned_response[3:] # Remove ``` if cleaned_response.endswith('```'): cleaned_response = cleaned_response[:-3] # Remove trailing ``` return cleaned_response.strip() def _get_openai_response(self, image: Image, prompt: str) -> str: """Get response from OpenAI GPT-4 Vision""" logger.info("Preparing image for OpenAI API...") try: # Convert PIL image to base64 img_buffer = io.BytesIO() image.save(img_buffer, format='JPEG') img_str = base64.b64encode(img_buffer.getvalue()).decode() logger.info(f"Image converted to base64. Size: {len(img_str)} characters") logger.info("Sending request to OpenAI API...") response = self.client.chat.completions.create( model="gpt-4o-mini", messages=[ { "role": "user", "content": [ {"type": "text", "text": prompt}, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{img_str}" } } ] } ], max_tokens=500 ) content = response.choices[0].message.content logger.info(f"OpenAI API response received. Length: {len(content)} characters") logger.info(f"OpenAI API usage: {response.usage}") return content except Exception as e: logger.error(f"OpenAI API error: {e}", exc_info=True) return "{}" def _get_technical_details(self, image: Image) -> str: """Get detailed technical analysis""" logger.info("Getting detailed technical analysis...") prompt = "Provide a brief technical analysis of this image focusing on sharpness, noise, and color quality." response = self._get_openai_response(image, prompt) logger.info(f"Technical details received: {len(response)} characters") return response def _get_compositional_details(self, image: Image) -> str: """Get detailed compositional analysis""" logger.info("Getting detailed compositional analysis...") prompt = "Provide a brief compositional analysis of this image focusing on framing, balance, and visual flow." response = self._get_openai_response(image, prompt) logger.info(f"Compositional details received: {len(response)} characters") return response def _get_psychological_details(self, image: Image) -> str: """Get detailed psychological analysis""" logger.info("Getting detailed psychological analysis...") prompt = "Provide a brief psychological analysis of this image focusing on emotional impact and engagement potential." response = self._get_openai_response(image, prompt) logger.info(f"Psychological details received: {len(response)} characters") return response def _get_trend_details(self, image: Image) -> str: """Get detailed trend analysis""" logger.info("Getting detailed trend analysis...") prompt = "Provide a brief trend analysis of this image focusing on current aesthetic alignment and cultural relevance." response = self._get_openai_response(image, prompt) logger.info(f"Trend details received: {len(response)} characters") return response def _get_recommendations(self, image: Image, final_score: float, user_preferences: Optional[Dict] = None) -> List[str]: """Get improvement recommendations based on score""" logger.info(f"Generating recommendations for score: {final_score}") # Build personalized prompt based on user preferences if user_preferences: aesthetic = user_preferences.get('aesthetic', 'General') niche = user_preferences.get('niche', 'General') target_audience = user_preferences.get('target_audience', 'General') content_type = user_preferences.get('content_type', 'Social Media Post') brand_voice = user_preferences.get('brand_voice', 'Neutral') prompt = f""" This image received a social media score of {final_score}/100. Provide 3 specific, actionable recommendations to improve the score, considering the user's preferences: - Aesthetic: {aesthetic} - Niche: {niche} - Target Audience: {target_audience} - Content Type: {content_type} - Brand Voice: {brand_voice} Focus on practical tips that could be implemented quickly. """ else: prompt = f""" This image received a social media score of {final_score}/100. Provide 3 specific, actionable recommendations to improve the score. Focus on practical tips that could be implemented quickly. """ response = self._get_openai_response(image, prompt) logger.info(f"Recommendations response: {response}") # Parse recommendations more intelligently lines = response.split('\n') recommendations = [] current_recommendation = "" for line in lines: line = line.strip() # Look for numbered recommendations (1., 2., 3., etc.) if line and (line.startswith('1.') or line.startswith('2.') or line.startswith('3.') or line.startswith('**1.') or line.startswith('**2.') or line.startswith('**3.')): # If we have a previous recommendation, save it if current_recommendation: recommendations.append(current_recommendation.strip()) # Start new recommendation clean_line = line.replace('**', '').strip() if clean_line.startswith('1.'): current_recommendation = clean_line[2:].strip() elif clean_line.startswith('2.'): current_recommendation = clean_line[2:].strip() elif clean_line.startswith('3.'): current_recommendation = clean_line[2:].strip() else: current_recommendation = clean_line elif line and current_recommendation and not line.startswith('To improve') and not line.startswith('Implementing'): # Continue building the current recommendation current_recommendation += " " + line # Add the last recommendation if current_recommendation: recommendations.append(current_recommendation.strip()) # If we didn't find numbered recommendations, try a simpler approach if len(recommendations) < 3: recommendations = [line.strip() for line in lines if line.strip() and not line.startswith('To improve') and not line.startswith('Implementing') and len(line.strip()) > 20][:3] logger.info(f"Parsed {len(recommendations)} recommendations") return recommendations # Test function def test_scorer(): """Test the scorer with a sample image""" logger.info("Starting test_scorer function...") scorer = ViralVelocityScorer() # You'll need to provide a test image path test_image_path = "test_image.jpg" # Replace with actual image path if os.path.exists(test_image_path): logger.info(f"Test image found: {test_image_path}") result = scorer.analyze_image(test_image_path) logger.info("Test completed successfully") print(json.dumps(result, indent=2)) else: logger.warning(f"Test image not found: {test_image_path}") print(f"Test image not found: {test_image_path}") print("Please provide a test image to run the scorer.") if __name__ == "__main__": test_scorer()