Files
2025-07-24 23:31:47 +01:00

225 lines
8.3 KiB
Python

"""Simple video processing for memory module detection."""
import cv2
import logging
from pathlib import Path
from typing import List, Dict, Any
from exceptions import VideoProcessingError
logger = logging.getLogger(__name__)
class VideoProcessor:
"""Simple video processor for memory module detection."""
def __init__(self, detector):
self.detector = detector
def process_video(self, video_path: str, fps: int = 1, max_frames: int = 100) -> Dict[str, Any]:
"""
Process video file and detect memory modules in frames.
Args:
video_path: Path to video file
fps: Frames per second to process (1 = every second)
max_frames: Maximum frames to process
Returns:
Dictionary with detection results
"""
try:
if not Path(video_path).exists():
raise FileNotFoundError(f"Video file not found: {video_path}")
logger.info(f"Processing video: {video_path}")
# Open video
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise VideoProcessingError("Could not open video file")
# Get video properties
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
video_fps = cap.get(cv2.CAP_PROP_FPS)
duration = total_frames / video_fps if video_fps > 0 else 0
# Calculate frame interval
frame_interval = max(1, int(video_fps / fps)) if video_fps > 0 else 1
logger.info(f"Video properties: {total_frames} frames, {video_fps:.2f}fps, {duration:.2f}s")
# Process frames
detections = []
frame_count = 0
processed_frames = 0
while cap.isOpened() and processed_frames < max_frames:
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
try:
# Get timestamp
timestamp = frame_count / video_fps if video_fps > 0 else frame_count
# Detect memory modules in frame
result = self.detector._perform_detection(frame)
# Store frame detection
frame_detection = {
'frame_number': frame_count,
'timestamp': round(timestamp, 2),
'detections': result['detections'],
'detection_count': result['detection_count']
}
detections.append(frame_detection)
processed_frames += 1
logger.info(f"Frame {frame_count}: {result['detection_count']} detections")
except Exception as e:
logger.warning(f"Failed to process frame {frame_count}: {e}")
frame_count += 1
cap.release()
# Generate summary
total_detections = sum(d['detection_count'] for d in detections)
avg_detections = total_detections / len(detections) if detections else 0
result = {
'video_info': {
'path': video_path,
'total_frames': total_frames,
'fps': video_fps,
'duration': duration
},
'processing_info': {
'frames_processed': processed_frames,
'frame_interval': frame_interval,
'target_fps': fps
},
'detections': detections,
'summary': {
'total_detections': total_detections,
'avg_detections_per_frame': round(avg_detections, 2),
'frames_with_detections': len([d for d in detections if d['detection_count'] > 0])
}
}
logger.info(f"Video processing completed: {processed_frames} frames, {total_detections} total detections")
return result
except Exception as e:
logger.error(f"Video processing failed: {e}")
raise VideoProcessingError(f"Video processing failed: {str(e)}")
def create_annotated_video(self, video_path: str, output_path: str, fps: int = 1) -> str:
"""
Create annotated video with bounding boxes.
Args:
video_path: Input video path
output_path: Output video path
fps: Processing fps
Returns:
Path to annotated video
"""
try:
logger.info(f"Creating annotated video: {video_path} -> {output_path}")
# Open input video
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise VideoProcessingError("Could not open input video")
# Get video properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
video_fps = cap.get(cv2.CAP_PROP_FPS)
# Create output video writer
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, video_fps, (width, height))
frame_interval = max(1, int(video_fps / fps)) if video_fps > 0 else 1
frame_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Process every nth frame for detection
if frame_count % frame_interval == 0:
try:
result = self.detector._perform_detection(frame)
annotated_frame = result['annotated_image']
out.write(annotated_frame)
except Exception as e:
logger.warning(f"Failed to annotate frame {frame_count}: {e}")
out.write(frame) # Write original frame if annotation fails
else:
out.write(frame) # Write original frame
frame_count += 1
cap.release()
out.release()
logger.info(f"Annotated video created: {output_path}")
return output_path
except Exception as e:
logger.error(f"Annotated video creation failed: {e}")
raise VideoProcessingError(f"Annotated video creation failed: {str(e)}")
def extract_frames(video_path: str, output_dir: str, fps: int = 1) -> List[str]:
"""
Extract frames from video for individual processing.
Args:
video_path: Path to video file
output_dir: Directory to save frames
fps: Frames per second to extract
Returns:
List of extracted frame paths
"""
try:
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
cap = cv2.VideoCapture(video_path)
video_fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = max(1, int(video_fps / fps))
frame_paths = []
frame_count = 0
saved_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
frame_filename = f"frame_{saved_count:04d}.jpg"
frame_path = output_path / frame_filename
cv2.imwrite(str(frame_path), frame)
frame_paths.append(str(frame_path))
saved_count += 1
frame_count += 1
cap.release()
logger.info(f"Extracted {len(frame_paths)} frames to {output_dir}")
return frame_paths
except Exception as e:
logger.error(f"Frame extraction failed: {e}")
raise VideoProcessingError(f"Frame extraction failed: {str(e)}")