Cluster/core/nodes/input_node.py
2025-07-17 17:04:56 +08:00

290 lines
10 KiB
Python

"""
Input node implementation for data source operations.
This module provides the InputNode class which handles various input data sources
including cameras, files, streams, and other media sources for the pipeline.
Main Components:
- InputNode: Core input data source node implementation
- Media source configuration and validation
- Stream management and configuration
Usage:
from cluster4npu_ui.core.nodes.input_node import InputNode
node = InputNode()
node.set_property('source_type', 'Camera')
node.set_property('device_id', 0)
"""
from .base_node import BaseNodeWithProperties
class InputNode(BaseNodeWithProperties):
"""
Input data source node for pipeline data ingestion.
This node handles various input data sources including cameras, files,
RTSP streams, and other media sources for the processing pipeline.
"""
__identifier__ = 'com.cluster.input_node'
NODE_NAME = 'Input Node'
def __init__(self):
super().__init__()
# Setup node connections (only output)
self.add_output('output', color=(0, 255, 0))
self.set_color(83, 133, 204)
# Initialize properties
self.setup_properties()
def setup_properties(self):
"""Initialize input source-specific properties."""
# Source type configuration
self.create_business_property('source_type', 'Camera', [
'Camera', 'Microphone', 'File', 'RTSP Stream', 'HTTP Stream', 'WebCam', 'Screen Capture'
])
# Device configuration
self.create_business_property('device_id', 0, {
'min': 0,
'max': 10,
'description': 'Device ID for camera or microphone'
})
self.create_business_property('source_path', '', {
'type': 'file_path',
'filter': 'Media files (*.mp4 *.avi *.mov *.mkv *.wav *.mp3 *.jpg *.png *.bmp)',
'description': 'Path to media file or stream URL'
})
# Video configuration
self.create_business_property('resolution', '1920x1080', [
'640x480', '1280x720', '1920x1080', '2560x1440', '3840x2160', 'Custom'
])
self.create_business_property('custom_width', 1920, {
'min': 320,
'max': 7680,
'description': 'Custom resolution width'
})
self.create_business_property('custom_height', 1080, {
'min': 240,
'max': 4320,
'description': 'Custom resolution height'
})
self.create_business_property('fps', 30, {
'min': 1,
'max': 120,
'description': 'Frames per second'
})
# Stream configuration
self.create_business_property('stream_url', '', {
'placeholder': 'rtsp://user:pass@host:port/path',
'description': 'RTSP or HTTP stream URL'
})
self.create_business_property('stream_timeout', 10, {
'min': 1,
'max': 60,
'description': 'Stream connection timeout in seconds'
})
self.create_business_property('stream_buffer_size', 1, {
'min': 1,
'max': 10,
'description': 'Stream buffer size in frames'
})
# Audio configuration
self.create_business_property('audio_sample_rate', 44100, [
16000, 22050, 44100, 48000, 96000
])
self.create_business_property('audio_channels', 2, {
'min': 1,
'max': 8,
'description': 'Number of audio channels'
})
# Advanced options
self.create_business_property('enable_loop', False, {
'description': 'Loop playback for file sources'
})
self.create_business_property('start_time', 0.0, {
'min': 0.0,
'max': 3600.0,
'step': 0.1,
'description': 'Start time in seconds for file sources'
})
self.create_business_property('duration', 0.0, {
'min': 0.0,
'max': 3600.0,
'step': 0.1,
'description': 'Duration in seconds (0 = entire file)'
})
# Color space and format
self.create_business_property('color_format', 'RGB', [
'RGB', 'BGR', 'YUV', 'GRAY'
])
self.create_business_property('bit_depth', 8, [
8, 10, 12, 16
])
def validate_configuration(self) -> tuple[bool, str]:
"""
Validate the current node configuration.
Returns:
Tuple of (is_valid, error_message)
"""
source_type = self.get_property('source_type')
# Validate based on source type
if source_type in ['Camera', 'WebCam']:
device_id = self.get_property('device_id')
if not isinstance(device_id, int) or device_id < 0:
return False, "Device ID must be a non-negative integer"
elif source_type == 'File':
source_path = self.get_property('source_path')
if not source_path:
return False, "Source path is required for file input"
elif source_type in ['RTSP Stream', 'HTTP Stream']:
stream_url = self.get_property('stream_url')
if not stream_url:
return False, "Stream URL is required for stream input"
# Basic URL validation
if not (stream_url.startswith('rtsp://') or stream_url.startswith('http://') or stream_url.startswith('https://')):
return False, "Invalid stream URL format"
# Validate resolution
resolution = self.get_property('resolution')
if resolution == 'Custom':
width = self.get_property('custom_width')
height = self.get_property('custom_height')
if not isinstance(width, int) or width < 320:
return False, "Custom width must be at least 320 pixels"
if not isinstance(height, int) or height < 240:
return False, "Custom height must be at least 240 pixels"
# Validate FPS
fps = self.get_property('fps')
if not isinstance(fps, int) or fps < 1:
return False, "FPS must be at least 1"
return True, ""
def get_input_config(self) -> dict:
"""
Get input configuration for pipeline execution.
Returns:
Dictionary containing input configuration
"""
config = {
'node_id': self.id,
'node_name': self.name(),
'source_type': self.get_property('source_type'),
'device_id': self.get_property('device_id'),
'source_path': self.get_property('source_path'),
'resolution': self.get_property('resolution'),
'fps': self.get_property('fps'),
'stream_url': self.get_property('stream_url'),
'stream_timeout': self.get_property('stream_timeout'),
'stream_buffer_size': self.get_property('stream_buffer_size'),
'audio_sample_rate': self.get_property('audio_sample_rate'),
'audio_channels': self.get_property('audio_channels'),
'enable_loop': self.get_property('enable_loop'),
'start_time': self.get_property('start_time'),
'duration': self.get_property('duration'),
'color_format': self.get_property('color_format'),
'bit_depth': self.get_property('bit_depth')
}
# Add custom resolution if applicable
if self.get_property('resolution') == 'Custom':
config['custom_width'] = self.get_property('custom_width')
config['custom_height'] = self.get_property('custom_height')
return config
def get_resolution_tuple(self) -> tuple[int, int]:
"""
Get resolution as (width, height) tuple.
Returns:
Tuple of (width, height)
"""
resolution = self.get_property('resolution')
if resolution == 'Custom':
return (self.get_property('custom_width'), self.get_property('custom_height'))
resolution_map = {
'640x480': (640, 480),
'1280x720': (1280, 720),
'1920x1080': (1920, 1080),
'2560x1440': (2560, 1440),
'3840x2160': (3840, 2160)
}
return resolution_map.get(resolution, (1920, 1080))
def get_estimated_bandwidth(self) -> dict:
"""
Estimate bandwidth requirements for the input source.
Returns:
Dictionary with bandwidth information
"""
width, height = self.get_resolution_tuple()
fps = self.get_property('fps')
bit_depth = self.get_property('bit_depth')
color_format = self.get_property('color_format')
# Calculate bits per pixel
if color_format == 'GRAY':
bits_per_pixel = bit_depth
else:
bits_per_pixel = bit_depth * 3 # RGB/BGR/YUV
# Raw bandwidth (bits per second)
raw_bandwidth = width * height * fps * bits_per_pixel
# Estimated compressed bandwidth (assuming 10:1 compression)
compressed_bandwidth = raw_bandwidth / 10
return {
'raw_bps': raw_bandwidth,
'compressed_bps': compressed_bandwidth,
'raw_mbps': raw_bandwidth / 1000000,
'compressed_mbps': compressed_bandwidth / 1000000,
'resolution': (width, height),
'fps': fps,
'bit_depth': bit_depth
}
def supports_audio(self) -> bool:
"""Check if the current source type supports audio."""
source_type = self.get_property('source_type')
return source_type in ['Microphone', 'File', 'RTSP Stream', 'HTTP Stream']
def is_real_time(self) -> bool:
"""Check if the current source is real-time."""
source_type = self.get_property('source_type')
return source_type in ['Camera', 'WebCam', 'Microphone', 'RTSP Stream', 'HTTP Stream', 'Screen Capture']