From 7e173c42deca7b4a153ba9edc0fa3ac5faad16ed Mon Sep 17 00:00:00 2001 From: Masonmason Date: Wed, 16 Jul 2025 23:32:36 +0800 Subject: [PATCH] feat: Implement essential components for complete inference workflow --- .../core/functions/camera_source.py | 583 +++---------- .../core/functions/result_handler.py | 83 ++ cluster4npu_ui/core/functions/video_source.py | 802 +++--------------- .../core/functions/workflow_orchestrator.py | 163 ++++ cluster4npu_ui/ui/dialogs/deployment.py | 16 +- 5 files changed, 464 insertions(+), 1183 deletions(-) create mode 100644 cluster4npu_ui/core/functions/result_handler.py create mode 100644 cluster4npu_ui/core/functions/workflow_orchestrator.py diff --git a/cluster4npu_ui/core/functions/camera_source.py b/cluster4npu_ui/core/functions/camera_source.py index 86d430d..862a268 100644 --- a/cluster4npu_ui/core/functions/camera_source.py +++ b/cluster4npu_ui/core/functions/camera_source.py @@ -1,501 +1,132 @@ -""" -Camera input source for the Cluster4NPU inference pipeline. - -This module provides camera input capabilities with support for multiple cameras, -resolution configuration, and seamless integration with the InferencePipeline. -""" import cv2 -import numpy as np import threading import time -from typing import Optional, Callable, Tuple, Dict, Any -from dataclasses import dataclass -from abc import ABC, abstractmethod +from typing import Optional, Callable - -@dataclass -class CameraConfig: - """Configuration for camera input source.""" - camera_index: int = 0 - width: int = 640 - height: int = 480 - fps: int = 30 - format: str = 'BGR' - auto_exposure: bool = True - brightness: float = 0.5 - contrast: float = 0.5 - saturation: float = 0.5 - - -class DataSourceBase(ABC): - """Abstract base class for data sources.""" - - @abstractmethod - def start(self) -> bool: - """Start the data source.""" - pass - - @abstractmethod - def stop(self) -> None: - """Stop the data source.""" - pass - - @abstractmethod - def is_running(self) -> bool: - """Check if the data source is running.""" - pass - - @abstractmethod - def get_frame(self) -> Optional[np.ndarray]: - """Get the next frame from the source.""" - pass - - -class CameraSource(DataSourceBase): +class CameraSource: """ - Camera input source for real-time video capture. - - Features: - - Multiple camera index support - - Resolution and FPS configuration - - Format conversion (BGR → model input format) - - Error handling for camera disconnection - - Thread-safe frame capture + A class to handle camera input using cv2.VideoCapture. + It captures frames in a separate thread and can send them to a pipeline. """ - - def __init__(self, config: CameraConfig, frame_callback: Optional[Callable[[np.ndarray], None]] = None): + def __init__(self, + camera_index: int = 0, + resolution: Optional[tuple[int, int]] = None, + fps: Optional[int] = None, + data_callback: Optional[Callable[[object], None]] = None): """ - Initialize camera source. - + Initializes the CameraSource. + Args: - config: Camera configuration - frame_callback: Optional callback for each captured frame + camera_index (int): The index of the camera to use. + resolution (Optional[tuple[int, int]]): The desired resolution (width, height). + fps (Optional[int]): The desired frames per second. + data_callback (Optional[Callable[[object], None]]): A callback function to send data to. """ - self.config = config - self.frame_callback = frame_callback - - # Camera capture object - self.cap: Optional[cv2.VideoCapture] = None - - # Threading control - self._capture_thread: Optional[threading.Thread] = None + self.camera_index = camera_index + self.resolution = resolution + self.fps = fps + self.data_callback = data_callback + + self.cap = None + self.running = False + self.thread = None self._stop_event = threading.Event() - self._frame_lock = threading.Lock() - - # Current frame storage - self._current_frame: Optional[np.ndarray] = None - self._frame_count = 0 - self._fps_counter = 0 - self._last_fps_time = time.time() - self._actual_fps = 0.0 - - # Error handling - self._connection_lost = False - self._last_error: Optional[str] = None - - def start(self) -> bool: + + def initialize(self) -> bool: """ - Start camera capture. - + Initializes the camera capture. + Returns: - bool: True if camera started successfully, False otherwise + bool: True if initialization is successful, False otherwise. """ - if self.is_running(): - return True - - try: - # Initialize camera - self.cap = cv2.VideoCapture(self.config.camera_index) - - if not self.cap.isOpened(): - self._last_error = f"Failed to open camera {self.config.camera_index}" - return False - - # Configure camera properties - self._configure_camera() - - # Test camera capture - ret, frame = self.cap.read() - if not ret or frame is None: - self._last_error = "Failed to read initial frame from camera" - self.cap.release() - self.cap = None - return False - - print(f"[CameraSource] Camera {self.config.camera_index} opened successfully") - print(f"[CameraSource] Resolution: {self.config.width}x{self.config.height}, FPS: {self.config.fps}") - - # Start capture thread - self._stop_event.clear() - self._connection_lost = False - self._capture_thread = threading.Thread(target=self._capture_loop, daemon=True) - self._capture_thread.start() - - return True - - except Exception as e: - self._last_error = f"Camera initialization error: {str(e)}" - if self.cap: - self.cap.release() - self.cap = None + print(f"Initializing camera at index {self.camera_index}...") + self.cap = cv2.VideoCapture(self.camera_index) + if not self.cap.isOpened(): + print(f"Error: Could not open camera at index {self.camera_index}.") return False - - def stop(self) -> None: - """Stop camera capture.""" - if not self.is_running(): + + if self.resolution: + self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0]) + self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1]) + + if self.fps: + self.cap.set(cv2.CAP_PROP_FPS, self.fps) + + print("Camera initialized successfully.") + return True + + def start(self): + """ + Starts the frame capture thread. + """ + if self.running: + print("Camera source is already running.") return + + if not self.cap or not self.cap.isOpened(): + if not self.initialize(): + return + + self.running = True + self._stop_event.clear() + self.thread = threading.Thread(target=self._capture_loop, daemon=True) + self.thread.start() + print("Camera capture thread started.") + + def stop(self): + """ + Stops the frame capture thread. + """ + self.running = False + if self.thread and self.thread.is_alive(): + self._stop_event.set() + self.thread.join(timeout=2) - print("[CameraSource] Stopping camera capture...") - - # Signal stop - self._stop_event.set() - - # Wait for capture thread to finish - if self._capture_thread and self._capture_thread.is_alive(): - self._capture_thread.join(timeout=2.0) - - # Release camera - if self.cap: + if self.cap and self.cap.isOpened(): self.cap.release() self.cap = None - - # Clear current frame - with self._frame_lock: - self._current_frame = None - - print("[CameraSource] Camera capture stopped") - - def is_running(self) -> bool: - """Check if camera is currently running.""" - return (self.cap is not None and - self.cap.isOpened() and - self._capture_thread is not None and - self._capture_thread.is_alive() and - not self._stop_event.is_set()) - - def get_frame(self) -> Optional[np.ndarray]: - """ - Get the latest captured frame. - - Returns: - Optional[np.ndarray]: Latest frame or None if no frame available - """ - with self._frame_lock: - if self._current_frame is not None: - return self._current_frame.copy() - return None - - def get_stats(self) -> Dict[str, Any]: - """ - Get camera statistics. - - Returns: - Dict[str, Any]: Statistics including FPS, frame count, etc. - """ - return { - 'frame_count': self._frame_count, - 'actual_fps': self._actual_fps, - 'target_fps': self.config.fps, - 'resolution': (self.config.width, self.config.height), - 'camera_index': self.config.camera_index, - 'connection_lost': self._connection_lost, - 'last_error': self._last_error, - 'is_running': self.is_running() - } - - def _configure_camera(self) -> None: - """Configure camera properties.""" - if not self.cap: - return - - # Set resolution - self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.config.width) - self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.config.height) - - # Set FPS - self.cap.set(cv2.CAP_PROP_FPS, self.config.fps) - - # Set other properties - if hasattr(cv2, 'CAP_PROP_AUTO_EXPOSURE'): - self.cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 1 if self.config.auto_exposure else 0) - - self.cap.set(cv2.CAP_PROP_BRIGHTNESS, self.config.brightness) - self.cap.set(cv2.CAP_PROP_CONTRAST, self.config.contrast) - self.cap.set(cv2.CAP_PROP_SATURATION, self.config.saturation) - - # Verify actual settings - actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - actual_fps = self.cap.get(cv2.CAP_PROP_FPS) - - print(f"[CameraSource] Actual resolution: {actual_width}x{actual_height}, FPS: {actual_fps}") - - def _capture_loop(self) -> None: - """Main capture loop running in separate thread.""" - print("[CameraSource] Capture loop started") - - frame_interval = 1.0 / self.config.fps - last_capture_time = time.time() - - while not self._stop_event.is_set(): - try: - # Control frame rate - current_time = time.time() - time_since_last = current_time - last_capture_time - - if time_since_last < frame_interval: - sleep_time = frame_interval - time_since_last - time.sleep(sleep_time) - continue - - last_capture_time = current_time - - # Capture frame - if not self.cap or not self.cap.isOpened(): - self._connection_lost = True - break - - ret, frame = self.cap.read() - - if not ret or frame is None: - print("[CameraSource] Failed to read frame from camera") - self._connection_lost = True - break - - # Update frame - with self._frame_lock: - self._current_frame = frame - - # Update statistics - self._frame_count += 1 - self._fps_counter += 1 - - # Calculate actual FPS - if current_time - self._last_fps_time >= 1.0: - self._actual_fps = self._fps_counter / (current_time - self._last_fps_time) - self._fps_counter = 0 - self._last_fps_time = current_time - - # Call frame callback if provided - if self.frame_callback: - try: - self.frame_callback(frame) - except Exception as e: - print(f"[CameraSource] Frame callback error: {e}") - - except Exception as e: - print(f"[CameraSource] Capture loop error: {e}") - self._last_error = str(e) - time.sleep(0.1) # Brief pause before retrying - - print("[CameraSource] Capture loop ended") - - def __enter__(self): - """Context manager entry.""" - if not self.start(): - raise RuntimeError(f"Failed to start camera: {self._last_error}") - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit.""" - self.stop() + print("Camera source stopped.") + def _capture_loop(self): + """ + The main loop for capturing frames from the camera. + """ + while self.running and not self._stop_event.is_set(): + ret, frame = self.cap.read() + if not ret: + print("Error: Could not read frame from camera. Reconnecting...") + self.cap.release() + time.sleep(1) + self.initialize() + continue -class CameraPipelineFeeder: - """ - Helper class to feed camera frames to InferencePipeline. - - This class bridges the CameraSource and InferencePipeline, - handling frame format conversion and pipeline data feeding. - """ - - def __init__(self, camera_source: CameraSource, pipeline, feed_rate: float = 30.0): - """ - Initialize camera pipeline feeder. - - Args: - camera_source: CameraSource instance - pipeline: InferencePipeline instance - feed_rate: Rate at which to feed frames to pipeline (FPS) - """ - self.camera_source = camera_source - self.pipeline = pipeline - self.feed_rate = feed_rate - - # Threading control - self._feed_thread: Optional[threading.Thread] = None - self._stop_event = threading.Event() - self._is_feeding = False - - # Statistics - self._frames_fed = 0 - self._last_feed_time = 0.0 - - def start_feeding(self) -> bool: - """ - Start feeding camera frames to pipeline. - - Returns: - bool: True if feeding started successfully - """ - if self._is_feeding: - return True - - if not self.camera_source.is_running(): - print("[CameraPipelineFeeder] Camera is not running") - return False - - print("[CameraPipelineFeeder] Starting frame feeding...") - - self._stop_event.clear() - self._is_feeding = True - self._feed_thread = threading.Thread(target=self._feed_loop, daemon=True) - self._feed_thread.start() - - return True - - def stop_feeding(self) -> None: - """Stop feeding frames to pipeline.""" - if not self._is_feeding: - return - - print("[CameraPipelineFeeder] Stopping frame feeding...") - - self._stop_event.set() - self._is_feeding = False - - if self._feed_thread and self._feed_thread.is_alive(): - self._feed_thread.join(timeout=2.0) - - print("[CameraPipelineFeeder] Frame feeding stopped") - - def _feed_loop(self) -> None: - """Main feeding loop.""" - feed_interval = 1.0 / self.feed_rate - last_feed_time = time.time() - - while not self._stop_event.is_set(): - try: - current_time = time.time() - - # Control feed rate - if current_time - last_feed_time < feed_interval: - time.sleep(0.001) # Small sleep to prevent busy waiting - continue - - # Get frame from camera - frame = self.camera_source.get_frame() - if frame is None: - time.sleep(0.01) - continue - - # Feed frame to pipeline + if self.data_callback: try: - from InferencePipeline import PipelineData - - pipeline_data = PipelineData( - data=frame, - metadata={ - 'source': 'camera', - 'camera_index': self.camera_source.config.camera_index, - 'timestamp': current_time, - 'frame_id': self._frames_fed - } - ) - - # Put data into pipeline - self.pipeline.put_data(pipeline_data) - - self._frames_fed += 1 - last_feed_time = current_time - + # Assuming the callback is thread-safe or handles its own locking + self.data_callback(frame) except Exception as e: - print(f"[CameraPipelineFeeder] Error feeding frame to pipeline: {e}") - time.sleep(0.1) - - except Exception as e: - print(f"[CameraPipelineFeeder] Feed loop error: {e}") - time.sleep(0.1) - - def get_stats(self) -> Dict[str, Any]: - """Get feeding statistics.""" - return { - 'frames_fed': self._frames_fed, - 'feed_rate': self.feed_rate, - 'is_feeding': self._is_feeding, - 'camera_stats': self.camera_source.get_stats() - } + print(f"Error in data_callback: {e}") + + # Control frame rate if FPS is set + if self.fps: + time.sleep(1.0 / self.fps) + def set_data_callback(self, callback: Callable[[object], None]): + """ + Sets the data callback function. + """ + self.data_callback = callback -def list_available_cameras() -> Dict[int, Dict[str, Any]]: - """ - List all available camera devices. - - Returns: - Dict[int, Dict[str, Any]]: Dictionary of camera index to camera info - """ - cameras = {} - - for i in range(10): # Check first 10 camera indices - cap = cv2.VideoCapture(i) - if cap.isOpened(): - # Get camera properties - width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - fps = cap.get(cv2.CAP_PROP_FPS) - - cameras[i] = { - 'index': i, - 'width': width, - 'height': height, - 'fps': fps, - 'backend': cap.getBackendName() if hasattr(cap, 'getBackendName') else 'Unknown' - } - - cap.release() - - return cameras - - -# Example usage and testing -if __name__ == "__main__": - # List available cameras - print("Available cameras:") - cameras = list_available_cameras() - for idx, info in cameras.items(): - print(f" Camera {idx}: {info['width']}x{info['height']} @ {info['fps']} FPS ({info['backend']})") - - if not cameras: - print("No cameras found!") - exit(1) - - # Test camera capture - config = CameraConfig( - camera_index=0, - width=640, - height=480, - fps=30 - ) - - def frame_callback(frame): - print(f"Frame captured: {frame.shape}") - - camera = CameraSource(config, frame_callback) - - try: - if camera.start(): - print("Camera started successfully") - - # Capture for 5 seconds - time.sleep(5) - - # Print statistics - stats = camera.get_stats() - print(f"Statistics: {stats}") - - else: - print("Failed to start camera") - - finally: - camera.stop() \ No newline at end of file + def get_frame(self) -> Optional[object]: + """ + Gets a single frame from the camera. Not recommended for continuous capture. + """ + if not self.cap or not self.cap.isOpened(): + if not self.initialize(): + return None + + ret, frame = self.cap.read() + if not ret: + return None + return frame diff --git a/cluster4npu_ui/core/functions/result_handler.py b/cluster4npu_ui/core/functions/result_handler.py new file mode 100644 index 0000000..5078b35 --- /dev/null +++ b/cluster4npu_ui/core/functions/result_handler.py @@ -0,0 +1,83 @@ + +import json +import csv +import os +import time +from typing import Any, Dict, List + +class ResultSerializer: + """ + Serializes inference results into various formats. + """ + def to_json(self, data: Dict[str, Any]) -> str: + """ + Serializes data to a JSON string. + """ + return json.dumps(data, indent=2) + + def to_csv(self, data: List[Dict[str, Any]], fieldnames: List[str]) -> str: + """ + Serializes data to a CSV string. + """ + import io + output = io.StringIO() + writer = csv.DictWriter(output, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(data) + return output.getvalue() + +class FileOutputManager: + """ + Manages writing results to files with timestamped names and directory organization. + """ + def __init__(self, base_path: str = "./output"): + """ + Initializes the FileOutputManager. + + Args: + base_path (str): The base directory to save output files. + """ + self.base_path = base_path + self.serializer = ResultSerializer() + + def save_result(self, result_data: Dict[str, Any], pipeline_name: str, format: str = 'json'): + """ + Saves a single result to a file. + + Args: + result_data (Dict[str, Any]): The result data to save. + pipeline_name (str): The name of the pipeline that generated the result. + format (str): The format to save the result in ('json' or 'csv'). + """ + try: + # Create directory structure + today = time.strftime("%Y-%m-%d") + output_dir = os.path.join(self.base_path, pipeline_name, today) + os.makedirs(output_dir, exist_ok=True) + + # Create filename + timestamp = time.strftime("%Y%m%d_%H%M%S") + filename = f"{timestamp}_{result_data.get('pipeline_id', 'result')}.{format}" + file_path = os.path.join(output_dir, filename) + + # Serialize and save + if format == 'json': + content = self.serializer.to_json(result_data) + with open(file_path, 'w') as f: + f.write(content) + elif format == 'csv': + # For CSV, we expect a list of dicts. If it's a single dict, wrap it. + data_to_save = result_data if isinstance(result_data, list) else [result_data] + if data_to_save: + fieldnames = list(data_to_save[0].keys()) + content = self.serializer.to_csv(data_to_save, fieldnames) + with open(file_path, 'w') as f: + f.write(content) + else: + print(f"Error: Unsupported format '{format}'") + return + + print(f"Result saved to {file_path}") + + except Exception as e: + print(f"Error saving result: {e}") diff --git a/cluster4npu_ui/core/functions/video_source.py b/cluster4npu_ui/core/functions/video_source.py index 8e94200..623f53b 100644 --- a/cluster4npu_ui/core/functions/video_source.py +++ b/cluster4npu_ui/core/functions/video_source.py @@ -1,725 +1,129 @@ -""" -Video file input source for the Cluster4NPU inference pipeline. - -This module provides video file input capabilities with support for common video formats, -frame rate control, seeking, and batch processing capabilities. -""" import cv2 -import numpy as np import threading import time -import os -from typing import Optional, Callable, Tuple, Dict, Any, List -from dataclasses import dataclass -from pathlib import Path -from camera_source import DataSourceBase +from typing import Optional, Callable - -@dataclass -class VideoFileConfig: - """Configuration for video file input source.""" - file_path: str - target_fps: Optional[float] = None # If None, use original video FPS - loop: bool = False - start_frame: int = 0 - end_frame: Optional[int] = None # If None, process until end - skip_frames: int = 0 # Skip every N frames - max_frames: Optional[int] = None # Maximum frames to process - resize_to: Optional[Tuple[int, int]] = None # (width, height) - preload_frames: bool = False # Preload frames to memory for faster access - - -class VideoFileSource(DataSourceBase): +class VideoFileSource: """ - Video file input source for processing video files frame by frame. - - Features: - - Support common video formats (MP4, AVI, MOV, MKV, etc.) - - Frame rate control and seeking - - Batch processing capabilities - - Progress tracking - - Loop playback support - - Frame skipping and range selection + A class to handle video file input using cv2.VideoCapture. + It reads frames from a video file and can send them to a pipeline. """ - - def __init__(self, config: VideoFileConfig, frame_callback: Optional[Callable[[np.ndarray, int], None]] = None): + def __init__(self, + file_path: str, + data_callback: Optional[Callable[[object], None]] = None, + loop: bool = False): """ - Initialize video file source. - + Initializes the VideoFileSource. + Args: - config: Video file configuration - frame_callback: Optional callback for each processed frame (frame, frame_number) + file_path (str): The path to the video file. + data_callback (Optional[Callable[[object], None]]): A callback function to send data to. + loop (bool): Whether to loop the video when it ends. """ - self.config = config - self.frame_callback = frame_callback - - # Video capture object - self.cap: Optional[cv2.VideoCapture] = None - - # Video properties - self.total_frames = 0 - self.original_fps = 0.0 - self.video_width = 0 - self.video_height = 0 - self.video_duration = 0.0 - - # Threading control - self._playback_thread: Optional[threading.Thread] = None + self.file_path = file_path + self.data_callback = data_callback + self.loop = loop + + self.cap = None + self.running = False + self.thread = None self._stop_event = threading.Event() - self._pause_event = threading.Event() - self._frame_lock = threading.Lock() - - # Current state - self._current_frame: Optional[np.ndarray] = None - self._current_frame_number = 0 - self._frames_processed = 0 - self._playback_started = False - - # Progress tracking - self._start_time = 0.0 - self._actual_fps = 0.0 - self._fps_counter = 0 - self._last_fps_time = 0.0 - - # Error handling - self._last_error: Optional[str] = None - - # Frame preloading - self._preloaded_frames: List[np.ndarray] = [] - self._preload_complete = False - - def start(self) -> bool: + self.fps = 0 + + def initialize(self) -> bool: """ - Start video file processing. - + Initializes the video capture from the file. + Returns: - bool: True if video started successfully, False otherwise + bool: True if initialization is successful, False otherwise. """ - if self.is_running(): - return True - - try: - # Check if file exists - if not os.path.exists(self.config.file_path): - self._last_error = f"Video file not found: {self.config.file_path}" - return False - - # Open video file - self.cap = cv2.VideoCapture(self.config.file_path) - - if not self.cap.isOpened(): - self._last_error = f"Failed to open video file: {self.config.file_path}" - return False - - # Get video properties - self._get_video_properties() - - # Validate configuration - if not self._validate_config(): - return False - - # Preload frames if requested - if self.config.preload_frames: - if not self._preload_frames(): - return False - - print(f"[VideoFileSource] Video opened successfully") - print(f"[VideoFileSource] File: {self.config.file_path}") - print(f"[VideoFileSource] Resolution: {self.video_width}x{self.video_height}") - print(f"[VideoFileSource] FPS: {self.original_fps}, Duration: {self.video_duration:.2f}s") - print(f"[VideoFileSource] Total frames: {self.total_frames}") - - # Start playback thread - self._stop_event.clear() - self._pause_event.clear() - self._playback_thread = threading.Thread(target=self._playback_loop, daemon=True) - self._playback_thread.start() - - return True - - except Exception as e: - self._last_error = f"Video initialization error: {str(e)}" - if self.cap: - self.cap.release() - self.cap = None + print(f"Initializing video source from {self.file_path}...") + self.cap = cv2.VideoCapture(self.file_path) + if not self.cap.isOpened(): + print(f"Error: Could not open video file {self.file_path}.") return False - - def stop(self) -> None: - """Stop video file processing.""" - if not self.is_running(): + + self.fps = self.cap.get(cv2.CAP_PROP_FPS) + if self.fps == 0: + print("Warning: Could not determine video FPS. Defaulting to 30.") + self.fps = 30 + + print(f"Video source initialized successfully. FPS: {self.fps}") + return True + + def start(self): + """ + Starts the frame reading thread. + """ + if self.running: + print("Video source is already running.") return + + if not self.cap or not self.cap.isOpened(): + if not self.initialize(): + return + + self.running = True + self._stop_event.clear() + self.thread = threading.Thread(target=self._capture_loop, daemon=True) + self.thread.start() + print("Video capture thread started.") + + def stop(self): + """ + Stops the frame reading thread. + """ + self.running = False + if self.thread and self.thread.is_alive(): + self._stop_event.set() + self.thread.join(timeout=2) - print("[VideoFileSource] Stopping video processing...") - - # Signal stop - self._stop_event.set() - - # Wait for playback thread to finish - if self._playback_thread and self._playback_thread.is_alive(): - self._playback_thread.join(timeout=2.0) - - # Release video capture - if self.cap: + if self.cap and self.cap.isOpened(): self.cap.release() self.cap = None - - # Clear current frame - with self._frame_lock: - self._current_frame = None - - # Clear preloaded frames - self._preloaded_frames.clear() - - print("[VideoFileSource] Video processing stopped") - - def pause(self) -> None: - """Pause video playback.""" - if self.is_running(): - self._pause_event.set() - print("[VideoFileSource] Video paused") - - def resume(self) -> None: - """Resume video playback.""" - if self.is_running(): - self._pause_event.clear() - print("[VideoFileSource] Video resumed") - - def is_running(self) -> bool: - """Check if video processing is currently running.""" - return (self.cap is not None and - self._playback_thread is not None and - self._playback_thread.is_alive() and - not self._stop_event.is_set()) - - def is_paused(self) -> bool: - """Check if video playback is paused.""" - return self._pause_event.is_set() - - def seek_to_frame(self, frame_number: int) -> bool: + print("Video source stopped.") + + def _capture_loop(self): """ - Seek to specific frame number. - - Args: - frame_number: Frame number to seek to - - Returns: - bool: True if seek successful + The main loop for reading frames from the video file. """ - if not self.cap: - return False - - try: - frame_number = max(0, min(frame_number, self.total_frames - 1)) - self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) - self._current_frame_number = frame_number - return True - except Exception as e: - self._last_error = f"Seek error: {str(e)}" - return False - - def seek_to_time(self, time_seconds: float) -> bool: - """ - Seek to specific time in video. - - Args: - time_seconds: Time in seconds to seek to - - Returns: - bool: True if seek successful - """ - if self.original_fps > 0: - frame_number = int(time_seconds * self.original_fps) - return self.seek_to_frame(frame_number) - return False - - def get_frame(self) -> Optional[np.ndarray]: - """ - Get the current frame. - - Returns: - Optional[np.ndarray]: Current frame or None if no frame available - """ - with self._frame_lock: - if self._current_frame is not None: - return self._current_frame.copy() - return None - - def get_progress(self) -> Dict[str, Any]: - """ - Get processing progress information. - - Returns: - Dict[str, Any]: Progress information - """ - if self.total_frames > 0: - progress_percent = (self._current_frame_number / self.total_frames) * 100 - else: - progress_percent = 0.0 - - elapsed_time = time.time() - self._start_time if self._start_time > 0 else 0 - - return { - 'current_frame': self._current_frame_number, - 'total_frames': self.total_frames, - 'progress_percent': progress_percent, - 'frames_processed': self._frames_processed, - 'elapsed_time': elapsed_time, - 'actual_fps': self._actual_fps, - 'is_complete': self._current_frame_number >= self.total_frames - 1 - } - - def get_stats(self) -> Dict[str, Any]: - """ - Get comprehensive video statistics. - - Returns: - Dict[str, Any]: Video statistics - """ - stats = { - 'file_path': self.config.file_path, - 'video_width': self.video_width, - 'video_height': self.video_height, - 'original_fps': self.original_fps, - 'target_fps': self.config.target_fps, - 'video_duration': self.video_duration, - 'total_frames': self.total_frames, - 'loop_enabled': self.config.loop, - 'preload_enabled': self.config.preload_frames, - 'preload_complete': self._preload_complete, - 'last_error': self._last_error, - 'is_running': self.is_running(), - 'is_paused': self.is_paused() - } - - stats.update(self.get_progress()) - return stats - - def _get_video_properties(self) -> None: - """Get video properties from OpenCV.""" - if not self.cap: - return - - self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) - self.original_fps = self.cap.get(cv2.CAP_PROP_FPS) - self.video_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - self.video_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - - if self.original_fps > 0: - self.video_duration = self.total_frames / self.original_fps - - def _validate_config(self) -> bool: - """Validate configuration against video properties.""" - if self.config.start_frame >= self.total_frames: - self._last_error = f"Start frame ({self.config.start_frame}) >= total frames ({self.total_frames})" - return False - - if self.config.end_frame and self.config.end_frame <= self.config.start_frame: - self._last_error = f"End frame ({self.config.end_frame}) <= start frame ({self.config.start_frame})" - return False - - return True - - def _preload_frames(self) -> bool: - """Preload frames into memory for faster access.""" - if not self.cap: - return False - - print("[VideoFileSource] Preloading frames...") - - try: - # Seek to start frame - self.cap.set(cv2.CAP_PROP_POS_FRAMES, self.config.start_frame) - - end_frame = self.config.end_frame or self.total_frames - max_frames = self.config.max_frames or (end_frame - self.config.start_frame) - - frames_to_load = min(max_frames, end_frame - self.config.start_frame) - - for i in range(frames_to_load): - ret, frame = self.cap.read() - if not ret or frame is None: - break - - # Apply resizing if configured - if self.config.resize_to: - frame = cv2.resize(frame, self.config.resize_to) - - self._preloaded_frames.append(frame) - - # Skip frames if configured - for _ in range(self.config.skip_frames): - self.cap.read() - - self._preload_complete = True - print(f"[VideoFileSource] Preloaded {len(self._preloaded_frames)} frames") - - return True - - except Exception as e: - self._last_error = f"Frame preloading error: {str(e)}" - return False - - def _playback_loop(self) -> None: - """Main playback loop running in separate thread.""" - print("[VideoFileSource] Playback loop started") - - self._start_time = time.time() - self._last_fps_time = self._start_time - - target_fps = self.config.target_fps or self.original_fps - frame_interval = 1.0 / target_fps if target_fps > 0 else 0.033 # Default 30 FPS - - # Set starting position - if not self.config.preload_frames: - self.cap.set(cv2.CAP_PROP_POS_FRAMES, self.config.start_frame) - - self._current_frame_number = self.config.start_frame - last_frame_time = time.time() - - while not self._stop_event.is_set(): - try: - # Handle pause - if self._pause_event.is_set(): - time.sleep(0.1) - continue - - # Control frame rate - current_time = time.time() - time_since_last = current_time - last_frame_time - - if time_since_last < frame_interval: - sleep_time = frame_interval - time_since_last - time.sleep(sleep_time) - continue - - # Get frame - frame = self._get_next_frame() - if frame is None: - if self.config.loop: - # Reset to start for looping - self._reset_to_start() - continue - else: - # End of video - break - - # Update current frame - with self._frame_lock: - self._current_frame = frame - - # Update statistics - self._frames_processed += 1 - self._fps_counter += 1 - last_frame_time = current_time - - # Calculate actual FPS - if current_time - self._last_fps_time >= 1.0: - self._actual_fps = self._fps_counter / (current_time - self._last_fps_time) - self._fps_counter = 0 - self._last_fps_time = current_time - - # Call frame callback if provided - if self.frame_callback: - try: - self.frame_callback(frame, self._current_frame_number) - except Exception as e: - print(f"[VideoFileSource] Frame callback error: {e}") - - # Check if we've reached the end frame - if self.config.end_frame and self._current_frame_number >= self.config.end_frame: - if self.config.loop: - self._reset_to_start() - else: - break - - # Check max frames limit - if self.config.max_frames and self._frames_processed >= self.config.max_frames: - break - - except Exception as e: - print(f"[VideoFileSource] Playback loop error: {e}") - self._last_error = str(e) - time.sleep(0.1) - - print("[VideoFileSource] Playback loop ended") - - def _get_next_frame(self) -> Optional[np.ndarray]: - """Get the next frame from video source.""" - if self.config.preload_frames: - # Get frame from preloaded frames - if self._current_frame_number - self.config.start_frame < len(self._preloaded_frames): - frame = self._preloaded_frames[self._current_frame_number - self.config.start_frame] - self._current_frame_number += 1 - return frame - else: - return None - else: - # Read frame from video capture - if not self.cap: - return None - + while self.running and not self._stop_event.is_set(): ret, frame = self.cap.read() - if not ret or frame is None: - return None - - # Apply resizing if configured - if self.config.resize_to: - frame = cv2.resize(frame, self.config.resize_to) - - self._current_frame_number += 1 - - # Skip frames if configured - for _ in range(self.config.skip_frames): - self.cap.read() - self._current_frame_number += 1 - - return frame - - def _reset_to_start(self) -> None: - """Reset video to start position for looping.""" - if self.config.preload_frames: - self._current_frame_number = self.config.start_frame - else: - if self.cap: - self.cap.set(cv2.CAP_PROP_POS_FRAMES, self.config.start_frame) - self._current_frame_number = self.config.start_frame - - def __enter__(self): - """Context manager entry.""" - if not self.start(): - raise RuntimeError(f"Failed to start video: {self._last_error}") - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """Context manager exit.""" - self.stop() - - -class VideoPipelineFeeder: - """ - Helper class to feed video frames to InferencePipeline. - - This class bridges the VideoFileSource and InferencePipeline, - handling frame format conversion and pipeline data feeding. - """ - - def __init__(self, video_source: VideoFileSource, pipeline): - """ - Initialize video pipeline feeder. - - Args: - video_source: VideoFileSource instance - pipeline: InferencePipeline instance - """ - self.video_source = video_source - self.pipeline = pipeline - - # Threading control - self._feed_thread: Optional[threading.Thread] = None - self._stop_event = threading.Event() - self._is_feeding = False - - # Statistics - self._frames_fed = 0 - self._results_collected = 0 - self._results: List[Dict[str, Any]] = [] - - def start_feeding(self) -> bool: - """ - Start feeding video frames to pipeline. - - Returns: - bool: True if feeding started successfully - """ - if self._is_feeding: - return True - - if not self.video_source.is_running(): - print("[VideoPipelineFeeder] Video source is not running") - return False - - print("[VideoPipelineFeeder] Starting frame feeding...") - - self._stop_event.clear() - self._is_feeding = True - self._feed_thread = threading.Thread(target=self._feed_loop, daemon=True) - self._feed_thread.start() - - return True - - def stop_feeding(self) -> None: - """Stop feeding frames to pipeline.""" - if not self._is_feeding: - return - - print("[VideoPipelineFeeder] Stopping frame feeding...") - - self._stop_event.set() - self._is_feeding = False - - if self._feed_thread and self._feed_thread.is_alive(): - self._feed_thread.join(timeout=2.0) - - print("[VideoPipelineFeeder] Frame feeding stopped") - - def _feed_loop(self) -> None: - """Main feeding loop.""" - while not self._stop_event.is_set(): - try: - # Get frame from video source - frame = self.video_source.get_frame() - if frame is None: - if not self.video_source.is_running(): - break - time.sleep(0.01) + if not ret: + if self.loop: + print("Video ended, looping...") + self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0) continue - - # Feed frame to pipeline + else: + print("Video ended.") + self.running = False + break + + if self.data_callback: try: - from InferencePipeline import PipelineData - - progress = self.video_source.get_progress() - - pipeline_data = PipelineData( - data=frame, - metadata={ - 'source': 'video_file', - 'file_path': self.video_source.config.file_path, - 'frame_number': progress['current_frame'], - 'total_frames': progress['total_frames'], - 'progress_percent': progress['progress_percent'], - 'timestamp': time.time(), - 'frame_id': self._frames_fed - } - ) - - # Put data into pipeline - self.pipeline.put_data(pipeline_data) - - self._frames_fed += 1 - + self.data_callback(frame) except Exception as e: - print(f"[VideoPipelineFeeder] Error feeding frame to pipeline: {e}") - time.sleep(0.1) - - except Exception as e: - print(f"[VideoPipelineFeeder] Feed loop error: {e}") - time.sleep(0.1) - - def get_results(self) -> List[Dict[str, Any]]: - """Get collected results.""" - return self._results.copy() - - def get_stats(self) -> Dict[str, Any]: - """Get feeding statistics.""" - return { - 'frames_fed': self._frames_fed, - 'results_collected': self._results_collected, - 'is_feeding': self._is_feeding, - 'video_stats': self.video_source.get_stats() - } - - -def get_video_info(file_path: str) -> Dict[str, Any]: - """ - Get video file information without opening for processing. - - Args: - file_path: Path to video file - - Returns: - Dict[str, Any]: Video information - """ - info = { - 'file_path': file_path, - 'exists': False, - 'valid': False, - 'error': None - } - - try: - if not os.path.exists(file_path): - info['error'] = 'File does not exist' - return info - - info['exists'] = True - info['file_size'] = os.path.getsize(file_path) - - cap = cv2.VideoCapture(file_path) - if not cap.isOpened(): - info['error'] = 'Cannot open video file' - return info - - info['valid'] = True - info['width'] = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - info['height'] = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - info['fps'] = cap.get(cv2.CAP_PROP_FPS) - info['frame_count'] = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) - - if info['fps'] > 0: - info['duration'] = info['frame_count'] / info['fps'] - else: - info['duration'] = 0.0 - - cap.release() - - except Exception as e: - info['error'] = str(e) - - return info - - -# Example usage and testing -if __name__ == "__main__": - # Test video file processing - import sys - - if len(sys.argv) != 2: - print("Usage: python video_source.py ") - sys.exit(1) - - video_path = sys.argv[1] - - # Get video info - info = get_video_info(video_path) - print(f"Video info: {info}") - - if not info['valid']: - print(f"Cannot process video: {info['error']}") - sys.exit(1) - - # Test video processing - config = VideoFileConfig( - file_path=video_path, - target_fps=10, # Process at 10 FPS - loop=False, - start_frame=0, - max_frames=100 # Process only first 100 frames - ) - - def frame_callback(frame, frame_number): - print(f"Processing frame {frame_number}: {frame.shape}") - - video_source = VideoFileSource(config, frame_callback) - - try: - if video_source.start(): - print("Video processing started") + print(f"Error in data_callback: {e}") - # Monitor progress - while video_source.is_running(): - progress = video_source.get_progress() - print(f"Progress: {progress['progress_percent']:.1f}% " - f"({progress['current_frame']}/{progress['total_frames']})") - time.sleep(1) - - # Print final statistics - stats = video_source.get_stats() - print(f"Final statistics: {stats}") - - else: - print(f"Failed to start video processing: {video_source._last_error}") - - finally: - video_source.stop() \ No newline at end of file + # Control frame rate + time.sleep(1.0 / self.fps) + + def set_data_callback(self, callback: Callable[[object], None]): + """ + Sets the data callback function. + """ + self.data_callback = callback + + def get_frame(self) -> Optional[object]: + """ + Gets a single frame from the video. Not recommended for continuous capture. + """ + if not self.cap or not self.cap.isOpened(): + if not self.initialize(): + return None + + ret, frame = self.cap.read() + if not ret: + return None + return frame diff --git a/cluster4npu_ui/core/functions/workflow_orchestrator.py b/cluster4npu_ui/core/functions/workflow_orchestrator.py new file mode 100644 index 0000000..d3aedac --- /dev/null +++ b/cluster4npu_ui/core/functions/workflow_orchestrator.py @@ -0,0 +1,163 @@ + +import threading +import time +from typing import Any, Dict, Optional + +from .InferencePipeline import InferencePipeline, PipelineData +from .camera_source import CameraSource +from .video_source import VideoFileSource +from .result_handler import FileOutputManager +# Import other data sources as they are created + +class WorkflowOrchestrator: + """ + Coordinates the entire data flow from input source to the inference pipeline + and handles the results. + """ + def __init__(self, pipeline: InferencePipeline, input_config: Dict[str, Any], output_config: Dict[str, Any]): + """ + Initializes the WorkflowOrchestrator. + + Args: + pipeline (InferencePipeline): The configured inference pipeline. + input_config (Dict[str, Any]): The configuration for the input source. + output_config (Dict[str, Any]): The configuration for the output. + """ + self.pipeline = pipeline + self.input_config = input_config + self.output_config = output_config + self.data_source = None + self.result_handler = None + self.running = False + self._stop_event = threading.Event() + + def start(self): + """ + Starts the workflow, including the data source and the pipeline. + """ + if self.running: + print("Workflow is already running.") + return + + print("Starting workflow orchestrator...") + self.running = True + self._stop_event.clear() + + # Create the result handler + self.result_handler = self._create_result_handler() + + # Create and start the data source + self.data_source = self._create_data_source() + if not self.data_source: + print("Error: Could not create data source. Aborting workflow.") + self.running = False + return + + # Set the pipeline's put_data method as the callback + self.data_source.set_data_callback(self.pipeline.put_data) + + # Set the result callback on the pipeline + if self.result_handler: + self.pipeline.set_result_callback(self.handle_result) + + # Start the pipeline + self.pipeline.initialize() + self.pipeline.start() + + # Start the data source + self.data_source.start() + + print("Workflow orchestrator started successfully.") + + def stop(self): + """ + Stops the workflow gracefully. + """ + if not self.running: + return + + print("Stopping workflow orchestrator...") + self.running = False + self._stop_event.set() + + if self.data_source: + self.data_source.stop() + + if self.pipeline: + self.pipeline.stop() + + print("Workflow orchestrator stopped.") + + def _create_data_source(self) -> Optional[Any]: + """ + Creates the appropriate data source based on the input configuration. + """ + source_type = self.input_config.get('source_type', '').lower() + print(f"Creating data source of type: {source_type}") + + if source_type == 'camera': + return CameraSource( + camera_index=self.input_config.get('device_id', 0), + resolution=self._parse_resolution(self.input_config.get('resolution')), + fps=self.input_config.get('fps', 30) + ) + elif source_type == 'file': + # Assuming 'file' means video file for now + return VideoFileSource( + file_path=self.input_config.get('source_path', ''), + loop=True # Or get from config if available + ) + # Add other source types here (e.g., 'rtsp stream', 'image file') + else: + print(f"Error: Unsupported source type '{source_type}'") + return None + + def _create_result_handler(self) -> Optional[Any]: + """ + Creates the appropriate result handler based on the output configuration. + """ + output_type = self.output_config.get('output_type', '').lower() + print(f"Creating result handler of type: {output_type}") + + if output_type == 'file': + return FileOutputManager( + base_path=self.output_config.get('destination', './output') + ) + # Add other result handlers here + else: + print(f"Warning: Unsupported output type '{output_type}'. No results will be saved.") + return None + + def handle_result(self, result_data: PipelineData): + """ + Callback function to handle results from the pipeline. + """ + if self.result_handler: + try: + # Convert PipelineData to a dictionary for serialization + result_dict = { + "pipeline_id": result_data.pipeline_id, + "timestamp": result_data.timestamp, + "metadata": result_data.metadata, + "stage_results": result_data.stage_results + } + self.result_handler.save_result( + result_dict, + self.pipeline.pipeline_name, + format=self.output_config.get('format', 'json').lower() + ) + except Exception as e: + print(f"Error handling result: {e}") + + def _parse_resolution(self, resolution_str: Optional[str]) -> Optional[tuple[int, int]]: + """ + Parses a resolution string (e.g., '1920x1080') into a tuple. + """ + if not resolution_str: + return None + try: + width, height = map(int, resolution_str.lower().split('x')) + return (width, height) + except ValueError: + print(f"Warning: Invalid resolution format '{resolution_str}'. Using default.") + return None diff --git a/cluster4npu_ui/ui/dialogs/deployment.py b/cluster4npu_ui/ui/dialogs/deployment.py index b55113a..4d084f8 100644 --- a/cluster4npu_ui/ui/dialogs/deployment.py +++ b/cluster4npu_ui/ui/dialogs/deployment.py @@ -46,6 +46,7 @@ except ImportError as e: try: from InferencePipeline import InferencePipeline from Multidongle import MultiDongle + from workflow_orchestrator import WorkflowOrchestrator PIPELINE_AVAILABLE = True except ImportError as e: print(f"Warning: Pipeline system not available: {e}") @@ -67,6 +68,7 @@ class DeploymentWorker(QThread): super().__init__() self.pipeline_data = pipeline_data self.should_stop = False + self.orchestrator = None def run(self): """Main deployment workflow.""" @@ -121,16 +123,12 @@ class DeploymentWorker(QThread): try: pipeline = converter.create_inference_pipeline(config) - self.progress_updated.emit(80, "Initializing dongle connections...") + self.progress_updated.emit(80, "Initializing workflow orchestrator...") self.deployment_started.emit() - # Initialize the pipeline - pipeline.initialize() - - self.progress_updated.emit(90, "Starting pipeline execution...") - - # Start the pipeline - pipeline.start() + # Create and start the orchestrator + self.orchestrator = WorkflowOrchestrator(pipeline, config.input_config, config.output_config) + self.orchestrator.start() self.progress_updated.emit(100, "Pipeline deployed successfully!") self.deployment_completed.emit(True, f"Pipeline '{config.pipeline_name}' deployed with {len(config.stage_configs)} stages") @@ -144,6 +142,8 @@ class DeploymentWorker(QThread): def stop(self): """Stop the deployment process.""" self.should_stop = True + if self.orchestrator: + self.orchestrator.stop() class DeploymentDialog(QDialog):