Files
2025-07-24 23:31:47 +01:00

237 lines
6.9 KiB
Python

import os
import uuid
import logging
import random
from pathlib import Path
from flask import Flask, request, jsonify, send_file, render_template
import cv2
from config import Config
from detector import MemoryDetector
from exceptions import DetectionError, FileUploadError, ValidationError, VideoProcessingError
from utils import allowed_file, setup_logging
from video_processor import VideoProcessor
# Initialize Flask app
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = Config.MAX_FILE_SIZE
# Setup logging
setup_logging()
logger = logging.getLogger(__name__)
# Initialize detector and video processor
detector = None
video_processor = None
def init_app():
"""Initialize application."""
global detector, video_processor
try:
# Create directories
Config.create_directories()
# Validate model exists
Config.validate_model()
# Initialize detector
detector = MemoryDetector(
model_path=Config.MODEL_PATH,
confidence_threshold=Config.CONFIDENCE_THRESHOLD,
image_size=Config.IMAGE_SIZE
)
# Initialize video processor
video_processor = VideoProcessor(detector)
logger.info("Application initialized successfully")
except Exception as e:
logger.error(f"Application initialization failed: {e}")
raise
@app.route('/')
def index():
"""Serve the frontend interface."""
return render_template('index.html')
@app.route('/api/health')
def health_check():
"""Health check endpoint."""
return jsonify({
'status': 'healthy',
'service': 'Memory Detection API',
'model_loaded': detector is not None
})
@app.route('/api/v1/detect', methods=['POST'])
def detect_memory():
"""Detect memory modules in uploaded image."""
try:
# Validate file upload
if 'image' not in request.files:
raise FileUploadError("No image file provided")
file = request.files['image']
if file.filename == '':
raise FileUploadError("No file selected")
if not allowed_file(file.filename, Config.ALLOWED_EXTENSIONS):
raise ValidationError(f"Invalid file type. Allowed: {Config.ALLOWED_EXTENSIONS}")
# Process image
logger.info(f"Processing uploaded file: {file.filename}")
# Read image data
image_bytes = file.read()
# Perform detection
result = detector.detect_from_bytes(image_bytes)
# Save result image
filename = f"{uuid.uuid4()}.jpg"
output_path = Path(Config.RESULT_FOLDER) / filename
cv2.imwrite(str(output_path), result['annotated_image'])
return jsonify({
'success': True,
'data': {
'detections': result['detections'],
'detection_count': result['detection_count'],
'result_image_url': f"/api/v1/results/{filename}"
}
})
except (DetectionError, FileUploadError, ValidationError) as e:
logger.error(f"Detection request failed: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 400
except Exception as e:
logger.error(f"Unexpected error in detection: {e}")
return jsonify({
'success': False,
'error': 'Internal server error'
}), 500
@app.route('/api/v1/results/<filename>')
def get_result_image(filename):
"""Get result image."""
try:
file_path = Path(Config.RESULT_FOLDER) / filename
if not file_path.exists():
return jsonify({'error': 'Image not found'}), 404
return send_file(file_path)
except Exception as e:
logger.error(f"Error serving result image: {e}")
return jsonify({'error': 'Internal server error'}), 500
@app.route('/api/v1/detect/video', methods=['POST'])
def detect_video():
"""Process video for memory module detection."""
try:
# Validate video upload
if 'video' not in request.files:
raise FileUploadError("No video file provided")
file = request.files['video']
if file.filename == '':
raise FileUploadError("No file selected")
# Check video format
video_extensions = {'mp4', 'avi', 'mov', 'mkv'}
if not allowed_file(file.filename, video_extensions):
raise ValidationError(f"Invalid video format. Allowed: {video_extensions}")
# Save uploaded video temporarily
video_filename = f"temp_{uuid.uuid4()}.mp4"
video_path = Path(Config.UPLOAD_FOLDER) / video_filename
file.save(str(video_path))
# Get processing parameters
fps = request.form.get('fps', 1, type=int)
max_frames = request.form.get('max_frames', 50, type=int)
logger.info(f"Processing video: {file.filename}, fps={fps}, max_frames={max_frames}")
# Process video
result = video_processor.process_video(
str(video_path),
fps=fps,
max_frames=max_frames
)
# Clean up temporary file
video_path.unlink(missing_ok=True)
return jsonify({
'success': True,
'data': {
'video_filename': file.filename,
'processing_info': result['processing_info'],
'detections': result['detections'],
'summary': result['summary']
}
})
except (VideoProcessingError, FileUploadError, ValidationError) as e:
logger.error(f"Video processing failed: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 400
except Exception as e:
logger.error(f"Unexpected error in video processing: {e}")
return jsonify({
'success': False,
'error': 'Internal server error'
}), 500
@app.route('/api/v1/test-images')
def list_test_images():
"""List available test images."""
try:
test_images = Config.get_test_images()
return jsonify({
'success': True,
'test_images': [img.name for img in test_images]
})
except Exception as e:
logger.error(f"Error listing test images: {e}")
return jsonify({'error': 'Internal server error'}), 500
@app.errorhandler(413)
def file_too_large(e):
"""Handle file too large error."""
return jsonify({
'success': False,
'error': f'File too large. Maximum size: {Config.MAX_FILE_SIZE} bytes'
}), 413
if __name__ == '__main__':
# Initialize application
init_app()
# Start server
logger.info(f"Starting server on {Config.HOST}:{Config.PORT}")
app.run(
host=Config.HOST,
port=Config.PORT,
debug=Config.DEBUG
)