feat: Implement essential components for complete inference workflow

This commit is contained in:
Masonmason 2025-07-16 23:32:36 +08:00
parent ee4d1a3e4a
commit 7e173c42de
5 changed files with 464 additions and 1183 deletions

View File

@ -1,501 +1,132 @@
"""
Camera input source for the Cluster4NPU inference pipeline.
This module provides camera input capabilities with support for multiple cameras,
resolution configuration, and seamless integration with the InferencePipeline.
"""
import cv2 import cv2
import numpy as np
import threading import threading
import time import time
from typing import Optional, Callable, Tuple, Dict, Any from typing import Optional, Callable
from dataclasses import dataclass
from abc import ABC, abstractmethod
class CameraSource:
@dataclass
class CameraConfig:
"""Configuration for camera input source."""
camera_index: int = 0
width: int = 640
height: int = 480
fps: int = 30
format: str = 'BGR'
auto_exposure: bool = True
brightness: float = 0.5
contrast: float = 0.5
saturation: float = 0.5
class DataSourceBase(ABC):
"""Abstract base class for data sources."""
@abstractmethod
def start(self) -> bool:
"""Start the data source."""
pass
@abstractmethod
def stop(self) -> None:
"""Stop the data source."""
pass
@abstractmethod
def is_running(self) -> bool:
"""Check if the data source is running."""
pass
@abstractmethod
def get_frame(self) -> Optional[np.ndarray]:
"""Get the next frame from the source."""
pass
class CameraSource(DataSourceBase):
""" """
Camera input source for real-time video capture. A class to handle camera input using cv2.VideoCapture.
It captures frames in a separate thread and can send them to a pipeline.
Features:
- Multiple camera index support
- Resolution and FPS configuration
- Format conversion (BGR model input format)
- Error handling for camera disconnection
- Thread-safe frame capture
""" """
def __init__(self,
def __init__(self, config: CameraConfig, frame_callback: Optional[Callable[[np.ndarray], None]] = None): camera_index: int = 0,
resolution: Optional[tuple[int, int]] = None,
fps: Optional[int] = None,
data_callback: Optional[Callable[[object], None]] = None):
""" """
Initialize camera source. Initializes the CameraSource.
Args: Args:
config: Camera configuration camera_index (int): The index of the camera to use.
frame_callback: Optional callback for each captured frame resolution (Optional[tuple[int, int]]): The desired resolution (width, height).
fps (Optional[int]): The desired frames per second.
data_callback (Optional[Callable[[object], None]]): A callback function to send data to.
""" """
self.config = config self.camera_index = camera_index
self.frame_callback = frame_callback self.resolution = resolution
self.fps = fps
self.data_callback = data_callback
# Camera capture object self.cap = None
self.cap: Optional[cv2.VideoCapture] = None self.running = False
self.thread = None
# Threading control
self._capture_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event() self._stop_event = threading.Event()
self._frame_lock = threading.Lock()
# Current frame storage def initialize(self) -> bool:
self._current_frame: Optional[np.ndarray] = None
self._frame_count = 0
self._fps_counter = 0
self._last_fps_time = time.time()
self._actual_fps = 0.0
# Error handling
self._connection_lost = False
self._last_error: Optional[str] = None
def start(self) -> bool:
""" """
Start camera capture. Initializes the camera capture.
Returns: Returns:
bool: True if camera started successfully, False otherwise bool: True if initialization is successful, False otherwise.
""" """
if self.is_running(): print(f"Initializing camera at index {self.camera_index}...")
return True self.cap = cv2.VideoCapture(self.camera_index)
if not self.cap.isOpened():
try: print(f"Error: Could not open camera at index {self.camera_index}.")
# Initialize camera
self.cap = cv2.VideoCapture(self.config.camera_index)
if not self.cap.isOpened():
self._last_error = f"Failed to open camera {self.config.camera_index}"
return False
# Configure camera properties
self._configure_camera()
# Test camera capture
ret, frame = self.cap.read()
if not ret or frame is None:
self._last_error = "Failed to read initial frame from camera"
self.cap.release()
self.cap = None
return False
print(f"[CameraSource] Camera {self.config.camera_index} opened successfully")
print(f"[CameraSource] Resolution: {self.config.width}x{self.config.height}, FPS: {self.config.fps}")
# Start capture thread
self._stop_event.clear()
self._connection_lost = False
self._capture_thread = threading.Thread(target=self._capture_loop, daemon=True)
self._capture_thread.start()
return True
except Exception as e:
self._last_error = f"Camera initialization error: {str(e)}"
if self.cap:
self.cap.release()
self.cap = None
return False return False
def stop(self) -> None: if self.resolution:
"""Stop camera capture.""" self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])
if not self.is_running(): self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])
return
print("[CameraSource] Stopping camera capture...") if self.fps:
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
# Signal stop
self._stop_event.set()
# Wait for capture thread to finish
if self._capture_thread and self._capture_thread.is_alive():
self._capture_thread.join(timeout=2.0)
# Release camera
if self.cap:
self.cap.release()
self.cap = None
# Clear current frame
with self._frame_lock:
self._current_frame = None
print("[CameraSource] Camera capture stopped")
def is_running(self) -> bool:
"""Check if camera is currently running."""
return (self.cap is not None and
self.cap.isOpened() and
self._capture_thread is not None and
self._capture_thread.is_alive() and
not self._stop_event.is_set())
def get_frame(self) -> Optional[np.ndarray]:
"""
Get the latest captured frame.
Returns:
Optional[np.ndarray]: Latest frame or None if no frame available
"""
with self._frame_lock:
if self._current_frame is not None:
return self._current_frame.copy()
return None
def get_stats(self) -> Dict[str, Any]:
"""
Get camera statistics.
Returns:
Dict[str, Any]: Statistics including FPS, frame count, etc.
"""
return {
'frame_count': self._frame_count,
'actual_fps': self._actual_fps,
'target_fps': self.config.fps,
'resolution': (self.config.width, self.config.height),
'camera_index': self.config.camera_index,
'connection_lost': self._connection_lost,
'last_error': self._last_error,
'is_running': self.is_running()
}
def _configure_camera(self) -> None:
"""Configure camera properties."""
if not self.cap:
return
# Set resolution
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.config.width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.config.height)
# Set FPS
self.cap.set(cv2.CAP_PROP_FPS, self.config.fps)
# Set other properties
if hasattr(cv2, 'CAP_PROP_AUTO_EXPOSURE'):
self.cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 1 if self.config.auto_exposure else 0)
self.cap.set(cv2.CAP_PROP_BRIGHTNESS, self.config.brightness)
self.cap.set(cv2.CAP_PROP_CONTRAST, self.config.contrast)
self.cap.set(cv2.CAP_PROP_SATURATION, self.config.saturation)
# Verify actual settings
actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
actual_fps = self.cap.get(cv2.CAP_PROP_FPS)
print(f"[CameraSource] Actual resolution: {actual_width}x{actual_height}, FPS: {actual_fps}")
def _capture_loop(self) -> None:
"""Main capture loop running in separate thread."""
print("[CameraSource] Capture loop started")
frame_interval = 1.0 / self.config.fps
last_capture_time = time.time()
while not self._stop_event.is_set():
try:
# Control frame rate
current_time = time.time()
time_since_last = current_time - last_capture_time
if time_since_last < frame_interval:
sleep_time = frame_interval - time_since_last
time.sleep(sleep_time)
continue
last_capture_time = current_time
# Capture frame
if not self.cap or not self.cap.isOpened():
self._connection_lost = True
break
ret, frame = self.cap.read()
if not ret or frame is None:
print("[CameraSource] Failed to read frame from camera")
self._connection_lost = True
break
# Update frame
with self._frame_lock:
self._current_frame = frame
# Update statistics
self._frame_count += 1
self._fps_counter += 1
# Calculate actual FPS
if current_time - self._last_fps_time >= 1.0:
self._actual_fps = self._fps_counter / (current_time - self._last_fps_time)
self._fps_counter = 0
self._last_fps_time = current_time
# Call frame callback if provided
if self.frame_callback:
try:
self.frame_callback(frame)
except Exception as e:
print(f"[CameraSource] Frame callback error: {e}")
except Exception as e:
print(f"[CameraSource] Capture loop error: {e}")
self._last_error = str(e)
time.sleep(0.1) # Brief pause before retrying
print("[CameraSource] Capture loop ended")
def __enter__(self):
"""Context manager entry."""
if not self.start():
raise RuntimeError(f"Failed to start camera: {self._last_error}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.stop()
class CameraPipelineFeeder:
"""
Helper class to feed camera frames to InferencePipeline.
This class bridges the CameraSource and InferencePipeline,
handling frame format conversion and pipeline data feeding.
"""
def __init__(self, camera_source: CameraSource, pipeline, feed_rate: float = 30.0):
"""
Initialize camera pipeline feeder.
Args:
camera_source: CameraSource instance
pipeline: InferencePipeline instance
feed_rate: Rate at which to feed frames to pipeline (FPS)
"""
self.camera_source = camera_source
self.pipeline = pipeline
self.feed_rate = feed_rate
# Threading control
self._feed_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._is_feeding = False
# Statistics
self._frames_fed = 0
self._last_feed_time = 0.0
def start_feeding(self) -> bool:
"""
Start feeding camera frames to pipeline.
Returns:
bool: True if feeding started successfully
"""
if self._is_feeding:
return True
if not self.camera_source.is_running():
print("[CameraPipelineFeeder] Camera is not running")
return False
print("[CameraPipelineFeeder] Starting frame feeding...")
self._stop_event.clear()
self._is_feeding = True
self._feed_thread = threading.Thread(target=self._feed_loop, daemon=True)
self._feed_thread.start()
print("Camera initialized successfully.")
return True return True
def stop_feeding(self) -> None: def start(self):
"""Stop feeding frames to pipeline.""" """
if not self._is_feeding: Starts the frame capture thread.
"""
if self.running:
print("Camera source is already running.")
return return
print("[CameraPipelineFeeder] Stopping frame feeding...") if not self.cap or not self.cap.isOpened():
if not self.initialize():
return
self._stop_event.set() self.running = True
self._is_feeding = False self._stop_event.clear()
self.thread = threading.Thread(target=self._capture_loop, daemon=True)
self.thread.start()
print("Camera capture thread started.")
if self._feed_thread and self._feed_thread.is_alive(): def stop(self):
self._feed_thread.join(timeout=2.0) """
Stops the frame capture thread.
"""
self.running = False
if self.thread and self.thread.is_alive():
self._stop_event.set()
self.thread.join(timeout=2)
print("[CameraPipelineFeeder] Frame feeding stopped") if self.cap and self.cap.isOpened():
self.cap.release()
self.cap = None
print("Camera source stopped.")
def _feed_loop(self) -> None: def _capture_loop(self):
"""Main feeding loop.""" """
feed_interval = 1.0 / self.feed_rate The main loop for capturing frames from the camera.
last_feed_time = time.time() """
while self.running and not self._stop_event.is_set():
ret, frame = self.cap.read()
if not ret:
print("Error: Could not read frame from camera. Reconnecting...")
self.cap.release()
time.sleep(1)
self.initialize()
continue
while not self._stop_event.is_set(): if self.data_callback:
try:
current_time = time.time()
# Control feed rate
if current_time - last_feed_time < feed_interval:
time.sleep(0.001) # Small sleep to prevent busy waiting
continue
# Get frame from camera
frame = self.camera_source.get_frame()
if frame is None:
time.sleep(0.01)
continue
# Feed frame to pipeline
try: try:
from InferencePipeline import PipelineData # Assuming the callback is thread-safe or handles its own locking
self.data_callback(frame)
pipeline_data = PipelineData(
data=frame,
metadata={
'source': 'camera',
'camera_index': self.camera_source.config.camera_index,
'timestamp': current_time,
'frame_id': self._frames_fed
}
)
# Put data into pipeline
self.pipeline.put_data(pipeline_data)
self._frames_fed += 1
last_feed_time = current_time
except Exception as e: except Exception as e:
print(f"[CameraPipelineFeeder] Error feeding frame to pipeline: {e}") print(f"Error in data_callback: {e}")
time.sleep(0.1)
except Exception as e: # Control frame rate if FPS is set
print(f"[CameraPipelineFeeder] Feed loop error: {e}") if self.fps:
time.sleep(0.1) time.sleep(1.0 / self.fps)
def get_stats(self) -> Dict[str, Any]: def set_data_callback(self, callback: Callable[[object], None]):
"""Get feeding statistics.""" """
return { Sets the data callback function.
'frames_fed': self._frames_fed, """
'feed_rate': self.feed_rate, self.data_callback = callback
'is_feeding': self._is_feeding,
'camera_stats': self.camera_source.get_stats()
}
def get_frame(self) -> Optional[object]:
"""
Gets a single frame from the camera. Not recommended for continuous capture.
"""
if not self.cap or not self.cap.isOpened():
if not self.initialize():
return None
def list_available_cameras() -> Dict[int, Dict[str, Any]]: ret, frame = self.cap.read()
""" if not ret:
List all available camera devices. return None
return frame
Returns:
Dict[int, Dict[str, Any]]: Dictionary of camera index to camera info
"""
cameras = {}
for i in range(10): # Check first 10 camera indices
cap = cv2.VideoCapture(i)
if cap.isOpened():
# Get camera properties
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
cameras[i] = {
'index': i,
'width': width,
'height': height,
'fps': fps,
'backend': cap.getBackendName() if hasattr(cap, 'getBackendName') else 'Unknown'
}
cap.release()
return cameras
# Example usage and testing
if __name__ == "__main__":
# List available cameras
print("Available cameras:")
cameras = list_available_cameras()
for idx, info in cameras.items():
print(f" Camera {idx}: {info['width']}x{info['height']} @ {info['fps']} FPS ({info['backend']})")
if not cameras:
print("No cameras found!")
exit(1)
# Test camera capture
config = CameraConfig(
camera_index=0,
width=640,
height=480,
fps=30
)
def frame_callback(frame):
print(f"Frame captured: {frame.shape}")
camera = CameraSource(config, frame_callback)
try:
if camera.start():
print("Camera started successfully")
# Capture for 5 seconds
time.sleep(5)
# Print statistics
stats = camera.get_stats()
print(f"Statistics: {stats}")
else:
print("Failed to start camera")
finally:
camera.stop()

View File

@ -0,0 +1,83 @@
import json
import csv
import os
import time
from typing import Any, Dict, List
class ResultSerializer:
"""
Serializes inference results into various formats.
"""
def to_json(self, data: Dict[str, Any]) -> str:
"""
Serializes data to a JSON string.
"""
return json.dumps(data, indent=2)
def to_csv(self, data: List[Dict[str, Any]], fieldnames: List[str]) -> str:
"""
Serializes data to a CSV string.
"""
import io
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
return output.getvalue()
class FileOutputManager:
"""
Manages writing results to files with timestamped names and directory organization.
"""
def __init__(self, base_path: str = "./output"):
"""
Initializes the FileOutputManager.
Args:
base_path (str): The base directory to save output files.
"""
self.base_path = base_path
self.serializer = ResultSerializer()
def save_result(self, result_data: Dict[str, Any], pipeline_name: str, format: str = 'json'):
"""
Saves a single result to a file.
Args:
result_data (Dict[str, Any]): The result data to save.
pipeline_name (str): The name of the pipeline that generated the result.
format (str): The format to save the result in ('json' or 'csv').
"""
try:
# Create directory structure
today = time.strftime("%Y-%m-%d")
output_dir = os.path.join(self.base_path, pipeline_name, today)
os.makedirs(output_dir, exist_ok=True)
# Create filename
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_{result_data.get('pipeline_id', 'result')}.{format}"
file_path = os.path.join(output_dir, filename)
# Serialize and save
if format == 'json':
content = self.serializer.to_json(result_data)
with open(file_path, 'w') as f:
f.write(content)
elif format == 'csv':
# For CSV, we expect a list of dicts. If it's a single dict, wrap it.
data_to_save = result_data if isinstance(result_data, list) else [result_data]
if data_to_save:
fieldnames = list(data_to_save[0].keys())
content = self.serializer.to_csv(data_to_save, fieldnames)
with open(file_path, 'w') as f:
f.write(content)
else:
print(f"Error: Unsupported format '{format}'")
return
print(f"Result saved to {file_path}")
except Exception as e:
print(f"Error saving result: {e}")

View File

@ -1,725 +1,129 @@
"""
Video file input source for the Cluster4NPU inference pipeline.
This module provides video file input capabilities with support for common video formats,
frame rate control, seeking, and batch processing capabilities.
"""
import cv2 import cv2
import numpy as np
import threading import threading
import time import time
import os from typing import Optional, Callable
from typing import Optional, Callable, Tuple, Dict, Any, List
from dataclasses import dataclass
from pathlib import Path
from camera_source import DataSourceBase
class VideoFileSource:
@dataclass
class VideoFileConfig:
"""Configuration for video file input source."""
file_path: str
target_fps: Optional[float] = None # If None, use original video FPS
loop: bool = False
start_frame: int = 0
end_frame: Optional[int] = None # If None, process until end
skip_frames: int = 0 # Skip every N frames
max_frames: Optional[int] = None # Maximum frames to process
resize_to: Optional[Tuple[int, int]] = None # (width, height)
preload_frames: bool = False # Preload frames to memory for faster access
class VideoFileSource(DataSourceBase):
""" """
Video file input source for processing video files frame by frame. A class to handle video file input using cv2.VideoCapture.
It reads frames from a video file and can send them to a pipeline.
Features:
- Support common video formats (MP4, AVI, MOV, MKV, etc.)
- Frame rate control and seeking
- Batch processing capabilities
- Progress tracking
- Loop playback support
- Frame skipping and range selection
""" """
def __init__(self,
def __init__(self, config: VideoFileConfig, frame_callback: Optional[Callable[[np.ndarray, int], None]] = None): file_path: str,
data_callback: Optional[Callable[[object], None]] = None,
loop: bool = False):
""" """
Initialize video file source. Initializes the VideoFileSource.
Args: Args:
config: Video file configuration file_path (str): The path to the video file.
frame_callback: Optional callback for each processed frame (frame, frame_number) data_callback (Optional[Callable[[object], None]]): A callback function to send data to.
loop (bool): Whether to loop the video when it ends.
""" """
self.config = config self.file_path = file_path
self.frame_callback = frame_callback self.data_callback = data_callback
self.loop = loop
# Video capture object self.cap = None
self.cap: Optional[cv2.VideoCapture] = None self.running = False
self.thread = None
# Video properties
self.total_frames = 0
self.original_fps = 0.0
self.video_width = 0
self.video_height = 0
self.video_duration = 0.0
# Threading control
self._playback_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event() self._stop_event = threading.Event()
self._pause_event = threading.Event() self.fps = 0
self._frame_lock = threading.Lock()
# Current state def initialize(self) -> bool:
self._current_frame: Optional[np.ndarray] = None
self._current_frame_number = 0
self._frames_processed = 0
self._playback_started = False
# Progress tracking
self._start_time = 0.0
self._actual_fps = 0.0
self._fps_counter = 0
self._last_fps_time = 0.0
# Error handling
self._last_error: Optional[str] = None
# Frame preloading
self._preloaded_frames: List[np.ndarray] = []
self._preload_complete = False
def start(self) -> bool:
""" """
Start video file processing. Initializes the video capture from the file.
Returns: Returns:
bool: True if video started successfully, False otherwise bool: True if initialization is successful, False otherwise.
""" """
if self.is_running(): print(f"Initializing video source from {self.file_path}...")
return True self.cap = cv2.VideoCapture(self.file_path)
if not self.cap.isOpened():
try: print(f"Error: Could not open video file {self.file_path}.")
# Check if file exists
if not os.path.exists(self.config.file_path):
self._last_error = f"Video file not found: {self.config.file_path}"
return False
# Open video file
self.cap = cv2.VideoCapture(self.config.file_path)
if not self.cap.isOpened():
self._last_error = f"Failed to open video file: {self.config.file_path}"
return False
# Get video properties
self._get_video_properties()
# Validate configuration
if not self._validate_config():
return False
# Preload frames if requested
if self.config.preload_frames:
if not self._preload_frames():
return False
print(f"[VideoFileSource] Video opened successfully")
print(f"[VideoFileSource] File: {self.config.file_path}")
print(f"[VideoFileSource] Resolution: {self.video_width}x{self.video_height}")
print(f"[VideoFileSource] FPS: {self.original_fps}, Duration: {self.video_duration:.2f}s")
print(f"[VideoFileSource] Total frames: {self.total_frames}")
# Start playback thread
self._stop_event.clear()
self._pause_event.clear()
self._playback_thread = threading.Thread(target=self._playback_loop, daemon=True)
self._playback_thread.start()
return True
except Exception as e:
self._last_error = f"Video initialization error: {str(e)}"
if self.cap:
self.cap.release()
self.cap = None
return False return False
def stop(self) -> None: self.fps = self.cap.get(cv2.CAP_PROP_FPS)
"""Stop video file processing.""" if self.fps == 0:
if not self.is_running(): print("Warning: Could not determine video FPS. Defaulting to 30.")
self.fps = 30
print(f"Video source initialized successfully. FPS: {self.fps}")
return True
def start(self):
"""
Starts the frame reading thread.
"""
if self.running:
print("Video source is already running.")
return return
print("[VideoFileSource] Stopping video processing...") if not self.cap or not self.cap.isOpened():
if not self.initialize():
return
# Signal stop self.running = True
self._stop_event.set() self._stop_event.clear()
self.thread = threading.Thread(target=self._capture_loop, daemon=True)
self.thread.start()
print("Video capture thread started.")
# Wait for playback thread to finish def stop(self):
if self._playback_thread and self._playback_thread.is_alive(): """
self._playback_thread.join(timeout=2.0) Stops the frame reading thread.
"""
self.running = False
if self.thread and self.thread.is_alive():
self._stop_event.set()
self.thread.join(timeout=2)
# Release video capture if self.cap and self.cap.isOpened():
if self.cap:
self.cap.release() self.cap.release()
self.cap = None self.cap = None
print("Video source stopped.")
# Clear current frame def _capture_loop(self):
with self._frame_lock:
self._current_frame = None
# Clear preloaded frames
self._preloaded_frames.clear()
print("[VideoFileSource] Video processing stopped")
def pause(self) -> None:
"""Pause video playback."""
if self.is_running():
self._pause_event.set()
print("[VideoFileSource] Video paused")
def resume(self) -> None:
"""Resume video playback."""
if self.is_running():
self._pause_event.clear()
print("[VideoFileSource] Video resumed")
def is_running(self) -> bool:
"""Check if video processing is currently running."""
return (self.cap is not None and
self._playback_thread is not None and
self._playback_thread.is_alive() and
not self._stop_event.is_set())
def is_paused(self) -> bool:
"""Check if video playback is paused."""
return self._pause_event.is_set()
def seek_to_frame(self, frame_number: int) -> bool:
""" """
Seek to specific frame number. The main loop for reading frames from the video file.
Args:
frame_number: Frame number to seek to
Returns:
bool: True if seek successful
""" """
if not self.cap: while self.running and not self._stop_event.is_set():
return False
try:
frame_number = max(0, min(frame_number, self.total_frames - 1))
self.cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
self._current_frame_number = frame_number
return True
except Exception as e:
self._last_error = f"Seek error: {str(e)}"
return False
def seek_to_time(self, time_seconds: float) -> bool:
"""
Seek to specific time in video.
Args:
time_seconds: Time in seconds to seek to
Returns:
bool: True if seek successful
"""
if self.original_fps > 0:
frame_number = int(time_seconds * self.original_fps)
return self.seek_to_frame(frame_number)
return False
def get_frame(self) -> Optional[np.ndarray]:
"""
Get the current frame.
Returns:
Optional[np.ndarray]: Current frame or None if no frame available
"""
with self._frame_lock:
if self._current_frame is not None:
return self._current_frame.copy()
return None
def get_progress(self) -> Dict[str, Any]:
"""
Get processing progress information.
Returns:
Dict[str, Any]: Progress information
"""
if self.total_frames > 0:
progress_percent = (self._current_frame_number / self.total_frames) * 100
else:
progress_percent = 0.0
elapsed_time = time.time() - self._start_time if self._start_time > 0 else 0
return {
'current_frame': self._current_frame_number,
'total_frames': self.total_frames,
'progress_percent': progress_percent,
'frames_processed': self._frames_processed,
'elapsed_time': elapsed_time,
'actual_fps': self._actual_fps,
'is_complete': self._current_frame_number >= self.total_frames - 1
}
def get_stats(self) -> Dict[str, Any]:
"""
Get comprehensive video statistics.
Returns:
Dict[str, Any]: Video statistics
"""
stats = {
'file_path': self.config.file_path,
'video_width': self.video_width,
'video_height': self.video_height,
'original_fps': self.original_fps,
'target_fps': self.config.target_fps,
'video_duration': self.video_duration,
'total_frames': self.total_frames,
'loop_enabled': self.config.loop,
'preload_enabled': self.config.preload_frames,
'preload_complete': self._preload_complete,
'last_error': self._last_error,
'is_running': self.is_running(),
'is_paused': self.is_paused()
}
stats.update(self.get_progress())
return stats
def _get_video_properties(self) -> None:
"""Get video properties from OpenCV."""
if not self.cap:
return
self.total_frames = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT))
self.original_fps = self.cap.get(cv2.CAP_PROP_FPS)
self.video_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
self.video_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
if self.original_fps > 0:
self.video_duration = self.total_frames / self.original_fps
def _validate_config(self) -> bool:
"""Validate configuration against video properties."""
if self.config.start_frame >= self.total_frames:
self._last_error = f"Start frame ({self.config.start_frame}) >= total frames ({self.total_frames})"
return False
if self.config.end_frame and self.config.end_frame <= self.config.start_frame:
self._last_error = f"End frame ({self.config.end_frame}) <= start frame ({self.config.start_frame})"
return False
return True
def _preload_frames(self) -> bool:
"""Preload frames into memory for faster access."""
if not self.cap:
return False
print("[VideoFileSource] Preloading frames...")
try:
# Seek to start frame
self.cap.set(cv2.CAP_PROP_POS_FRAMES, self.config.start_frame)
end_frame = self.config.end_frame or self.total_frames
max_frames = self.config.max_frames or (end_frame - self.config.start_frame)
frames_to_load = min(max_frames, end_frame - self.config.start_frame)
for i in range(frames_to_load):
ret, frame = self.cap.read()
if not ret or frame is None:
break
# Apply resizing if configured
if self.config.resize_to:
frame = cv2.resize(frame, self.config.resize_to)
self._preloaded_frames.append(frame)
# Skip frames if configured
for _ in range(self.config.skip_frames):
self.cap.read()
self._preload_complete = True
print(f"[VideoFileSource] Preloaded {len(self._preloaded_frames)} frames")
return True
except Exception as e:
self._last_error = f"Frame preloading error: {str(e)}"
return False
def _playback_loop(self) -> None:
"""Main playback loop running in separate thread."""
print("[VideoFileSource] Playback loop started")
self._start_time = time.time()
self._last_fps_time = self._start_time
target_fps = self.config.target_fps or self.original_fps
frame_interval = 1.0 / target_fps if target_fps > 0 else 0.033 # Default 30 FPS
# Set starting position
if not self.config.preload_frames:
self.cap.set(cv2.CAP_PROP_POS_FRAMES, self.config.start_frame)
self._current_frame_number = self.config.start_frame
last_frame_time = time.time()
while not self._stop_event.is_set():
try:
# Handle pause
if self._pause_event.is_set():
time.sleep(0.1)
continue
# Control frame rate
current_time = time.time()
time_since_last = current_time - last_frame_time
if time_since_last < frame_interval:
sleep_time = frame_interval - time_since_last
time.sleep(sleep_time)
continue
# Get frame
frame = self._get_next_frame()
if frame is None:
if self.config.loop:
# Reset to start for looping
self._reset_to_start()
continue
else:
# End of video
break
# Update current frame
with self._frame_lock:
self._current_frame = frame
# Update statistics
self._frames_processed += 1
self._fps_counter += 1
last_frame_time = current_time
# Calculate actual FPS
if current_time - self._last_fps_time >= 1.0:
self._actual_fps = self._fps_counter / (current_time - self._last_fps_time)
self._fps_counter = 0
self._last_fps_time = current_time
# Call frame callback if provided
if self.frame_callback:
try:
self.frame_callback(frame, self._current_frame_number)
except Exception as e:
print(f"[VideoFileSource] Frame callback error: {e}")
# Check if we've reached the end frame
if self.config.end_frame and self._current_frame_number >= self.config.end_frame:
if self.config.loop:
self._reset_to_start()
else:
break
# Check max frames limit
if self.config.max_frames and self._frames_processed >= self.config.max_frames:
break
except Exception as e:
print(f"[VideoFileSource] Playback loop error: {e}")
self._last_error = str(e)
time.sleep(0.1)
print("[VideoFileSource] Playback loop ended")
def _get_next_frame(self) -> Optional[np.ndarray]:
"""Get the next frame from video source."""
if self.config.preload_frames:
# Get frame from preloaded frames
if self._current_frame_number - self.config.start_frame < len(self._preloaded_frames):
frame = self._preloaded_frames[self._current_frame_number - self.config.start_frame]
self._current_frame_number += 1
return frame
else:
return None
else:
# Read frame from video capture
if not self.cap:
return None
ret, frame = self.cap.read() ret, frame = self.cap.read()
if not ret or frame is None: if not ret:
if self.loop:
print("Video ended, looping...")
self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
continue
else:
print("Video ended.")
self.running = False
break
if self.data_callback:
try:
self.data_callback(frame)
except Exception as e:
print(f"Error in data_callback: {e}")
# Control frame rate
time.sleep(1.0 / self.fps)
def set_data_callback(self, callback: Callable[[object], None]):
"""
Sets the data callback function.
"""
self.data_callback = callback
def get_frame(self) -> Optional[object]:
"""
Gets a single frame from the video. Not recommended for continuous capture.
"""
if not self.cap or not self.cap.isOpened():
if not self.initialize():
return None return None
# Apply resizing if configured ret, frame = self.cap.read()
if self.config.resize_to: if not ret:
frame = cv2.resize(frame, self.config.resize_to) return None
return frame
self._current_frame_number += 1
# Skip frames if configured
for _ in range(self.config.skip_frames):
self.cap.read()
self._current_frame_number += 1
return frame
def _reset_to_start(self) -> None:
"""Reset video to start position for looping."""
if self.config.preload_frames:
self._current_frame_number = self.config.start_frame
else:
if self.cap:
self.cap.set(cv2.CAP_PROP_POS_FRAMES, self.config.start_frame)
self._current_frame_number = self.config.start_frame
def __enter__(self):
"""Context manager entry."""
if not self.start():
raise RuntimeError(f"Failed to start video: {self._last_error}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.stop()
class VideoPipelineFeeder:
"""
Helper class to feed video frames to InferencePipeline.
This class bridges the VideoFileSource and InferencePipeline,
handling frame format conversion and pipeline data feeding.
"""
def __init__(self, video_source: VideoFileSource, pipeline):
"""
Initialize video pipeline feeder.
Args:
video_source: VideoFileSource instance
pipeline: InferencePipeline instance
"""
self.video_source = video_source
self.pipeline = pipeline
# Threading control
self._feed_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._is_feeding = False
# Statistics
self._frames_fed = 0
self._results_collected = 0
self._results: List[Dict[str, Any]] = []
def start_feeding(self) -> bool:
"""
Start feeding video frames to pipeline.
Returns:
bool: True if feeding started successfully
"""
if self._is_feeding:
return True
if not self.video_source.is_running():
print("[VideoPipelineFeeder] Video source is not running")
return False
print("[VideoPipelineFeeder] Starting frame feeding...")
self._stop_event.clear()
self._is_feeding = True
self._feed_thread = threading.Thread(target=self._feed_loop, daemon=True)
self._feed_thread.start()
return True
def stop_feeding(self) -> None:
"""Stop feeding frames to pipeline."""
if not self._is_feeding:
return
print("[VideoPipelineFeeder] Stopping frame feeding...")
self._stop_event.set()
self._is_feeding = False
if self._feed_thread and self._feed_thread.is_alive():
self._feed_thread.join(timeout=2.0)
print("[VideoPipelineFeeder] Frame feeding stopped")
def _feed_loop(self) -> None:
"""Main feeding loop."""
while not self._stop_event.is_set():
try:
# Get frame from video source
frame = self.video_source.get_frame()
if frame is None:
if not self.video_source.is_running():
break
time.sleep(0.01)
continue
# Feed frame to pipeline
try:
from InferencePipeline import PipelineData
progress = self.video_source.get_progress()
pipeline_data = PipelineData(
data=frame,
metadata={
'source': 'video_file',
'file_path': self.video_source.config.file_path,
'frame_number': progress['current_frame'],
'total_frames': progress['total_frames'],
'progress_percent': progress['progress_percent'],
'timestamp': time.time(),
'frame_id': self._frames_fed
}
)
# Put data into pipeline
self.pipeline.put_data(pipeline_data)
self._frames_fed += 1
except Exception as e:
print(f"[VideoPipelineFeeder] Error feeding frame to pipeline: {e}")
time.sleep(0.1)
except Exception as e:
print(f"[VideoPipelineFeeder] Feed loop error: {e}")
time.sleep(0.1)
def get_results(self) -> List[Dict[str, Any]]:
"""Get collected results."""
return self._results.copy()
def get_stats(self) -> Dict[str, Any]:
"""Get feeding statistics."""
return {
'frames_fed': self._frames_fed,
'results_collected': self._results_collected,
'is_feeding': self._is_feeding,
'video_stats': self.video_source.get_stats()
}
def get_video_info(file_path: str) -> Dict[str, Any]:
"""
Get video file information without opening for processing.
Args:
file_path: Path to video file
Returns:
Dict[str, Any]: Video information
"""
info = {
'file_path': file_path,
'exists': False,
'valid': False,
'error': None
}
try:
if not os.path.exists(file_path):
info['error'] = 'File does not exist'
return info
info['exists'] = True
info['file_size'] = os.path.getsize(file_path)
cap = cv2.VideoCapture(file_path)
if not cap.isOpened():
info['error'] = 'Cannot open video file'
return info
info['valid'] = True
info['width'] = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
info['height'] = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
info['fps'] = cap.get(cv2.CAP_PROP_FPS)
info['frame_count'] = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if info['fps'] > 0:
info['duration'] = info['frame_count'] / info['fps']
else:
info['duration'] = 0.0
cap.release()
except Exception as e:
info['error'] = str(e)
return info
# Example usage and testing
if __name__ == "__main__":
# Test video file processing
import sys
if len(sys.argv) != 2:
print("Usage: python video_source.py <video_file_path>")
sys.exit(1)
video_path = sys.argv[1]
# Get video info
info = get_video_info(video_path)
print(f"Video info: {info}")
if not info['valid']:
print(f"Cannot process video: {info['error']}")
sys.exit(1)
# Test video processing
config = VideoFileConfig(
file_path=video_path,
target_fps=10, # Process at 10 FPS
loop=False,
start_frame=0,
max_frames=100 # Process only first 100 frames
)
def frame_callback(frame, frame_number):
print(f"Processing frame {frame_number}: {frame.shape}")
video_source = VideoFileSource(config, frame_callback)
try:
if video_source.start():
print("Video processing started")
# Monitor progress
while video_source.is_running():
progress = video_source.get_progress()
print(f"Progress: {progress['progress_percent']:.1f}% "
f"({progress['current_frame']}/{progress['total_frames']})")
time.sleep(1)
# Print final statistics
stats = video_source.get_stats()
print(f"Final statistics: {stats}")
else:
print(f"Failed to start video processing: {video_source._last_error}")
finally:
video_source.stop()

View File

@ -0,0 +1,163 @@
import threading
import time
from typing import Any, Dict, Optional
from .InferencePipeline import InferencePipeline, PipelineData
from .camera_source import CameraSource
from .video_source import VideoFileSource
from .result_handler import FileOutputManager
# Import other data sources as they are created
class WorkflowOrchestrator:
"""
Coordinates the entire data flow from input source to the inference pipeline
and handles the results.
"""
def __init__(self, pipeline: InferencePipeline, input_config: Dict[str, Any], output_config: Dict[str, Any]):
"""
Initializes the WorkflowOrchestrator.
Args:
pipeline (InferencePipeline): The configured inference pipeline.
input_config (Dict[str, Any]): The configuration for the input source.
output_config (Dict[str, Any]): The configuration for the output.
"""
self.pipeline = pipeline
self.input_config = input_config
self.output_config = output_config
self.data_source = None
self.result_handler = None
self.running = False
self._stop_event = threading.Event()
def start(self):
"""
Starts the workflow, including the data source and the pipeline.
"""
if self.running:
print("Workflow is already running.")
return
print("Starting workflow orchestrator...")
self.running = True
self._stop_event.clear()
# Create the result handler
self.result_handler = self._create_result_handler()
# Create and start the data source
self.data_source = self._create_data_source()
if not self.data_source:
print("Error: Could not create data source. Aborting workflow.")
self.running = False
return
# Set the pipeline's put_data method as the callback
self.data_source.set_data_callback(self.pipeline.put_data)
# Set the result callback on the pipeline
if self.result_handler:
self.pipeline.set_result_callback(self.handle_result)
# Start the pipeline
self.pipeline.initialize()
self.pipeline.start()
# Start the data source
self.data_source.start()
print("Workflow orchestrator started successfully.")
def stop(self):
"""
Stops the workflow gracefully.
"""
if not self.running:
return
print("Stopping workflow orchestrator...")
self.running = False
self._stop_event.set()
if self.data_source:
self.data_source.stop()
if self.pipeline:
self.pipeline.stop()
print("Workflow orchestrator stopped.")
def _create_data_source(self) -> Optional[Any]:
"""
Creates the appropriate data source based on the input configuration.
"""
source_type = self.input_config.get('source_type', '').lower()
print(f"Creating data source of type: {source_type}")
if source_type == 'camera':
return CameraSource(
camera_index=self.input_config.get('device_id', 0),
resolution=self._parse_resolution(self.input_config.get('resolution')),
fps=self.input_config.get('fps', 30)
)
elif source_type == 'file':
# Assuming 'file' means video file for now
return VideoFileSource(
file_path=self.input_config.get('source_path', ''),
loop=True # Or get from config if available
)
# Add other source types here (e.g., 'rtsp stream', 'image file')
else:
print(f"Error: Unsupported source type '{source_type}'")
return None
def _create_result_handler(self) -> Optional[Any]:
"""
Creates the appropriate result handler based on the output configuration.
"""
output_type = self.output_config.get('output_type', '').lower()
print(f"Creating result handler of type: {output_type}")
if output_type == 'file':
return FileOutputManager(
base_path=self.output_config.get('destination', './output')
)
# Add other result handlers here
else:
print(f"Warning: Unsupported output type '{output_type}'. No results will be saved.")
return None
def handle_result(self, result_data: PipelineData):
"""
Callback function to handle results from the pipeline.
"""
if self.result_handler:
try:
# Convert PipelineData to a dictionary for serialization
result_dict = {
"pipeline_id": result_data.pipeline_id,
"timestamp": result_data.timestamp,
"metadata": result_data.metadata,
"stage_results": result_data.stage_results
}
self.result_handler.save_result(
result_dict,
self.pipeline.pipeline_name,
format=self.output_config.get('format', 'json').lower()
)
except Exception as e:
print(f"Error handling result: {e}")
def _parse_resolution(self, resolution_str: Optional[str]) -> Optional[tuple[int, int]]:
"""
Parses a resolution string (e.g., '1920x1080') into a tuple.
"""
if not resolution_str:
return None
try:
width, height = map(int, resolution_str.lower().split('x'))
return (width, height)
except ValueError:
print(f"Warning: Invalid resolution format '{resolution_str}'. Using default.")
return None

View File

@ -46,6 +46,7 @@ except ImportError as e:
try: try:
from InferencePipeline import InferencePipeline from InferencePipeline import InferencePipeline
from Multidongle import MultiDongle from Multidongle import MultiDongle
from workflow_orchestrator import WorkflowOrchestrator
PIPELINE_AVAILABLE = True PIPELINE_AVAILABLE = True
except ImportError as e: except ImportError as e:
print(f"Warning: Pipeline system not available: {e}") print(f"Warning: Pipeline system not available: {e}")
@ -67,6 +68,7 @@ class DeploymentWorker(QThread):
super().__init__() super().__init__()
self.pipeline_data = pipeline_data self.pipeline_data = pipeline_data
self.should_stop = False self.should_stop = False
self.orchestrator = None
def run(self): def run(self):
"""Main deployment workflow.""" """Main deployment workflow."""
@ -121,16 +123,12 @@ class DeploymentWorker(QThread):
try: try:
pipeline = converter.create_inference_pipeline(config) pipeline = converter.create_inference_pipeline(config)
self.progress_updated.emit(80, "Initializing dongle connections...") self.progress_updated.emit(80, "Initializing workflow orchestrator...")
self.deployment_started.emit() self.deployment_started.emit()
# Initialize the pipeline # Create and start the orchestrator
pipeline.initialize() self.orchestrator = WorkflowOrchestrator(pipeline, config.input_config, config.output_config)
self.orchestrator.start()
self.progress_updated.emit(90, "Starting pipeline execution...")
# Start the pipeline
pipeline.start()
self.progress_updated.emit(100, "Pipeline deployed successfully!") self.progress_updated.emit(100, "Pipeline deployed successfully!")
self.deployment_completed.emit(True, f"Pipeline '{config.pipeline_name}' deployed with {len(config.stage_configs)} stages") self.deployment_completed.emit(True, f"Pipeline '{config.pipeline_name}' deployed with {len(config.stage_configs)} stages")
@ -144,6 +142,8 @@ class DeploymentWorker(QThread):
def stop(self): def stop(self):
"""Stop the deployment process.""" """Stop the deployment process."""
self.should_stop = True self.should_stop = True
if self.orchestrator:
self.orchestrator.stop()
class DeploymentDialog(QDialog): class DeploymentDialog(QDialog):