diff --git a/cluster4npu_ui/TODO.md b/cluster4npu_ui/TODO.md new file mode 100644 index 0000000..d884355 --- /dev/null +++ b/cluster4npu_ui/TODO.md @@ -0,0 +1,290 @@ +# Cluster4NPU Pipeline TODO + +## Current Status +✅ **Pipeline Core**: Multi-stage pipeline with device auto-detection working +✅ **Hardware Integration**: Kneron NPU dongles connecting and initializing successfully +✅ **Auto-resize Preprocessing**: Model input shape detection and automatic preprocessing implemented +❌ **Data Input Sources**: Missing camera and file input implementations +❌ **Result Persistence**: No result saving or output mechanisms +❌ **End-to-End Workflow**: Gaps between UI configuration and core pipeline execution + +--- + +## Priority 1: Essential Components for Complete Inference Workflow + +### 1. Data Source Implementation +**Status**: 🔴 Critical Missing Components +**Location**: Need to create new classes in `core/functions/` or extend existing ones + +#### 1.1 Camera Input Source +- **File**: `core/functions/camera_source.py` (new) +- **Class**: `CameraSource` +- **Purpose**: Wrapper around cv2.VideoCapture for camera input +- **Integration**: Connect to InferencePipeline.put_data() +- **Features**: + - Multiple camera index support + - Resolution and FPS configuration + - Format conversion (BGR → model input format) + - Error handling for camera disconnection + +#### 1.2 Video File Input Source +- **File**: `core/functions/video_source.py` (new) +- **Class**: `VideoFileSource` +- **Purpose**: Process video files frame by frame +- **Integration**: Feed frames to InferencePipeline +- **Features**: + - Support common video formats (MP4, AVI, MOV) + - Frame rate control and seeking + - Batch processing capabilities + - Progress tracking + +#### 1.3 Image File Input Source +- **File**: `core/functions/image_source.py` (new) +- **Class**: `ImageFileSource` +- **Purpose**: Process single images or image directories +- **Integration**: Single-shot inference through pipeline +- **Features**: + - Support common image formats (JPG, PNG, BMP) + - Batch directory processing + - Image validation and error handling + +#### 1.4 RTSP/HTTP Stream Source +- **File**: `core/functions/stream_source.py` (new) +- **Class**: `RTSPSource`, `HTTPStreamSource` +- **Purpose**: Process live video streams +- **Integration**: Real-time streaming to pipeline +- **Features**: + - Stream connection management + - Reconnection on failure + - Buffer management and frame dropping + +### 2. Result Persistence System +**Status**: 🔴 Critical Missing Components +**Location**: `core/functions/result_handler.py` (new) + +#### 2.1 Result Serialization +- **Class**: `ResultSerializer` +- **Purpose**: Convert inference results to standard formats +- **Features**: + - JSON export with timestamps + - CSV export for analytics + - Binary format for performance + - Configurable fields and formatting + +#### 2.2 File Output Manager +- **Class**: `FileOutputManager` +- **Purpose**: Handle result file writing and organization +- **Features**: + - Timestamped file naming + - Directory organization by date/pipeline + - File rotation and cleanup + - Output format configuration + +#### 2.3 Real-time Result Streaming +- **Class**: `ResultStreamer` +- **Purpose**: Stream results to external systems +- **Features**: + - WebSocket result broadcasting + - REST API endpoints + - Message queue integration (Redis, RabbitMQ) + - Custom callback system + +### 3. Input/Output Integration Bridge +**Status**: 🔴 Critical Missing Components +**Location**: `core/functions/pipeline_manager.py` (new) + +#### 3.1 Pipeline Configuration Manager +- **Class**: `PipelineConfigManager` +- **Purpose**: Convert UI configurations to executable pipelines +- **Integration**: Bridge between UI and core pipeline +- **Features**: + - Parse UI node configurations + - Instantiate appropriate data sources + - Configure result handlers + - Manage pipeline lifecycle + +#### 3.2 Unified Workflow Orchestrator +- **Class**: `WorkflowOrchestrator` +- **Purpose**: Coordinate complete data flow from input to output +- **Features**: + - Input source management + - Pipeline execution control + - Result handling and persistence + - Error recovery and logging + +--- + +## Priority 2: Enhanced Preprocessing and Auto-resize + +### 4. Enhanced Preprocessing System +**Status**: 🟡 Partially Implemented +**Location**: `core/functions/Multidongle.py` (existing) + new preprocessing modules + +#### 4.1 Current Auto-resize Implementation +- **Location**: `Multidongle.py:354-371` (preprocess_frame method) +- **Features**: ✅ Already implemented + - Automatic model input shape detection + - Dynamic resizing based on model requirements + - Format conversion (BGR565, RGB8888, YUYV, RAW8) + - Aspect ratio handling + +#### 4.2 Enhanced Preprocessing Pipeline +- **File**: `core/functions/preprocessor.py` (new) +- **Class**: `AdvancedPreprocessor` +- **Purpose**: Extended preprocessing capabilities +- **Features**: + - **Smart cropping**: Maintain aspect ratio with intelligent cropping + - **Normalization**: Configurable pixel value normalization + - **Augmentation**: Real-time data augmentation for training + - **Multi-model support**: Different preprocessing for different models + - **Caching**: Preprocessed frame caching for performance + +#### 4.3 Model-Aware Preprocessing +- **Enhancement**: Extend existing `Multidongle` class +- **Location**: `core/functions/Multidongle.py:188-199` (model_input_shape detection) +- **Features**: + - **Dynamic preprocessing**: Adjust preprocessing based on model metadata + - **Model-specific optimization**: Tailored preprocessing for different model types + - **Preprocessing profiles**: Saved preprocessing configurations per model + +--- + +## Priority 3: UI Integration and User Experience + +### 5. Dashboard Integration +**Status**: 🟡 Partially Implemented +**Location**: `ui/windows/dashboard.py` (existing) + +#### 5.1 Real-time Pipeline Monitoring +- **Enhancement**: Extend existing Dashboard class +- **Features**: + - Live inference statistics + - Real-time result visualization + - Performance metrics dashboard + - Error monitoring and alerts + +#### 5.2 Input Source Configuration +- **Integration**: Connect UI input nodes to actual data sources +- **Features**: + - Camera selection and preview + - File browser integration + - Stream URL validation + - Input source testing + +### 6. Result Visualization +**Status**: 🔴 Not Implemented +**Location**: `ui/widgets/result_viewer.py` (new) + +#### 6.1 Result Display Widget +- **Class**: `ResultViewer` +- **Purpose**: Display inference results in UI +- **Features**: + - Real-time result streaming + - Result history and filtering + - Export capabilities + - Customizable display formats + +--- + +## Priority 4: Advanced Features and Optimization + +### 7. Performance Optimization +**Status**: 🟡 Basic Implementation +**Location**: Multiple files + +#### 7.1 Memory Management +- **Enhancement**: Optimize existing queue systems +- **Files**: `InferencePipeline.py`, `Multidongle.py` +- **Features**: + - Smart queue sizing based on available memory + - Frame dropping under load + - Memory leak detection and prevention + - Garbage collection optimization + +#### 7.2 Multi-device Load Balancing +- **Enhancement**: Extend existing multi-dongle support +- **Location**: `core/functions/Multidongle.py` (existing auto-detection) +- **Features**: + - Intelligent device allocation + - Load balancing across devices + - Device health monitoring + - Automatic failover + +### 8. Error Handling and Recovery +**Status**: 🟡 Basic Implementation +**Location**: Throughout codebase + +#### 8.1 Comprehensive Error Recovery +- **Enhancement**: Extend existing error handling +- **Features**: + - Automatic device reconnection + - Pipeline restart on critical errors + - Input source recovery + - Result persistence on failure + +--- + +## Implementation Roadmap + +### Phase 1: Core Data Flow (Weeks 1-2) +1. ✅ **Complete**: Pipeline deployment and device initialization +2. 🔄 **In Progress**: Auto-resize preprocessing (mostly implemented) +3. **Next**: Implement basic camera input source +4. **Next**: Add simple result file output +5. **Next**: Create basic pipeline manager + +### Phase 2: Complete Workflow (Weeks 3-4) +1. Add video file input support +2. Implement comprehensive result persistence +3. Create UI integration bridge +4. Add real-time monitoring + +### Phase 3: Advanced Features (Weeks 5-6) +1. Enhanced preprocessing pipeline +2. Performance optimization +3. Advanced error handling +4. Result visualization + +### Phase 4: Production Features (Weeks 7-8) +1. Multi-device load balancing +2. Advanced stream input support +3. Analytics and reporting +4. Configuration management + +--- + +## Key Code Locations for Current Auto-resize Implementation + +### Model Input Shape Detection +- **File**: `core/functions/Multidongle.py` +- **Lines**: 188-199 (model_input_shape property) +- **Status**: ✅ Working - detects model input dimensions from NEF files + +### Automatic Preprocessing +- **File**: `core/functions/Multidongle.py` +- **Lines**: 354-371 (preprocess_frame method) +- **Status**: ✅ Working - auto-resizes based on model input shape +- **Features**: Format conversion, aspect ratio handling + +### Pipeline Data Processing +- **File**: `core/functions/InferencePipeline.py` +- **Lines**: 165-240 (_process_data method) +- **Status**: ✅ Working - integrates preprocessing with inference +- **Features**: Inter-stage processing, result accumulation + +### Format Conversion +- **File**: `core/functions/Multidongle.py` +- **Lines**: 382-396 (_convert_format method) +- **Status**: ✅ Working - supports BGR565, RGB8888, YUYV, RAW8 + +--- + +## Notes for Development + +1. **Auto-resize is already implemented** ✅ - The system automatically detects model input shape and resizes accordingly +2. **Priority should be on input sources** - Camera and file input are the critical missing pieces +3. **Result persistence is essential** - Current system only provides callbacks, need file output +4. **UI integration gap** - UI configuration doesn't connect to core pipeline execution +5. **Performance is good** - Multi-threading and device management are solid foundations + +The core pipeline and preprocessing are working well - the focus should be on completing the input/output ecosystem around the existing robust inference engine. \ No newline at end of file diff --git a/cluster4npu_ui/core/functions/camera_source.py b/cluster4npu_ui/core/functions/camera_source.py new file mode 100644 index 0000000..86d430d --- /dev/null +++ b/cluster4npu_ui/core/functions/camera_source.py @@ -0,0 +1,501 @@ +""" +Camera input source for the Cluster4NPU inference pipeline. + +This module provides camera input capabilities with support for multiple cameras, +resolution configuration, and seamless integration with the InferencePipeline. +""" + +import cv2 +import numpy as np +import threading +import time +from typing import Optional, Callable, Tuple, Dict, Any +from dataclasses import dataclass +from abc import ABC, abstractmethod + + +@dataclass +class CameraConfig: + """Configuration for camera input source.""" + camera_index: int = 0 + width: int = 640 + height: int = 480 + fps: int = 30 + format: str = 'BGR' + auto_exposure: bool = True + brightness: float = 0.5 + contrast: float = 0.5 + saturation: float = 0.5 + + +class DataSourceBase(ABC): + """Abstract base class for data sources.""" + + @abstractmethod + def start(self) -> bool: + """Start the data source.""" + pass + + @abstractmethod + def stop(self) -> None: + """Stop the data source.""" + pass + + @abstractmethod + def is_running(self) -> bool: + """Check if the data source is running.""" + pass + + @abstractmethod + def get_frame(self) -> Optional[np.ndarray]: + """Get the next frame from the source.""" + pass + + +class CameraSource(DataSourceBase): + """ + Camera input source for real-time video capture. + + Features: + - Multiple camera index support + - Resolution and FPS configuration + - Format conversion (BGR → model input format) + - Error handling for camera disconnection + - Thread-safe frame capture + """ + + def __init__(self, config: CameraConfig, frame_callback: Optional[Callable[[np.ndarray], None]] = None): + """ + Initialize camera source. + + Args: + config: Camera configuration + frame_callback: Optional callback for each captured frame + """ + self.config = config + self.frame_callback = frame_callback + + # Camera capture object + self.cap: Optional[cv2.VideoCapture] = None + + # Threading control + self._capture_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._frame_lock = threading.Lock() + + # Current frame storage + self._current_frame: Optional[np.ndarray] = None + self._frame_count = 0 + self._fps_counter = 0 + self._last_fps_time = time.time() + self._actual_fps = 0.0 + + # Error handling + self._connection_lost = False + self._last_error: Optional[str] = None + + def start(self) -> bool: + """ + Start camera capture. + + Returns: + bool: True if camera started successfully, False otherwise + """ + if self.is_running(): + return True + + try: + # Initialize camera + self.cap = cv2.VideoCapture(self.config.camera_index) + + if not self.cap.isOpened(): + self._last_error = f"Failed to open camera {self.config.camera_index}" + return False + + # Configure camera properties + self._configure_camera() + + # Test camera capture + ret, frame = self.cap.read() + if not ret or frame is None: + self._last_error = "Failed to read initial frame from camera" + self.cap.release() + self.cap = None + return False + + print(f"[CameraSource] Camera {self.config.camera_index} opened successfully") + print(f"[CameraSource] Resolution: {self.config.width}x{self.config.height}, FPS: {self.config.fps}") + + # Start capture thread + self._stop_event.clear() + self._connection_lost = False + self._capture_thread = threading.Thread(target=self._capture_loop, daemon=True) + self._capture_thread.start() + + return True + + except Exception as e: + self._last_error = f"Camera initialization error: {str(e)}" + if self.cap: + self.cap.release() + self.cap = None + return False + + def stop(self) -> None: + """Stop camera capture.""" + if not self.is_running(): + return + + print("[CameraSource] Stopping camera capture...") + + # Signal stop + self._stop_event.set() + + # Wait for capture thread to finish + if self._capture_thread and self._capture_thread.is_alive(): + self._capture_thread.join(timeout=2.0) + + # Release camera + if self.cap: + self.cap.release() + self.cap = None + + # Clear current frame + with self._frame_lock: + self._current_frame = None + + print("[CameraSource] Camera capture stopped") + + def is_running(self) -> bool: + """Check if camera is currently running.""" + return (self.cap is not None and + self.cap.isOpened() and + self._capture_thread is not None and + self._capture_thread.is_alive() and + not self._stop_event.is_set()) + + def get_frame(self) -> Optional[np.ndarray]: + """ + Get the latest captured frame. + + Returns: + Optional[np.ndarray]: Latest frame or None if no frame available + """ + with self._frame_lock: + if self._current_frame is not None: + return self._current_frame.copy() + return None + + def get_stats(self) -> Dict[str, Any]: + """ + Get camera statistics. + + Returns: + Dict[str, Any]: Statistics including FPS, frame count, etc. + """ + return { + 'frame_count': self._frame_count, + 'actual_fps': self._actual_fps, + 'target_fps': self.config.fps, + 'resolution': (self.config.width, self.config.height), + 'camera_index': self.config.camera_index, + 'connection_lost': self._connection_lost, + 'last_error': self._last_error, + 'is_running': self.is_running() + } + + def _configure_camera(self) -> None: + """Configure camera properties.""" + if not self.cap: + return + + # Set resolution + self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.config.width) + self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.config.height) + + # Set FPS + self.cap.set(cv2.CAP_PROP_FPS, self.config.fps) + + # Set other properties + if hasattr(cv2, 'CAP_PROP_AUTO_EXPOSURE'): + self.cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 1 if self.config.auto_exposure else 0) + + self.cap.set(cv2.CAP_PROP_BRIGHTNESS, self.config.brightness) + self.cap.set(cv2.CAP_PROP_CONTRAST, self.config.contrast) + self.cap.set(cv2.CAP_PROP_SATURATION, self.config.saturation) + + # Verify actual settings + actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + actual_fps = self.cap.get(cv2.CAP_PROP_FPS) + + print(f"[CameraSource] Actual resolution: {actual_width}x{actual_height}, FPS: {actual_fps}") + + def _capture_loop(self) -> None: + """Main capture loop running in separate thread.""" + print("[CameraSource] Capture loop started") + + frame_interval = 1.0 / self.config.fps + last_capture_time = time.time() + + while not self._stop_event.is_set(): + try: + # Control frame rate + current_time = time.time() + time_since_last = current_time - last_capture_time + + if time_since_last < frame_interval: + sleep_time = frame_interval - time_since_last + time.sleep(sleep_time) + continue + + last_capture_time = current_time + + # Capture frame + if not self.cap or not self.cap.isOpened(): + self._connection_lost = True + break + + ret, frame = self.cap.read() + + if not ret or frame is None: + print("[CameraSource] Failed to read frame from camera") + self._connection_lost = True + break + + # Update frame + with self._frame_lock: + self._current_frame = frame + + # Update statistics + self._frame_count += 1 + self._fps_counter += 1 + + # Calculate actual FPS + if current_time - self._last_fps_time >= 1.0: + self._actual_fps = self._fps_counter / (current_time - self._last_fps_time) + self._fps_counter = 0 + self._last_fps_time = current_time + + # Call frame callback if provided + if self.frame_callback: + try: + self.frame_callback(frame) + except Exception as e: + print(f"[CameraSource] Frame callback error: {e}") + + except Exception as e: + print(f"[CameraSource] Capture loop error: {e}") + self._last_error = str(e) + time.sleep(0.1) # Brief pause before retrying + + print("[CameraSource] Capture loop ended") + + def __enter__(self): + """Context manager entry.""" + if not self.start(): + raise RuntimeError(f"Failed to start camera: {self._last_error}") + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + self.stop() + + +class CameraPipelineFeeder: + """ + Helper class to feed camera frames to InferencePipeline. + + This class bridges the CameraSource and InferencePipeline, + handling frame format conversion and pipeline data feeding. + """ + + def __init__(self, camera_source: CameraSource, pipeline, feed_rate: float = 30.0): + """ + Initialize camera pipeline feeder. + + Args: + camera_source: CameraSource instance + pipeline: InferencePipeline instance + feed_rate: Rate at which to feed frames to pipeline (FPS) + """ + self.camera_source = camera_source + self.pipeline = pipeline + self.feed_rate = feed_rate + + # Threading control + self._feed_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._is_feeding = False + + # Statistics + self._frames_fed = 0 + self._last_feed_time = 0.0 + + def start_feeding(self) -> bool: + """ + Start feeding camera frames to pipeline. + + Returns: + bool: True if feeding started successfully + """ + if self._is_feeding: + return True + + if not self.camera_source.is_running(): + print("[CameraPipelineFeeder] Camera is not running") + return False + + print("[CameraPipelineFeeder] Starting frame feeding...") + + self._stop_event.clear() + self._is_feeding = True + self._feed_thread = threading.Thread(target=self._feed_loop, daemon=True) + self._feed_thread.start() + + return True + + def stop_feeding(self) -> None: + """Stop feeding frames to pipeline.""" + if not self._is_feeding: + return + + print("[CameraPipelineFeeder] Stopping frame feeding...") + + self._stop_event.set() + self._is_feeding = False + + if self._feed_thread and self._feed_thread.is_alive(): + self._feed_thread.join(timeout=2.0) + + print("[CameraPipelineFeeder] Frame feeding stopped") + + def _feed_loop(self) -> None: + """Main feeding loop.""" + feed_interval = 1.0 / self.feed_rate + last_feed_time = time.time() + + while not self._stop_event.is_set(): + try: + current_time = time.time() + + # Control feed rate + if current_time - last_feed_time < feed_interval: + time.sleep(0.001) # Small sleep to prevent busy waiting + continue + + # Get frame from camera + frame = self.camera_source.get_frame() + if frame is None: + time.sleep(0.01) + continue + + # Feed frame to pipeline + try: + from InferencePipeline import PipelineData + + pipeline_data = PipelineData( + data=frame, + metadata={ + 'source': 'camera', + 'camera_index': self.camera_source.config.camera_index, + 'timestamp': current_time, + 'frame_id': self._frames_fed + } + ) + + # Put data into pipeline + self.pipeline.put_data(pipeline_data) + + self._frames_fed += 1 + last_feed_time = current_time + + except Exception as e: + print(f"[CameraPipelineFeeder] Error feeding frame to pipeline: {e}") + time.sleep(0.1) + + except Exception as e: + print(f"[CameraPipelineFeeder] Feed loop error: {e}") + time.sleep(0.1) + + def get_stats(self) -> Dict[str, Any]: + """Get feeding statistics.""" + return { + 'frames_fed': self._frames_fed, + 'feed_rate': self.feed_rate, + 'is_feeding': self._is_feeding, + 'camera_stats': self.camera_source.get_stats() + } + + +def list_available_cameras() -> Dict[int, Dict[str, Any]]: + """ + List all available camera devices. + + Returns: + Dict[int, Dict[str, Any]]: Dictionary of camera index to camera info + """ + cameras = {} + + for i in range(10): # Check first 10 camera indices + cap = cv2.VideoCapture(i) + if cap.isOpened(): + # Get camera properties + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + fps = cap.get(cv2.CAP_PROP_FPS) + + cameras[i] = { + 'index': i, + 'width': width, + 'height': height, + 'fps': fps, + 'backend': cap.getBackendName() if hasattr(cap, 'getBackendName') else 'Unknown' + } + + cap.release() + + return cameras + + +# Example usage and testing +if __name__ == "__main__": + # List available cameras + print("Available cameras:") + cameras = list_available_cameras() + for idx, info in cameras.items(): + print(f" Camera {idx}: {info['width']}x{info['height']} @ {info['fps']} FPS ({info['backend']})") + + if not cameras: + print("No cameras found!") + exit(1) + + # Test camera capture + config = CameraConfig( + camera_index=0, + width=640, + height=480, + fps=30 + ) + + def frame_callback(frame): + print(f"Frame captured: {frame.shape}") + + camera = CameraSource(config, frame_callback) + + try: + if camera.start(): + print("Camera started successfully") + + # Capture for 5 seconds + time.sleep(5) + + # Print statistics + stats = camera.get_stats() + print(f"Statistics: {stats}") + + else: + print("Failed to start camera") + + finally: + camera.stop() \ No newline at end of file diff --git a/cluster4npu_ui/core/functions/video_source.py b/cluster4npu_ui/core/functions/video_source.py new file mode 100644 index 0000000..8e94200 --- /dev/null +++ b/cluster4npu_ui/core/functions/video_source.py @@ -0,0 +1,725 @@ +""" +Video file input source for the Cluster4NPU inference pipeline. + +This module provides video file input capabilities with support for common video formats, +frame rate control, seeking, and batch processing capabilities. +""" + +import cv2 +import numpy as np +import threading +import time +import os +from typing import Optional, Callable, Tuple, Dict, Any, List +from dataclasses import dataclass +from pathlib import Path +from camera_source import DataSourceBase + + +@dataclass +class VideoFileConfig: + """Configuration for video file input source.""" + file_path: str + target_fps: Optional[float] = None # If None, use original video FPS + loop: bool = False + start_frame: int = 0 + end_frame: Optional[int] = None # If None, process until end + skip_frames: int = 0 # Skip every N frames + max_frames: Optional[int] = None # Maximum frames to process + resize_to: Optional[Tuple[int, int]] = None # (width, height) + preload_frames: bool = False # Preload frames to memory for faster access + + +class VideoFileSource(DataSourceBase): + """ + Video file input source for processing video files frame by frame. + + Features: + - Support common video formats (MP4, AVI, MOV, MKV, etc.) + - Frame rate control and seeking + - Batch processing capabilities + - Progress tracking + - Loop playback support + - Frame skipping and range selection + """ + + def __init__(self, config: VideoFileConfig, frame_callback: Optional[Callable[[np.ndarray, int], None]] = None): + """ + Initialize video file source. + + Args: + config: Video file configuration + frame_callback: Optional callback for each processed frame (frame, frame_number) + """ + self.config = config + self.frame_callback = frame_callback + + # Video capture object + self.cap: Optional[cv2.VideoCapture] = None + + # Video properties + self.total_frames = 0 + self.original_fps = 0.0 + self.video_width = 0 + self.video_height = 0 + self.video_duration = 0.0 + + # Threading control + self._playback_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._pause_event = threading.Event() + self._frame_lock = threading.Lock() + + # Current state + self._current_frame: Optional[np.ndarray] = None + self._current_frame_number = 0 + self._frames_processed = 0 + self._playback_started = False + + # Progress tracking + self._start_time = 0.0 + self._actual_fps = 0.0 + self._fps_counter = 0 + self._last_fps_time = 0.0 + + # Error handling + self._last_error: Optional[str] = None + + # Frame preloading + self._preloaded_frames: List[np.ndarray] = [] + self._preload_complete = False + + def start(self) -> bool: + """ + Start video file processing. + + Returns: + bool: True if video started successfully, False otherwise + """ + if self.is_running(): + return True + + try: + # Check if file exists + if not os.path.exists(self.config.file_path): + self._last_error = f"Video file not found: {self.config.file_path}" + return False + + # Open video file + self.cap = cv2.VideoCapture(self.config.file_path) + + if not self.cap.isOpened(): + self._last_error = f"Failed to open video file: {self.config.file_path}" + return False + + # Get video properties + self._get_video_properties() + + # Validate configuration + if not self._validate_config(): + return False + + # Preload frames if requested + if self.config.preload_frames: + if not self._preload_frames(): + return False + + print(f"[VideoFileSource] Video opened successfully") + print(f"[VideoFileSource] File: {self.config.file_path}") + print(f"[VideoFileSource] Resolution: {self.video_width}x{self.video_height}") + print(f"[VideoFileSource] FPS: {self.original_fps}, Duration: {self.video_duration:.2f}s") + print(f"[VideoFileSource] Total frames: {self.total_frames}") + + # Start playback thread + self._stop_event.clear() + self._pause_event.clear() + self._playback_thread = threading.Thread(target=self._playback_loop, daemon=True) + self._playback_thread.start() + + return True + + except Exception as e: + self._last_error = f"Video initialization error: {str(e)}" + if self.cap: + self.cap.release() + self.cap = None + return False + + def stop(self) -> None: + """Stop video file processing.""" + if not self.is_running(): + return + + print("[VideoFileSource] Stopping video processing...") + + # Signal stop + self._stop_event.set() + + # Wait for playback thread to finish + if self._playback_thread and self._playback_thread.is_alive(): + self._playback_thread.join(timeout=2.0) + + # Release video capture + if self.cap: + self.cap.release() + self.cap = None + + # Clear current frame + with self._frame_lock: + self._current_frame = None + + # Clear preloaded frames + self._preloaded_frames.clear() + + print("[VideoFileSource] Video processing stopped") + + def pause(self) -> None: + """Pause video playback.""" + if self.is_running(): + self._pause_event.set() + print("[VideoFileSource] Video paused") + + def resume(self) -> None: + """Resume video playback.""" + if self.is_running(): + self._pause_event.clear() + print("[VideoFileSource] Video resumed") + + def is_running(self) -> bool: + """Check if video processing is currently running.""" + return (self.cap is not None and + self._playback_thread is not None and + self._playback_thread.is_alive() and + not self._stop_event.is_set()) + + def is_paused(self) -> bool: + """Check if video playback is paused.""" + return self._pause_event.is_set() + + def seek_to_frame(self, frame_number: int) -> bool: + """ + Seek to specific frame number. + + Args: + frame_number: Frame number to seek to + + Returns: + bool: True if seek successful + """ + if not self.cap: + return False + + try: + frame_number = max(0, min(frame_number, self.total_frames - 1)) + self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) + self._current_frame_number = frame_number + return True + except Exception as e: + self._last_error = f"Seek error: {str(e)}" + return False + + def seek_to_time(self, time_seconds: float) -> bool: + """ + Seek to specific time in video. + + Args: + time_seconds: Time in seconds to seek to + + Returns: + bool: True if seek successful + """ + if self.original_fps > 0: + frame_number = int(time_seconds * self.original_fps) + return self.seek_to_frame(frame_number) + return False + + def get_frame(self) -> Optional[np.ndarray]: + """ + Get the current frame. + + Returns: + Optional[np.ndarray]: Current frame or None if no frame available + """ + with self._frame_lock: + if self._current_frame is not None: + return self._current_frame.copy() + return None + + def get_progress(self) -> Dict[str, Any]: + """ + Get processing progress information. + + Returns: + Dict[str, Any]: Progress information + """ + if self.total_frames > 0: + progress_percent = (self._current_frame_number / self.total_frames) * 100 + else: + progress_percent = 0.0 + + elapsed_time = time.time() - self._start_time if self._start_time > 0 else 0 + + return { + 'current_frame': self._current_frame_number, + 'total_frames': self.total_frames, + 'progress_percent': progress_percent, + 'frames_processed': self._frames_processed, + 'elapsed_time': elapsed_time, + 'actual_fps': self._actual_fps, + 'is_complete': self._current_frame_number >= self.total_frames - 1 + } + + def get_stats(self) -> Dict[str, Any]: + """ + Get comprehensive video statistics. + + Returns: + Dict[str, Any]: Video statistics + """ + stats = { + 'file_path': self.config.file_path, + 'video_width': self.video_width, + 'video_height': self.video_height, + 'original_fps': self.original_fps, + 'target_fps': self.config.target_fps, + 'video_duration': self.video_duration, + 'total_frames': self.total_frames, + 'loop_enabled': self.config.loop, + 'preload_enabled': self.config.preload_frames, + 'preload_complete': self._preload_complete, + 'last_error': self._last_error, + 'is_running': self.is_running(), + 'is_paused': self.is_paused() + } + + stats.update(self.get_progress()) + return stats + + def _get_video_properties(self) -> None: + """Get video properties from OpenCV.""" + if not self.cap: + return + + self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) + self.original_fps = self.cap.get(cv2.CAP_PROP_FPS) + self.video_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + self.video_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + + if self.original_fps > 0: + self.video_duration = self.total_frames / self.original_fps + + def _validate_config(self) -> bool: + """Validate configuration against video properties.""" + if self.config.start_frame >= self.total_frames: + self._last_error = f"Start frame ({self.config.start_frame}) >= total frames ({self.total_frames})" + return False + + if self.config.end_frame and self.config.end_frame <= self.config.start_frame: + self._last_error = f"End frame ({self.config.end_frame}) <= start frame ({self.config.start_frame})" + return False + + return True + + def _preload_frames(self) -> bool: + """Preload frames into memory for faster access.""" + if not self.cap: + return False + + print("[VideoFileSource] Preloading frames...") + + try: + # Seek to start frame + self.cap.set(cv2.CAP_PROP_POS_FRAMES, self.config.start_frame) + + end_frame = self.config.end_frame or self.total_frames + max_frames = self.config.max_frames or (end_frame - self.config.start_frame) + + frames_to_load = min(max_frames, end_frame - self.config.start_frame) + + for i in range(frames_to_load): + ret, frame = self.cap.read() + if not ret or frame is None: + break + + # Apply resizing if configured + if self.config.resize_to: + frame = cv2.resize(frame, self.config.resize_to) + + self._preloaded_frames.append(frame) + + # Skip frames if configured + for _ in range(self.config.skip_frames): + self.cap.read() + + self._preload_complete = True + print(f"[VideoFileSource] Preloaded {len(self._preloaded_frames)} frames") + + return True + + except Exception as e: + self._last_error = f"Frame preloading error: {str(e)}" + return False + + def _playback_loop(self) -> None: + """Main playback loop running in separate thread.""" + print("[VideoFileSource] Playback loop started") + + self._start_time = time.time() + self._last_fps_time = self._start_time + + target_fps = self.config.target_fps or self.original_fps + frame_interval = 1.0 / target_fps if target_fps > 0 else 0.033 # Default 30 FPS + + # Set starting position + if not self.config.preload_frames: + self.cap.set(cv2.CAP_PROP_POS_FRAMES, self.config.start_frame) + + self._current_frame_number = self.config.start_frame + last_frame_time = time.time() + + while not self._stop_event.is_set(): + try: + # Handle pause + if self._pause_event.is_set(): + time.sleep(0.1) + continue + + # Control frame rate + current_time = time.time() + time_since_last = current_time - last_frame_time + + if time_since_last < frame_interval: + sleep_time = frame_interval - time_since_last + time.sleep(sleep_time) + continue + + # Get frame + frame = self._get_next_frame() + if frame is None: + if self.config.loop: + # Reset to start for looping + self._reset_to_start() + continue + else: + # End of video + break + + # Update current frame + with self._frame_lock: + self._current_frame = frame + + # Update statistics + self._frames_processed += 1 + self._fps_counter += 1 + last_frame_time = current_time + + # Calculate actual FPS + if current_time - self._last_fps_time >= 1.0: + self._actual_fps = self._fps_counter / (current_time - self._last_fps_time) + self._fps_counter = 0 + self._last_fps_time = current_time + + # Call frame callback if provided + if self.frame_callback: + try: + self.frame_callback(frame, self._current_frame_number) + except Exception as e: + print(f"[VideoFileSource] Frame callback error: {e}") + + # Check if we've reached the end frame + if self.config.end_frame and self._current_frame_number >= self.config.end_frame: + if self.config.loop: + self._reset_to_start() + else: + break + + # Check max frames limit + if self.config.max_frames and self._frames_processed >= self.config.max_frames: + break + + except Exception as e: + print(f"[VideoFileSource] Playback loop error: {e}") + self._last_error = str(e) + time.sleep(0.1) + + print("[VideoFileSource] Playback loop ended") + + def _get_next_frame(self) -> Optional[np.ndarray]: + """Get the next frame from video source.""" + if self.config.preload_frames: + # Get frame from preloaded frames + if self._current_frame_number - self.config.start_frame < len(self._preloaded_frames): + frame = self._preloaded_frames[self._current_frame_number - self.config.start_frame] + self._current_frame_number += 1 + return frame + else: + return None + else: + # Read frame from video capture + if not self.cap: + return None + + ret, frame = self.cap.read() + if not ret or frame is None: + return None + + # Apply resizing if configured + if self.config.resize_to: + frame = cv2.resize(frame, self.config.resize_to) + + self._current_frame_number += 1 + + # Skip frames if configured + for _ in range(self.config.skip_frames): + self.cap.read() + self._current_frame_number += 1 + + return frame + + def _reset_to_start(self) -> None: + """Reset video to start position for looping.""" + if self.config.preload_frames: + self._current_frame_number = self.config.start_frame + else: + if self.cap: + self.cap.set(cv2.CAP_PROP_POS_FRAMES, self.config.start_frame) + self._current_frame_number = self.config.start_frame + + def __enter__(self): + """Context manager entry.""" + if not self.start(): + raise RuntimeError(f"Failed to start video: {self._last_error}") + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + self.stop() + + +class VideoPipelineFeeder: + """ + Helper class to feed video frames to InferencePipeline. + + This class bridges the VideoFileSource and InferencePipeline, + handling frame format conversion and pipeline data feeding. + """ + + def __init__(self, video_source: VideoFileSource, pipeline): + """ + Initialize video pipeline feeder. + + Args: + video_source: VideoFileSource instance + pipeline: InferencePipeline instance + """ + self.video_source = video_source + self.pipeline = pipeline + + # Threading control + self._feed_thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._is_feeding = False + + # Statistics + self._frames_fed = 0 + self._results_collected = 0 + self._results: List[Dict[str, Any]] = [] + + def start_feeding(self) -> bool: + """ + Start feeding video frames to pipeline. + + Returns: + bool: True if feeding started successfully + """ + if self._is_feeding: + return True + + if not self.video_source.is_running(): + print("[VideoPipelineFeeder] Video source is not running") + return False + + print("[VideoPipelineFeeder] Starting frame feeding...") + + self._stop_event.clear() + self._is_feeding = True + self._feed_thread = threading.Thread(target=self._feed_loop, daemon=True) + self._feed_thread.start() + + return True + + def stop_feeding(self) -> None: + """Stop feeding frames to pipeline.""" + if not self._is_feeding: + return + + print("[VideoPipelineFeeder] Stopping frame feeding...") + + self._stop_event.set() + self._is_feeding = False + + if self._feed_thread and self._feed_thread.is_alive(): + self._feed_thread.join(timeout=2.0) + + print("[VideoPipelineFeeder] Frame feeding stopped") + + def _feed_loop(self) -> None: + """Main feeding loop.""" + while not self._stop_event.is_set(): + try: + # Get frame from video source + frame = self.video_source.get_frame() + if frame is None: + if not self.video_source.is_running(): + break + time.sleep(0.01) + continue + + # Feed frame to pipeline + try: + from InferencePipeline import PipelineData + + progress = self.video_source.get_progress() + + pipeline_data = PipelineData( + data=frame, + metadata={ + 'source': 'video_file', + 'file_path': self.video_source.config.file_path, + 'frame_number': progress['current_frame'], + 'total_frames': progress['total_frames'], + 'progress_percent': progress['progress_percent'], + 'timestamp': time.time(), + 'frame_id': self._frames_fed + } + ) + + # Put data into pipeline + self.pipeline.put_data(pipeline_data) + + self._frames_fed += 1 + + except Exception as e: + print(f"[VideoPipelineFeeder] Error feeding frame to pipeline: {e}") + time.sleep(0.1) + + except Exception as e: + print(f"[VideoPipelineFeeder] Feed loop error: {e}") + time.sleep(0.1) + + def get_results(self) -> List[Dict[str, Any]]: + """Get collected results.""" + return self._results.copy() + + def get_stats(self) -> Dict[str, Any]: + """Get feeding statistics.""" + return { + 'frames_fed': self._frames_fed, + 'results_collected': self._results_collected, + 'is_feeding': self._is_feeding, + 'video_stats': self.video_source.get_stats() + } + + +def get_video_info(file_path: str) -> Dict[str, Any]: + """ + Get video file information without opening for processing. + + Args: + file_path: Path to video file + + Returns: + Dict[str, Any]: Video information + """ + info = { + 'file_path': file_path, + 'exists': False, + 'valid': False, + 'error': None + } + + try: + if not os.path.exists(file_path): + info['error'] = 'File does not exist' + return info + + info['exists'] = True + info['file_size'] = os.path.getsize(file_path) + + cap = cv2.VideoCapture(file_path) + if not cap.isOpened(): + info['error'] = 'Cannot open video file' + return info + + info['valid'] = True + info['width'] = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + info['height'] = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + info['fps'] = cap.get(cv2.CAP_PROP_FPS) + info['frame_count'] = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + + if info['fps'] > 0: + info['duration'] = info['frame_count'] / info['fps'] + else: + info['duration'] = 0.0 + + cap.release() + + except Exception as e: + info['error'] = str(e) + + return info + + +# Example usage and testing +if __name__ == "__main__": + # Test video file processing + import sys + + if len(sys.argv) != 2: + print("Usage: python video_source.py ") + sys.exit(1) + + video_path = sys.argv[1] + + # Get video info + info = get_video_info(video_path) + print(f"Video info: {info}") + + if not info['valid']: + print(f"Cannot process video: {info['error']}") + sys.exit(1) + + # Test video processing + config = VideoFileConfig( + file_path=video_path, + target_fps=10, # Process at 10 FPS + loop=False, + start_frame=0, + max_frames=100 # Process only first 100 frames + ) + + def frame_callback(frame, frame_number): + print(f"Processing frame {frame_number}: {frame.shape}") + + video_source = VideoFileSource(config, frame_callback) + + try: + if video_source.start(): + print("Video processing started") + + # Monitor progress + while video_source.is_running(): + progress = video_source.get_progress() + print(f"Progress: {progress['progress_percent']:.1f}% " + f"({progress['current_frame']}/{progress['total_frames']})") + time.sleep(1) + + # Print final statistics + stats = video_source.get_stats() + print(f"Final statistics: {stats}") + + else: + print(f"Failed to start video processing: {video_source._last_error}") + + finally: + video_source.stop() \ No newline at end of file