diff --git a/core/functions/InferencePipeline.py b/core/functions/InferencePipeline.py index f8dbc40..c9b6860 100644 --- a/core/functions/InferencePipeline.py +++ b/core/functions/InferencePipeline.py @@ -19,6 +19,8 @@ class StageConfig: model_path: str upload_fw: bool max_queue_size: int = 50 + # Multi-series support + multi_series_config: Optional[Dict[str, Any]] = None # For multi-series mode # Inter-stage processing input_preprocessor: Optional[PreProcessor] = None # Before this stage output_postprocessor: Optional[PostProcessor] = None # After this stage @@ -43,15 +45,25 @@ class PipelineStage: self.stage_id = config.stage_id # Initialize MultiDongle for this stage - self.multidongle = MultiDongle( - port_id=config.port_ids, - scpu_fw_path=config.scpu_fw_path, - ncpu_fw_path=config.ncpu_fw_path, - model_path=config.model_path, - upload_fw=config.upload_fw, - auto_detect=config.auto_detect if hasattr(config, 'auto_detect') else False, - max_queue_size=config.max_queue_size - ) + if config.multi_series_config: + # Multi-series mode + self.multidongle = MultiDongle( + multi_series_config=config.multi_series_config, + max_queue_size=config.max_queue_size + ) + print(f"[Stage {self.stage_id}] Initialized in multi-series mode with config: {list(config.multi_series_config.keys())}") + else: + # Single-series mode (legacy) + self.multidongle = MultiDongle( + port_id=config.port_ids, + scpu_fw_path=config.scpu_fw_path, + ncpu_fw_path=config.ncpu_fw_path, + model_path=config.model_path, + upload_fw=config.upload_fw, + auto_detect=config.auto_detect if hasattr(config, 'auto_detect') else False, + max_queue_size=config.max_queue_size + ) + print(f"[Stage {self.stage_id}] Initialized in single-series mode") # Store preprocessor and postprocessor for later use self.stage_preprocessor = config.stage_preprocessor diff --git a/core/functions/Multidongle.py b/core/functions/Multidongle.py index 130e234..5ae609b 100644 --- a/core/functions/Multidongle.py +++ b/core/functions/Multidongle.py @@ -329,6 +329,9 @@ class MultiDongle: self._input_queue = queue.Queue() self._ordered_output_queue = queue.Queue() + # Create output queue for legacy compatibility + self._output_queue = self._ordered_output_queue # Point to the same queue + self.result_queues = {} # series_name -> queue for series_name in multi_series_config.keys(): self.result_queues[series_name] = queue.Queue() @@ -492,6 +495,15 @@ class MultiDongle: Connect devices, upload firmware (if upload_fw is True), and upload model. Must be called before start(). """ + if self.multi_series_mode: + # Multi-series initialization + self._initialize_multi_series() + else: + # Legacy single-series initialization + self._initialize_single_series() + + def _initialize_single_series(self): + """Initialize single-series (legacy) mode""" # Connect device and assign to self.device_group try: print('[Connect Device]') @@ -554,6 +566,102 @@ class MultiDongle: else: print("Warning: Could not get generic inference input descriptor from model.") self.generic_inference_input_descriptor = None + + def _initialize_multi_series(self): + """Initialize multi-series mode""" + print('[Multi-Series Initialization]') + + # Initialize each series separately + for series_name, config in self.series_config.items(): + print(f'[Initializing {series_name}]') + + # Get port IDs for this series + port_ids = config.get('port_ids', []) + if not port_ids: + print(f'Warning: No port IDs configured for {series_name}, skipping') + continue + + # Connect devices for this series + try: + print(f' [Connect Devices] Port IDs: {port_ids}') + device_group = kp.core.connect_devices(usb_port_ids=port_ids) + self.device_groups[series_name] = device_group + print(f' - Success ({len(port_ids)} devices)') + except kp.ApiKPException as exception: + print(f'Error: connect devices failed for {series_name}, port IDs = {port_ids}, error = {str(exception)}') + continue + + # Upload firmware if available + firmware_paths = config.get('firmware_paths') + if firmware_paths and 'scpu' in firmware_paths and 'ncpu' in firmware_paths: + try: + print(f' [Upload Firmware]') + kp.core.load_firmware_from_file( + device_group=device_group, + scpu_fw_path=firmware_paths['scpu'], + ncpu_fw_path=firmware_paths['ncpu'] + ) + print(f' - Success') + except kp.ApiKPException as exception: + print(f'Error: upload firmware failed for {series_name}, error = {str(exception)}') + continue + else: + print(f' [Upload Firmware] - Skipped (no firmware paths configured)') + + # Upload model + model_path = config.get('model_path') + if model_path: + try: + print(f' [Upload Model]') + model_descriptor = kp.core.load_model_from_file( + device_group=device_group, + file_path=model_path + ) + self.model_descriptors[series_name] = model_descriptor + print(f' - Success') + + # Extract model input dimensions for this series + if model_descriptor and model_descriptor.models: + model = model_descriptor.models[0] + if hasattr(model, 'input_nodes') and model.input_nodes: + input_node = model.input_nodes[0] + shape = input_node.tensor_shape_info.data.shape_npu + model_input_shape = (shape[3], shape[2]) # (width, height) + model_input_channels = shape[1] # 3 for RGB + print(f' Model input shape: {model_input_shape}, channels: {model_input_channels}') + + # Store series-specific model info + self.series_groups[series_name]['model_input_shape'] = model_input_shape + self.series_groups[series_name]['model_input_channels'] = model_input_channels + + except kp.ApiKPException as exception: + print(f'Error: upload model failed for {series_name}, error = {str(exception)}') + continue + else: + print(f' [Upload Model] - Skipped (no model path configured)') + + print('[Multi-Series Initialization Complete]') + + # Set up legacy compatibility attributes using the first series + if self.device_groups: + first_series = next(iter(self.device_groups.keys())) + self.device_group = self.device_groups[first_series] + self.model_nef_descriptor = self.model_descriptors.get(first_series) + + # Set up generic inference descriptor from first series + if self.model_nef_descriptor: + self.generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor( + model_id=self.model_nef_descriptor.models[0].id, + ) + + # Set model input shape from first series + if first_series in self.series_groups: + series_info = self.series_groups[first_series] + self.model_input_shape = series_info.get('model_input_shape', (640, 640)) + self.model_input_channels = series_info.get('model_input_channels', 3) + else: + self.model_input_shape = (640, 640) + self.model_input_channels = 3 def preprocess_frame(self, frame: np.ndarray, target_format: str = 'BGR565') -> np.ndarray: """ @@ -704,6 +812,13 @@ class MultiDongle: Start the send and receive threads. Must be called after initialize(). """ + if self.multi_series_mode: + self._start_multi_series() + else: + self._start_single_series() + + def _start_single_series(self): + """Start single-series (legacy) mode""" if self.device_group is None: raise RuntimeError("MultiDongle not initialized. Call initialize() first.") @@ -717,12 +832,63 @@ class MultiDongle: self._receive_thread = threading.Thread(target=self._receive_thread_func, daemon=True) self._receive_thread.start() print("Receive thread started.") + + def _start_multi_series(self): + """Start multi-series mode""" + if not self.device_groups: + raise RuntimeError("MultiDongle not initialized. Call initialize() first.") + + print("[Starting Multi-Series Threads]") + self._stop_event.clear() + + # Start dispatcher thread + if self.dispatcher_thread is None or not self.dispatcher_thread.is_alive(): + self.dispatcher_thread = threading.Thread(target=self._dispatcher_thread_func, daemon=True) + self.dispatcher_thread.start() + print("Dispatcher thread started.") + + # Start send/receive threads for each series + for series_name in self.device_groups.keys(): + # Start send thread for this series + if series_name not in self.send_threads or not self.send_threads[series_name].is_alive(): + send_thread = threading.Thread( + target=self._multi_series_send_thread_func, + args=(series_name,), + daemon=True + ) + self.send_threads[series_name] = send_thread + send_thread.start() + print(f"Send thread started for {series_name}.") + + # Start receive thread for this series + if series_name not in self.receive_threads or not self.receive_threads[series_name].is_alive(): + receive_thread = threading.Thread( + target=self._multi_series_receive_thread_func, + args=(series_name,), + daemon=True + ) + self.receive_threads[series_name] = receive_thread + receive_thread.start() + print(f"Receive thread started for {series_name}.") + + # Start result ordering thread + if self.result_ordering_thread is None or not self.result_ordering_thread.is_alive(): + self.result_ordering_thread = threading.Thread(target=self._result_ordering_thread_func, daemon=True) + self.result_ordering_thread.start() + print("Result ordering thread started.") def stop(self): """Improved stop method with better cleanup""" if self._stop_event.is_set(): return # Already stopping + if self.multi_series_mode: + self._stop_multi_series() + else: + self._stop_single_series() + + def _stop_single_series(self): + """Stop single-series (legacy) mode""" print("Stopping threads...") self._stop_event.set() @@ -751,6 +917,217 @@ class MultiDongle: except kp.ApiKPException as e: print(f"Error disconnecting device group: {e}") self.device_group = None + + def _stop_multi_series(self): + """Stop multi-series mode""" + print("[Stopping Multi-Series Threads]") + self._stop_event.set() + + # Clear input queue to unblock dispatcher + while not self._input_queue.empty(): + try: + self._input_queue.get_nowait() + except queue.Empty: + break + + # Signal dispatcher thread to wake up + self._input_queue.put(None) + + # Clear series result queues + for series_name, result_queue in self.result_queues.items(): + while not result_queue.empty(): + try: + result_queue.get_nowait() + except queue.Empty: + break + + # Stop all send threads + for series_name, send_thread in self.send_threads.items(): + if send_thread and send_thread.is_alive(): + send_thread.join(timeout=2.0) + if send_thread.is_alive(): + print(f"Warning: Send thread for {series_name} didn't stop cleanly") + + # Stop all receive threads + for series_name, receive_thread in self.receive_threads.items(): + if receive_thread and receive_thread.is_alive(): + receive_thread.join(timeout=2.0) + if receive_thread.is_alive(): + print(f"Warning: Receive thread for {series_name} didn't stop cleanly") + + # Stop dispatcher thread + if self.dispatcher_thread and self.dispatcher_thread.is_alive(): + self.dispatcher_thread.join(timeout=2.0) + if self.dispatcher_thread.is_alive(): + print("Warning: Dispatcher thread didn't stop cleanly") + + # Stop result ordering thread + if self.result_ordering_thread and self.result_ordering_thread.is_alive(): + self.result_ordering_thread.join(timeout=2.0) + if self.result_ordering_thread.is_alive(): + print("Warning: Result ordering thread didn't stop cleanly") + + # Disconnect all device groups + print("Disconnecting device groups...") + for series_name, device_group in self.device_groups.items(): + try: + kp.core.disconnect_devices(device_group=device_group) + print(f"Device group for {series_name} disconnected successfully.") + except kp.ApiKPException as e: + print(f"Error disconnecting device group for {series_name}: {e}") + + self.device_groups.clear() + + def _dispatcher_thread_func(self): + """Dispatcher thread: assigns tasks to dongles based on load balancing""" + print("Dispatcher thread started") + + while not self._stop_event.is_set(): + try: + task = self._input_queue.get(timeout=0.1) + if task is None: # Sentinel value + continue + + # Select optimal dongle based on current load and capacity + selected_series = self._select_optimal_series() + if selected_series is None: + print("Warning: No series available for task dispatch") + continue + + # Enqueue to selected series + self.result_queues[selected_series].put(task) + self.current_loads[selected_series] += 1 + + except queue.Empty: + continue + except Exception as e: + print(f"Error in dispatcher: {e}") + if not self._stop_event.is_set(): + self._stop_event.set() + + print("Dispatcher thread stopped") + + def _multi_series_send_thread_func(self, series_name: str): + """Send thread for specific dongle series - with tuple handling fix""" + print(f"Send worker started for {series_name}") + + device_group = self.device_groups[series_name] + result_queue = self.result_queues[series_name] + model_descriptor = self.model_descriptors[series_name] + + while not self._stop_event.is_set(): + try: + task = result_queue.get(timeout=0.1) + if task is None: + continue + + # Handle both tuple and dict formats + if isinstance(task, tuple): + # Legacy single-series format: (image_data, image_format) + image_data, image_format = task + sequence_id = getattr(self, '_inference_counter', 0) + self._inference_counter = sequence_id + 1 + elif isinstance(task, dict): + # Multi-series format: dict with keys + image_data = task.get('image_data') + image_format = task.get('image_format', kp.ImageFormat.KP_IMAGE_FORMAT_RGB565) + sequence_id = task.get('sequence_id', 0) + else: + print(f"Error: Unknown task format: {type(task)}") + continue + + if image_data is None: + print(f"Error: No image data in task") + continue + + # Create inference descriptor for this task + inference_descriptor = kp.GenericImageInferenceDescriptor( + model_id=model_descriptor.models[0].id, + ) + inference_descriptor.inference_number = sequence_id + + inference_descriptor.input_node_image_list = [ + kp.GenericInputNodeImage( + image=image_data, + image_format=image_format, + resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE, + padding_mode=kp.PaddingMode.KP_PADDING_CORNER, + normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON + ) + ] + + # Send inference + kp.inference.generic_image_inference_send( + device_group=device_group, + generic_inference_input_descriptor=inference_descriptor + ) + + except queue.Empty: + continue + except kp.ApiKPException as e: + print(f"Error in {series_name} send worker: {e}") + if not self._stop_event.is_set(): + self._stop_event.set() + except Exception as e: + print(f"Unexpected error in {series_name} send worker: {e}") + if not self._stop_event.is_set(): + self._stop_event.set() + + print(f"Send worker stopped for {series_name}") + + def _multi_series_receive_thread_func(self, series_name: str): + """Receive thread for specific dongle series""" + print(f"Receive worker started for {series_name}") + + device_group = self.device_groups[series_name] + + while not self._stop_event.is_set(): + try: + # Receive inference result + raw_result = kp.inference.generic_image_inference_receive(device_group=device_group) + + # Create result object + result = { + 'sequence_id': raw_result.header.inference_number, + 'result': raw_result, + 'dongle_series': series_name, + 'timestamp': time.time() + } + + # Add to pending results for ordering + self.pending_results[result['sequence_id']] = result + self.current_loads[series_name] = max(0, self.current_loads[series_name] - 1) + + except kp.ApiKPException as e: + if not self._stop_event.is_set(): + print(f"Error in {series_name} receive worker: {e}") + self._stop_event.set() + except Exception as e: + print(f"Unexpected error in {series_name} receive worker: {e}") + + print(f"Receive worker stopped for {series_name}") + + def _result_ordering_thread_func(self): + """Result ordering thread: ensures results are output in sequence order""" + print("Result ordering worker started") + + while not self._stop_event.is_set(): + # Check if next expected result is available + if self.next_output_sequence in self.pending_results: + result = self.pending_results.pop(self.next_output_sequence) + self._ordered_output_queue.put(result) + self.next_output_sequence += 1 + + # Clean up old pending results to prevent memory bloat + if len(self.pending_results) > 1000: # result_buffer_size + oldest_sequences = sorted(self.pending_results.keys())[:500] + for seq_id in oldest_sequences: + if seq_id < self.next_output_sequence: + self.pending_results.pop(seq_id, None) + else: + time.sleep(0.001) # Small delay to prevent busy waiting + + print("Result ordering worker stopped") def put_input(self, image: Union[str, np.ndarray], format: str, target_size: Tuple[int, int] = None): """ @@ -773,7 +1150,22 @@ class MultiDongle: else: raise ValueError(f"Unsupported format: {format}") - self._input_queue.put((image_data, image_format_enum)) + if self.multi_series_mode: + # In multi-series mode, create a task with sequence ID + with self.sequence_lock: + sequence_id = self.sequence_counter + self.sequence_counter += 1 + + task = { + 'sequence_id': sequence_id, + 'image_data': image_data, + 'image_format': image_format_enum, + 'timestamp': time.time() + } + self._input_queue.put(task) + else: + # In single-series mode, use the original format + self._input_queue.put((image_data, image_format_enum)) def get_output(self, timeout: float = None): """ @@ -783,7 +1175,15 @@ class MultiDongle: :return: Received data (e.g., kp.GenericInferenceOutputDescriptor) or None if no data available within timeout. """ try: - return self._output_queue.get(block=timeout is not None, timeout=timeout) + if self.multi_series_mode: + # In multi-series mode, use the ordered output queue + result = self._ordered_output_queue.get(block=timeout is not None, timeout=timeout) + if result and isinstance(result, dict): + return result.get('result') # Extract the actual inference result + return result + else: + # In single-series mode, use the regular output queue + return self._output_queue.get(block=timeout is not None, timeout=timeout) except queue.Empty: return None diff --git a/core/functions/mflow_converter.py b/core/functions/mflow_converter.py index 246f301..10c88fc 100644 --- a/core/functions/mflow_converter.py +++ b/core/functions/mflow_converter.py @@ -463,6 +463,72 @@ class MFlowConverter: print("="*60 + "\n") + def _build_multi_series_config_from_properties(self, properties: Dict[str, Any]) -> Dict[str, Any]: + """Build multi-series configuration from node properties""" + try: + enabled_series = properties.get('enabled_series', []) + assets_folder = properties.get('assets_folder', '') + + if not enabled_series: + print("Warning: No enabled_series found in multi-series mode") + return {} + + multi_series_config = {} + + for series in enabled_series: + # Get port IDs for this series + port_ids_str = properties.get(f'kl{series}_port_ids', '') + if not port_ids_str or not port_ids_str.strip(): + print(f"Warning: No port IDs configured for KL{series}") + continue + + # Parse port IDs (comma-separated string to list of integers) + try: + port_ids = [int(pid.strip()) for pid in port_ids_str.split(',') if pid.strip()] + if not port_ids: + continue + except ValueError: + print(f"Warning: Invalid port IDs for KL{series}: {port_ids_str}") + continue + + # Build series configuration + series_config = { + "port_ids": port_ids + } + + # Add model path if assets folder is configured + if assets_folder: + import os + model_folder = os.path.join(assets_folder, 'Models', f'KL{series}') + if os.path.exists(model_folder): + # Look for .nef files in the model folder + nef_files = [f for f in os.listdir(model_folder) if f.endswith('.nef')] + if nef_files: + series_config["model_path"] = os.path.join(model_folder, nef_files[0]) + print(f"Found model for KL{series}: {series_config['model_path']}") + + # Add firmware paths if available + firmware_folder = os.path.join(assets_folder, 'Firmware', f'KL{series}') + if os.path.exists(firmware_folder): + scpu_path = os.path.join(firmware_folder, 'fw_scpu.bin') + ncpu_path = os.path.join(firmware_folder, 'fw_ncpu.bin') + + if os.path.exists(scpu_path) and os.path.exists(ncpu_path): + series_config["firmware_paths"] = { + "scpu": scpu_path, + "ncpu": ncpu_path + } + print(f"Found firmware for KL{series}: scpu={scpu_path}, ncpu={ncpu_path}") + + multi_series_config[f'KL{series}'] = series_config + print(f"Configured KL{series} with {len(port_ids)} devices on ports {port_ids}") + + return multi_series_config if multi_series_config else {} + + except Exception as e: + print(f"Error building multi-series config from properties: {e}") + return {} + def _create_stage_configs(self, model_nodes: List[Dict], preprocess_nodes: List[Dict], postprocess_nodes: List[Dict], connections: List[Dict]) -> List[StageConfig]: """Create StageConfig objects for each model node""" @@ -502,16 +568,38 @@ class MFlowConverter: # Queue size max_queue_size = properties.get('max_queue_size', 50) - # Create StageConfig - stage_config = StageConfig( - stage_id=stage_id, - port_ids=port_ids, - scpu_fw_path=scpu_fw_path, - ncpu_fw_path=ncpu_fw_path, - model_path=model_path, - upload_fw=upload_fw, - max_queue_size=max_queue_size - ) + # Check if multi-series mode is enabled + multi_series_mode = properties.get('multi_series_mode', False) + multi_series_config = None + + if multi_series_mode: + # Build multi-series config from node properties + multi_series_config = self._build_multi_series_config_from_properties(properties) + print(f"Multi-series config for {stage_id}: {multi_series_config}") + + # Create StageConfig for multi-series mode + stage_config = StageConfig( + stage_id=stage_id, + port_ids=[], # Will be handled by multi_series_config + scpu_fw_path='', # Will be handled by multi_series_config + ncpu_fw_path='', # Will be handled by multi_series_config + model_path='', # Will be handled by multi_series_config + upload_fw=upload_fw, + max_queue_size=max_queue_size, + multi_series_config=multi_series_config + ) + else: + # Create StageConfig for single-series mode (legacy) + stage_config = StageConfig( + stage_id=stage_id, + port_ids=port_ids, + scpu_fw_path=scpu_fw_path, + ncpu_fw_path=ncpu_fw_path, + model_path=model_path, + upload_fw=upload_fw, + max_queue_size=max_queue_size, + multi_series_config=None + ) stage_configs.append(stage_config) @@ -625,24 +713,89 @@ class MFlowConverter: """Validate individual stage configuration""" errors = [] - # Check model path - if not stage_config.model_path: - errors.append(f"Stage {stage_num}: Model path is required") - elif not os.path.exists(stage_config.model_path): - errors.append(f"Stage {stage_num}: Model file not found: {stage_config.model_path}") - - # Check firmware paths if upload_fw is True - if stage_config.upload_fw: - if not os.path.exists(stage_config.scpu_fw_path): - errors.append(f"Stage {stage_num}: SCPU firmware not found: {stage_config.scpu_fw_path}") - if not os.path.exists(stage_config.ncpu_fw_path): - errors.append(f"Stage {stage_num}: NCPU firmware not found: {stage_config.ncpu_fw_path}") + # Check if this is multi-series configuration + if stage_config.multi_series_config: + # Multi-series validation + errors.extend(self._validate_multi_series_config(stage_config.multi_series_config, stage_num)) + else: + # Single-series validation (legacy) + # Check model path + if not stage_config.model_path: + errors.append(f"Stage {stage_num}: Model path is required") + elif not os.path.exists(stage_config.model_path): + errors.append(f"Stage {stage_num}: Model file not found: {stage_config.model_path}") - # Check port IDs - if not stage_config.port_ids: - errors.append(f"Stage {stage_num}: At least one port ID is required") + # Check firmware paths if upload_fw is True + if stage_config.upload_fw: + if not os.path.exists(stage_config.scpu_fw_path): + errors.append(f"Stage {stage_num}: SCPU firmware not found: {stage_config.scpu_fw_path}") + if not os.path.exists(stage_config.ncpu_fw_path): + errors.append(f"Stage {stage_num}: NCPU firmware not found: {stage_config.ncpu_fw_path}") + + # Check port IDs + if not stage_config.port_ids: + errors.append(f"Stage {stage_num}: At least one port ID is required") return errors + + def _validate_multi_series_config(self, multi_series_config: Dict[str, Any], stage_num: int) -> List[str]: + """Validate multi-series configuration""" + errors = [] + + if not multi_series_config: + errors.append(f"Stage {stage_num}: Multi-series configuration is empty") + return errors + + print(f"Validating multi-series config for stage {stage_num}: {list(multi_series_config.keys())}") + + # Check each series configuration + for series_name, series_config in multi_series_config.items(): + if not isinstance(series_config, dict): + errors.append(f"Stage {stage_num}: Invalid configuration for {series_name}") + continue + + # Check port IDs + port_ids = series_config.get('port_ids', []) + if not port_ids: + errors.append(f"Stage {stage_num}: {series_name} has no port IDs configured") + continue + + if not isinstance(port_ids, list) or not all(isinstance(p, int) for p in port_ids): + errors.append(f"Stage {stage_num}: {series_name} port IDs must be a list of integers") + continue + + print(f" {series_name}: {len(port_ids)} ports configured") + + # Check model path + model_path = series_config.get('model_path') + if model_path: + if not os.path.exists(model_path): + errors.append(f"Stage {stage_num}: {series_name} model file not found: {model_path}") + else: + print(f" {series_name}: Model validated: {model_path}") + else: + print(f" {series_name}: No model path specified (optional for multi-series)") + + # Check firmware paths if specified + firmware_paths = series_config.get('firmware_paths') + if firmware_paths and isinstance(firmware_paths, dict): + scpu_path = firmware_paths.get('scpu') + ncpu_path = firmware_paths.get('ncpu') + + if scpu_path and not os.path.exists(scpu_path): + errors.append(f"Stage {stage_num}: {series_name} SCPU firmware not found: {scpu_path}") + elif scpu_path: + print(f" {series_name}: SCPU firmware validated: {scpu_path}") + + if ncpu_path and not os.path.exists(ncpu_path): + errors.append(f"Stage {stage_num}: {series_name} NCPU firmware not found: {ncpu_path}") + elif ncpu_path: + print(f" {series_name}: NCPU firmware validated: {ncpu_path}") + + if not errors: + print(f"Stage {stage_num}: Multi-series configuration validation passed") + + return errors def convert_mflow_file(mflow_path: str, firmware_path: str = "./firmware") -> PipelineConfig: diff --git a/core/nodes/exact_nodes.py b/core/nodes/exact_nodes.py index 46c4e1d..084545a 100644 --- a/core/nodes/exact_nodes.py +++ b/core/nodes/exact_nodes.py @@ -5,6 +5,8 @@ This module provides node implementations that exactly match the original properties and behavior from the monolithic UI.py file. """ +import os + try: from NodeGraphQt import BaseNode NODEGRAPH_AVAILABLE = True @@ -119,6 +121,14 @@ class ExactModelNode(BaseNode): self.create_property('multi_series_mode', False) self.create_property('assets_folder', '') self.create_property('enabled_series', ['520', '720']) + + # Series-specific port ID configurations + self.create_property('kl520_port_ids', '') + self.create_property('kl720_port_ids', '') + self.create_property('kl630_port_ids', '') + self.create_property('kl730_port_ids', '') + self.create_property('kl540_port_ids', '') + self.create_property('max_queue_size', 100) self.create_property('result_buffer_size', 1000) self.create_property('batch_size', 1) @@ -139,6 +149,14 @@ class ExactModelNode(BaseNode): 'multi_series_mode': {'type': 'bool', 'default': False, 'description': 'Enable multi-series dongle support'}, 'assets_folder': {'type': 'file_path', 'filter': 'Folder', 'mode': 'directory'}, 'enabled_series': {'type': 'list', 'options': ['520', '720', '630', '730', '540'], 'default': ['520', '720']}, + + # Series-specific port ID options + 'kl520_port_ids': {'placeholder': 'e.g., 28,32 (comma-separated port IDs for KL520)', 'description': 'Port IDs for KL520 dongles'}, + 'kl720_port_ids': {'placeholder': 'e.g., 30,34 (comma-separated port IDs for KL720)', 'description': 'Port IDs for KL720 dongles'}, + 'kl630_port_ids': {'placeholder': 'e.g., 36,38 (comma-separated port IDs for KL630)', 'description': 'Port IDs for KL630 dongles'}, + 'kl730_port_ids': {'placeholder': 'e.g., 40,42 (comma-separated port IDs for KL730)', 'description': 'Port IDs for KL730 dongles'}, + 'kl540_port_ids': {'placeholder': 'e.g., 44,46 (comma-separated port IDs for KL540)', 'description': 'Port IDs for KL540 dongles'}, + 'max_queue_size': {'min': 1, 'max': 1000, 'default': 100}, 'result_buffer_size': {'min': 100, 'max': 10000, 'default': 1000}, 'batch_size': {'min': 1, 'max': 32, 'default': 1}, @@ -202,11 +220,25 @@ class ExactModelNode(BaseNode): if multi_series_mode: # Multi-series mode: show multi-series specific properties - return base_props + [ - 'assets_folder', 'enabled_series', + multi_props = ['assets_folder', 'enabled_series'] + + # Add port ID configurations for enabled series + try: + enabled_series = self.get_property('enabled_series') or [] + for series in enabled_series: + port_prop = f'kl{series}_port_ids' + if port_prop not in multi_props: # Avoid duplicates + multi_props.append(port_prop) + except: + pass # If can't get enabled_series, just show basic properties + + # Add other multi-series properties + multi_props.extend([ 'max_queue_size', 'result_buffer_size', 'batch_size', 'enable_preprocessing', 'enable_postprocessing' - ] + ]) + + return base_props + multi_props else: # Single-series mode: show traditional properties return base_props + [ @@ -229,8 +261,8 @@ class ExactModelNode(BaseNode): multi_series_mode = self.get_property('multi_series_mode') if multi_series_mode: - # Multi-series configuration - return { + # Multi-series configuration with series-specific port IDs + config = { 'multi_series_mode': True, 'assets_folder': self.get_property('assets_folder'), 'enabled_series': self.get_property('enabled_series'), @@ -240,6 +272,13 @@ class ExactModelNode(BaseNode): 'enable_preprocessing': self.get_property('enable_preprocessing'), 'enable_postprocessing': self.get_property('enable_postprocessing') } + + # Build multi-series config for MultiDongle + multi_series_config = self._build_multi_series_config() + if multi_series_config: + config['multi_series_config'] = multi_series_config + + return config else: # Single-series configuration return { @@ -265,6 +304,67 @@ class ExactModelNode(BaseNode): 'upload_fw': self.get_property('upload_fw', True) } + def _build_multi_series_config(self): + """Build multi-series configuration for MultiDongle""" + try: + enabled_series = self.get_property('enabled_series') or [] + assets_folder = self.get_property('assets_folder') or '' + + if not enabled_series: + return None + + multi_series_config = {} + + for series in enabled_series: + # Get port IDs for this series + port_ids_str = self.get_property(f'kl{series}_port_ids') or '' + if not port_ids_str.strip(): + continue # Skip series without port IDs + + # Parse port IDs (comma-separated string to list of integers) + try: + port_ids = [int(pid.strip()) for pid in port_ids_str.split(',') if pid.strip()] + if not port_ids: + continue + except ValueError: + print(f"Warning: Invalid port IDs for KL{series}: {port_ids_str}") + continue + + # Build series configuration + series_config = { + "port_ids": port_ids + } + + # Add model path if assets folder is configured + if assets_folder: + import os + model_folder = os.path.join(assets_folder, 'Models', f'KL{series}') + if os.path.exists(model_folder): + # Look for .nef files in the model folder + nef_files = [f for f in os.listdir(model_folder) if f.endswith('.nef')] + if nef_files: + series_config["model_path"] = os.path.join(model_folder, nef_files[0]) + + # Add firmware paths if available + firmware_folder = os.path.join(assets_folder, 'Firmware', f'KL{series}') + if os.path.exists(firmware_folder): + scpu_path = os.path.join(firmware_folder, 'fw_scpu.bin') + ncpu_path = os.path.join(firmware_folder, 'fw_ncpu.bin') + + if os.path.exists(scpu_path) and os.path.exists(ncpu_path): + series_config["firmware_paths"] = { + "scpu": scpu_path, + "ncpu": ncpu_path + } + + multi_series_config[f'KL{series}'] = series_config + + return multi_series_config if multi_series_config else None + + except Exception as e: + print(f"Error building multi-series config: {e}") + return None + def get_hardware_requirements(self): """Get hardware requirements for this node""" if not NODEGRAPH_AVAILABLE: @@ -429,6 +529,80 @@ class ExactModelNode(BaseNode): except Exception as e: return {'status': 'error', 'message': f'Error reading assets folder: {e}'} + def validate_configuration(self) -> tuple[bool, str]: + """ + Validate the current node configuration. + + Returns: + Tuple of (is_valid, error_message) + """ + if not NODEGRAPH_AVAILABLE: + return True, "" + + try: + multi_series_mode = self.get_property('multi_series_mode') + + if multi_series_mode: + # Multi-series validation + enabled_series = self.get_property('enabled_series') + if not enabled_series: + return False, "No series enabled in multi-series mode" + + # Check if at least one series has port IDs configured + has_valid_series = False + for series in enabled_series: + port_ids_str = self.get_property(f'kl{series}_port_ids', '') + if port_ids_str and port_ids_str.strip(): + # Validate port ID format + try: + port_ids = [int(pid.strip()) for pid in port_ids_str.split(',') if pid.strip()] + if port_ids: + has_valid_series = True + print(f"Valid series config found for KL{series}: ports {port_ids}") + except ValueError: + print(f"Warning: Invalid port ID format for KL{series}: {port_ids_str}") + continue + + if not has_valid_series: + return False, "At least one series must have valid port IDs configured" + + # Assets folder validation (optional for multi-series) + assets_folder = self.get_property('assets_folder') + if assets_folder: + if not os.path.exists(assets_folder): + print(f"Warning: Assets folder does not exist: {assets_folder}") + else: + # Validate assets folder structure if provided + assets_info = self.get_assets_folder_info() + if assets_info.get('status') == 'error': + print(f"Warning: Assets folder issue: {assets_info.get('message', 'Unknown error')}") + + print("Multi-series mode validation passed") + return True, "" + else: + # Single-series validation (legacy) + model_path = self.get_property('model_path') + if not model_path: + return False, "Model path is required" + + if not os.path.exists(model_path): + return False, f"Model file does not exist: {model_path}" + + # Check dongle series + dongle_series = self.get_property('dongle_series') + if dongle_series not in ['520', '720', '1080', 'Custom']: + return False, f"Invalid dongle series: {dongle_series}" + + # Check number of dongles + num_dongles = self.get_property('num_dongles') + if not isinstance(num_dongles, int) or num_dongles < 1: + return False, "Number of dongles must be at least 1" + + return True, "" + + except Exception as e: + return False, f"Validation error: {str(e)}" + class ExactPreprocessNode(BaseNode): """Preprocessing node - exact match to original.""" diff --git a/ui/dialogs/deployment.py b/ui/dialogs/deployment.py index e39e9c8..157582b 100644 --- a/ui/dialogs/deployment.py +++ b/ui/dialogs/deployment.py @@ -622,10 +622,32 @@ Stage Configurations: for i, stage_config in enumerate(config.stage_configs, 1): analysis_text += f"\nStage {i}: {stage_config.stage_id}\n" - analysis_text += f" Port IDs: {stage_config.port_ids}\n" - analysis_text += f" Model Path: {stage_config.model_path}\n" - analysis_text += f" SCPU Firmware: {stage_config.scpu_fw_path}\n" - analysis_text += f" NCPU Firmware: {stage_config.ncpu_fw_path}\n" + + # Check if this is multi-series configuration + if stage_config.multi_series_config: + analysis_text += f" Mode: Multi-Series\n" + analysis_text += f" Series Configured: {list(stage_config.multi_series_config.keys())}\n" + + # Show details for each series + for series_name, series_config in stage_config.multi_series_config.items(): + analysis_text += f" \n {series_name} Configuration:\n" + analysis_text += f" Port IDs: {series_config.get('port_ids', [])}\n" + model_path = series_config.get('model_path', 'Not specified') + analysis_text += f" Model: {model_path}\n" + firmware_paths = series_config.get('firmware_paths', {}) + if firmware_paths: + analysis_text += f" SCPU Firmware: {firmware_paths.get('scpu', 'Not specified')}\n" + analysis_text += f" NCPU Firmware: {firmware_paths.get('ncpu', 'Not specified')}\n" + else: + analysis_text += f" Firmware: Not specified\n" + else: + # Single-series (legacy) configuration + analysis_text += f" Mode: Single-Series\n" + analysis_text += f" Port IDs: {stage_config.port_ids}\n" + analysis_text += f" Model Path: {stage_config.model_path}\n" + analysis_text += f" SCPU Firmware: {stage_config.scpu_fw_path}\n" + analysis_text += f" NCPU Firmware: {stage_config.ncpu_fw_path}\n" + analysis_text += f" Upload Firmware: {stage_config.upload_fw}\n" analysis_text += f" Max Queue Size: {stage_config.max_queue_size}\n" @@ -663,23 +685,66 @@ Stage Configurations: stage_group = QGroupBox(f"Stage {i}: {stage_config.stage_id}") stage_layout = QFormLayout(stage_group) - # Create read-only fields for stage configuration - model_path_edit = QLineEdit(stage_config.model_path) - model_path_edit.setReadOnly(True) - stage_layout.addRow("Model Path:", model_path_edit) - - scpu_fw_edit = QLineEdit(stage_config.scpu_fw_path) - scpu_fw_edit.setReadOnly(True) - stage_layout.addRow("SCPU Firmware:", scpu_fw_edit) - - ncpu_fw_edit = QLineEdit(stage_config.ncpu_fw_path) - ncpu_fw_edit.setReadOnly(True) - stage_layout.addRow("NCPU Firmware:", ncpu_fw_edit) - - port_ids_edit = QLineEdit(str(stage_config.port_ids)) - port_ids_edit.setReadOnly(True) - stage_layout.addRow("Port IDs:", port_ids_edit) + # Check if this is multi-series configuration + if stage_config.multi_series_config: + # Multi-series configuration display + mode_edit = QLineEdit("Multi-Series") + mode_edit.setReadOnly(True) + stage_layout.addRow("Mode:", mode_edit) + + series_edit = QLineEdit(str(list(stage_config.multi_series_config.keys()))) + series_edit.setReadOnly(True) + stage_layout.addRow("Series:", series_edit) + + # Show details for each series + for series_name, series_config in stage_config.multi_series_config.items(): + series_label = QLabel(f"--- {series_name} ---") + series_label.setStyleSheet("font-weight: bold; color: #89b4fa;") + stage_layout.addRow(series_label) + + port_ids_edit = QLineEdit(str(series_config.get('port_ids', []))) + port_ids_edit.setReadOnly(True) + stage_layout.addRow(f"{series_name} Port IDs:", port_ids_edit) + + model_path = series_config.get('model_path', 'Not specified') + model_path_edit = QLineEdit(model_path) + model_path_edit.setReadOnly(True) + stage_layout.addRow(f"{series_name} Model:", model_path_edit) + + firmware_paths = series_config.get('firmware_paths', {}) + if firmware_paths: + scpu_path = firmware_paths.get('scpu', 'Not specified') + scpu_fw_edit = QLineEdit(scpu_path) + scpu_fw_edit.setReadOnly(True) + stage_layout.addRow(f"{series_name} SCPU FW:", scpu_fw_edit) + + ncpu_path = firmware_paths.get('ncpu', 'Not specified') + ncpu_fw_edit = QLineEdit(ncpu_path) + ncpu_fw_edit.setReadOnly(True) + stage_layout.addRow(f"{series_name} NCPU FW:", ncpu_fw_edit) + else: + # Single-series configuration display + mode_edit = QLineEdit("Single-Series") + mode_edit.setReadOnly(True) + stage_layout.addRow("Mode:", mode_edit) + + model_path_edit = QLineEdit(stage_config.model_path) + model_path_edit.setReadOnly(True) + stage_layout.addRow("Model Path:", model_path_edit) + + scpu_fw_edit = QLineEdit(stage_config.scpu_fw_path) + scpu_fw_edit.setReadOnly(True) + stage_layout.addRow("SCPU Firmware:", scpu_fw_edit) + + ncpu_fw_edit = QLineEdit(stage_config.ncpu_fw_path) + ncpu_fw_edit.setReadOnly(True) + stage_layout.addRow("NCPU Firmware:", ncpu_fw_edit) + + port_ids_edit = QLineEdit(str(stage_config.port_ids)) + port_ids_edit.setReadOnly(True) + stage_layout.addRow("Port IDs:", port_ids_edit) + # Common fields queue_size_spin = QSpinBox() queue_size_spin.setValue(stage_config.max_queue_size) queue_size_spin.setReadOnly(True)