"""Simple video processing for memory module detection.""" import cv2 import logging from pathlib import Path from typing import List, Dict, Any from exceptions import VideoProcessingError logger = logging.getLogger(__name__) class VideoProcessor: """Simple video processor for memory module detection.""" def __init__(self, detector): self.detector = detector def process_video(self, video_path: str, fps: int = 1, max_frames: int = 100) -> Dict[str, Any]: """ Process video file and detect memory modules in frames. Args: video_path: Path to video file fps: Frames per second to process (1 = every second) max_frames: Maximum frames to process Returns: Dictionary with detection results """ try: if not Path(video_path).exists(): raise FileNotFoundError(f"Video file not found: {video_path}") logger.info(f"Processing video: {video_path}") # Open video cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise VideoProcessingError("Could not open video file") # Get video properties total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) video_fps = cap.get(cv2.CAP_PROP_FPS) duration = total_frames / video_fps if video_fps > 0 else 0 # Calculate frame interval frame_interval = max(1, int(video_fps / fps)) if video_fps > 0 else 1 logger.info(f"Video properties: {total_frames} frames, {video_fps:.2f}fps, {duration:.2f}s") # Process frames detections = [] frame_count = 0 processed_frames = 0 while cap.isOpened() and processed_frames < max_frames: ret, frame = cap.read() if not ret: break if frame_count % frame_interval == 0: try: # Get timestamp timestamp = frame_count / video_fps if video_fps > 0 else frame_count # Detect memory modules in frame result = self.detector._perform_detection(frame) # Store frame detection frame_detection = { 'frame_number': frame_count, 'timestamp': round(timestamp, 2), 'detections': result['detections'], 'detection_count': result['detection_count'] } detections.append(frame_detection) processed_frames += 1 logger.info(f"Frame {frame_count}: {result['detection_count']} detections") except Exception as e: logger.warning(f"Failed to process frame {frame_count}: {e}") frame_count += 1 cap.release() # Generate summary total_detections = sum(d['detection_count'] for d in detections) avg_detections = total_detections / len(detections) if detections else 0 result = { 'video_info': { 'path': video_path, 'total_frames': total_frames, 'fps': video_fps, 'duration': duration }, 'processing_info': { 'frames_processed': processed_frames, 'frame_interval': frame_interval, 'target_fps': fps }, 'detections': detections, 'summary': { 'total_detections': total_detections, 'avg_detections_per_frame': round(avg_detections, 2), 'frames_with_detections': len([d for d in detections if d['detection_count'] > 0]) } } logger.info(f"Video processing completed: {processed_frames} frames, {total_detections} total detections") return result except Exception as e: logger.error(f"Video processing failed: {e}") raise VideoProcessingError(f"Video processing failed: {str(e)}") def create_annotated_video(self, video_path: str, output_path: str, fps: int = 1) -> str: """ Create annotated video with bounding boxes. Args: video_path: Input video path output_path: Output video path fps: Processing fps Returns: Path to annotated video """ try: logger.info(f"Creating annotated video: {video_path} -> {output_path}") # Open input video cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise VideoProcessingError("Could not open input video") # Get video properties width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) video_fps = cap.get(cv2.CAP_PROP_FPS) # Create output video writer fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, video_fps, (width, height)) frame_interval = max(1, int(video_fps / fps)) if video_fps > 0 else 1 frame_count = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break # Process every nth frame for detection if frame_count % frame_interval == 0: try: result = self.detector._perform_detection(frame) annotated_frame = result['annotated_image'] out.write(annotated_frame) except Exception as e: logger.warning(f"Failed to annotate frame {frame_count}: {e}") out.write(frame) # Write original frame if annotation fails else: out.write(frame) # Write original frame frame_count += 1 cap.release() out.release() logger.info(f"Annotated video created: {output_path}") return output_path except Exception as e: logger.error(f"Annotated video creation failed: {e}") raise VideoProcessingError(f"Annotated video creation failed: {str(e)}") def extract_frames(video_path: str, output_dir: str, fps: int = 1) -> List[str]: """ Extract frames from video for individual processing. Args: video_path: Path to video file output_dir: Directory to save frames fps: Frames per second to extract Returns: List of extracted frame paths """ try: output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) cap = cv2.VideoCapture(video_path) video_fps = cap.get(cv2.CAP_PROP_FPS) frame_interval = max(1, int(video_fps / fps)) frame_paths = [] frame_count = 0 saved_count = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break if frame_count % frame_interval == 0: frame_filename = f"frame_{saved_count:04d}.jpg" frame_path = output_path / frame_filename cv2.imwrite(str(frame_path), frame) frame_paths.append(str(frame_path)) saved_count += 1 frame_count += 1 cap.release() logger.info(f"Extracted {len(frame_paths)} frames to {output_dir}") return frame_paths except Exception as e: logger.error(f"Frame extraction failed: {e}") raise VideoProcessingError(f"Frame extraction failed: {str(e)}")