fix: Resolve multi-series initialization and validation issues

- Fix mflow_converter to properly handle multi-series configuration creation
- Update InferencePipeline to correctly initialize MultiDongle with multi-series config
- Add comprehensive multi-series configuration validation in mflow_converter
- Enhance deployment dialog to display multi-series configuration details
- Improve analysis and configuration tabs to show proper multi-series info

This resolves the issue where multi-series mode was falling back to single-series
during inference initialization, ensuring proper multi-series dongle support.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
HuangMason320 2025-08-14 16:33:22 +08:00
parent ec940c3f2f
commit 2fea1eceec
5 changed files with 865 additions and 61 deletions

View File

@ -19,6 +19,8 @@ class StageConfig:
model_path: str
upload_fw: bool
max_queue_size: int = 50
# Multi-series support
multi_series_config: Optional[Dict[str, Any]] = None # For multi-series mode
# Inter-stage processing
input_preprocessor: Optional[PreProcessor] = None # Before this stage
output_postprocessor: Optional[PostProcessor] = None # After this stage
@ -43,15 +45,25 @@ class PipelineStage:
self.stage_id = config.stage_id
# Initialize MultiDongle for this stage
self.multidongle = MultiDongle(
port_id=config.port_ids,
scpu_fw_path=config.scpu_fw_path,
ncpu_fw_path=config.ncpu_fw_path,
model_path=config.model_path,
upload_fw=config.upload_fw,
auto_detect=config.auto_detect if hasattr(config, 'auto_detect') else False,
max_queue_size=config.max_queue_size
)
if config.multi_series_config:
# Multi-series mode
self.multidongle = MultiDongle(
multi_series_config=config.multi_series_config,
max_queue_size=config.max_queue_size
)
print(f"[Stage {self.stage_id}] Initialized in multi-series mode with config: {list(config.multi_series_config.keys())}")
else:
# Single-series mode (legacy)
self.multidongle = MultiDongle(
port_id=config.port_ids,
scpu_fw_path=config.scpu_fw_path,
ncpu_fw_path=config.ncpu_fw_path,
model_path=config.model_path,
upload_fw=config.upload_fw,
auto_detect=config.auto_detect if hasattr(config, 'auto_detect') else False,
max_queue_size=config.max_queue_size
)
print(f"[Stage {self.stage_id}] Initialized in single-series mode")
# Store preprocessor and postprocessor for later use
self.stage_preprocessor = config.stage_preprocessor

View File

@ -329,6 +329,9 @@ class MultiDongle:
self._input_queue = queue.Queue()
self._ordered_output_queue = queue.Queue()
# Create output queue for legacy compatibility
self._output_queue = self._ordered_output_queue # Point to the same queue
self.result_queues = {} # series_name -> queue
for series_name in multi_series_config.keys():
self.result_queues[series_name] = queue.Queue()
@ -492,6 +495,15 @@ class MultiDongle:
Connect devices, upload firmware (if upload_fw is True), and upload model.
Must be called before start().
"""
if self.multi_series_mode:
# Multi-series initialization
self._initialize_multi_series()
else:
# Legacy single-series initialization
self._initialize_single_series()
def _initialize_single_series(self):
"""Initialize single-series (legacy) mode"""
# Connect device and assign to self.device_group
try:
print('[Connect Device]')
@ -554,6 +566,102 @@ class MultiDongle:
else:
print("Warning: Could not get generic inference input descriptor from model.")
self.generic_inference_input_descriptor = None
def _initialize_multi_series(self):
"""Initialize multi-series mode"""
print('[Multi-Series Initialization]')
# Initialize each series separately
for series_name, config in self.series_config.items():
print(f'[Initializing {series_name}]')
# Get port IDs for this series
port_ids = config.get('port_ids', [])
if not port_ids:
print(f'Warning: No port IDs configured for {series_name}, skipping')
continue
# Connect devices for this series
try:
print(f' [Connect Devices] Port IDs: {port_ids}')
device_group = kp.core.connect_devices(usb_port_ids=port_ids)
self.device_groups[series_name] = device_group
print(f' - Success ({len(port_ids)} devices)')
except kp.ApiKPException as exception:
print(f'Error: connect devices failed for {series_name}, port IDs = {port_ids}, error = {str(exception)}')
continue
# Upload firmware if available
firmware_paths = config.get('firmware_paths')
if firmware_paths and 'scpu' in firmware_paths and 'ncpu' in firmware_paths:
try:
print(f' [Upload Firmware]')
kp.core.load_firmware_from_file(
device_group=device_group,
scpu_fw_path=firmware_paths['scpu'],
ncpu_fw_path=firmware_paths['ncpu']
)
print(f' - Success')
except kp.ApiKPException as exception:
print(f'Error: upload firmware failed for {series_name}, error = {str(exception)}')
continue
else:
print(f' [Upload Firmware] - Skipped (no firmware paths configured)')
# Upload model
model_path = config.get('model_path')
if model_path:
try:
print(f' [Upload Model]')
model_descriptor = kp.core.load_model_from_file(
device_group=device_group,
file_path=model_path
)
self.model_descriptors[series_name] = model_descriptor
print(f' - Success')
# Extract model input dimensions for this series
if model_descriptor and model_descriptor.models:
model = model_descriptor.models[0]
if hasattr(model, 'input_nodes') and model.input_nodes:
input_node = model.input_nodes[0]
shape = input_node.tensor_shape_info.data.shape_npu
model_input_shape = (shape[3], shape[2]) # (width, height)
model_input_channels = shape[1] # 3 for RGB
print(f' Model input shape: {model_input_shape}, channels: {model_input_channels}')
# Store series-specific model info
self.series_groups[series_name]['model_input_shape'] = model_input_shape
self.series_groups[series_name]['model_input_channels'] = model_input_channels
except kp.ApiKPException as exception:
print(f'Error: upload model failed for {series_name}, error = {str(exception)}')
continue
else:
print(f' [Upload Model] - Skipped (no model path configured)')
print('[Multi-Series Initialization Complete]')
# Set up legacy compatibility attributes using the first series
if self.device_groups:
first_series = next(iter(self.device_groups.keys()))
self.device_group = self.device_groups[first_series]
self.model_nef_descriptor = self.model_descriptors.get(first_series)
# Set up generic inference descriptor from first series
if self.model_nef_descriptor:
self.generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor(
model_id=self.model_nef_descriptor.models[0].id,
)
# Set model input shape from first series
if first_series in self.series_groups:
series_info = self.series_groups[first_series]
self.model_input_shape = series_info.get('model_input_shape', (640, 640))
self.model_input_channels = series_info.get('model_input_channels', 3)
else:
self.model_input_shape = (640, 640)
self.model_input_channels = 3
def preprocess_frame(self, frame: np.ndarray, target_format: str = 'BGR565') -> np.ndarray:
"""
@ -704,6 +812,13 @@ class MultiDongle:
Start the send and receive threads.
Must be called after initialize().
"""
if self.multi_series_mode:
self._start_multi_series()
else:
self._start_single_series()
def _start_single_series(self):
"""Start single-series (legacy) mode"""
if self.device_group is None:
raise RuntimeError("MultiDongle not initialized. Call initialize() first.")
@ -717,12 +832,63 @@ class MultiDongle:
self._receive_thread = threading.Thread(target=self._receive_thread_func, daemon=True)
self._receive_thread.start()
print("Receive thread started.")
def _start_multi_series(self):
"""Start multi-series mode"""
if not self.device_groups:
raise RuntimeError("MultiDongle not initialized. Call initialize() first.")
print("[Starting Multi-Series Threads]")
self._stop_event.clear()
# Start dispatcher thread
if self.dispatcher_thread is None or not self.dispatcher_thread.is_alive():
self.dispatcher_thread = threading.Thread(target=self._dispatcher_thread_func, daemon=True)
self.dispatcher_thread.start()
print("Dispatcher thread started.")
# Start send/receive threads for each series
for series_name in self.device_groups.keys():
# Start send thread for this series
if series_name not in self.send_threads or not self.send_threads[series_name].is_alive():
send_thread = threading.Thread(
target=self._multi_series_send_thread_func,
args=(series_name,),
daemon=True
)
self.send_threads[series_name] = send_thread
send_thread.start()
print(f"Send thread started for {series_name}.")
# Start receive thread for this series
if series_name not in self.receive_threads or not self.receive_threads[series_name].is_alive():
receive_thread = threading.Thread(
target=self._multi_series_receive_thread_func,
args=(series_name,),
daemon=True
)
self.receive_threads[series_name] = receive_thread
receive_thread.start()
print(f"Receive thread started for {series_name}.")
# Start result ordering thread
if self.result_ordering_thread is None or not self.result_ordering_thread.is_alive():
self.result_ordering_thread = threading.Thread(target=self._result_ordering_thread_func, daemon=True)
self.result_ordering_thread.start()
print("Result ordering thread started.")
def stop(self):
"""Improved stop method with better cleanup"""
if self._stop_event.is_set():
return # Already stopping
if self.multi_series_mode:
self._stop_multi_series()
else:
self._stop_single_series()
def _stop_single_series(self):
"""Stop single-series (legacy) mode"""
print("Stopping threads...")
self._stop_event.set()
@ -751,6 +917,217 @@ class MultiDongle:
except kp.ApiKPException as e:
print(f"Error disconnecting device group: {e}")
self.device_group = None
def _stop_multi_series(self):
"""Stop multi-series mode"""
print("[Stopping Multi-Series Threads]")
self._stop_event.set()
# Clear input queue to unblock dispatcher
while not self._input_queue.empty():
try:
self._input_queue.get_nowait()
except queue.Empty:
break
# Signal dispatcher thread to wake up
self._input_queue.put(None)
# Clear series result queues
for series_name, result_queue in self.result_queues.items():
while not result_queue.empty():
try:
result_queue.get_nowait()
except queue.Empty:
break
# Stop all send threads
for series_name, send_thread in self.send_threads.items():
if send_thread and send_thread.is_alive():
send_thread.join(timeout=2.0)
if send_thread.is_alive():
print(f"Warning: Send thread for {series_name} didn't stop cleanly")
# Stop all receive threads
for series_name, receive_thread in self.receive_threads.items():
if receive_thread and receive_thread.is_alive():
receive_thread.join(timeout=2.0)
if receive_thread.is_alive():
print(f"Warning: Receive thread for {series_name} didn't stop cleanly")
# Stop dispatcher thread
if self.dispatcher_thread and self.dispatcher_thread.is_alive():
self.dispatcher_thread.join(timeout=2.0)
if self.dispatcher_thread.is_alive():
print("Warning: Dispatcher thread didn't stop cleanly")
# Stop result ordering thread
if self.result_ordering_thread and self.result_ordering_thread.is_alive():
self.result_ordering_thread.join(timeout=2.0)
if self.result_ordering_thread.is_alive():
print("Warning: Result ordering thread didn't stop cleanly")
# Disconnect all device groups
print("Disconnecting device groups...")
for series_name, device_group in self.device_groups.items():
try:
kp.core.disconnect_devices(device_group=device_group)
print(f"Device group for {series_name} disconnected successfully.")
except kp.ApiKPException as e:
print(f"Error disconnecting device group for {series_name}: {e}")
self.device_groups.clear()
def _dispatcher_thread_func(self):
"""Dispatcher thread: assigns tasks to dongles based on load balancing"""
print("Dispatcher thread started")
while not self._stop_event.is_set():
try:
task = self._input_queue.get(timeout=0.1)
if task is None: # Sentinel value
continue
# Select optimal dongle based on current load and capacity
selected_series = self._select_optimal_series()
if selected_series is None:
print("Warning: No series available for task dispatch")
continue
# Enqueue to selected series
self.result_queues[selected_series].put(task)
self.current_loads[selected_series] += 1
except queue.Empty:
continue
except Exception as e:
print(f"Error in dispatcher: {e}")
if not self._stop_event.is_set():
self._stop_event.set()
print("Dispatcher thread stopped")
def _multi_series_send_thread_func(self, series_name: str):
"""Send thread for specific dongle series - with tuple handling fix"""
print(f"Send worker started for {series_name}")
device_group = self.device_groups[series_name]
result_queue = self.result_queues[series_name]
model_descriptor = self.model_descriptors[series_name]
while not self._stop_event.is_set():
try:
task = result_queue.get(timeout=0.1)
if task is None:
continue
# Handle both tuple and dict formats
if isinstance(task, tuple):
# Legacy single-series format: (image_data, image_format)
image_data, image_format = task
sequence_id = getattr(self, '_inference_counter', 0)
self._inference_counter = sequence_id + 1
elif isinstance(task, dict):
# Multi-series format: dict with keys
image_data = task.get('image_data')
image_format = task.get('image_format', kp.ImageFormat.KP_IMAGE_FORMAT_RGB565)
sequence_id = task.get('sequence_id', 0)
else:
print(f"Error: Unknown task format: {type(task)}")
continue
if image_data is None:
print(f"Error: No image data in task")
continue
# Create inference descriptor for this task
inference_descriptor = kp.GenericImageInferenceDescriptor(
model_id=model_descriptor.models[0].id,
)
inference_descriptor.inference_number = sequence_id
inference_descriptor.input_node_image_list = [
kp.GenericInputNodeImage(
image=image_data,
image_format=image_format,
resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE,
padding_mode=kp.PaddingMode.KP_PADDING_CORNER,
normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON
)
]
# Send inference
kp.inference.generic_image_inference_send(
device_group=device_group,
generic_inference_input_descriptor=inference_descriptor
)
except queue.Empty:
continue
except kp.ApiKPException as e:
print(f"Error in {series_name} send worker: {e}")
if not self._stop_event.is_set():
self._stop_event.set()
except Exception as e:
print(f"Unexpected error in {series_name} send worker: {e}")
if not self._stop_event.is_set():
self._stop_event.set()
print(f"Send worker stopped for {series_name}")
def _multi_series_receive_thread_func(self, series_name: str):
"""Receive thread for specific dongle series"""
print(f"Receive worker started for {series_name}")
device_group = self.device_groups[series_name]
while not self._stop_event.is_set():
try:
# Receive inference result
raw_result = kp.inference.generic_image_inference_receive(device_group=device_group)
# Create result object
result = {
'sequence_id': raw_result.header.inference_number,
'result': raw_result,
'dongle_series': series_name,
'timestamp': time.time()
}
# Add to pending results for ordering
self.pending_results[result['sequence_id']] = result
self.current_loads[series_name] = max(0, self.current_loads[series_name] - 1)
except kp.ApiKPException as e:
if not self._stop_event.is_set():
print(f"Error in {series_name} receive worker: {e}")
self._stop_event.set()
except Exception as e:
print(f"Unexpected error in {series_name} receive worker: {e}")
print(f"Receive worker stopped for {series_name}")
def _result_ordering_thread_func(self):
"""Result ordering thread: ensures results are output in sequence order"""
print("Result ordering worker started")
while not self._stop_event.is_set():
# Check if next expected result is available
if self.next_output_sequence in self.pending_results:
result = self.pending_results.pop(self.next_output_sequence)
self._ordered_output_queue.put(result)
self.next_output_sequence += 1
# Clean up old pending results to prevent memory bloat
if len(self.pending_results) > 1000: # result_buffer_size
oldest_sequences = sorted(self.pending_results.keys())[:500]
for seq_id in oldest_sequences:
if seq_id < self.next_output_sequence:
self.pending_results.pop(seq_id, None)
else:
time.sleep(0.001) # Small delay to prevent busy waiting
print("Result ordering worker stopped")
def put_input(self, image: Union[str, np.ndarray], format: str, target_size: Tuple[int, int] = None):
"""
@ -773,7 +1150,22 @@ class MultiDongle:
else:
raise ValueError(f"Unsupported format: {format}")
self._input_queue.put((image_data, image_format_enum))
if self.multi_series_mode:
# In multi-series mode, create a task with sequence ID
with self.sequence_lock:
sequence_id = self.sequence_counter
self.sequence_counter += 1
task = {
'sequence_id': sequence_id,
'image_data': image_data,
'image_format': image_format_enum,
'timestamp': time.time()
}
self._input_queue.put(task)
else:
# In single-series mode, use the original format
self._input_queue.put((image_data, image_format_enum))
def get_output(self, timeout: float = None):
"""
@ -783,7 +1175,15 @@ class MultiDongle:
:return: Received data (e.g., kp.GenericInferenceOutputDescriptor) or None if no data available within timeout.
"""
try:
return self._output_queue.get(block=timeout is not None, timeout=timeout)
if self.multi_series_mode:
# In multi-series mode, use the ordered output queue
result = self._ordered_output_queue.get(block=timeout is not None, timeout=timeout)
if result and isinstance(result, dict):
return result.get('result') # Extract the actual inference result
return result
else:
# In single-series mode, use the regular output queue
return self._output_queue.get(block=timeout is not None, timeout=timeout)
except queue.Empty:
return None

View File

@ -463,6 +463,72 @@ class MFlowConverter:
print("="*60 + "\n")
def _build_multi_series_config_from_properties(self, properties: Dict[str, Any]) -> Dict[str, Any]:
"""Build multi-series configuration from node properties"""
try:
enabled_series = properties.get('enabled_series', [])
assets_folder = properties.get('assets_folder', '')
if not enabled_series:
print("Warning: No enabled_series found in multi-series mode")
return {}
multi_series_config = {}
for series in enabled_series:
# Get port IDs for this series
port_ids_str = properties.get(f'kl{series}_port_ids', '')
if not port_ids_str or not port_ids_str.strip():
print(f"Warning: No port IDs configured for KL{series}")
continue
# Parse port IDs (comma-separated string to list of integers)
try:
port_ids = [int(pid.strip()) for pid in port_ids_str.split(',') if pid.strip()]
if not port_ids:
continue
except ValueError:
print(f"Warning: Invalid port IDs for KL{series}: {port_ids_str}")
continue
# Build series configuration
series_config = {
"port_ids": port_ids
}
# Add model path if assets folder is configured
if assets_folder:
import os
model_folder = os.path.join(assets_folder, 'Models', f'KL{series}')
if os.path.exists(model_folder):
# Look for .nef files in the model folder
nef_files = [f for f in os.listdir(model_folder) if f.endswith('.nef')]
if nef_files:
series_config["model_path"] = os.path.join(model_folder, nef_files[0])
print(f"Found model for KL{series}: {series_config['model_path']}")
# Add firmware paths if available
firmware_folder = os.path.join(assets_folder, 'Firmware', f'KL{series}')
if os.path.exists(firmware_folder):
scpu_path = os.path.join(firmware_folder, 'fw_scpu.bin')
ncpu_path = os.path.join(firmware_folder, 'fw_ncpu.bin')
if os.path.exists(scpu_path) and os.path.exists(ncpu_path):
series_config["firmware_paths"] = {
"scpu": scpu_path,
"ncpu": ncpu_path
}
print(f"Found firmware for KL{series}: scpu={scpu_path}, ncpu={ncpu_path}")
multi_series_config[f'KL{series}'] = series_config
print(f"Configured KL{series} with {len(port_ids)} devices on ports {port_ids}")
return multi_series_config if multi_series_config else {}
except Exception as e:
print(f"Error building multi-series config from properties: {e}")
return {}
def _create_stage_configs(self, model_nodes: List[Dict], preprocess_nodes: List[Dict],
postprocess_nodes: List[Dict], connections: List[Dict]) -> List[StageConfig]:
"""Create StageConfig objects for each model node"""
@ -502,16 +568,38 @@ class MFlowConverter:
# Queue size
max_queue_size = properties.get('max_queue_size', 50)
# Create StageConfig
stage_config = StageConfig(
stage_id=stage_id,
port_ids=port_ids,
scpu_fw_path=scpu_fw_path,
ncpu_fw_path=ncpu_fw_path,
model_path=model_path,
upload_fw=upload_fw,
max_queue_size=max_queue_size
)
# Check if multi-series mode is enabled
multi_series_mode = properties.get('multi_series_mode', False)
multi_series_config = None
if multi_series_mode:
# Build multi-series config from node properties
multi_series_config = self._build_multi_series_config_from_properties(properties)
print(f"Multi-series config for {stage_id}: {multi_series_config}")
# Create StageConfig for multi-series mode
stage_config = StageConfig(
stage_id=stage_id,
port_ids=[], # Will be handled by multi_series_config
scpu_fw_path='', # Will be handled by multi_series_config
ncpu_fw_path='', # Will be handled by multi_series_config
model_path='', # Will be handled by multi_series_config
upload_fw=upload_fw,
max_queue_size=max_queue_size,
multi_series_config=multi_series_config
)
else:
# Create StageConfig for single-series mode (legacy)
stage_config = StageConfig(
stage_id=stage_id,
port_ids=port_ids,
scpu_fw_path=scpu_fw_path,
ncpu_fw_path=ncpu_fw_path,
model_path=model_path,
upload_fw=upload_fw,
max_queue_size=max_queue_size,
multi_series_config=None
)
stage_configs.append(stage_config)
@ -625,24 +713,89 @@ class MFlowConverter:
"""Validate individual stage configuration"""
errors = []
# Check model path
if not stage_config.model_path:
errors.append(f"Stage {stage_num}: Model path is required")
elif not os.path.exists(stage_config.model_path):
errors.append(f"Stage {stage_num}: Model file not found: {stage_config.model_path}")
# Check firmware paths if upload_fw is True
if stage_config.upload_fw:
if not os.path.exists(stage_config.scpu_fw_path):
errors.append(f"Stage {stage_num}: SCPU firmware not found: {stage_config.scpu_fw_path}")
if not os.path.exists(stage_config.ncpu_fw_path):
errors.append(f"Stage {stage_num}: NCPU firmware not found: {stage_config.ncpu_fw_path}")
# Check if this is multi-series configuration
if stage_config.multi_series_config:
# Multi-series validation
errors.extend(self._validate_multi_series_config(stage_config.multi_series_config, stage_num))
else:
# Single-series validation (legacy)
# Check model path
if not stage_config.model_path:
errors.append(f"Stage {stage_num}: Model path is required")
elif not os.path.exists(stage_config.model_path):
errors.append(f"Stage {stage_num}: Model file not found: {stage_config.model_path}")
# Check port IDs
if not stage_config.port_ids:
errors.append(f"Stage {stage_num}: At least one port ID is required")
# Check firmware paths if upload_fw is True
if stage_config.upload_fw:
if not os.path.exists(stage_config.scpu_fw_path):
errors.append(f"Stage {stage_num}: SCPU firmware not found: {stage_config.scpu_fw_path}")
if not os.path.exists(stage_config.ncpu_fw_path):
errors.append(f"Stage {stage_num}: NCPU firmware not found: {stage_config.ncpu_fw_path}")
# Check port IDs
if not stage_config.port_ids:
errors.append(f"Stage {stage_num}: At least one port ID is required")
return errors
def _validate_multi_series_config(self, multi_series_config: Dict[str, Any], stage_num: int) -> List[str]:
"""Validate multi-series configuration"""
errors = []
if not multi_series_config:
errors.append(f"Stage {stage_num}: Multi-series configuration is empty")
return errors
print(f"Validating multi-series config for stage {stage_num}: {list(multi_series_config.keys())}")
# Check each series configuration
for series_name, series_config in multi_series_config.items():
if not isinstance(series_config, dict):
errors.append(f"Stage {stage_num}: Invalid configuration for {series_name}")
continue
# Check port IDs
port_ids = series_config.get('port_ids', [])
if not port_ids:
errors.append(f"Stage {stage_num}: {series_name} has no port IDs configured")
continue
if not isinstance(port_ids, list) or not all(isinstance(p, int) for p in port_ids):
errors.append(f"Stage {stage_num}: {series_name} port IDs must be a list of integers")
continue
print(f" {series_name}: {len(port_ids)} ports configured")
# Check model path
model_path = series_config.get('model_path')
if model_path:
if not os.path.exists(model_path):
errors.append(f"Stage {stage_num}: {series_name} model file not found: {model_path}")
else:
print(f" {series_name}: Model validated: {model_path}")
else:
print(f" {series_name}: No model path specified (optional for multi-series)")
# Check firmware paths if specified
firmware_paths = series_config.get('firmware_paths')
if firmware_paths and isinstance(firmware_paths, dict):
scpu_path = firmware_paths.get('scpu')
ncpu_path = firmware_paths.get('ncpu')
if scpu_path and not os.path.exists(scpu_path):
errors.append(f"Stage {stage_num}: {series_name} SCPU firmware not found: {scpu_path}")
elif scpu_path:
print(f" {series_name}: SCPU firmware validated: {scpu_path}")
if ncpu_path and not os.path.exists(ncpu_path):
errors.append(f"Stage {stage_num}: {series_name} NCPU firmware not found: {ncpu_path}")
elif ncpu_path:
print(f" {series_name}: NCPU firmware validated: {ncpu_path}")
if not errors:
print(f"Stage {stage_num}: Multi-series configuration validation passed")
return errors
def convert_mflow_file(mflow_path: str, firmware_path: str = "./firmware") -> PipelineConfig:

View File

@ -5,6 +5,8 @@ This module provides node implementations that exactly match the original
properties and behavior from the monolithic UI.py file.
"""
import os
try:
from NodeGraphQt import BaseNode
NODEGRAPH_AVAILABLE = True
@ -119,6 +121,14 @@ class ExactModelNode(BaseNode):
self.create_property('multi_series_mode', False)
self.create_property('assets_folder', '')
self.create_property('enabled_series', ['520', '720'])
# Series-specific port ID configurations
self.create_property('kl520_port_ids', '')
self.create_property('kl720_port_ids', '')
self.create_property('kl630_port_ids', '')
self.create_property('kl730_port_ids', '')
self.create_property('kl540_port_ids', '')
self.create_property('max_queue_size', 100)
self.create_property('result_buffer_size', 1000)
self.create_property('batch_size', 1)
@ -139,6 +149,14 @@ class ExactModelNode(BaseNode):
'multi_series_mode': {'type': 'bool', 'default': False, 'description': 'Enable multi-series dongle support'},
'assets_folder': {'type': 'file_path', 'filter': 'Folder', 'mode': 'directory'},
'enabled_series': {'type': 'list', 'options': ['520', '720', '630', '730', '540'], 'default': ['520', '720']},
# Series-specific port ID options
'kl520_port_ids': {'placeholder': 'e.g., 28,32 (comma-separated port IDs for KL520)', 'description': 'Port IDs for KL520 dongles'},
'kl720_port_ids': {'placeholder': 'e.g., 30,34 (comma-separated port IDs for KL720)', 'description': 'Port IDs for KL720 dongles'},
'kl630_port_ids': {'placeholder': 'e.g., 36,38 (comma-separated port IDs for KL630)', 'description': 'Port IDs for KL630 dongles'},
'kl730_port_ids': {'placeholder': 'e.g., 40,42 (comma-separated port IDs for KL730)', 'description': 'Port IDs for KL730 dongles'},
'kl540_port_ids': {'placeholder': 'e.g., 44,46 (comma-separated port IDs for KL540)', 'description': 'Port IDs for KL540 dongles'},
'max_queue_size': {'min': 1, 'max': 1000, 'default': 100},
'result_buffer_size': {'min': 100, 'max': 10000, 'default': 1000},
'batch_size': {'min': 1, 'max': 32, 'default': 1},
@ -202,11 +220,25 @@ class ExactModelNode(BaseNode):
if multi_series_mode:
# Multi-series mode: show multi-series specific properties
return base_props + [
'assets_folder', 'enabled_series',
multi_props = ['assets_folder', 'enabled_series']
# Add port ID configurations for enabled series
try:
enabled_series = self.get_property('enabled_series') or []
for series in enabled_series:
port_prop = f'kl{series}_port_ids'
if port_prop not in multi_props: # Avoid duplicates
multi_props.append(port_prop)
except:
pass # If can't get enabled_series, just show basic properties
# Add other multi-series properties
multi_props.extend([
'max_queue_size', 'result_buffer_size', 'batch_size',
'enable_preprocessing', 'enable_postprocessing'
]
])
return base_props + multi_props
else:
# Single-series mode: show traditional properties
return base_props + [
@ -229,8 +261,8 @@ class ExactModelNode(BaseNode):
multi_series_mode = self.get_property('multi_series_mode')
if multi_series_mode:
# Multi-series configuration
return {
# Multi-series configuration with series-specific port IDs
config = {
'multi_series_mode': True,
'assets_folder': self.get_property('assets_folder'),
'enabled_series': self.get_property('enabled_series'),
@ -240,6 +272,13 @@ class ExactModelNode(BaseNode):
'enable_preprocessing': self.get_property('enable_preprocessing'),
'enable_postprocessing': self.get_property('enable_postprocessing')
}
# Build multi-series config for MultiDongle
multi_series_config = self._build_multi_series_config()
if multi_series_config:
config['multi_series_config'] = multi_series_config
return config
else:
# Single-series configuration
return {
@ -265,6 +304,67 @@ class ExactModelNode(BaseNode):
'upload_fw': self.get_property('upload_fw', True)
}
def _build_multi_series_config(self):
"""Build multi-series configuration for MultiDongle"""
try:
enabled_series = self.get_property('enabled_series') or []
assets_folder = self.get_property('assets_folder') or ''
if not enabled_series:
return None
multi_series_config = {}
for series in enabled_series:
# Get port IDs for this series
port_ids_str = self.get_property(f'kl{series}_port_ids') or ''
if not port_ids_str.strip():
continue # Skip series without port IDs
# Parse port IDs (comma-separated string to list of integers)
try:
port_ids = [int(pid.strip()) for pid in port_ids_str.split(',') if pid.strip()]
if not port_ids:
continue
except ValueError:
print(f"Warning: Invalid port IDs for KL{series}: {port_ids_str}")
continue
# Build series configuration
series_config = {
"port_ids": port_ids
}
# Add model path if assets folder is configured
if assets_folder:
import os
model_folder = os.path.join(assets_folder, 'Models', f'KL{series}')
if os.path.exists(model_folder):
# Look for .nef files in the model folder
nef_files = [f for f in os.listdir(model_folder) if f.endswith('.nef')]
if nef_files:
series_config["model_path"] = os.path.join(model_folder, nef_files[0])
# Add firmware paths if available
firmware_folder = os.path.join(assets_folder, 'Firmware', f'KL{series}')
if os.path.exists(firmware_folder):
scpu_path = os.path.join(firmware_folder, 'fw_scpu.bin')
ncpu_path = os.path.join(firmware_folder, 'fw_ncpu.bin')
if os.path.exists(scpu_path) and os.path.exists(ncpu_path):
series_config["firmware_paths"] = {
"scpu": scpu_path,
"ncpu": ncpu_path
}
multi_series_config[f'KL{series}'] = series_config
return multi_series_config if multi_series_config else None
except Exception as e:
print(f"Error building multi-series config: {e}")
return None
def get_hardware_requirements(self):
"""Get hardware requirements for this node"""
if not NODEGRAPH_AVAILABLE:
@ -429,6 +529,80 @@ class ExactModelNode(BaseNode):
except Exception as e:
return {'status': 'error', 'message': f'Error reading assets folder: {e}'}
def validate_configuration(self) -> tuple[bool, str]:
"""
Validate the current node configuration.
Returns:
Tuple of (is_valid, error_message)
"""
if not NODEGRAPH_AVAILABLE:
return True, ""
try:
multi_series_mode = self.get_property('multi_series_mode')
if multi_series_mode:
# Multi-series validation
enabled_series = self.get_property('enabled_series')
if not enabled_series:
return False, "No series enabled in multi-series mode"
# Check if at least one series has port IDs configured
has_valid_series = False
for series in enabled_series:
port_ids_str = self.get_property(f'kl{series}_port_ids', '')
if port_ids_str and port_ids_str.strip():
# Validate port ID format
try:
port_ids = [int(pid.strip()) for pid in port_ids_str.split(',') if pid.strip()]
if port_ids:
has_valid_series = True
print(f"Valid series config found for KL{series}: ports {port_ids}")
except ValueError:
print(f"Warning: Invalid port ID format for KL{series}: {port_ids_str}")
continue
if not has_valid_series:
return False, "At least one series must have valid port IDs configured"
# Assets folder validation (optional for multi-series)
assets_folder = self.get_property('assets_folder')
if assets_folder:
if not os.path.exists(assets_folder):
print(f"Warning: Assets folder does not exist: {assets_folder}")
else:
# Validate assets folder structure if provided
assets_info = self.get_assets_folder_info()
if assets_info.get('status') == 'error':
print(f"Warning: Assets folder issue: {assets_info.get('message', 'Unknown error')}")
print("Multi-series mode validation passed")
return True, ""
else:
# Single-series validation (legacy)
model_path = self.get_property('model_path')
if not model_path:
return False, "Model path is required"
if not os.path.exists(model_path):
return False, f"Model file does not exist: {model_path}"
# Check dongle series
dongle_series = self.get_property('dongle_series')
if dongle_series not in ['520', '720', '1080', 'Custom']:
return False, f"Invalid dongle series: {dongle_series}"
# Check number of dongles
num_dongles = self.get_property('num_dongles')
if not isinstance(num_dongles, int) or num_dongles < 1:
return False, "Number of dongles must be at least 1"
return True, ""
except Exception as e:
return False, f"Validation error: {str(e)}"
class ExactPreprocessNode(BaseNode):
"""Preprocessing node - exact match to original."""

View File

@ -622,10 +622,32 @@ Stage Configurations:
for i, stage_config in enumerate(config.stage_configs, 1):
analysis_text += f"\nStage {i}: {stage_config.stage_id}\n"
analysis_text += f" Port IDs: {stage_config.port_ids}\n"
analysis_text += f" Model Path: {stage_config.model_path}\n"
analysis_text += f" SCPU Firmware: {stage_config.scpu_fw_path}\n"
analysis_text += f" NCPU Firmware: {stage_config.ncpu_fw_path}\n"
# Check if this is multi-series configuration
if stage_config.multi_series_config:
analysis_text += f" Mode: Multi-Series\n"
analysis_text += f" Series Configured: {list(stage_config.multi_series_config.keys())}\n"
# Show details for each series
for series_name, series_config in stage_config.multi_series_config.items():
analysis_text += f" \n {series_name} Configuration:\n"
analysis_text += f" Port IDs: {series_config.get('port_ids', [])}\n"
model_path = series_config.get('model_path', 'Not specified')
analysis_text += f" Model: {model_path}\n"
firmware_paths = series_config.get('firmware_paths', {})
if firmware_paths:
analysis_text += f" SCPU Firmware: {firmware_paths.get('scpu', 'Not specified')}\n"
analysis_text += f" NCPU Firmware: {firmware_paths.get('ncpu', 'Not specified')}\n"
else:
analysis_text += f" Firmware: Not specified\n"
else:
# Single-series (legacy) configuration
analysis_text += f" Mode: Single-Series\n"
analysis_text += f" Port IDs: {stage_config.port_ids}\n"
analysis_text += f" Model Path: {stage_config.model_path}\n"
analysis_text += f" SCPU Firmware: {stage_config.scpu_fw_path}\n"
analysis_text += f" NCPU Firmware: {stage_config.ncpu_fw_path}\n"
analysis_text += f" Upload Firmware: {stage_config.upload_fw}\n"
analysis_text += f" Max Queue Size: {stage_config.max_queue_size}\n"
@ -663,23 +685,66 @@ Stage Configurations:
stage_group = QGroupBox(f"Stage {i}: {stage_config.stage_id}")
stage_layout = QFormLayout(stage_group)
# Create read-only fields for stage configuration
model_path_edit = QLineEdit(stage_config.model_path)
model_path_edit.setReadOnly(True)
stage_layout.addRow("Model Path:", model_path_edit)
scpu_fw_edit = QLineEdit(stage_config.scpu_fw_path)
scpu_fw_edit.setReadOnly(True)
stage_layout.addRow("SCPU Firmware:", scpu_fw_edit)
ncpu_fw_edit = QLineEdit(stage_config.ncpu_fw_path)
ncpu_fw_edit.setReadOnly(True)
stage_layout.addRow("NCPU Firmware:", ncpu_fw_edit)
port_ids_edit = QLineEdit(str(stage_config.port_ids))
port_ids_edit.setReadOnly(True)
stage_layout.addRow("Port IDs:", port_ids_edit)
# Check if this is multi-series configuration
if stage_config.multi_series_config:
# Multi-series configuration display
mode_edit = QLineEdit("Multi-Series")
mode_edit.setReadOnly(True)
stage_layout.addRow("Mode:", mode_edit)
series_edit = QLineEdit(str(list(stage_config.multi_series_config.keys())))
series_edit.setReadOnly(True)
stage_layout.addRow("Series:", series_edit)
# Show details for each series
for series_name, series_config in stage_config.multi_series_config.items():
series_label = QLabel(f"--- {series_name} ---")
series_label.setStyleSheet("font-weight: bold; color: #89b4fa;")
stage_layout.addRow(series_label)
port_ids_edit = QLineEdit(str(series_config.get('port_ids', [])))
port_ids_edit.setReadOnly(True)
stage_layout.addRow(f"{series_name} Port IDs:", port_ids_edit)
model_path = series_config.get('model_path', 'Not specified')
model_path_edit = QLineEdit(model_path)
model_path_edit.setReadOnly(True)
stage_layout.addRow(f"{series_name} Model:", model_path_edit)
firmware_paths = series_config.get('firmware_paths', {})
if firmware_paths:
scpu_path = firmware_paths.get('scpu', 'Not specified')
scpu_fw_edit = QLineEdit(scpu_path)
scpu_fw_edit.setReadOnly(True)
stage_layout.addRow(f"{series_name} SCPU FW:", scpu_fw_edit)
ncpu_path = firmware_paths.get('ncpu', 'Not specified')
ncpu_fw_edit = QLineEdit(ncpu_path)
ncpu_fw_edit.setReadOnly(True)
stage_layout.addRow(f"{series_name} NCPU FW:", ncpu_fw_edit)
else:
# Single-series configuration display
mode_edit = QLineEdit("Single-Series")
mode_edit.setReadOnly(True)
stage_layout.addRow("Mode:", mode_edit)
model_path_edit = QLineEdit(stage_config.model_path)
model_path_edit.setReadOnly(True)
stage_layout.addRow("Model Path:", model_path_edit)
scpu_fw_edit = QLineEdit(stage_config.scpu_fw_path)
scpu_fw_edit.setReadOnly(True)
stage_layout.addRow("SCPU Firmware:", scpu_fw_edit)
ncpu_fw_edit = QLineEdit(stage_config.ncpu_fw_path)
ncpu_fw_edit.setReadOnly(True)
stage_layout.addRow("NCPU Firmware:", ncpu_fw_edit)
port_ids_edit = QLineEdit(str(stage_config.port_ids))
port_ids_edit.setReadOnly(True)
stage_layout.addRow("Port IDs:", port_ids_edit)
# Common fields
queue_size_spin = QSpinBox()
queue_size_spin.setValue(stage_config.max_queue_size)
queue_size_spin.setReadOnly(True)