""" Video file input source for the Cluster4NPU inference pipeline. This module provides video file input capabilities with support for common video formats, frame rate control, seeking, and batch processing capabilities. """ import cv2 import numpy as np import threading import time import os from typing import Optional, Callable, Tuple, Dict, Any, List from dataclasses import dataclass from pathlib import Path from camera_source import DataSourceBase @dataclass class VideoFileConfig: """Configuration for video file input source.""" file_path: str target_fps: Optional[float] = None # If None, use original video FPS loop: bool = False start_frame: int = 0 end_frame: Optional[int] = None # If None, process until end skip_frames: int = 0 # Skip every N frames max_frames: Optional[int] = None # Maximum frames to process resize_to: Optional[Tuple[int, int]] = None # (width, height) preload_frames: bool = False # Preload frames to memory for faster access class VideoFileSource(DataSourceBase): """ Video file input source for processing video files frame by frame. Features: - Support common video formats (MP4, AVI, MOV, MKV, etc.) - Frame rate control and seeking - Batch processing capabilities - Progress tracking - Loop playback support - Frame skipping and range selection """ def __init__(self, config: VideoFileConfig, frame_callback: Optional[Callable[[np.ndarray, int], None]] = None): """ Initialize video file source. Args: config: Video file configuration frame_callback: Optional callback for each processed frame (frame, frame_number) """ self.config = config self.frame_callback = frame_callback # Video capture object self.cap: Optional[cv2.VideoCapture] = None # Video properties self.total_frames = 0 self.original_fps = 0.0 self.video_width = 0 self.video_height = 0 self.video_duration = 0.0 # Threading control self._playback_thread: Optional[threading.Thread] = None self._stop_event = threading.Event() self._pause_event = threading.Event() self._frame_lock = threading.Lock() # Current state self._current_frame: Optional[np.ndarray] = None self._current_frame_number = 0 self._frames_processed = 0 self._playback_started = False # Progress tracking self._start_time = 0.0 self._actual_fps = 0.0 self._fps_counter = 0 self._last_fps_time = 0.0 # Error handling self._last_error: Optional[str] = None # Frame preloading self._preloaded_frames: List[np.ndarray] = [] self._preload_complete = False def start(self) -> bool: """ Start video file processing. Returns: bool: True if video started successfully, False otherwise """ if self.is_running(): return True try: # Check if file exists if not os.path.exists(self.config.file_path): self._last_error = f"Video file not found: {self.config.file_path}" return False # Open video file self.cap = cv2.VideoCapture(self.config.file_path) if not self.cap.isOpened(): self._last_error = f"Failed to open video file: {self.config.file_path}" return False # Get video properties self._get_video_properties() # Validate configuration if not self._validate_config(): return False # Preload frames if requested if self.config.preload_frames: if not self._preload_frames(): return False print(f"[VideoFileSource] Video opened successfully") print(f"[VideoFileSource] File: {self.config.file_path}") print(f"[VideoFileSource] Resolution: {self.video_width}x{self.video_height}") print(f"[VideoFileSource] FPS: {self.original_fps}, Duration: {self.video_duration:.2f}s") print(f"[VideoFileSource] Total frames: {self.total_frames}") # Start playback thread self._stop_event.clear() self._pause_event.clear() self._playback_thread = threading.Thread(target=self._playback_loop, daemon=True) self._playback_thread.start() return True except Exception as e: self._last_error = f"Video initialization error: {str(e)}" if self.cap: self.cap.release() self.cap = None return False def stop(self) -> None: """Stop video file processing.""" if not self.is_running(): return print("[VideoFileSource] Stopping video processing...") # Signal stop self._stop_event.set() # Wait for playback thread to finish if self._playback_thread and self._playback_thread.is_alive(): self._playback_thread.join(timeout=2.0) # Release video capture if self.cap: self.cap.release() self.cap = None # Clear current frame with self._frame_lock: self._current_frame = None # Clear preloaded frames self._preloaded_frames.clear() print("[VideoFileSource] Video processing stopped") def pause(self) -> None: """Pause video playback.""" if self.is_running(): self._pause_event.set() print("[VideoFileSource] Video paused") def resume(self) -> None: """Resume video playback.""" if self.is_running(): self._pause_event.clear() print("[VideoFileSource] Video resumed") def is_running(self) -> bool: """Check if video processing is currently running.""" return (self.cap is not None and self._playback_thread is not None and self._playback_thread.is_alive() and not self._stop_event.is_set()) def is_paused(self) -> bool: """Check if video playback is paused.""" return self._pause_event.is_set() def seek_to_frame(self, frame_number: int) -> bool: """ Seek to specific frame number. Args: frame_number: Frame number to seek to Returns: bool: True if seek successful """ if not self.cap: return False try: frame_number = max(0, min(frame_number, self.total_frames - 1)) self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) self._current_frame_number = frame_number return True except Exception as e: self._last_error = f"Seek error: {str(e)}" return False def seek_to_time(self, time_seconds: float) -> bool: """ Seek to specific time in video. Args: time_seconds: Time in seconds to seek to Returns: bool: True if seek successful """ if self.original_fps > 0: frame_number = int(time_seconds * self.original_fps) return self.seek_to_frame(frame_number) return False def get_frame(self) -> Optional[np.ndarray]: """ Get the current frame. Returns: Optional[np.ndarray]: Current frame or None if no frame available """ with self._frame_lock: if self._current_frame is not None: return self._current_frame.copy() return None def get_progress(self) -> Dict[str, Any]: """ Get processing progress information. Returns: Dict[str, Any]: Progress information """ if self.total_frames > 0: progress_percent = (self._current_frame_number / self.total_frames) * 100 else: progress_percent = 0.0 elapsed_time = time.time() - self._start_time if self._start_time > 0 else 0 return { 'current_frame': self._current_frame_number, 'total_frames': self.total_frames, 'progress_percent': progress_percent, 'frames_processed': self._frames_processed, 'elapsed_time': elapsed_time, 'actual_fps': self._actual_fps, 'is_complete': self._current_frame_number >= self.total_frames - 1 } def get_stats(self) -> Dict[str, Any]: """ Get comprehensive video statistics. Returns: Dict[str, Any]: Video statistics """ stats = { 'file_path': self.config.file_path, 'video_width': self.video_width, 'video_height': self.video_height, 'original_fps': self.original_fps, 'target_fps': self.config.target_fps, 'video_duration': self.video_duration, 'total_frames': self.total_frames, 'loop_enabled': self.config.loop, 'preload_enabled': self.config.preload_frames, 'preload_complete': self._preload_complete, 'last_error': self._last_error, 'is_running': self.is_running(), 'is_paused': self.is_paused() } stats.update(self.get_progress()) return stats def _get_video_properties(self) -> None: """Get video properties from OpenCV.""" if not self.cap: return self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) self.original_fps = self.cap.get(cv2.CAP_PROP_FPS) self.video_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) self.video_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) if self.original_fps > 0: self.video_duration = self.total_frames / self.original_fps def _validate_config(self) -> bool: """Validate configuration against video properties.""" if self.config.start_frame >= self.total_frames: self._last_error = f"Start frame ({self.config.start_frame}) >= total frames ({self.total_frames})" return False if self.config.end_frame and self.config.end_frame <= self.config.start_frame: self._last_error = f"End frame ({self.config.end_frame}) <= start frame ({self.config.start_frame})" return False return True def _preload_frames(self) -> bool: """Preload frames into memory for faster access.""" if not self.cap: return False print("[VideoFileSource] Preloading frames...") try: # Seek to start frame self.cap.set(cv2.CAP_PROP_POS_FRAMES, self.config.start_frame) end_frame = self.config.end_frame or self.total_frames max_frames = self.config.max_frames or (end_frame - self.config.start_frame) frames_to_load = min(max_frames, end_frame - self.config.start_frame) for i in range(frames_to_load): ret, frame = self.cap.read() if not ret or frame is None: break # Apply resizing if configured if self.config.resize_to: frame = cv2.resize(frame, self.config.resize_to) self._preloaded_frames.append(frame) # Skip frames if configured for _ in range(self.config.skip_frames): self.cap.read() self._preload_complete = True print(f"[VideoFileSource] Preloaded {len(self._preloaded_frames)} frames") return True except Exception as e: self._last_error = f"Frame preloading error: {str(e)}" return False def _playback_loop(self) -> None: """Main playback loop running in separate thread.""" print("[VideoFileSource] Playback loop started") self._start_time = time.time() self._last_fps_time = self._start_time target_fps = self.config.target_fps or self.original_fps frame_interval = 1.0 / target_fps if target_fps > 0 else 0.033 # Default 30 FPS # Set starting position if not self.config.preload_frames: self.cap.set(cv2.CAP_PROP_POS_FRAMES, self.config.start_frame) self._current_frame_number = self.config.start_frame last_frame_time = time.time() while not self._stop_event.is_set(): try: # Handle pause if self._pause_event.is_set(): time.sleep(0.1) continue # Control frame rate current_time = time.time() time_since_last = current_time - last_frame_time if time_since_last < frame_interval: sleep_time = frame_interval - time_since_last time.sleep(sleep_time) continue # Get frame frame = self._get_next_frame() if frame is None: if self.config.loop: # Reset to start for looping self._reset_to_start() continue else: # End of video break # Update current frame with self._frame_lock: self._current_frame = frame # Update statistics self._frames_processed += 1 self._fps_counter += 1 last_frame_time = current_time # Calculate actual FPS if current_time - self._last_fps_time >= 1.0: self._actual_fps = self._fps_counter / (current_time - self._last_fps_time) self._fps_counter = 0 self._last_fps_time = current_time # Call frame callback if provided if self.frame_callback: try: self.frame_callback(frame, self._current_frame_number) except Exception as e: print(f"[VideoFileSource] Frame callback error: {e}") # Check if we've reached the end frame if self.config.end_frame and self._current_frame_number >= self.config.end_frame: if self.config.loop: self._reset_to_start() else: break # Check max frames limit if self.config.max_frames and self._frames_processed >= self.config.max_frames: break except Exception as e: print(f"[VideoFileSource] Playback loop error: {e}") self._last_error = str(e) time.sleep(0.1) print("[VideoFileSource] Playback loop ended") def _get_next_frame(self) -> Optional[np.ndarray]: """Get the next frame from video source.""" if self.config.preload_frames: # Get frame from preloaded frames if self._current_frame_number - self.config.start_frame < len(self._preloaded_frames): frame = self._preloaded_frames[self._current_frame_number - self.config.start_frame] self._current_frame_number += 1 return frame else: return None else: # Read frame from video capture if not self.cap: return None ret, frame = self.cap.read() if not ret or frame is None: return None # Apply resizing if configured if self.config.resize_to: frame = cv2.resize(frame, self.config.resize_to) self._current_frame_number += 1 # Skip frames if configured for _ in range(self.config.skip_frames): self.cap.read() self._current_frame_number += 1 return frame def _reset_to_start(self) -> None: """Reset video to start position for looping.""" if self.config.preload_frames: self._current_frame_number = self.config.start_frame else: if self.cap: self.cap.set(cv2.CAP_PROP_POS_FRAMES, self.config.start_frame) self._current_frame_number = self.config.start_frame def __enter__(self): """Context manager entry.""" if not self.start(): raise RuntimeError(f"Failed to start video: {self._last_error}") return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.stop() class VideoPipelineFeeder: """ Helper class to feed video frames to InferencePipeline. This class bridges the VideoFileSource and InferencePipeline, handling frame format conversion and pipeline data feeding. """ def __init__(self, video_source: VideoFileSource, pipeline): """ Initialize video pipeline feeder. Args: video_source: VideoFileSource instance pipeline: InferencePipeline instance """ self.video_source = video_source self.pipeline = pipeline # Threading control self._feed_thread: Optional[threading.Thread] = None self._stop_event = threading.Event() self._is_feeding = False # Statistics self._frames_fed = 0 self._results_collected = 0 self._results: List[Dict[str, Any]] = [] def start_feeding(self) -> bool: """ Start feeding video frames to pipeline. Returns: bool: True if feeding started successfully """ if self._is_feeding: return True if not self.video_source.is_running(): print("[VideoPipelineFeeder] Video source is not running") return False print("[VideoPipelineFeeder] Starting frame feeding...") self._stop_event.clear() self._is_feeding = True self._feed_thread = threading.Thread(target=self._feed_loop, daemon=True) self._feed_thread.start() return True def stop_feeding(self) -> None: """Stop feeding frames to pipeline.""" if not self._is_feeding: return print("[VideoPipelineFeeder] Stopping frame feeding...") self._stop_event.set() self._is_feeding = False if self._feed_thread and self._feed_thread.is_alive(): self._feed_thread.join(timeout=2.0) print("[VideoPipelineFeeder] Frame feeding stopped") def _feed_loop(self) -> None: """Main feeding loop.""" while not self._stop_event.is_set(): try: # Get frame from video source frame = self.video_source.get_frame() if frame is None: if not self.video_source.is_running(): break time.sleep(0.01) continue # Feed frame to pipeline try: from InferencePipeline import PipelineData progress = self.video_source.get_progress() pipeline_data = PipelineData( data=frame, metadata={ 'source': 'video_file', 'file_path': self.video_source.config.file_path, 'frame_number': progress['current_frame'], 'total_frames': progress['total_frames'], 'progress_percent': progress['progress_percent'], 'timestamp': time.time(), 'frame_id': self._frames_fed } ) # Put data into pipeline self.pipeline.put_data(pipeline_data) self._frames_fed += 1 except Exception as e: print(f"[VideoPipelineFeeder] Error feeding frame to pipeline: {e}") time.sleep(0.1) except Exception as e: print(f"[VideoPipelineFeeder] Feed loop error: {e}") time.sleep(0.1) def get_results(self) -> List[Dict[str, Any]]: """Get collected results.""" return self._results.copy() def get_stats(self) -> Dict[str, Any]: """Get feeding statistics.""" return { 'frames_fed': self._frames_fed, 'results_collected': self._results_collected, 'is_feeding': self._is_feeding, 'video_stats': self.video_source.get_stats() } def get_video_info(file_path: str) -> Dict[str, Any]: """ Get video file information without opening for processing. Args: file_path: Path to video file Returns: Dict[str, Any]: Video information """ info = { 'file_path': file_path, 'exists': False, 'valid': False, 'error': None } try: if not os.path.exists(file_path): info['error'] = 'File does not exist' return info info['exists'] = True info['file_size'] = os.path.getsize(file_path) cap = cv2.VideoCapture(file_path) if not cap.isOpened(): info['error'] = 'Cannot open video file' return info info['valid'] = True info['width'] = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) info['height'] = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) info['fps'] = cap.get(cv2.CAP_PROP_FPS) info['frame_count'] = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if info['fps'] > 0: info['duration'] = info['frame_count'] / info['fps'] else: info['duration'] = 0.0 cap.release() except Exception as e: info['error'] = str(e) return info # Example usage and testing if __name__ == "__main__": # Test video file processing import sys if len(sys.argv) != 2: print("Usage: python video_source.py ") sys.exit(1) video_path = sys.argv[1] # Get video info info = get_video_info(video_path) print(f"Video info: {info}") if not info['valid']: print(f"Cannot process video: {info['error']}") sys.exit(1) # Test video processing config = VideoFileConfig( file_path=video_path, target_fps=10, # Process at 10 FPS loop=False, start_frame=0, max_frames=100 # Process only first 100 frames ) def frame_callback(frame, frame_number): print(f"Processing frame {frame_number}: {frame.shape}") video_source = VideoFileSource(config, frame_callback) try: if video_source.start(): print("Video processing started") # Monitor progress while video_source.is_running(): progress = video_source.get_progress() print(f"Progress: {progress['progress_percent']:.1f}% " f"({progress['current_frame']}/{progress['total_frames']})") time.sleep(1) # Print final statistics stats = video_source.get_stats() print(f"Final statistics: {stats}") else: print(f"Failed to start video processing: {video_source._last_error}") finally: video_source.stop()