import os import logging from typing import Dict, Tuple, Optional from google.cloud import vision from PIL import Image import io # Set up logging logger = logging.getLogger(__name__) class ContentModerator: """ Content Safety & Moderation using Google Cloud Vision SafeSearch Detects inappropriate content before image scoring """ def __init__(self): """Initialize the content moderator with Google Cloud Vision""" logger.info("Initializing ContentModerator...") # Check for Google Cloud credentials credentials_path = os.getenv('GOOGLE_APPLICATION_CREDENTIALS') if not credentials_path: logger.warning("GOOGLE_APPLICATION_CREDENTIALS not found. Content moderation will be disabled.") self.client = None self.moderation_enabled = False else: try: self.client = vision.ImageAnnotatorClient() self.moderation_enabled = True logger.info("Google Cloud Vision client initialized successfully") except Exception as e: logger.error(f"Failed to initialize Google Cloud Vision client: {e}") self.client = None self.moderation_enabled = False def check_content_safety(self, image_path: str) -> Tuple[bool, Dict]: """ Check if image content is safe for processing Returns: Tuple[bool, Dict]: (is_safe, moderation_details) """ logger.info(f"Starting content safety check for: {image_path}") if not self.moderation_enabled: logger.warning("Content moderation disabled - skipping safety check") return True, {"status": "moderation_disabled", "reason": "Google Cloud credentials not configured"} try: # Load image with open(image_path, 'rb') as image_file: content = image_file.read() # Create image object for Google Cloud Vision image = vision.Image(content=content) # Perform SafeSearch detection logger.info("Performing SafeSearch detection...") response = self.client.safe_search_detection(image=image) safe_search = response.safe_search_annotation # Debug: Log the response structure logger.info(f"SafeSearch response type: {type(safe_search)}") logger.info(f"SafeSearch attributes: {dir(safe_search)}") logger.info(f"Adult likelihood: {safe_search.adult}") logger.info(f"Violence likelihood: {safe_search.violence}") # Analyze results moderation_result = self._analyze_safe_search_results(safe_search) # Determine if content is safe is_safe = moderation_result['is_safe'] logger.info(f"Content safety check complete. Safe: {is_safe}") if not is_safe: logger.warning(f"Content rejected: {moderation_result['rejection_reason']}") return is_safe, moderation_result except Exception as e: logger.error(f"Content safety check failed: {e}", exc_info=True) # In case of error, we'll allow the content but log the issue return True, { "status": "error", "error": str(e), "is_safe": True, # Default to safe on error "rejection_reason": None } def _analyze_safe_search_results(self, safe_search) -> Dict: """ Analyze Google Cloud Vision SafeSearch results Returns: Dict: Detailed moderation analysis """ # Define risk levels - using the correct API structure risk_levels = { vision.Likelihood.UNKNOWN: 0, vision.Likelihood.VERY_UNLIKELY: 1, vision.Likelihood.UNLIKELY: 2, vision.Likelihood.POSSIBLE: 3, vision.Likelihood.LIKELY: 4, vision.Likelihood.VERY_LIKELY: 5 } # Get risk scores adult_risk = risk_levels.get(safe_search.adult, 0) violence_risk = risk_levels.get(safe_search.violence, 0) racy_risk = risk_levels.get(safe_search.racy, 0) medical_risk = risk_levels.get(safe_search.medical, 0) spoof_risk = risk_levels.get(safe_search.spoof, 0) # Define thresholds for rejection REJECTION_THRESHOLDS = { 'adult': 4, # LIKELY or VERY_LIKELY 'violence': 4, # LIKELY or VERY_LIKELY 'racy': 5, # VERY_LIKELY only 'medical': 4, # LIKELY or VERY_LIKELY 'spoof': 4 # LIKELY or VERY_LIKELY } # Check for violations violations = [] rejection_reason = None if adult_risk >= REJECTION_THRESHOLDS['adult']: violations.append(f"Adult content (risk level: {adult_risk})") rejection_reason = "Contains inappropriate adult content" if violence_risk >= REJECTION_THRESHOLDS['violence']: violations.append(f"Violence (risk level: {violence_risk})") rejection_reason = "Contains violent or graphic content" if racy_risk >= REJECTION_THRESHOLDS['racy']: violations.append(f"Racy content (risk level: {racy_risk})") rejection_reason = "Contains suggestive or racy content" if medical_risk >= REJECTION_THRESHOLDS['medical']: violations.append(f"Medical content (risk level: {medical_risk})") rejection_reason = "Contains medical or graphic content" if spoof_risk >= REJECTION_THRESHOLDS['spoof']: violations.append(f"Spoof content (risk level: {spoof_risk})") rejection_reason = "Contains spoof or manipulated content" # Determine if content is safe is_safe = len(violations) == 0 return { "is_safe": is_safe, "rejection_reason": rejection_reason, "violations": violations, "risk_scores": { "adult": adult_risk, "violence": violence_risk, "racy": racy_risk, "medical": medical_risk, "spoof": spoof_risk }, "risk_levels": { "adult": str(safe_search.adult), "violence": str(safe_search.violence), "racy": str(safe_search.racy), "medical": str(safe_search.medical), "spoof": str(safe_search.spoof) }, "status": "completed" } def get_moderation_status(self) -> Dict: """Get the current moderation system status""" return { "moderation_enabled": self.moderation_enabled, "client_initialized": self.client is not None, "credentials_configured": os.getenv('GOOGLE_APPLICATION_CREDENTIALS') is not None }