468 lines
20 KiB
Python
468 lines
20 KiB
Python
|
|
import os
|
||
|
|
import base64
|
||
|
|
import json
|
||
|
|
import logging
|
||
|
|
from typing import Dict, List, Optional, Tuple
|
||
|
|
import google.generativeai as genai
|
||
|
|
from PIL import Image
|
||
|
|
import io
|
||
|
|
import uuid
|
||
|
|
from dotenv import load_dotenv
|
||
|
|
|
||
|
|
# Load environment variables
|
||
|
|
load_dotenv()
|
||
|
|
|
||
|
|
# Set up logging
|
||
|
|
logging.basicConfig(
|
||
|
|
level=logging.INFO,
|
||
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
|
|
handlers=[
|
||
|
|
logging.FileHandler('image_enhancer.log'),
|
||
|
|
logging.StreamHandler()
|
||
|
|
]
|
||
|
|
)
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
class ImageEnhancer:
|
||
|
|
"""
|
||
|
|
AI Image Enhancement using Google Gemini 2.0 Flash Preview Image Generation
|
||
|
|
Generates 5 enhanced versions of uploaded images
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self):
|
||
|
|
"""Initialize the image enhancer with Gemini"""
|
||
|
|
logger.info("Initializing ImageEnhancer with Gemini...")
|
||
|
|
|
||
|
|
api_key = os.getenv('GEMINI_API_KEY')
|
||
|
|
if not api_key:
|
||
|
|
logger.error("GEMINI_API_KEY not found in environment variables")
|
||
|
|
raise ValueError("GEMINI_API_KEY not found in environment variables")
|
||
|
|
|
||
|
|
logger.info("Gemini API key found, initializing client...")
|
||
|
|
genai.configure(api_key=api_key)
|
||
|
|
|
||
|
|
# Try different models for image generation
|
||
|
|
try:
|
||
|
|
# First try the experimental model
|
||
|
|
self.model = genai.GenerativeModel('gemini-2.0-flash-exp')
|
||
|
|
logger.info("Using gemini-2.0-flash-exp model")
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning(f"Failed to initialize gemini-2.0-flash-exp: {e}")
|
||
|
|
try:
|
||
|
|
# Fallback to standard model
|
||
|
|
self.model = genai.GenerativeModel('gemini-1.5-flash')
|
||
|
|
logger.info("Using gemini-1.5-flash model")
|
||
|
|
except Exception as e2:
|
||
|
|
logger.error(f"Failed to initialize any Gemini model: {e2}")
|
||
|
|
raise e2
|
||
|
|
|
||
|
|
logger.info("ImageEnhancer initialization complete")
|
||
|
|
|
||
|
|
def enhance_image(self, image_path: str, user_preferences: Optional[Dict] = None) -> Dict:
|
||
|
|
"""
|
||
|
|
Generate 5 enhanced versions of an image
|
||
|
|
|
||
|
|
Args:
|
||
|
|
image_path: Path to the original image
|
||
|
|
user_preferences: User preferences for enhancement style
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Dict containing enhanced images and metadata
|
||
|
|
"""
|
||
|
|
logger.info(f"Starting image enhancement for: {image_path}")
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Load original image
|
||
|
|
original_image = Image.open(image_path)
|
||
|
|
logger.info(f"Original image loaded. Size: {original_image.size}, Mode: {original_image.mode}")
|
||
|
|
|
||
|
|
# Analyze original image to understand what needs enhancement
|
||
|
|
analysis = self._analyze_image_for_enhancement(original_image)
|
||
|
|
logger.info(f"Image analysis complete: {analysis['issues_found']}")
|
||
|
|
|
||
|
|
# Generate enhancement prompts based on analysis and user preferences
|
||
|
|
enhancement_prompts = self._generate_enhancement_prompts(analysis, user_preferences)
|
||
|
|
logger.info(f"Generated {len(enhancement_prompts)} enhancement prompts")
|
||
|
|
|
||
|
|
# Generate enhanced images
|
||
|
|
enhanced_images = []
|
||
|
|
for i, prompt in enumerate(enhancement_prompts):
|
||
|
|
logger.info(f"Generating enhanced image {i+1}/5...")
|
||
|
|
enhanced_image = self._generate_enhanced_image(original_image, prompt, i+1)
|
||
|
|
if enhanced_image:
|
||
|
|
enhanced_images.append(enhanced_image)
|
||
|
|
|
||
|
|
logger.info(f"Successfully generated {len(enhanced_images)} enhanced images")
|
||
|
|
|
||
|
|
return {
|
||
|
|
'status': 'success',
|
||
|
|
'original_image': {
|
||
|
|
'path': image_path,
|
||
|
|
'size': original_image.size,
|
||
|
|
'analysis': analysis
|
||
|
|
},
|
||
|
|
'enhanced_images': enhanced_images,
|
||
|
|
'total_generated': len(enhanced_images),
|
||
|
|
'enhancement_prompts': enhancement_prompts
|
||
|
|
}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Image enhancement failed: {str(e)}", exc_info=True)
|
||
|
|
return {
|
||
|
|
'status': 'error',
|
||
|
|
'error': f'Enhancement failed: {str(e)}',
|
||
|
|
'enhanced_images': []
|
||
|
|
}
|
||
|
|
|
||
|
|
def _analyze_image_for_enhancement(self, image: Image) -> Dict:
|
||
|
|
"""
|
||
|
|
Analyze image to identify enhancement opportunities
|
||
|
|
"""
|
||
|
|
logger.info("Analyzing image for enhancement opportunities...")
|
||
|
|
|
||
|
|
prompt = """
|
||
|
|
Analyze this image and identify specific issues that could be improved for social media.
|
||
|
|
Focus on technical and compositional issues that can be enhanced without changing personal appearance.
|
||
|
|
|
||
|
|
Look for:
|
||
|
|
1. Blurry or out-of-focus areas
|
||
|
|
2. Closed eyes in group photos
|
||
|
|
3. Unwanted objects (fingers, passing people, etc.)
|
||
|
|
4. Poor lighting or exposure
|
||
|
|
5. Composition issues
|
||
|
|
6. Color balance problems
|
||
|
|
7. Noise or grain
|
||
|
|
8. Cropping opportunities
|
||
|
|
|
||
|
|
Return ONLY a JSON object with:
|
||
|
|
{
|
||
|
|
"issues_found": ["list of specific issues"],
|
||
|
|
"enhancement_priorities": ["ordered list of what to fix first"],
|
||
|
|
"overall_quality": "good/medium/poor",
|
||
|
|
"main_subject": "description of main subject",
|
||
|
|
"background": "description of background",
|
||
|
|
"lighting": "description of lighting conditions"
|
||
|
|
}
|
||
|
|
"""
|
||
|
|
|
||
|
|
try:
|
||
|
|
response = self._get_gemini_response(image, prompt)
|
||
|
|
analysis = json.loads(self._clean_json_response(response))
|
||
|
|
logger.info(f"Image analysis: {analysis['issues_found']} issues found")
|
||
|
|
return analysis
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Image analysis failed: {e}")
|
||
|
|
return {
|
||
|
|
"issues_found": ["general enhancement needed"],
|
||
|
|
"enhancement_priorities": ["improve overall quality"],
|
||
|
|
"overall_quality": "medium",
|
||
|
|
"main_subject": "person or object",
|
||
|
|
"background": "various",
|
||
|
|
"lighting": "mixed"
|
||
|
|
}
|
||
|
|
|
||
|
|
def _generate_enhancement_prompts(self, analysis: Dict, user_preferences: Optional[Dict] = None) -> List[str]:
|
||
|
|
"""
|
||
|
|
Generate 5 different enhancement prompts focused on fixing imperfections without changing personal appearance
|
||
|
|
"""
|
||
|
|
logger.info("Generating enhancement prompts...")
|
||
|
|
|
||
|
|
issues = analysis.get('issues_found', [])
|
||
|
|
priorities = analysis.get('enhancement_priorities', [])
|
||
|
|
|
||
|
|
# Base enhancement focus areas (as per transcript requirements)
|
||
|
|
enhancement_focuses = [
|
||
|
|
"fix blurry areas and improve overall sharpness and focus",
|
||
|
|
"correct closed eyes and improve facial clarity and expressions",
|
||
|
|
"remove unwanted objects and clean up the background",
|
||
|
|
"enhance lighting, exposure, and color balance",
|
||
|
|
"improve composition, framing, and overall image quality"
|
||
|
|
]
|
||
|
|
|
||
|
|
prompts = []
|
||
|
|
for i, focus in enumerate(enhancement_focuses):
|
||
|
|
# Create specific enhancement prompt focused on fixing imperfections
|
||
|
|
prompt = f"Enhance this image by {focus}. "
|
||
|
|
|
||
|
|
# Add specific fixes based on analysis
|
||
|
|
if issues:
|
||
|
|
specific_fixes = []
|
||
|
|
for issue in issues[:3]: # Focus on top 3 issues
|
||
|
|
if 'blur' in issue.lower() or 'focus' in issue.lower():
|
||
|
|
specific_fixes.append("fix any blurry or out-of-focus areas")
|
||
|
|
elif 'eye' in issue.lower():
|
||
|
|
specific_fixes.append("ensure all eyes are open and clear")
|
||
|
|
elif 'light' in issue.lower() or 'exposure' in issue.lower():
|
||
|
|
specific_fixes.append("improve lighting and exposure")
|
||
|
|
elif 'color' in issue.lower():
|
||
|
|
specific_fixes.append("enhance color balance and vibrancy")
|
||
|
|
elif 'noise' in issue.lower():
|
||
|
|
specific_fixes.append("reduce noise and improve clarity")
|
||
|
|
elif 'object' in issue.lower() or 'finger' in issue.lower():
|
||
|
|
specific_fixes.append("remove unwanted objects or distractions")
|
||
|
|
|
||
|
|
if specific_fixes:
|
||
|
|
prompt += "Specifically address: " + ", ".join(specific_fixes) + ". "
|
||
|
|
|
||
|
|
# Add user preference context for styling (not personal appearance)
|
||
|
|
if user_preferences:
|
||
|
|
aesthetic = user_preferences.get('aesthetic', '')
|
||
|
|
niche = user_preferences.get('niche', '')
|
||
|
|
if aesthetic and niche:
|
||
|
|
prompt += f"Apply {aesthetic} aesthetic styling for {niche} content. "
|
||
|
|
|
||
|
|
# Critical instructions: NO personal appearance changes
|
||
|
|
prompt += "IMPORTANT: Do NOT alter personal appearance, body shape, facial features, or make anyone look like a celebrity. Only fix technical image issues like blur, lighting, composition, and remove unwanted objects. Maintain the original person's appearance exactly as they are."
|
||
|
|
|
||
|
|
prompts.append(prompt)
|
||
|
|
|
||
|
|
logger.info(f"Generated {len(prompts)} enhancement prompts focused on fixing imperfections")
|
||
|
|
return prompts
|
||
|
|
|
||
|
|
def _generate_enhanced_image(self, original_image: Image, prompt: str, version: int) -> Optional[Dict]:
|
||
|
|
"""
|
||
|
|
Generate a single enhanced image using Gemini 2.0 Flash Preview Image Generation
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
logger.info(f"Generating enhanced image version {version} with Gemini...")
|
||
|
|
|
||
|
|
# Prepare the image for Gemini
|
||
|
|
img_buffer = io.BytesIO()
|
||
|
|
original_image.save(img_buffer, format='JPEG', quality=95)
|
||
|
|
img_buffer.seek(0)
|
||
|
|
|
||
|
|
# Create the content for Gemini (image + text prompt)
|
||
|
|
content = [
|
||
|
|
{
|
||
|
|
"mime_type": "image/jpeg",
|
||
|
|
"data": img_buffer.getvalue()
|
||
|
|
},
|
||
|
|
prompt
|
||
|
|
]
|
||
|
|
|
||
|
|
# Generate enhanced image using Gemini 2.0 Flash Preview
|
||
|
|
logger.info(f"Sending enhancement request to Gemini for version {version}...")
|
||
|
|
|
||
|
|
# Try to generate content with image generation capability
|
||
|
|
try:
|
||
|
|
response = self.model.generate_content(
|
||
|
|
content,
|
||
|
|
generation_config=genai.types.GenerationConfig(
|
||
|
|
temperature=0.7,
|
||
|
|
max_output_tokens=8192,
|
||
|
|
)
|
||
|
|
)
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Gemini generation failed: {e}")
|
||
|
|
# Fallback to placeholder enhancement
|
||
|
|
return self._create_enhanced_placeholder(original_image, prompt, version)
|
||
|
|
|
||
|
|
# Log the response structure for debugging
|
||
|
|
logger.info(f"Response type: {type(response)}")
|
||
|
|
logger.info(f"Response parts: {len(response.parts) if hasattr(response, 'parts') else 'No parts'}")
|
||
|
|
|
||
|
|
# Check if response contains an image
|
||
|
|
if hasattr(response, 'parts') and response.parts:
|
||
|
|
for i, part in enumerate(response.parts):
|
||
|
|
logger.info(f"Part {i}: {type(part)}")
|
||
|
|
logger.info(f"Part {i} attributes: {dir(part)}")
|
||
|
|
|
||
|
|
# Check for inline_data (image data)
|
||
|
|
if hasattr(part, 'inline_data') and part.inline_data:
|
||
|
|
logger.info(f"Found inline_data in part {i}")
|
||
|
|
enhanced_image_path = self._save_gemini_image(part.inline_data.data, version)
|
||
|
|
|
||
|
|
if enhanced_image_path:
|
||
|
|
logger.info(f"Enhanced image {version} saved to: {enhanced_image_path}")
|
||
|
|
return {
|
||
|
|
'version': version,
|
||
|
|
'prompt': prompt,
|
||
|
|
'image_path': enhanced_image_path,
|
||
|
|
'generation_method': 'gemini-2.0-flash-preview'
|
||
|
|
}
|
||
|
|
else:
|
||
|
|
logger.error(f"Failed to save enhanced image {version}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
# Check for text content (might contain image generation instructions)
|
||
|
|
elif hasattr(part, 'text') and part.text:
|
||
|
|
logger.info(f"Part {i} contains text: {part.text[:200]}...")
|
||
|
|
|
||
|
|
# If Gemini returns text instead of image, try to extract image generation instructions
|
||
|
|
if "generate" in part.text.lower() or "create" in part.text.lower():
|
||
|
|
logger.info("Gemini returned text instructions instead of image")
|
||
|
|
# Use the text as enhancement instructions
|
||
|
|
return self._create_enhanced_placeholder(original_image, prompt, version, part.text)
|
||
|
|
|
||
|
|
# If no image was generated, log the full response for debugging
|
||
|
|
logger.error(f"No image generated in response for version {version}")
|
||
|
|
logger.error(f"Full response: {response}")
|
||
|
|
|
||
|
|
# Check if Gemini provided enhancement instructions in text
|
||
|
|
enhancement_instructions = None
|
||
|
|
if hasattr(response, 'parts') and response.parts:
|
||
|
|
for part in response.parts:
|
||
|
|
if hasattr(part, 'text') and part.text:
|
||
|
|
enhancement_instructions = part.text
|
||
|
|
logger.info(f"Gemini provided enhancement instructions: {enhancement_instructions[:200]}...")
|
||
|
|
break
|
||
|
|
|
||
|
|
# Try alternative approach - create enhanced image based on instructions
|
||
|
|
return self._create_enhanced_placeholder(original_image, prompt, version, enhancement_instructions)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Failed to generate enhanced image {version}: {e}")
|
||
|
|
import traceback
|
||
|
|
traceback.print_exc()
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _save_gemini_image(self, image_data: bytes, version: int) -> Optional[str]:
|
||
|
|
"""
|
||
|
|
Save Gemini generated image to local storage
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
# Create enhanced images directory
|
||
|
|
enhanced_dir = "enhanced_images"
|
||
|
|
os.makedirs(enhanced_dir, exist_ok=True)
|
||
|
|
|
||
|
|
# Generate unique filename
|
||
|
|
filename = f"enhanced_v{version}_{uuid.uuid4().hex[:8]}.jpg"
|
||
|
|
filepath = os.path.join(enhanced_dir, filename)
|
||
|
|
|
||
|
|
# Save image data directly
|
||
|
|
with open(filepath, 'wb') as f:
|
||
|
|
f.write(image_data)
|
||
|
|
|
||
|
|
logger.info(f"Gemini enhanced image {version} saved to: {filepath}")
|
||
|
|
return filepath
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Failed to save Gemini image: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _create_enhanced_placeholder(self, original_image: Image, prompt: str, version: int, instructions: str = None) -> Optional[Dict]:
|
||
|
|
"""
|
||
|
|
Create a placeholder enhanced image when Gemini doesn't generate images
|
||
|
|
This applies basic image processing to simulate enhancement
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
logger.info(f"Creating enhanced placeholder for version {version}...")
|
||
|
|
|
||
|
|
# Create enhanced images directory
|
||
|
|
enhanced_dir = "enhanced_images"
|
||
|
|
os.makedirs(enhanced_dir, exist_ok=True)
|
||
|
|
|
||
|
|
# Generate unique filename
|
||
|
|
filename = f"enhanced_v{version}_{uuid.uuid4().hex[:8]}.jpg"
|
||
|
|
filepath = os.path.join(enhanced_dir, filename)
|
||
|
|
|
||
|
|
# Apply basic enhancements to simulate AI enhancement
|
||
|
|
enhanced_image = original_image.copy()
|
||
|
|
|
||
|
|
# Apply different enhancements based on version to create variety
|
||
|
|
enhancement_factors = {
|
||
|
|
1: {'brightness': 1.1, 'contrast': 1.05, 'color': 1.1, 'sharpness': True},
|
||
|
|
2: {'brightness': 1.05, 'contrast': 1.1, 'color': 1.05, 'sharpness': True},
|
||
|
|
3: {'brightness': 1.15, 'contrast': 1.0, 'color': 1.15, 'sharpness': False},
|
||
|
|
4: {'brightness': 1.0, 'contrast': 1.15, 'color': 1.0, 'sharpness': True},
|
||
|
|
5: {'brightness': 1.08, 'contrast': 1.08, 'color': 1.08, 'sharpness': True}
|
||
|
|
}
|
||
|
|
|
||
|
|
factors = enhancement_factors.get(version, enhancement_factors[1])
|
||
|
|
|
||
|
|
# Apply enhancements
|
||
|
|
from PIL import ImageEnhance, ImageFilter
|
||
|
|
|
||
|
|
# Brightness
|
||
|
|
if factors['brightness'] != 1.0:
|
||
|
|
enhancer = ImageEnhance.Brightness(enhanced_image)
|
||
|
|
enhanced_image = enhancer.enhance(factors['brightness'])
|
||
|
|
|
||
|
|
# Contrast
|
||
|
|
if factors['contrast'] != 1.0:
|
||
|
|
enhancer = ImageEnhance.Contrast(enhanced_image)
|
||
|
|
enhanced_image = enhancer.enhance(factors['contrast'])
|
||
|
|
|
||
|
|
# Color
|
||
|
|
if factors['color'] != 1.0:
|
||
|
|
enhancer = ImageEnhance.Color(enhanced_image)
|
||
|
|
enhanced_image = enhancer.enhance(factors['color'])
|
||
|
|
|
||
|
|
# Sharpness
|
||
|
|
if factors['sharpness']:
|
||
|
|
enhanced_image = enhanced_image.filter(ImageFilter.SHARPEN)
|
||
|
|
|
||
|
|
# Apply specific enhancements based on prompt
|
||
|
|
if "blur" in prompt.lower() or "sharp" in prompt.lower():
|
||
|
|
enhanced_image = enhanced_image.filter(ImageFilter.SHARPEN)
|
||
|
|
|
||
|
|
if "light" in prompt.lower() or "exposure" in prompt.lower():
|
||
|
|
enhancer = ImageEnhance.Brightness(enhanced_image)
|
||
|
|
enhanced_image = enhancer.enhance(1.1)
|
||
|
|
|
||
|
|
if "color" in prompt.lower():
|
||
|
|
enhancer = ImageEnhance.Color(enhanced_image)
|
||
|
|
enhanced_image = enhancer.enhance(1.1)
|
||
|
|
|
||
|
|
# Save the enhanced image
|
||
|
|
enhanced_image.save(filepath, 'JPEG', quality=95)
|
||
|
|
|
||
|
|
logger.info(f"Enhanced placeholder {version} saved to: {filepath}")
|
||
|
|
return {
|
||
|
|
'version': version,
|
||
|
|
'prompt': prompt,
|
||
|
|
'image_path': filepath,
|
||
|
|
'generation_method': 'ai-enhanced-placeholder',
|
||
|
|
'enhancement_factors': factors,
|
||
|
|
'gemini_instructions': instructions[:200] if instructions else None
|
||
|
|
}
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Failed to create enhanced placeholder {version}: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _get_gemini_response(self, image: Image, prompt: str) -> str:
|
||
|
|
"""Get response from Gemini for image analysis"""
|
||
|
|
try:
|
||
|
|
# Prepare the image for Gemini
|
||
|
|
img_buffer = io.BytesIO()
|
||
|
|
image.save(img_buffer, format='JPEG')
|
||
|
|
img_buffer.seek(0)
|
||
|
|
|
||
|
|
# Create the content for Gemini (image + text prompt)
|
||
|
|
content = [
|
||
|
|
{
|
||
|
|
"mime_type": "image/jpeg",
|
||
|
|
"data": img_buffer.getvalue()
|
||
|
|
},
|
||
|
|
prompt
|
||
|
|
]
|
||
|
|
|
||
|
|
# Get response from Gemini
|
||
|
|
response = self.model.generate_content(
|
||
|
|
content,
|
||
|
|
generation_config=genai.types.GenerationConfig(
|
||
|
|
temperature=0.3,
|
||
|
|
max_output_tokens=1000,
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
return response.text
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Gemini API error: {e}")
|
||
|
|
return "{}"
|
||
|
|
|
||
|
|
def _clean_json_response(self, response: str) -> str:
|
||
|
|
"""Clean JSON response by removing markdown formatting"""
|
||
|
|
cleaned_response = response.strip()
|
||
|
|
if cleaned_response.startswith('```json'):
|
||
|
|
cleaned_response = cleaned_response[7:]
|
||
|
|
if cleaned_response.startswith('```'):
|
||
|
|
cleaned_response = cleaned_response[3:]
|
||
|
|
if cleaned_response.endswith('```'):
|
||
|
|
cleaned_response = cleaned_response[:-3]
|
||
|
|
|
||
|
|
return cleaned_response.strip()
|
||
|
|
|