Files
viral_velocity/content_moderator.py
Aherobo Ovie Victor e559238be5 Initial commit
2025-10-27 18:43:42 +01:00

179 lines
7.0 KiB
Python

import os
import logging
from typing import Dict, Tuple, Optional
from google.cloud import vision
from PIL import Image
import io
# Set up logging
logger = logging.getLogger(__name__)
class ContentModerator:
"""
Content Safety & Moderation using Google Cloud Vision SafeSearch
Detects inappropriate content before image scoring
"""
def __init__(self):
"""Initialize the content moderator with Google Cloud Vision"""
logger.info("Initializing ContentModerator...")
# Check for Google Cloud credentials
credentials_path = os.getenv('GOOGLE_APPLICATION_CREDENTIALS')
if not credentials_path:
logger.warning("GOOGLE_APPLICATION_CREDENTIALS not found. Content moderation will be disabled.")
self.client = None
self.moderation_enabled = False
else:
try:
self.client = vision.ImageAnnotatorClient()
self.moderation_enabled = True
logger.info("Google Cloud Vision client initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Google Cloud Vision client: {e}")
self.client = None
self.moderation_enabled = False
def check_content_safety(self, image_path: str) -> Tuple[bool, Dict]:
"""
Check if image content is safe for processing
Returns:
Tuple[bool, Dict]: (is_safe, moderation_details)
"""
logger.info(f"Starting content safety check for: {image_path}")
if not self.moderation_enabled:
logger.warning("Content moderation disabled - skipping safety check")
return True, {"status": "moderation_disabled", "reason": "Google Cloud credentials not configured"}
try:
# Load image
with open(image_path, 'rb') as image_file:
content = image_file.read()
# Create image object for Google Cloud Vision
image = vision.Image(content=content)
# Perform SafeSearch detection
logger.info("Performing SafeSearch detection...")
response = self.client.safe_search_detection(image=image)
safe_search = response.safe_search_annotation
# Debug: Log the response structure
logger.info(f"SafeSearch response type: {type(safe_search)}")
logger.info(f"SafeSearch attributes: {dir(safe_search)}")
logger.info(f"Adult likelihood: {safe_search.adult}")
logger.info(f"Violence likelihood: {safe_search.violence}")
# Analyze results
moderation_result = self._analyze_safe_search_results(safe_search)
# Determine if content is safe
is_safe = moderation_result['is_safe']
logger.info(f"Content safety check complete. Safe: {is_safe}")
if not is_safe:
logger.warning(f"Content rejected: {moderation_result['rejection_reason']}")
return is_safe, moderation_result
except Exception as e:
logger.error(f"Content safety check failed: {e}", exc_info=True)
# In case of error, we'll allow the content but log the issue
return True, {
"status": "error",
"error": str(e),
"is_safe": True, # Default to safe on error
"rejection_reason": None
}
def _analyze_safe_search_results(self, safe_search) -> Dict:
"""
Analyze Google Cloud Vision SafeSearch results
Returns:
Dict: Detailed moderation analysis
"""
# Define risk levels - using the correct API structure
risk_levels = {
vision.Likelihood.UNKNOWN: 0,
vision.Likelihood.VERY_UNLIKELY: 1,
vision.Likelihood.UNLIKELY: 2,
vision.Likelihood.POSSIBLE: 3,
vision.Likelihood.LIKELY: 4,
vision.Likelihood.VERY_LIKELY: 5
}
# Get risk scores
adult_risk = risk_levels.get(safe_search.adult, 0)
violence_risk = risk_levels.get(safe_search.violence, 0)
racy_risk = risk_levels.get(safe_search.racy, 0)
medical_risk = risk_levels.get(safe_search.medical, 0)
spoof_risk = risk_levels.get(safe_search.spoof, 0)
# Define thresholds for rejection
REJECTION_THRESHOLDS = {
'adult': 4, # LIKELY or VERY_LIKELY
'violence': 4, # LIKELY or VERY_LIKELY
'racy': 5, # VERY_LIKELY only
'medical': 4, # LIKELY or VERY_LIKELY
'spoof': 4 # LIKELY or VERY_LIKELY
}
# Check for violations
violations = []
rejection_reason = None
if adult_risk >= REJECTION_THRESHOLDS['adult']:
violations.append(f"Adult content (risk level: {adult_risk})")
rejection_reason = "Contains inappropriate adult content"
if violence_risk >= REJECTION_THRESHOLDS['violence']:
violations.append(f"Violence (risk level: {violence_risk})")
rejection_reason = "Contains violent or graphic content"
if racy_risk >= REJECTION_THRESHOLDS['racy']:
violations.append(f"Racy content (risk level: {racy_risk})")
rejection_reason = "Contains suggestive or racy content"
if medical_risk >= REJECTION_THRESHOLDS['medical']:
violations.append(f"Medical content (risk level: {medical_risk})")
rejection_reason = "Contains medical or graphic content"
if spoof_risk >= REJECTION_THRESHOLDS['spoof']:
violations.append(f"Spoof content (risk level: {spoof_risk})")
rejection_reason = "Contains spoof or manipulated content"
# Determine if content is safe
is_safe = len(violations) == 0
return {
"is_safe": is_safe,
"rejection_reason": rejection_reason,
"violations": violations,
"risk_scores": {
"adult": adult_risk,
"violence": violence_risk,
"racy": racy_risk,
"medical": medical_risk,
"spoof": spoof_risk
},
"risk_levels": {
"adult": str(safe_search.adult),
"violence": str(safe_search.violence),
"racy": str(safe_search.racy),
"medical": str(safe_search.medical),
"spoof": str(safe_search.spoof)
},
"status": "completed"
}
def get_moderation_status(self) -> Dict:
"""Get the current moderation system status"""
return {
"moderation_enabled": self.moderation_enabled,
"client_initialized": self.client is not None,
"credentials_configured": os.getenv('GOOGLE_APPLICATION_CREDENTIALS') is not None
}