225 lines
8.3 KiB
Python
225 lines
8.3 KiB
Python
"""Simple video processing for memory module detection."""
|
|
|
|
import cv2
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any
|
|
from exceptions import VideoProcessingError
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class VideoProcessor:
|
|
"""Simple video processor for memory module detection."""
|
|
|
|
def __init__(self, detector):
|
|
self.detector = detector
|
|
|
|
def process_video(self, video_path: str, fps: int = 1, max_frames: int = 100) -> Dict[str, Any]:
|
|
"""
|
|
Process video file and detect memory modules in frames.
|
|
|
|
Args:
|
|
video_path: Path to video file
|
|
fps: Frames per second to process (1 = every second)
|
|
max_frames: Maximum frames to process
|
|
|
|
Returns:
|
|
Dictionary with detection results
|
|
"""
|
|
try:
|
|
if not Path(video_path).exists():
|
|
raise FileNotFoundError(f"Video file not found: {video_path}")
|
|
|
|
logger.info(f"Processing video: {video_path}")
|
|
|
|
# Open video
|
|
cap = cv2.VideoCapture(video_path)
|
|
if not cap.isOpened():
|
|
raise VideoProcessingError("Could not open video file")
|
|
|
|
# Get video properties
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
video_fps = cap.get(cv2.CAP_PROP_FPS)
|
|
duration = total_frames / video_fps if video_fps > 0 else 0
|
|
|
|
# Calculate frame interval
|
|
frame_interval = max(1, int(video_fps / fps)) if video_fps > 0 else 1
|
|
|
|
logger.info(f"Video properties: {total_frames} frames, {video_fps:.2f}fps, {duration:.2f}s")
|
|
|
|
# Process frames
|
|
detections = []
|
|
frame_count = 0
|
|
processed_frames = 0
|
|
|
|
while cap.isOpened() and processed_frames < max_frames:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
if frame_count % frame_interval == 0:
|
|
try:
|
|
# Get timestamp
|
|
timestamp = frame_count / video_fps if video_fps > 0 else frame_count
|
|
|
|
# Detect memory modules in frame
|
|
result = self.detector._perform_detection(frame)
|
|
|
|
# Store frame detection
|
|
frame_detection = {
|
|
'frame_number': frame_count,
|
|
'timestamp': round(timestamp, 2),
|
|
'detections': result['detections'],
|
|
'detection_count': result['detection_count']
|
|
}
|
|
detections.append(frame_detection)
|
|
processed_frames += 1
|
|
|
|
logger.info(f"Frame {frame_count}: {result['detection_count']} detections")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to process frame {frame_count}: {e}")
|
|
|
|
frame_count += 1
|
|
|
|
cap.release()
|
|
|
|
# Generate summary
|
|
total_detections = sum(d['detection_count'] for d in detections)
|
|
avg_detections = total_detections / len(detections) if detections else 0
|
|
|
|
result = {
|
|
'video_info': {
|
|
'path': video_path,
|
|
'total_frames': total_frames,
|
|
'fps': video_fps,
|
|
'duration': duration
|
|
},
|
|
'processing_info': {
|
|
'frames_processed': processed_frames,
|
|
'frame_interval': frame_interval,
|
|
'target_fps': fps
|
|
},
|
|
'detections': detections,
|
|
'summary': {
|
|
'total_detections': total_detections,
|
|
'avg_detections_per_frame': round(avg_detections, 2),
|
|
'frames_with_detections': len([d for d in detections if d['detection_count'] > 0])
|
|
}
|
|
}
|
|
|
|
logger.info(f"Video processing completed: {processed_frames} frames, {total_detections} total detections")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Video processing failed: {e}")
|
|
raise VideoProcessingError(f"Video processing failed: {str(e)}")
|
|
|
|
def create_annotated_video(self, video_path: str, output_path: str, fps: int = 1) -> str:
|
|
"""
|
|
Create annotated video with bounding boxes.
|
|
|
|
Args:
|
|
video_path: Input video path
|
|
output_path: Output video path
|
|
fps: Processing fps
|
|
|
|
Returns:
|
|
Path to annotated video
|
|
"""
|
|
try:
|
|
logger.info(f"Creating annotated video: {video_path} -> {output_path}")
|
|
|
|
# Open input video
|
|
cap = cv2.VideoCapture(video_path)
|
|
if not cap.isOpened():
|
|
raise VideoProcessingError("Could not open input video")
|
|
|
|
# Get video properties
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
video_fps = cap.get(cv2.CAP_PROP_FPS)
|
|
|
|
# Create output video writer
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
|
out = cv2.VideoWriter(output_path, fourcc, video_fps, (width, height))
|
|
|
|
frame_interval = max(1, int(video_fps / fps)) if video_fps > 0 else 1
|
|
frame_count = 0
|
|
|
|
while cap.isOpened():
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
# Process every nth frame for detection
|
|
if frame_count % frame_interval == 0:
|
|
try:
|
|
result = self.detector._perform_detection(frame)
|
|
annotated_frame = result['annotated_image']
|
|
out.write(annotated_frame)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to annotate frame {frame_count}: {e}")
|
|
out.write(frame) # Write original frame if annotation fails
|
|
else:
|
|
out.write(frame) # Write original frame
|
|
|
|
frame_count += 1
|
|
|
|
cap.release()
|
|
out.release()
|
|
|
|
logger.info(f"Annotated video created: {output_path}")
|
|
return output_path
|
|
|
|
except Exception as e:
|
|
logger.error(f"Annotated video creation failed: {e}")
|
|
raise VideoProcessingError(f"Annotated video creation failed: {str(e)}")
|
|
|
|
|
|
def extract_frames(video_path: str, output_dir: str, fps: int = 1) -> List[str]:
|
|
"""
|
|
Extract frames from video for individual processing.
|
|
|
|
Args:
|
|
video_path: Path to video file
|
|
output_dir: Directory to save frames
|
|
fps: Frames per second to extract
|
|
|
|
Returns:
|
|
List of extracted frame paths
|
|
"""
|
|
try:
|
|
output_path = Path(output_dir)
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
cap = cv2.VideoCapture(video_path)
|
|
video_fps = cap.get(cv2.CAP_PROP_FPS)
|
|
frame_interval = max(1, int(video_fps / fps))
|
|
|
|
frame_paths = []
|
|
frame_count = 0
|
|
saved_count = 0
|
|
|
|
while cap.isOpened():
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
if frame_count % frame_interval == 0:
|
|
frame_filename = f"frame_{saved_count:04d}.jpg"
|
|
frame_path = output_path / frame_filename
|
|
cv2.imwrite(str(frame_path), frame)
|
|
frame_paths.append(str(frame_path))
|
|
saved_count += 1
|
|
|
|
frame_count += 1
|
|
|
|
cap.release()
|
|
logger.info(f"Extracted {len(frame_paths)} frames to {output_dir}")
|
|
return frame_paths
|
|
|
|
except Exception as e:
|
|
logger.error(f"Frame extraction failed: {e}")
|
|
raise VideoProcessingError(f"Frame extraction failed: {str(e)}") |