Compare commits

...

5 Commits

Author SHA1 Message Date
1781a05269 feat: Add multi-series configuration testing and debugging tools
- Add comprehensive test scripts for multi-series dongle configuration
- Add debugging tools for deployment and flow testing
- Add configuration verification and guide utilities
- Fix stdout/stderr handling in deployment dialog for PyInstaller builds
- Includes port ID configuration tests and multi-series config validation

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-21 00:31:45 +08:00
c4090b2420 perf: Optimize multi-series dongle performance and prevent bottlenecks
Key improvements:
- Add timeout mechanism (2s) for result ordering to prevent slow devices from blocking pipeline
- Implement performance-biased load balancing with 2x penalty for low-GOPS devices (< 10 GOPS)
- Adjust KL520 GOPS from 3 to 2 for more accurate performance representation
- Remove KL540 references to focus on available hardware
- Add intelligent sequence skipping with timeout results for better throughput

This resolves the issue where multi-series mode had lower FPS than single KL720
due to KL520 devices creating bottlenecks in the result ordering queue.

Performance impact:
- Reduces KL520 task allocation from ~12.5% to ~5-8%
- Prevents pipeline stalls from slow inference results
- Maintains result ordering integrity with timeout fallback

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-14 17:15:39 +08:00
2fea1eceec fix: Resolve multi-series initialization and validation issues
- Fix mflow_converter to properly handle multi-series configuration creation
- Update InferencePipeline to correctly initialize MultiDongle with multi-series config
- Add comprehensive multi-series configuration validation in mflow_converter
- Enhance deployment dialog to display multi-series configuration details
- Improve analysis and configuration tabs to show proper multi-series info

This resolves the issue where multi-series mode was falling back to single-series
during inference initialization, ensuring proper multi-series dongle support.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-14 16:33:22 +08:00
Mason
ec940c3f2f Improve assets folder selection and fix macOS tkinter crash
- Replace tkinter with PyQt5 QFileDialog as primary folder selector to fix macOS crashes
- Add specialized assets_folder property handling in dashboard with validation
- Integrate improved folder dialog utility with ExactModelNode
- Provide detailed validation feedback and user-friendly tooltips
- Maintain backward compatibility with tkinter as fallback

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-14 11:26:23 +08:00
48acae9c74 feat: Implement multi-series dongle support and improve app stability 2025-08-13 22:03:42 +08:00
25 changed files with 3435 additions and 686 deletions

View File

@ -0,0 +1,110 @@
#!/usr/bin/env python3
"""
Check current multi-series configuration in saved .mflow files
"""
import json
import os
import glob
def check_mflow_files():
"""Check .mflow files for multi-series configuration"""
# Look for .mflow files in common locations
search_paths = [
"*.mflow",
"flows/*.mflow",
"examples/*.mflow",
"../*.mflow"
]
mflow_files = []
for pattern in search_paths:
mflow_files.extend(glob.glob(pattern))
if not mflow_files:
print("No .mflow files found in current directory")
return
print(f"Found {len(mflow_files)} .mflow file(s):")
for mflow_file in mflow_files:
print(f"\n=== Checking {mflow_file} ===")
try:
with open(mflow_file, 'r') as f:
data = json.load(f)
# Look for nodes with type "Model" or "ExactModelNode"
nodes = data.get('nodes', [])
model_nodes = [node for node in nodes if node.get('type') in ['Model', 'ExactModelNode']]
if not model_nodes:
print(" No Model nodes found")
continue
for i, node in enumerate(model_nodes):
print(f"\n Model Node {i+1}:")
print(f" Name: {node.get('name', 'Unnamed')}")
# Check both custom_properties and properties for multi-series config
custom_properties = node.get('custom_properties', {})
properties = node.get('properties', {})
# Multi-series config is typically in custom_properties
config_props = custom_properties if custom_properties else properties
# Check multi-series configuration
multi_series_mode = config_props.get('multi_series_mode', False)
enabled_series = config_props.get('enabled_series', [])
print(f" multi_series_mode: {multi_series_mode}")
print(f" enabled_series: {enabled_series}")
if multi_series_mode:
print(" Multi-series port configurations:")
for series in ['520', '720', '630', '730', '540']:
port_ids = config_props.get(f'kl{series}_port_ids', '')
if port_ids:
print(f" kl{series}_port_ids: '{port_ids}'")
assets_folder = config_props.get('assets_folder', '')
if assets_folder:
print(f" assets_folder: '{assets_folder}'")
else:
print(" assets_folder: (not set)")
else:
print(" Multi-series mode is DISABLED")
print(" Current single-series configuration:")
port_ids = properties.get('port_ids', [])
model_path = properties.get('model_path', '')
print(f" port_ids: {port_ids}")
print(f" model_path: '{model_path}'")
except Exception as e:
print(f" Error reading file: {e}")
def print_configuration_guide():
"""Print guide for setting up multi-series configuration"""
print("\n" + "="*60)
print("MULTI-SERIES CONFIGURATION GUIDE")
print("="*60)
print()
print("To enable multi-series inference, set these properties in your Model Node:")
print()
print("1. multi_series_mode = True")
print("2. enabled_series = ['520', '720']")
print("3. kl520_port_ids = '28,32'")
print("4. kl720_port_ids = '4'")
print("5. assets_folder = (optional, for auto model/firmware detection)")
print()
print("Expected devices found:")
print(" KL520 devices on ports: 28, 32")
print(" KL720 device on port: 4")
print()
print("If multi_series_mode is False or not set, the system will use")
print("single-series mode with only the first available device.")
if __name__ == "__main__":
check_mflow_files()
print_configuration_guide()

View File

@ -19,6 +19,8 @@ class StageConfig:
model_path: str model_path: str
upload_fw: bool upload_fw: bool
max_queue_size: int = 50 max_queue_size: int = 50
# Multi-series support
multi_series_config: Optional[Dict[str, Any]] = None # For multi-series mode
# Inter-stage processing # Inter-stage processing
input_preprocessor: Optional[PreProcessor] = None # Before this stage input_preprocessor: Optional[PreProcessor] = None # Before this stage
output_postprocessor: Optional[PostProcessor] = None # After this stage output_postprocessor: Optional[PostProcessor] = None # After this stage
@ -43,15 +45,25 @@ class PipelineStage:
self.stage_id = config.stage_id self.stage_id = config.stage_id
# Initialize MultiDongle for this stage # Initialize MultiDongle for this stage
self.multidongle = MultiDongle( if config.multi_series_config:
port_id=config.port_ids, # Multi-series mode
scpu_fw_path=config.scpu_fw_path, self.multidongle = MultiDongle(
ncpu_fw_path=config.ncpu_fw_path, multi_series_config=config.multi_series_config,
model_path=config.model_path, max_queue_size=config.max_queue_size
upload_fw=config.upload_fw, )
auto_detect=config.auto_detect if hasattr(config, 'auto_detect') else False, print(f"[Stage {self.stage_id}] Initialized in multi-series mode with config: {list(config.multi_series_config.keys())}")
max_queue_size=config.max_queue_size else:
) # Single-series mode (legacy)
self.multidongle = MultiDongle(
port_id=config.port_ids,
scpu_fw_path=config.scpu_fw_path,
ncpu_fw_path=config.ncpu_fw_path,
model_path=config.model_path,
upload_fw=config.upload_fw,
auto_detect=config.auto_detect if hasattr(config, 'auto_detect') else False,
max_queue_size=config.max_queue_size
)
print(f"[Stage {self.stage_id}] Initialized in single-series mode")
# Store preprocessor and postprocessor for later use # Store preprocessor and postprocessor for later use
self.stage_preprocessor = config.stage_preprocessor self.stage_preprocessor = config.stage_preprocessor

View File

@ -10,7 +10,39 @@ import kp
import cv2 import cv2
import time import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Callable, Optional, Any, Dict from typing import Callable, Optional, Any, Dict, List
from dataclasses import dataclass
from collections import defaultdict
@dataclass
class InferenceTask:
sequence_id: int
image_data: np.ndarray
image_format: Any # kp.ImageFormat
timestamp: float
@dataclass
class InferenceResult:
sequence_id: int
result: Any
series_name: str
timestamp: float
class DongleSeriesSpec:
"""Dongle series specifications with GOPS capacity for load balancing"""
KL520_GOPS = 2
KL720_GOPS = 28
SERIES_SPECS = {
"KL520": {"product_id": 0x100, "gops": KL520_GOPS},
"KL720": {"product_id": 0x720, "gops": KL720_GOPS},
"KL630": {"product_id": 0x630, "gops": 400},
"KL730": {"product_id": 0x730, "gops": 1600},
# "KL540": {"product_id": 0x540, "gops": 800}
}
class DataProcessor(ABC): class DataProcessor(ABC):
@ -83,7 +115,7 @@ class MultiDongle:
"0x720": "KL720", "0x720": "KL720",
"0x630": "KL630", "0x630": "KL630",
"0x730": "KL730", "0x730": "KL730",
"0x540": "KL540", # "0x540": "KL540",
} }
@staticmethod @staticmethod
@ -176,8 +208,8 @@ class MultiDongle:
return 'KL630' return 'KL630'
elif chip == kp.ModelNefDescriptor.KP_CHIP_KL730: elif chip == kp.ModelNefDescriptor.KP_CHIP_KL730:
return 'KL730' return 'KL730'
elif chip == kp.ModelNefDescriptor.KP_CHIP_KL540: # elif chip == kp.ModelNefDescriptor.KP_CHIP_KL540:
return 'KL540' # return 'KL540'
# Final fallback # Final fallback
return 'Unknown' return 'Unknown'
@ -222,17 +254,111 @@ class MultiDongle:
except kp.ApiKPException as exception: except kp.ApiKPException as exception:
raise Exception(f'Failed to connect devices: {str(exception)}') raise Exception(f'Failed to connect devices: {str(exception)}')
def __init__(self, port_id: list = None, scpu_fw_path: str = None, ncpu_fw_path: str = None, model_path: str = None, upload_fw: bool = False, auto_detect: bool = False, max_queue_size: int = 0): def __init__(self, port_id: list = None, scpu_fw_path: str = None, ncpu_fw_path: str = None,
model_path: str = None, upload_fw: bool = False, auto_detect: bool = False,
max_queue_size: int = 0, multi_series_config: dict = None):
""" """
Initialize the MultiDongle class. Initialize the MultiDongle class with support for both single and multi-series configurations.
:param port_id: List of USB port IDs for the same layer's devices. If None and auto_detect=True, will auto-detect devices.
:param scpu_fw_path: Path to the SCPU firmware file. :param port_id: List of USB port IDs for single-series (legacy). If None and auto_detect=True, will auto-detect.
:param ncpu_fw_path: Path to the NCPU firmware file. :param scpu_fw_path: Path to the SCPU firmware file for single-series (legacy).
:param model_path: Path to the model file. :param ncpu_fw_path: Path to the NCPU firmware file for single-series (legacy).
:param upload_fw: Flag to indicate whether to upload firmware. :param model_path: Path to the model file for single-series (legacy).
:param auto_detect: Flag to auto-detect and connect to available devices. :param upload_fw: Flag to indicate whether to upload firmware for single-series (legacy).
:param auto_detect: Flag to auto-detect and connect to available devices for single-series (legacy).
:param max_queue_size: Maximum size for internal queues. If 0, unlimited queues are used. :param max_queue_size: Maximum size for internal queues. If 0, unlimited queues are used.
:param multi_series_config: Multi-series configuration dict. Format:
{
"KL520": {
"port_ids": [28, 32],
"model_path": "path/to/kl520_model.nef",
"firmware_paths": { # Optional
"scpu": "path/to/kl520_scpu.bin",
"ncpu": "path/to/kl520_ncpu.bin"
}
}
}
""" """
# Determine if we're using multi-series mode
self.multi_series_mode = multi_series_config is not None
if self.multi_series_mode:
# Multi-series initialization
self._init_multi_series(multi_series_config, max_queue_size)
else:
# Legacy single-series initialization
self._init_single_series(port_id, scpu_fw_path, ncpu_fw_path, model_path,
upload_fw, auto_detect, max_queue_size)
def _init_multi_series(self, multi_series_config: dict, max_queue_size: int):
"""Initialize multi-series configuration"""
self.series_config = multi_series_config
self.series_groups = {} # series_name -> config
self.device_groups = {} # series_name -> device_group
self.model_descriptors = {} # series_name -> model descriptor
self.gops_weights = {} # series_name -> normalized weight
self.current_loads = {} # series_name -> current queue size
# Set up series groups and calculate weights
total_gops = 0
for series_name, config in multi_series_config.items():
if series_name not in DongleSeriesSpec.SERIES_SPECS:
raise ValueError(f"Unknown series: {series_name}")
self.series_groups[series_name] = config
self.current_loads[series_name] = 0
# Calculate effective GOPS (series GOPS * number of devices)
port_count = len(config.get("port_ids", []))
series_gops = DongleSeriesSpec.SERIES_SPECS[series_name]["gops"]
effective_gops = series_gops * port_count
total_gops += effective_gops
# Calculate normalized weights
for series_name, config in multi_series_config.items():
port_count = len(config.get("port_ids", []))
series_gops = DongleSeriesSpec.SERIES_SPECS[series_name]["gops"]
effective_gops = series_gops * port_count
self.gops_weights[series_name] = effective_gops / total_gops if total_gops > 0 else 0
# Multi-series threading and queues
if max_queue_size > 0:
self._input_queue = queue.Queue(maxsize=max_queue_size)
self._ordered_output_queue = queue.Queue(maxsize=max_queue_size)
else:
self._input_queue = queue.Queue()
self._ordered_output_queue = queue.Queue()
# Create output queue for legacy compatibility
self._output_queue = self._ordered_output_queue # Point to the same queue
self.result_queues = {} # series_name -> queue
for series_name in multi_series_config.keys():
self.result_queues[series_name] = queue.Queue()
# Sequence management for ordered results
self.sequence_counter = 0
self.sequence_lock = threading.Lock()
self.pending_results = {} # sequence_id -> InferenceResult
self.next_output_sequence = 0
# Threading
self._stop_event = threading.Event()
self.dispatcher_thread = None
self.send_threads = {} # series_name -> thread
self.receive_threads = {} # series_name -> thread
self.result_ordering_thread = None
# Legacy attributes for compatibility
self.port_id = []
self.device_group = None
self.model_nef_descriptor = None
self.generic_inference_input_descriptor = None
self._inference_counter = 0
def _init_single_series(self, port_id: list, scpu_fw_path: str, ncpu_fw_path: str,
model_path: str, upload_fw: bool, auto_detect: bool, max_queue_size: int):
"""Initialize legacy single-series configuration"""
self.auto_detect = auto_detect self.auto_detect = auto_detect
self.connected_devices_info = [] self.connected_devices_info = []
@ -258,8 +384,8 @@ class MultiDongle:
# generic_inference_input_descriptor will be prepared in initialize # generic_inference_input_descriptor will be prepared in initialize
self.model_nef_descriptor = None self.model_nef_descriptor = None
self.generic_inference_input_descriptor = None self.generic_inference_input_descriptor = None
# Queues for data # Queues for data
# Input queue for images to be sent
if max_queue_size > 0: if max_queue_size > 0:
self._input_queue = queue.Queue(maxsize=max_queue_size) self._input_queue = queue.Queue(maxsize=max_queue_size)
self._output_queue = queue.Queue(maxsize=max_queue_size) self._output_queue = queue.Queue(maxsize=max_queue_size)
@ -270,15 +396,137 @@ class MultiDongle:
# Threading attributes # Threading attributes
self._send_thread = None self._send_thread = None
self._receive_thread = None self._receive_thread = None
self._stop_event = threading.Event() # Event to signal threads to stop self._stop_event = threading.Event()
self._inference_counter = 0 self._inference_counter = 0
# Convert single-series to multi-series format internally for unified processing
self._convert_single_to_multi_series()
def _convert_single_to_multi_series(self):
"""
Convert single-series configuration to multi-series format internally
This allows unified processing regardless of initialization mode
"""
if not self.port_id:
# No ports specified, create empty structure
self.series_groups = {}
self.gops_weights = {}
self.current_loads = {}
return
# Detect series from connected devices or use default
detected_series = self._detect_series_from_ports(self.port_id)
# Create multi-series config format
self.series_groups = {
detected_series: {
"port_ids": self.port_id.copy(),
"model_path": self.model_path,
"firmware_paths": {
"scpu": self.scpu_fw_path,
"ncpu": self.ncpu_fw_path
} if self.scpu_fw_path and self.ncpu_fw_path else {}
}
}
# Calculate GOPS weights (100% since it's single series)
self.gops_weights = {detected_series: 1.0}
# Initialize load tracking
self.current_loads = {detected_series: 0}
print(f"Single-series config converted to multi-series format: {detected_series}")
def _detect_series_from_ports(self, port_ids: List[int]) -> str:
"""
Detect series from port IDs by scanning connected devices
Falls back to KL520 if unable to detect
"""
try:
# Try to scan devices and match port IDs
devices_info = self.scan_devices()
for device_info in devices_info:
if device_info['port_id'] in port_ids:
series = device_info.get('series', 'Unknown')
if series != 'Unknown':
return series
# If scanning didn't work, try to auto-detect from the first available device
if self.auto_detect and self.connected_devices_info:
for device_info in self.connected_devices_info:
series = device_info.get('series', 'Unknown')
if series != 'Unknown':
return series
except Exception as e:
print(f"Warning: Could not detect series from devices: {e}")
# Fallback to KL520 (most common series)
print("Warning: Could not detect device series, defaulting to KL520")
return "KL520"
def _select_optimal_series(self) -> Optional[str]:
"""
Select optimal series based on current load and GOPS capacity with performance bias
Returns the series name with the best load/capacity ratio, favoring high-performance dongles
"""
if not self.multi_series_mode or not self.series_groups:
return None
best_score = float('inf')
selected_series = None
# Get series GOPS values for performance bias
series_gops = {}
for series_name in self.series_groups.keys():
# Extract GOPS from DongleSeriesSpec
for spec_name, spec_info in DongleSeriesSpec.SERIES_SPECS.items():
if spec_name == series_name:
series_gops[series_name] = spec_info["gops"]
break
for series_name in self.series_groups.keys():
current_load = self.current_loads.get(series_name, 0)
weight = self.gops_weights.get(series_name, 0)
gops = series_gops.get(series_name, 1)
if weight <= 0:
continue
# Calculate load ratio (lower is better)
load_ratio = current_load / weight
# Add performance bias: penalize low-GOPS devices more heavily
# This encourages using high-performance dongles even if they have slightly higher load
if gops < 10: # Low-performance threshold (like KL520 with 2 GOPS)
performance_penalty = 2.0 # 2x penalty for slow devices
else:
performance_penalty = 1.0
# Combined score considers both load and performance
combined_score = load_ratio * performance_penalty
if combined_score < best_score:
best_score = combined_score
selected_series = series_name
return selected_series
def initialize(self): def initialize(self):
""" """
Connect devices, upload firmware (if upload_fw is True), and upload model. Connect devices, upload firmware (if upload_fw is True), and upload model.
Must be called before start(). Must be called before start().
""" """
if self.multi_series_mode:
# Multi-series initialization
self._initialize_multi_series()
else:
# Legacy single-series initialization
self._initialize_single_series()
def _initialize_single_series(self):
"""Initialize single-series (legacy) mode"""
# Connect device and assign to self.device_group # Connect device and assign to self.device_group
try: try:
print('[Connect Device]') print('[Connect Device]')
@ -342,6 +590,102 @@ class MultiDongle:
print("Warning: Could not get generic inference input descriptor from model.") print("Warning: Could not get generic inference input descriptor from model.")
self.generic_inference_input_descriptor = None self.generic_inference_input_descriptor = None
def _initialize_multi_series(self):
"""Initialize multi-series mode"""
print('[Multi-Series Initialization]')
# Initialize each series separately
for series_name, config in self.series_config.items():
print(f'[Initializing {series_name}]')
# Get port IDs for this series
port_ids = config.get('port_ids', [])
if not port_ids:
print(f'Warning: No port IDs configured for {series_name}, skipping')
continue
# Connect devices for this series
try:
print(f' [Connect Devices] Port IDs: {port_ids}')
device_group = kp.core.connect_devices(usb_port_ids=port_ids)
self.device_groups[series_name] = device_group
print(f' - Success ({len(port_ids)} devices)')
except kp.ApiKPException as exception:
print(f'Error: connect devices failed for {series_name}, port IDs = {port_ids}, error = {str(exception)}')
continue
# Upload firmware if available
firmware_paths = config.get('firmware_paths')
if firmware_paths and 'scpu' in firmware_paths and 'ncpu' in firmware_paths:
try:
print(f' [Upload Firmware]')
kp.core.load_firmware_from_file(
device_group=device_group,
scpu_fw_path=firmware_paths['scpu'],
ncpu_fw_path=firmware_paths['ncpu']
)
print(f' - Success')
except kp.ApiKPException as exception:
print(f'Error: upload firmware failed for {series_name}, error = {str(exception)}')
continue
else:
print(f' [Upload Firmware] - Skipped (no firmware paths configured)')
# Upload model
model_path = config.get('model_path')
if model_path:
try:
print(f' [Upload Model]')
model_descriptor = kp.core.load_model_from_file(
device_group=device_group,
file_path=model_path
)
self.model_descriptors[series_name] = model_descriptor
print(f' - Success')
# Extract model input dimensions for this series
if model_descriptor and model_descriptor.models:
model = model_descriptor.models[0]
if hasattr(model, 'input_nodes') and model.input_nodes:
input_node = model.input_nodes[0]
shape = input_node.tensor_shape_info.data.shape_npu
model_input_shape = (shape[3], shape[2]) # (width, height)
model_input_channels = shape[1] # 3 for RGB
print(f' Model input shape: {model_input_shape}, channels: {model_input_channels}')
# Store series-specific model info
self.series_groups[series_name]['model_input_shape'] = model_input_shape
self.series_groups[series_name]['model_input_channels'] = model_input_channels
except kp.ApiKPException as exception:
print(f'Error: upload model failed for {series_name}, error = {str(exception)}')
continue
else:
print(f' [Upload Model] - Skipped (no model path configured)')
print('[Multi-Series Initialization Complete]')
# Set up legacy compatibility attributes using the first series
if self.device_groups:
first_series = next(iter(self.device_groups.keys()))
self.device_group = self.device_groups[first_series]
self.model_nef_descriptor = self.model_descriptors.get(first_series)
# Set up generic inference descriptor from first series
if self.model_nef_descriptor:
self.generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor(
model_id=self.model_nef_descriptor.models[0].id,
)
# Set model input shape from first series
if first_series in self.series_groups:
series_info = self.series_groups[first_series]
self.model_input_shape = series_info.get('model_input_shape', (640, 640))
self.model_input_channels = series_info.get('model_input_channels', 3)
else:
self.model_input_shape = (640, 640)
self.model_input_channels = 3
def preprocess_frame(self, frame: np.ndarray, target_format: str = 'BGR565') -> np.ndarray: def preprocess_frame(self, frame: np.ndarray, target_format: str = 'BGR565') -> np.ndarray:
""" """
Preprocess frame for inference Preprocess frame for inference
@ -491,6 +835,13 @@ class MultiDongle:
Start the send and receive threads. Start the send and receive threads.
Must be called after initialize(). Must be called after initialize().
""" """
if self.multi_series_mode:
self._start_multi_series()
else:
self._start_single_series()
def _start_single_series(self):
"""Start single-series (legacy) mode"""
if self.device_group is None: if self.device_group is None:
raise RuntimeError("MultiDongle not initialized. Call initialize() first.") raise RuntimeError("MultiDongle not initialized. Call initialize() first.")
@ -505,11 +856,62 @@ class MultiDongle:
self._receive_thread.start() self._receive_thread.start()
print("Receive thread started.") print("Receive thread started.")
def _start_multi_series(self):
"""Start multi-series mode"""
if not self.device_groups:
raise RuntimeError("MultiDongle not initialized. Call initialize() first.")
print("[Starting Multi-Series Threads]")
self._stop_event.clear()
# Start dispatcher thread
if self.dispatcher_thread is None or not self.dispatcher_thread.is_alive():
self.dispatcher_thread = threading.Thread(target=self._dispatcher_thread_func, daemon=True)
self.dispatcher_thread.start()
print("Dispatcher thread started.")
# Start send/receive threads for each series
for series_name in self.device_groups.keys():
# Start send thread for this series
if series_name not in self.send_threads or not self.send_threads[series_name].is_alive():
send_thread = threading.Thread(
target=self._multi_series_send_thread_func,
args=(series_name,),
daemon=True
)
self.send_threads[series_name] = send_thread
send_thread.start()
print(f"Send thread started for {series_name}.")
# Start receive thread for this series
if series_name not in self.receive_threads or not self.receive_threads[series_name].is_alive():
receive_thread = threading.Thread(
target=self._multi_series_receive_thread_func,
args=(series_name,),
daemon=True
)
self.receive_threads[series_name] = receive_thread
receive_thread.start()
print(f"Receive thread started for {series_name}.")
# Start result ordering thread
if self.result_ordering_thread is None or not self.result_ordering_thread.is_alive():
self.result_ordering_thread = threading.Thread(target=self._result_ordering_thread_func, daemon=True)
self.result_ordering_thread.start()
print("Result ordering thread started.")
def stop(self): def stop(self):
"""Improved stop method with better cleanup""" """Improved stop method with better cleanup"""
if self._stop_event.is_set(): if self._stop_event.is_set():
return # Already stopping return # Already stopping
if self.multi_series_mode:
self._stop_multi_series()
else:
self._stop_single_series()
def _stop_single_series(self):
"""Stop single-series (legacy) mode"""
print("Stopping threads...") print("Stopping threads...")
self._stop_event.set() self._stop_event.set()
@ -539,6 +941,248 @@ class MultiDongle:
print(f"Error disconnecting device group: {e}") print(f"Error disconnecting device group: {e}")
self.device_group = None self.device_group = None
def _stop_multi_series(self):
"""Stop multi-series mode"""
print("[Stopping Multi-Series Threads]")
self._stop_event.set()
# Clear input queue to unblock dispatcher
while not self._input_queue.empty():
try:
self._input_queue.get_nowait()
except queue.Empty:
break
# Signal dispatcher thread to wake up
self._input_queue.put(None)
# Clear series result queues
for series_name, result_queue in self.result_queues.items():
while not result_queue.empty():
try:
result_queue.get_nowait()
except queue.Empty:
break
# Stop all send threads
for series_name, send_thread in self.send_threads.items():
if send_thread and send_thread.is_alive():
send_thread.join(timeout=2.0)
if send_thread.is_alive():
print(f"Warning: Send thread for {series_name} didn't stop cleanly")
# Stop all receive threads
for series_name, receive_thread in self.receive_threads.items():
if receive_thread and receive_thread.is_alive():
receive_thread.join(timeout=2.0)
if receive_thread.is_alive():
print(f"Warning: Receive thread for {series_name} didn't stop cleanly")
# Stop dispatcher thread
if self.dispatcher_thread and self.dispatcher_thread.is_alive():
self.dispatcher_thread.join(timeout=2.0)
if self.dispatcher_thread.is_alive():
print("Warning: Dispatcher thread didn't stop cleanly")
# Stop result ordering thread
if self.result_ordering_thread and self.result_ordering_thread.is_alive():
self.result_ordering_thread.join(timeout=2.0)
if self.result_ordering_thread.is_alive():
print("Warning: Result ordering thread didn't stop cleanly")
# Disconnect all device groups
print("Disconnecting device groups...")
for series_name, device_group in self.device_groups.items():
try:
kp.core.disconnect_devices(device_group=device_group)
print(f"Device group for {series_name} disconnected successfully.")
except kp.ApiKPException as e:
print(f"Error disconnecting device group for {series_name}: {e}")
self.device_groups.clear()
def _dispatcher_thread_func(self):
"""Dispatcher thread: assigns tasks to dongles based on load balancing"""
print("Dispatcher thread started")
while not self._stop_event.is_set():
try:
task = self._input_queue.get(timeout=0.1)
if task is None: # Sentinel value
continue
# Select optimal dongle based on current load and capacity
selected_series = self._select_optimal_series()
if selected_series is None:
print("Warning: No series available for task dispatch")
continue
# Enqueue to selected series
self.result_queues[selected_series].put(task)
self.current_loads[selected_series] += 1
except queue.Empty:
continue
except Exception as e:
print(f"Error in dispatcher: {e}")
if not self._stop_event.is_set():
self._stop_event.set()
print("Dispatcher thread stopped")
def _multi_series_send_thread_func(self, series_name: str):
"""Send thread for specific dongle series - with tuple handling fix"""
print(f"Send worker started for {series_name}")
device_group = self.device_groups[series_name]
result_queue = self.result_queues[series_name]
model_descriptor = self.model_descriptors[series_name]
while not self._stop_event.is_set():
try:
task = result_queue.get(timeout=0.1)
if task is None:
continue
# Handle both tuple and dict formats
if isinstance(task, tuple):
# Legacy single-series format: (image_data, image_format)
image_data, image_format = task
sequence_id = getattr(self, '_inference_counter', 0)
self._inference_counter = sequence_id + 1
elif isinstance(task, dict):
# Multi-series format: dict with keys
image_data = task.get('image_data')
image_format = task.get('image_format', kp.ImageFormat.KP_IMAGE_FORMAT_RGB565)
sequence_id = task.get('sequence_id', 0)
else:
print(f"Error: Unknown task format: {type(task)}")
continue
if image_data is None:
print(f"Error: No image data in task")
continue
# Create inference descriptor for this task
inference_descriptor = kp.GenericImageInferenceDescriptor(
model_id=model_descriptor.models[0].id,
)
inference_descriptor.inference_number = sequence_id
inference_descriptor.input_node_image_list = [
kp.GenericInputNodeImage(
image=image_data,
image_format=image_format,
resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE,
padding_mode=kp.PaddingMode.KP_PADDING_CORNER,
normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON
)
]
# Send inference
kp.inference.generic_image_inference_send(
device_group=device_group,
generic_inference_input_descriptor=inference_descriptor
)
except queue.Empty:
continue
except kp.ApiKPException as e:
print(f"Error in {series_name} send worker: {e}")
if not self._stop_event.is_set():
self._stop_event.set()
except Exception as e:
print(f"Unexpected error in {series_name} send worker: {e}")
if not self._stop_event.is_set():
self._stop_event.set()
print(f"Send worker stopped for {series_name}")
def _multi_series_receive_thread_func(self, series_name: str):
"""Receive thread for specific dongle series"""
print(f"Receive worker started for {series_name}")
device_group = self.device_groups[series_name]
while not self._stop_event.is_set():
try:
# Receive inference result
raw_result = kp.inference.generic_image_inference_receive(device_group=device_group)
# Create result object
result = {
'sequence_id': raw_result.header.inference_number,
'result': raw_result,
'dongle_series': series_name,
'timestamp': time.time()
}
# Add to pending results for ordering
self.pending_results[result['sequence_id']] = result
self.current_loads[series_name] = max(0, self.current_loads[series_name] - 1)
except kp.ApiKPException as e:
if not self._stop_event.is_set():
print(f"Error in {series_name} receive worker: {e}")
self._stop_event.set()
except Exception as e:
print(f"Unexpected error in {series_name} receive worker: {e}")
print(f"Receive worker stopped for {series_name}")
def _result_ordering_thread_func(self):
"""Result ordering thread: ensures results are output in sequence order"""
print("Result ordering worker started")
# Track when we started waiting for each sequence
sequence_wait_times = {}
MAX_WAIT_TIME = 2.0 # Maximum wait time for slow sequences (seconds)
while not self._stop_event.is_set():
current_time = time.time()
# Check if next expected result is available
if self.next_output_sequence in self.pending_results:
result = self.pending_results.pop(self.next_output_sequence)
self._ordered_output_queue.put(result)
# Remove from wait tracking
sequence_wait_times.pop(self.next_output_sequence, None)
self.next_output_sequence += 1
# Clean up old pending results to prevent memory bloat
if len(self.pending_results) > 1000: # result_buffer_size
oldest_sequences = sorted(self.pending_results.keys())[:500]
for seq_id in oldest_sequences:
if seq_id < self.next_output_sequence:
self.pending_results.pop(seq_id, None)
else:
# Track how long we've been waiting for this sequence
if self.next_output_sequence not in sequence_wait_times:
sequence_wait_times[self.next_output_sequence] = current_time
# Check if we've been waiting too long
wait_time = current_time - sequence_wait_times[self.next_output_sequence]
if wait_time > MAX_WAIT_TIME:
print(f"Warning: Skipping sequence {self.next_output_sequence} after {wait_time:.2f}s timeout")
# Create a timeout result
timeout_result = {
'sequence_id': self.next_output_sequence,
'result': {'error': 'timeout', 'probability': 0.0, 'result_string': 'Timeout'},
'dongle_series': 'timeout',
'timestamp': current_time
}
self._ordered_output_queue.put(timeout_result)
# Remove from wait tracking and advance sequence
sequence_wait_times.pop(self.next_output_sequence, None)
self.next_output_sequence += 1
else:
time.sleep(0.001) # Small delay to prevent busy waiting
print("Result ordering worker stopped")
def put_input(self, image: Union[str, np.ndarray], format: str, target_size: Tuple[int, int] = None): def put_input(self, image: Union[str, np.ndarray], format: str, target_size: Tuple[int, int] = None):
""" """
Put an image into the input queue with flexible preprocessing Put an image into the input queue with flexible preprocessing
@ -560,7 +1204,22 @@ class MultiDongle:
else: else:
raise ValueError(f"Unsupported format: {format}") raise ValueError(f"Unsupported format: {format}")
self._input_queue.put((image_data, image_format_enum)) if self.multi_series_mode:
# In multi-series mode, create a task with sequence ID
with self.sequence_lock:
sequence_id = self.sequence_counter
self.sequence_counter += 1
task = {
'sequence_id': sequence_id,
'image_data': image_data,
'image_format': image_format_enum,
'timestamp': time.time()
}
self._input_queue.put(task)
else:
# In single-series mode, use the original format
self._input_queue.put((image_data, image_format_enum))
def get_output(self, timeout: float = None): def get_output(self, timeout: float = None):
""" """
@ -570,7 +1229,15 @@ class MultiDongle:
:return: Received data (e.g., kp.GenericInferenceOutputDescriptor) or None if no data available within timeout. :return: Received data (e.g., kp.GenericInferenceOutputDescriptor) or None if no data available within timeout.
""" """
try: try:
return self._output_queue.get(block=timeout is not None, timeout=timeout) if self.multi_series_mode:
# In multi-series mode, use the ordered output queue
result = self._ordered_output_queue.get(block=timeout is not None, timeout=timeout)
if result and isinstance(result, dict):
return result.get('result') # Extract the actual inference result
return result
else:
# In single-series mode, use the regular output queue
return self._output_queue.get(block=timeout is not None, timeout=timeout)
except queue.Empty: except queue.Empty:
return None return None
@ -637,7 +1304,7 @@ class MultiDongle:
'kl720': 'KL720', 'kl720': 'KL720',
'kl630': 'KL630', 'kl630': 'KL630',
'kl730': 'KL730', 'kl730': 'KL730',
'kl540': 'KL540', # 'kl540': 'KL540',
} }
if isinstance(chip_id, str): if isinstance(chip_id, str):

View File

@ -1,375 +0,0 @@
#!/usr/bin/env python3
"""
智慧拓撲排序算法演示 (獨立版本)
不依賴外部模組純粹展示拓撲排序算法的核心功能
"""
import json
from typing import List, Dict, Any, Tuple
from collections import deque
class TopologyDemo:
"""演示拓撲排序算法的類別"""
def __init__(self):
self.stage_order = []
def analyze_pipeline(self, pipeline_data: Dict[str, Any]):
"""分析pipeline並執行拓撲排序"""
print("Starting intelligent pipeline topology analysis...")
# 提取模型節點
model_nodes = [node for node in pipeline_data.get('nodes', [])
if 'model' in node.get('type', '').lower()]
connections = pipeline_data.get('connections', [])
if not model_nodes:
print(" Warning: No model nodes found!")
return []
# 建立依賴圖
dependency_graph = self._build_dependency_graph(model_nodes, connections)
# 檢測循環
cycles = self._detect_cycles(dependency_graph)
if cycles:
print(f" Warning: Found {len(cycles)} cycles!")
dependency_graph = self._resolve_cycles(dependency_graph, cycles)
# 執行拓撲排序
sorted_stages = self._topological_sort_with_optimization(dependency_graph, model_nodes)
# 計算指標
metrics = self._calculate_pipeline_metrics(sorted_stages, dependency_graph)
self._display_pipeline_analysis(sorted_stages, metrics)
return sorted_stages
def _build_dependency_graph(self, model_nodes: List[Dict], connections: List[Dict]) -> Dict[str, Dict]:
"""建立依賴圖"""
print(" Building dependency graph...")
graph = {}
for node in model_nodes:
graph[node['id']] = {
'node': node,
'dependencies': set(),
'dependents': set(),
'depth': 0
}
# 分析連接
for conn in connections:
output_node_id = conn.get('output_node')
input_node_id = conn.get('input_node')
if output_node_id in graph and input_node_id in graph:
graph[input_node_id]['dependencies'].add(output_node_id)
graph[output_node_id]['dependents'].add(input_node_id)
dep_count = sum(len(data['dependencies']) for data in graph.values())
print(f" Graph built: {len(graph)} nodes, {dep_count} dependencies")
return graph
def _detect_cycles(self, graph: Dict[str, Dict]) -> List[List[str]]:
"""檢測循環"""
print(" Checking for dependency cycles...")
cycles = []
visited = set()
rec_stack = set()
def dfs_cycle_detect(node_id, path):
if node_id in rec_stack:
cycle_start = path.index(node_id)
cycle = path[cycle_start:] + [node_id]
cycles.append(cycle)
return True
if node_id in visited:
return False
visited.add(node_id)
rec_stack.add(node_id)
path.append(node_id)
for dependent in graph[node_id]['dependents']:
if dfs_cycle_detect(dependent, path):
return True
path.pop()
rec_stack.remove(node_id)
return False
for node_id in graph:
if node_id not in visited:
dfs_cycle_detect(node_id, [])
if cycles:
print(f" Warning: Found {len(cycles)} cycles")
else:
print(" No cycles detected")
return cycles
def _resolve_cycles(self, graph: Dict[str, Dict], cycles: List[List[str]]) -> Dict[str, Dict]:
"""解決循環"""
print(" Resolving dependency cycles...")
for cycle in cycles:
node_names = [graph[nid]['node']['name'] for nid in cycle]
print(f" Breaking cycle: {''.join(node_names)}")
if len(cycle) >= 2:
node_to_break = cycle[-2]
dependent_to_break = cycle[-1]
graph[dependent_to_break]['dependencies'].discard(node_to_break)
graph[node_to_break]['dependents'].discard(dependent_to_break)
print(f" Broke dependency: {graph[node_to_break]['node']['name']}{graph[dependent_to_break]['node']['name']}")
return graph
def _topological_sort_with_optimization(self, graph: Dict[str, Dict], model_nodes: List[Dict]) -> List[Dict]:
"""執行優化的拓撲排序"""
print(" Performing optimized topological sort...")
# 計算深度層級
self._calculate_depth_levels(graph)
# 按深度分組
depth_groups = self._group_by_depth(graph)
# 排序
sorted_nodes = []
for depth in sorted(depth_groups.keys()):
group_nodes = depth_groups[depth]
group_nodes.sort(key=lambda nid: (
len(graph[nid]['dependencies']),
-len(graph[nid]['dependents']),
graph[nid]['node']['name']
))
for node_id in group_nodes:
sorted_nodes.append(graph[node_id]['node'])
print(f" Sorted {len(sorted_nodes)} stages into {len(depth_groups)} execution levels")
return sorted_nodes
def _calculate_depth_levels(self, graph: Dict[str, Dict]):
"""計算深度層級"""
print(" Calculating execution depth levels...")
no_deps = [nid for nid, data in graph.items() if not data['dependencies']]
queue = deque([(nid, 0) for nid in no_deps])
while queue:
node_id, depth = queue.popleft()
if graph[node_id]['depth'] < depth:
graph[node_id]['depth'] = depth
for dependent in graph[node_id]['dependents']:
queue.append((dependent, depth + 1))
def _group_by_depth(self, graph: Dict[str, Dict]) -> Dict[int, List[str]]:
"""按深度分組"""
depth_groups = {}
for node_id, data in graph.items():
depth = data['depth']
if depth not in depth_groups:
depth_groups[depth] = []
depth_groups[depth].append(node_id)
return depth_groups
def _calculate_pipeline_metrics(self, sorted_stages: List[Dict], graph: Dict[str, Dict]) -> Dict[str, Any]:
"""計算指標"""
print(" Calculating pipeline metrics...")
total_stages = len(sorted_stages)
max_depth = max([data['depth'] for data in graph.values()]) + 1 if graph else 1
depth_distribution = {}
for data in graph.values():
depth = data['depth']
depth_distribution[depth] = depth_distribution.get(depth, 0) + 1
max_parallel = max(depth_distribution.values()) if depth_distribution else 1
critical_path = self._find_critical_path(graph)
return {
'total_stages': total_stages,
'pipeline_depth': max_depth,
'max_parallel_stages': max_parallel,
'parallelization_efficiency': (total_stages / max_depth) if max_depth > 0 else 1.0,
'critical_path_length': len(critical_path),
'critical_path': critical_path
}
def _find_critical_path(self, graph: Dict[str, Dict]) -> List[str]:
"""找出關鍵路徑"""
longest_path = []
def dfs_longest_path(node_id, current_path):
nonlocal longest_path
current_path.append(node_id)
if not graph[node_id]['dependents']:
if len(current_path) > len(longest_path):
longest_path = current_path.copy()
else:
for dependent in graph[node_id]['dependents']:
dfs_longest_path(dependent, current_path)
current_path.pop()
for node_id, data in graph.items():
if not data['dependencies']:
dfs_longest_path(node_id, [])
return longest_path
def _display_pipeline_analysis(self, sorted_stages: List[Dict], metrics: Dict[str, Any]):
"""顯示分析結果"""
print("\n" + "="*60)
print("INTELLIGENT PIPELINE TOPOLOGY ANALYSIS COMPLETE")
print("="*60)
print(f"Pipeline Metrics:")
print(f" Total Stages: {metrics['total_stages']}")
print(f" Pipeline Depth: {metrics['pipeline_depth']} levels")
print(f" Max Parallel Stages: {metrics['max_parallel_stages']}")
print(f" Parallelization Efficiency: {metrics['parallelization_efficiency']:.1%}")
print(f"\nOptimized Execution Order:")
for i, stage in enumerate(sorted_stages, 1):
print(f" {i:2d}. {stage['name']} (ID: {stage['id'][:8]}...)")
if metrics['critical_path']:
print(f"\nCritical Path ({metrics['critical_path_length']} stages):")
critical_names = []
for node_id in metrics['critical_path']:
node_name = next((stage['name'] for stage in sorted_stages if stage['id'] == node_id), 'Unknown')
critical_names.append(node_name)
print(f" {''.join(critical_names)}")
print(f"\nPerformance Insights:")
if metrics['parallelization_efficiency'] > 0.8:
print(" Excellent parallelization potential!")
elif metrics['parallelization_efficiency'] > 0.6:
print(" Good parallelization opportunities available")
else:
print(" Limited parallelization - consider pipeline redesign")
if metrics['pipeline_depth'] <= 3:
print(" Low latency pipeline - great for real-time applications")
elif metrics['pipeline_depth'] <= 6:
print(" Balanced pipeline depth - good throughput/latency trade-off")
else:
print(" Deep pipeline - optimized for maximum throughput")
print("="*60 + "\n")
def create_demo_pipelines():
"""創建演示用的pipeline"""
# Demo 1: 簡單線性pipeline
simple_pipeline = {
"project_name": "Simple Linear Pipeline",
"nodes": [
{"id": "model_001", "name": "Object Detection", "type": "ExactModelNode"},
{"id": "model_002", "name": "Fire Classification", "type": "ExactModelNode"},
{"id": "model_003", "name": "Result Verification", "type": "ExactModelNode"}
],
"connections": [
{"output_node": "model_001", "input_node": "model_002"},
{"output_node": "model_002", "input_node": "model_003"}
]
}
# Demo 2: 並行pipeline
parallel_pipeline = {
"project_name": "Parallel Processing Pipeline",
"nodes": [
{"id": "model_001", "name": "RGB Processor", "type": "ExactModelNode"},
{"id": "model_002", "name": "IR Processor", "type": "ExactModelNode"},
{"id": "model_003", "name": "Depth Processor", "type": "ExactModelNode"},
{"id": "model_004", "name": "Fusion Engine", "type": "ExactModelNode"}
],
"connections": [
{"output_node": "model_001", "input_node": "model_004"},
{"output_node": "model_002", "input_node": "model_004"},
{"output_node": "model_003", "input_node": "model_004"}
]
}
# Demo 3: 複雜多層pipeline
complex_pipeline = {
"project_name": "Advanced Multi-Stage Fire Detection Pipeline",
"nodes": [
{"id": "model_rgb_001", "name": "RGB Feature Extractor", "type": "ExactModelNode"},
{"id": "model_edge_002", "name": "Edge Feature Extractor", "type": "ExactModelNode"},
{"id": "model_thermal_003", "name": "Thermal Feature Extractor", "type": "ExactModelNode"},
{"id": "model_fusion_004", "name": "Feature Fusion", "type": "ExactModelNode"},
{"id": "model_attention_005", "name": "Attention Mechanism", "type": "ExactModelNode"},
{"id": "model_classifier_006", "name": "Fire Classifier", "type": "ExactModelNode"}
],
"connections": [
{"output_node": "model_rgb_001", "input_node": "model_fusion_004"},
{"output_node": "model_edge_002", "input_node": "model_fusion_004"},
{"output_node": "model_thermal_003", "input_node": "model_attention_005"},
{"output_node": "model_fusion_004", "input_node": "model_classifier_006"},
{"output_node": "model_attention_005", "input_node": "model_classifier_006"}
]
}
# Demo 4: 有循環的pipeline (測試循環檢測)
cycle_pipeline = {
"project_name": "Pipeline with Cycles (Testing)",
"nodes": [
{"id": "model_A", "name": "Model A", "type": "ExactModelNode"},
{"id": "model_B", "name": "Model B", "type": "ExactModelNode"},
{"id": "model_C", "name": "Model C", "type": "ExactModelNode"}
],
"connections": [
{"output_node": "model_A", "input_node": "model_B"},
{"output_node": "model_B", "input_node": "model_C"},
{"output_node": "model_C", "input_node": "model_A"} # 創建循環!
]
}
return [simple_pipeline, parallel_pipeline, complex_pipeline, cycle_pipeline]
def main():
"""主演示函數"""
print("INTELLIGENT PIPELINE TOPOLOGY SORTING DEMONSTRATION")
print("="*60)
print("This demo showcases our advanced pipeline analysis capabilities:")
print("• Automatic dependency resolution")
print("• Parallel execution optimization")
print("• Cycle detection and prevention")
print("• Critical path analysis")
print("• Performance metrics calculation")
print("="*60 + "\n")
demo = TopologyDemo()
pipelines = create_demo_pipelines()
demo_names = ["Simple Linear", "Parallel Processing", "Complex Multi-Stage", "Cycle Detection"]
for i, (pipeline, name) in enumerate(zip(pipelines, demo_names), 1):
print(f"DEMO {i}: {name} Pipeline")
print("="*50)
demo.analyze_pipeline(pipeline)
print("\n")
print("ALL DEMONSTRATIONS COMPLETED SUCCESSFULLY!")
print("Ready for production deployment and progress reporting!")
if __name__ == "__main__":
main()

View File

@ -463,6 +463,72 @@ class MFlowConverter:
print("="*60 + "\n") print("="*60 + "\n")
def _build_multi_series_config_from_properties(self, properties: Dict[str, Any]) -> Dict[str, Any]:
"""Build multi-series configuration from node properties"""
try:
enabled_series = properties.get('enabled_series', [])
assets_folder = properties.get('assets_folder', '')
if not enabled_series:
print("Warning: No enabled_series found in multi-series mode")
return {}
multi_series_config = {}
for series in enabled_series:
# Get port IDs for this series
port_ids_str = properties.get(f'kl{series}_port_ids', '')
if not port_ids_str or not port_ids_str.strip():
print(f"Warning: No port IDs configured for KL{series}")
continue
# Parse port IDs (comma-separated string to list of integers)
try:
port_ids = [int(pid.strip()) for pid in port_ids_str.split(',') if pid.strip()]
if not port_ids:
continue
except ValueError:
print(f"Warning: Invalid port IDs for KL{series}: {port_ids_str}")
continue
# Build series configuration
series_config = {
"port_ids": port_ids
}
# Add model path if assets folder is configured
if assets_folder:
import os
model_folder = os.path.join(assets_folder, 'Models', f'KL{series}')
if os.path.exists(model_folder):
# Look for .nef files in the model folder
nef_files = [f for f in os.listdir(model_folder) if f.endswith('.nef')]
if nef_files:
series_config["model_path"] = os.path.join(model_folder, nef_files[0])
print(f"Found model for KL{series}: {series_config['model_path']}")
# Add firmware paths if available
firmware_folder = os.path.join(assets_folder, 'Firmware', f'KL{series}')
if os.path.exists(firmware_folder):
scpu_path = os.path.join(firmware_folder, 'fw_scpu.bin')
ncpu_path = os.path.join(firmware_folder, 'fw_ncpu.bin')
if os.path.exists(scpu_path) and os.path.exists(ncpu_path):
series_config["firmware_paths"] = {
"scpu": scpu_path,
"ncpu": ncpu_path
}
print(f"Found firmware for KL{series}: scpu={scpu_path}, ncpu={ncpu_path}")
multi_series_config[f'KL{series}'] = series_config
print(f"Configured KL{series} with {len(port_ids)} devices on ports {port_ids}")
return multi_series_config if multi_series_config else {}
except Exception as e:
print(f"Error building multi-series config from properties: {e}")
return {}
def _create_stage_configs(self, model_nodes: List[Dict], preprocess_nodes: List[Dict], def _create_stage_configs(self, model_nodes: List[Dict], preprocess_nodes: List[Dict],
postprocess_nodes: List[Dict], connections: List[Dict]) -> List[StageConfig]: postprocess_nodes: List[Dict], connections: List[Dict]) -> List[StageConfig]:
"""Create StageConfig objects for each model node""" """Create StageConfig objects for each model node"""
@ -502,16 +568,38 @@ class MFlowConverter:
# Queue size # Queue size
max_queue_size = properties.get('max_queue_size', 50) max_queue_size = properties.get('max_queue_size', 50)
# Create StageConfig # Check if multi-series mode is enabled
stage_config = StageConfig( multi_series_mode = properties.get('multi_series_mode', False)
stage_id=stage_id, multi_series_config = None
port_ids=port_ids,
scpu_fw_path=scpu_fw_path, if multi_series_mode:
ncpu_fw_path=ncpu_fw_path, # Build multi-series config from node properties
model_path=model_path, multi_series_config = self._build_multi_series_config_from_properties(properties)
upload_fw=upload_fw, print(f"Multi-series config for {stage_id}: {multi_series_config}")
max_queue_size=max_queue_size
) # Create StageConfig for multi-series mode
stage_config = StageConfig(
stage_id=stage_id,
port_ids=[], # Will be handled by multi_series_config
scpu_fw_path='', # Will be handled by multi_series_config
ncpu_fw_path='', # Will be handled by multi_series_config
model_path='', # Will be handled by multi_series_config
upload_fw=upload_fw,
max_queue_size=max_queue_size,
multi_series_config=multi_series_config
)
else:
# Create StageConfig for single-series mode (legacy)
stage_config = StageConfig(
stage_id=stage_id,
port_ids=port_ids,
scpu_fw_path=scpu_fw_path,
ncpu_fw_path=ncpu_fw_path,
model_path=model_path,
upload_fw=upload_fw,
max_queue_size=max_queue_size,
multi_series_config=None
)
stage_configs.append(stage_config) stage_configs.append(stage_config)
@ -625,22 +713,87 @@ class MFlowConverter:
"""Validate individual stage configuration""" """Validate individual stage configuration"""
errors = [] errors = []
# Check model path # Check if this is multi-series configuration
if not stage_config.model_path: if stage_config.multi_series_config:
errors.append(f"Stage {stage_num}: Model path is required") # Multi-series validation
elif not os.path.exists(stage_config.model_path): errors.extend(self._validate_multi_series_config(stage_config.multi_series_config, stage_num))
errors.append(f"Stage {stage_num}: Model file not found: {stage_config.model_path}") else:
# Single-series validation (legacy)
# Check model path
if not stage_config.model_path:
errors.append(f"Stage {stage_num}: Model path is required")
elif not os.path.exists(stage_config.model_path):
errors.append(f"Stage {stage_num}: Model file not found: {stage_config.model_path}")
# Check firmware paths if upload_fw is True # Check firmware paths if upload_fw is True
if stage_config.upload_fw: if stage_config.upload_fw:
if not os.path.exists(stage_config.scpu_fw_path): if not os.path.exists(stage_config.scpu_fw_path):
errors.append(f"Stage {stage_num}: SCPU firmware not found: {stage_config.scpu_fw_path}") errors.append(f"Stage {stage_num}: SCPU firmware not found: {stage_config.scpu_fw_path}")
if not os.path.exists(stage_config.ncpu_fw_path): if not os.path.exists(stage_config.ncpu_fw_path):
errors.append(f"Stage {stage_num}: NCPU firmware not found: {stage_config.ncpu_fw_path}") errors.append(f"Stage {stage_num}: NCPU firmware not found: {stage_config.ncpu_fw_path}")
# Check port IDs # Check port IDs
if not stage_config.port_ids: if not stage_config.port_ids:
errors.append(f"Stage {stage_num}: At least one port ID is required") errors.append(f"Stage {stage_num}: At least one port ID is required")
return errors
def _validate_multi_series_config(self, multi_series_config: Dict[str, Any], stage_num: int) -> List[str]:
"""Validate multi-series configuration"""
errors = []
if not multi_series_config:
errors.append(f"Stage {stage_num}: Multi-series configuration is empty")
return errors
print(f"Validating multi-series config for stage {stage_num}: {list(multi_series_config.keys())}")
# Check each series configuration
for series_name, series_config in multi_series_config.items():
if not isinstance(series_config, dict):
errors.append(f"Stage {stage_num}: Invalid configuration for {series_name}")
continue
# Check port IDs
port_ids = series_config.get('port_ids', [])
if not port_ids:
errors.append(f"Stage {stage_num}: {series_name} has no port IDs configured")
continue
if not isinstance(port_ids, list) or not all(isinstance(p, int) for p in port_ids):
errors.append(f"Stage {stage_num}: {series_name} port IDs must be a list of integers")
continue
print(f" {series_name}: {len(port_ids)} ports configured")
# Check model path
model_path = series_config.get('model_path')
if model_path:
if not os.path.exists(model_path):
errors.append(f"Stage {stage_num}: {series_name} model file not found: {model_path}")
else:
print(f" {series_name}: Model validated: {model_path}")
else:
print(f" {series_name}: No model path specified (optional for multi-series)")
# Check firmware paths if specified
firmware_paths = series_config.get('firmware_paths')
if firmware_paths and isinstance(firmware_paths, dict):
scpu_path = firmware_paths.get('scpu')
ncpu_path = firmware_paths.get('ncpu')
if scpu_path and not os.path.exists(scpu_path):
errors.append(f"Stage {stage_num}: {series_name} SCPU firmware not found: {scpu_path}")
elif scpu_path:
print(f" {series_name}: SCPU firmware validated: {scpu_path}")
if ncpu_path and not os.path.exists(ncpu_path):
errors.append(f"Stage {stage_num}: {series_name} NCPU firmware not found: {ncpu_path}")
elif ncpu_path:
print(f" {series_name}: NCPU firmware validated: {ncpu_path}")
if not errors:
print(f"Stage {stage_num}: Multi-series configuration validation passed")
return errors return errors

View File

@ -5,6 +5,8 @@ This module provides node implementations that exactly match the original
properties and behavior from the monolithic UI.py file. properties and behavior from the monolithic UI.py file.
""" """
import os
try: try:
from NodeGraphQt import BaseNode from NodeGraphQt import BaseNode
NODEGRAPH_AVAILABLE = True NODEGRAPH_AVAILABLE = True
@ -115,20 +117,60 @@ class ExactModelNode(BaseNode):
self.create_property('port_id', '') self.create_property('port_id', '')
self.create_property('upload_fw', True) self.create_property('upload_fw', True)
# Multi-series properties
self.create_property('multi_series_mode', False)
self.create_property('assets_folder', '')
self.create_property('enabled_series', ['520', '720'])
# Series-specific port ID configurations
self.create_property('kl520_port_ids', '')
self.create_property('kl720_port_ids', '')
self.create_property('kl630_port_ids', '')
self.create_property('kl730_port_ids', '')
# self.create_property('kl540_port_ids', '')
self.create_property('max_queue_size', 100)
self.create_property('result_buffer_size', 1000)
self.create_property('batch_size', 1)
self.create_property('enable_preprocessing', False)
self.create_property('enable_postprocessing', False)
# Original property options - exact match # Original property options - exact match
self._property_options = { self._property_options = {
'dongle_series': ['520', '720', '1080', 'Custom'], 'dongle_series': ['520', '720'],
'num_dongles': {'min': 1, 'max': 16}, 'num_dongles': {'min': 1, 'max': 16},
'model_path': {'type': 'file_path', 'filter': 'NEF Model files (*.nef)'}, 'model_path': {'type': 'file_path', 'filter': 'NEF Model files (*.nef)'},
'scpu_fw_path': {'type': 'file_path', 'filter': 'SCPU Firmware files (*.bin)'}, 'scpu_fw_path': {'type': 'file_path', 'filter': 'SCPU Firmware files (*.bin)'},
'ncpu_fw_path': {'type': 'file_path', 'filter': 'NCPU Firmware files (*.bin)'}, 'ncpu_fw_path': {'type': 'file_path', 'filter': 'NCPU Firmware files (*.bin)'},
'port_id': {'placeholder': 'e.g., 8080 or auto'}, 'port_id': {'placeholder': 'e.g., 8080 or auto'},
'upload_fw': {'type': 'bool', 'default': True, 'description': 'Upload firmware to dongle if needed'} 'upload_fw': {'type': 'bool', 'default': True, 'description': 'Upload firmware to dongle if needed'},
# Multi-series property options
'multi_series_mode': {'type': 'bool', 'default': False, 'description': 'Enable multi-series dongle support'},
'assets_folder': {'type': 'file_path', 'filter': 'Folder', 'mode': 'directory'},
'enabled_series': {'type': 'list', 'options': ['520', '720', '630', '730', '540'], 'default': ['520', '720']},
# Series-specific port ID options
'kl520_port_ids': {'placeholder': 'e.g., 28,32 (comma-separated port IDs for KL520)', 'description': 'Port IDs for KL520 dongles'},
'kl720_port_ids': {'placeholder': 'e.g., 30,34 (comma-separated port IDs for KL720)', 'description': 'Port IDs for KL720 dongles'},
'kl630_port_ids': {'placeholder': 'e.g., 36,38 (comma-separated port IDs for KL630)', 'description': 'Port IDs for KL630 dongles'},
'kl730_port_ids': {'placeholder': 'e.g., 40,42 (comma-separated port IDs for KL730)', 'description': 'Port IDs for KL730 dongles'},
# 'kl540_port_ids': {'placeholder': 'e.g., 44,46 (comma-separated port IDs for KL540)', 'description': 'Port IDs for KL540 dongles'},
'max_queue_size': {'min': 1, 'max': 1000, 'default': 100},
'result_buffer_size': {'min': 100, 'max': 10000, 'default': 1000},
'batch_size': {'min': 1, 'max': 32, 'default': 1},
'enable_preprocessing': {'type': 'bool', 'default': False},
'enable_postprocessing': {'type': 'bool', 'default': False}
} }
# Create custom properties dictionary for UI compatibility # Create custom properties dictionary for UI compatibility
self._populate_custom_properties() self._populate_custom_properties()
# Set up custom property handlers for folder selection
if NODEGRAPH_AVAILABLE:
self._setup_custom_property_handlers()
def _populate_custom_properties(self): def _populate_custom_properties(self):
"""Populate the custom properties dictionary for UI compatibility.""" """Populate the custom properties dictionary for UI compatibility."""
if not NODEGRAPH_AVAILABLE: if not NODEGRAPH_AVAILABLE:
@ -166,8 +208,400 @@ class ExactModelNode(BaseNode):
def get_display_properties(self): def get_display_properties(self):
"""Return properties that should be displayed in the UI panel.""" """Return properties that should be displayed in the UI panel."""
# Customize which properties appear for Model nodes if not NODEGRAPH_AVAILABLE:
return ['model_path', 'scpu_fw_path', 'ncpu_fw_path', 'dongle_series', 'num_dongles', 'port_id', 'upload_fw'] return []
# Base properties that are always shown
base_props = ['multi_series_mode']
try:
# Check if we're in multi-series mode
multi_series_mode = self.get_property('multi_series_mode')
if multi_series_mode:
# Multi-series mode: show multi-series specific properties
multi_props = ['assets_folder', 'enabled_series']
# Add port ID configurations for enabled series
try:
enabled_series = self.get_property('enabled_series') or []
for series in enabled_series:
port_prop = f'kl{series}_port_ids'
if port_prop not in multi_props: # Avoid duplicates
multi_props.append(port_prop)
except:
pass # If can't get enabled_series, just show basic properties
# Add other multi-series properties
multi_props.extend([
'max_queue_size', 'result_buffer_size', 'batch_size',
'enable_preprocessing', 'enable_postprocessing'
])
return base_props + multi_props
else:
# Single-series mode: show traditional properties
return base_props + [
'model_path', 'scpu_fw_path', 'ncpu_fw_path',
'dongle_series', 'num_dongles', 'port_id', 'upload_fw'
]
except:
# Fallback to single-series mode if property access fails
return base_props + [
'model_path', 'scpu_fw_path', 'ncpu_fw_path',
'dongle_series', 'num_dongles', 'port_id', 'upload_fw'
]
def get_inference_config(self):
"""Get configuration for inference pipeline"""
if not NODEGRAPH_AVAILABLE:
return {}
try:
multi_series_mode = self.get_property('multi_series_mode')
if multi_series_mode:
# Multi-series configuration with series-specific port IDs
config = {
'multi_series_mode': True,
'assets_folder': self.get_property('assets_folder'),
'enabled_series': self.get_property('enabled_series'),
'max_queue_size': self.get_property('max_queue_size'),
'result_buffer_size': self.get_property('result_buffer_size'),
'batch_size': self.get_property('batch_size'),
'enable_preprocessing': self.get_property('enable_preprocessing'),
'enable_postprocessing': self.get_property('enable_postprocessing')
}
# Build multi-series config for MultiDongle
multi_series_config = self._build_multi_series_config()
if multi_series_config:
config['multi_series_config'] = multi_series_config
return config
else:
# Single-series configuration
return {
'multi_series_mode': False,
'model_path': self.get_property('model_path'),
'scpu_fw_path': self.get_property('scpu_fw_path'),
'ncpu_fw_path': self.get_property('ncpu_fw_path'),
'dongle_series': self.get_property('dongle_series'),
'num_dongles': self.get_property('num_dongles'),
'port_id': self.get_property('port_id'),
'upload_fw': self.get_property('upload_fw')
}
except:
# Fallback to single-series configuration
return {
'multi_series_mode': False,
'model_path': self.get_property('model_path', ''),
'scpu_fw_path': self.get_property('scpu_fw_path', ''),
'ncpu_fw_path': self.get_property('ncpu_fw_path', ''),
'dongle_series': self.get_property('dongle_series', '520'),
'num_dongles': self.get_property('num_dongles', 1),
'port_id': self.get_property('port_id', ''),
'upload_fw': self.get_property('upload_fw', True)
}
def _build_multi_series_config(self):
"""Build multi-series configuration for MultiDongle"""
try:
enabled_series = self.get_property('enabled_series') or []
assets_folder = self.get_property('assets_folder') or ''
if not enabled_series:
return None
multi_series_config = {}
for series in enabled_series:
# Get port IDs for this series
port_ids_str = self.get_property(f'kl{series}_port_ids') or ''
if not port_ids_str.strip():
continue # Skip series without port IDs
# Parse port IDs (comma-separated string to list of integers)
try:
port_ids = [int(pid.strip()) for pid in port_ids_str.split(',') if pid.strip()]
if not port_ids:
continue
except ValueError:
print(f"Warning: Invalid port IDs for KL{series}: {port_ids_str}")
continue
# Build series configuration
series_config = {
"port_ids": port_ids
}
# Add model path if assets folder is configured
if assets_folder:
import os
model_folder = os.path.join(assets_folder, 'Models', f'KL{series}')
if os.path.exists(model_folder):
# Look for .nef files in the model folder
nef_files = [f for f in os.listdir(model_folder) if f.endswith('.nef')]
if nef_files:
series_config["model_path"] = os.path.join(model_folder, nef_files[0])
# Add firmware paths if available
firmware_folder = os.path.join(assets_folder, 'Firmware', f'KL{series}')
if os.path.exists(firmware_folder):
scpu_path = os.path.join(firmware_folder, 'fw_scpu.bin')
ncpu_path = os.path.join(firmware_folder, 'fw_ncpu.bin')
if os.path.exists(scpu_path) and os.path.exists(ncpu_path):
series_config["firmware_paths"] = {
"scpu": scpu_path,
"ncpu": ncpu_path
}
multi_series_config[f'KL{series}'] = series_config
return multi_series_config if multi_series_config else None
except Exception as e:
print(f"Error building multi-series config: {e}")
return None
def get_hardware_requirements(self):
"""Get hardware requirements for this node"""
if not NODEGRAPH_AVAILABLE:
return {}
try:
multi_series_mode = self.get_property('multi_series_mode')
if multi_series_mode:
enabled_series = self.get_property('enabled_series')
return {
'multi_series_mode': True,
'required_series': enabled_series,
'estimated_dongles': len(enabled_series) * 2 # Assume 2 dongles per series
}
else:
dongle_series = self.get_property('dongle_series')
num_dongles = self.get_property('num_dongles')
return {
'multi_series_mode': False,
'required_series': [f'KL{dongle_series}'],
'estimated_dongles': num_dongles
}
except:
return {'multi_series_mode': False, 'required_series': ['KL520'], 'estimated_dongles': 1}
def _setup_custom_property_handlers(self):
"""Setup custom property handlers, especially for folder selection."""
try:
# For assets_folder, we want to trigger folder selection dialog
# This might require custom widget or property handling
# For now, we'll use the standard approach but add validation
# You can override the property widget here if needed
# This is a placeholder for custom folder selection implementation
pass
except Exception as e:
print(f"Warning: Could not setup custom property handlers: {e}")
def select_assets_folder(self):
"""Method to open folder selection dialog for assets folder using improved utility."""
if not NODEGRAPH_AVAILABLE:
return ""
try:
from utils.folder_dialog import select_assets_folder
# Get current folder path as initial directory
current_folder = ""
try:
current_folder = self.get_property('assets_folder') or ""
except:
pass
# Use the specialized assets folder dialog with validation
result = select_assets_folder(initial_dir=current_folder)
if result['path']:
# Set the property
if NODEGRAPH_AVAILABLE:
self.set_property('assets_folder', result['path'])
# Print validation results
if result['valid']:
print(f"✓ Valid Assets folder set to: {result['path']}")
if 'details' in result and 'available_series' in result['details']:
series = result['details']['available_series']
print(f" Available series: {', '.join(series)}")
else:
print(f"⚠ Assets folder set to: {result['path']}")
print(f" Warning: {result['message']}")
print(" Expected structure: Assets/Firmware/ and Assets/Models/ with series subfolders")
return result['path']
else:
print("No folder selected")
return ""
except ImportError:
print("utils.folder_dialog not available, falling back to simple input")
# Fallback to manual input
folder_path = input("Enter Assets folder path: ").strip()
if folder_path and NODEGRAPH_AVAILABLE:
self.set_property('assets_folder', folder_path)
return folder_path
except Exception as e:
print(f"Error selecting assets folder: {e}")
return ""
def _validate_assets_folder(self, folder_path):
"""Validate that the assets folder has the expected structure."""
try:
import os
# Check if Firmware and Models folders exist
firmware_path = os.path.join(folder_path, 'Firmware')
models_path = os.path.join(folder_path, 'Models')
has_firmware = os.path.exists(firmware_path) and os.path.isdir(firmware_path)
has_models = os.path.exists(models_path) and os.path.isdir(models_path)
if not (has_firmware and has_models):
return False
# Check for at least one series subfolder
expected_series = ['KL520', 'KL720', 'KL630', 'KL730']
firmware_series = [d for d in os.listdir(firmware_path)
if os.path.isdir(os.path.join(firmware_path, d)) and d in expected_series]
models_series = [d for d in os.listdir(models_path)
if os.path.isdir(os.path.join(models_path, d)) and d in expected_series]
# At least one series should exist in both firmware and models
return len(firmware_series) > 0 and len(models_series) > 0
except Exception as e:
print(f"Error validating assets folder: {e}")
return False
def get_assets_folder_info(self):
"""Get information about the configured assets folder."""
if not NODEGRAPH_AVAILABLE:
return {}
try:
folder_path = self.get_property('assets_folder')
if not folder_path:
return {'status': 'not_set', 'message': 'No assets folder selected'}
if not os.path.exists(folder_path):
return {'status': 'invalid', 'message': 'Selected folder does not exist'}
info = {'status': 'valid', 'path': folder_path, 'series': []}
# Get available series
firmware_path = os.path.join(folder_path, 'Firmware')
models_path = os.path.join(folder_path, 'Models')
if os.path.exists(firmware_path):
firmware_series = [d for d in os.listdir(firmware_path)
if os.path.isdir(os.path.join(firmware_path, d))]
info['firmware_series'] = firmware_series
if os.path.exists(models_path):
models_series = [d for d in os.listdir(models_path)
if os.path.isdir(os.path.join(models_path, d))]
info['models_series'] = models_series
# Find common series
if 'firmware_series' in info and 'models_series' in info:
common_series = list(set(info['firmware_series']) & set(info['models_series']))
info['available_series'] = common_series
if not common_series:
info['status'] = 'incomplete'
info['message'] = 'No series found with both firmware and models'
return info
except Exception as e:
return {'status': 'error', 'message': f'Error reading assets folder: {e}'}
def validate_configuration(self) -> tuple[bool, str]:
"""
Validate the current node configuration.
Returns:
Tuple of (is_valid, error_message)
"""
if not NODEGRAPH_AVAILABLE:
return True, ""
try:
multi_series_mode = self.get_property('multi_series_mode')
if multi_series_mode:
# Multi-series validation
enabled_series = self.get_property('enabled_series')
if not enabled_series:
return False, "No series enabled in multi-series mode"
# Check if at least one series has port IDs configured
has_valid_series = False
for series in enabled_series:
port_ids_str = self.get_property(f'kl{series}_port_ids', '')
if port_ids_str and port_ids_str.strip():
# Validate port ID format
try:
port_ids = [int(pid.strip()) for pid in port_ids_str.split(',') if pid.strip()]
if port_ids:
has_valid_series = True
print(f"Valid series config found for KL{series}: ports {port_ids}")
except ValueError:
print(f"Warning: Invalid port ID format for KL{series}: {port_ids_str}")
continue
if not has_valid_series:
return False, "At least one series must have valid port IDs configured"
# Assets folder validation (optional for multi-series)
assets_folder = self.get_property('assets_folder')
if assets_folder:
if not os.path.exists(assets_folder):
print(f"Warning: Assets folder does not exist: {assets_folder}")
else:
# Validate assets folder structure if provided
assets_info = self.get_assets_folder_info()
if assets_info.get('status') == 'error':
print(f"Warning: Assets folder issue: {assets_info.get('message', 'Unknown error')}")
print("Multi-series mode validation passed")
return True, ""
else:
# Single-series validation (legacy)
model_path = self.get_property('model_path')
if not model_path:
return False, "Model path is required"
if not os.path.exists(model_path):
return False, f"Model file does not exist: {model_path}"
# Check dongle series
dongle_series = self.get_property('dongle_series')
if dongle_series not in ['520', '720', '1080', 'Custom']:
return False, f"Invalid dongle series: {dongle_series}"
# Check number of dongles
num_dongles = self.get_property('num_dongles')
if not isinstance(num_dongles, int) or num_dongles < 1:
return False, "Number of dongles must be at least 1"
return True, ""
except Exception as e:
return False, f"Validation error: {str(e)}"
class ExactPreprocessNode(BaseNode): class ExactPreprocessNode(BaseNode):

58
debug_deployment.py Normal file
View File

@ -0,0 +1,58 @@
#!/usr/bin/env python3
"""
Debug deployment error
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def simulate_deployment():
"""Simulate the deployment process to find the Optional error"""
try:
print("Testing export_pipeline_data equivalent...")
# Simulate creating a node and getting properties
from core.nodes.exact_nodes import ExactModelNode
# This would be similar to what dashboard does
node = ExactModelNode()
print("Node created")
# Check if node has get_business_properties
if hasattr(node, 'get_business_properties'):
print("Node has get_business_properties")
try:
props = node.get_business_properties()
print(f"Properties extracted: {type(props)}")
except Exception as e:
print(f"Error in get_business_properties: {e}")
import traceback
traceback.print_exc()
# Test the mflow converter directly
print("\nTesting MFlowConverter...")
from core.functions.mflow_converter import MFlowConverter
converter = MFlowConverter(default_fw_path='.')
print("MFlowConverter created successfully")
# Test multi-series config building
test_props = {
'multi_series_mode': True,
'enabled_series': ['520', '720'],
'kl520_port_ids': '28,32',
'kl720_port_ids': '4'
}
config = converter._build_multi_series_config_from_properties(test_props)
print(f"Multi-series config: {config}")
print("All tests passed!")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
simulate_deployment()

View File

@ -0,0 +1,90 @@
#!/usr/bin/env python3
"""
Debug the multi-series configuration flow
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def test_full_flow():
"""Test the complete multi-series configuration flow"""
print("=== Testing Multi-Series Configuration Flow ===")
# Simulate node properties as they would appear in the UI
mock_node_properties = {
'multi_series_mode': True,
'enabled_series': ['520', '720'],
'kl520_port_ids': '28,32',
'kl720_port_ids': '4',
'assets_folder': '',
'max_queue_size': 100
}
print(f"1. Mock node properties: {mock_node_properties}")
# Test the mflow converter building multi-series config
try:
from core.functions.mflow_converter import MFlowConverter
converter = MFlowConverter(default_fw_path='.')
config = converter._build_multi_series_config_from_properties(mock_node_properties)
print(f"2. Multi-series config built: {config}")
if config:
print(" [OK] Multi-series config successfully built")
# Test StageConfig creation
from core.functions.InferencePipeline import StageConfig
stage_config = StageConfig(
stage_id="test_stage",
port_ids=[], # Not used in multi-series
scpu_fw_path='',
ncpu_fw_path='',
model_path='',
upload_fw=False,
multi_series_mode=True,
multi_series_config=config
)
print(f"3. StageConfig created with multi_series_mode: {stage_config.multi_series_mode}")
print(f" Multi-series config: {stage_config.multi_series_config}")
# Test what would happen in PipelineStage initialization
print("4. Testing PipelineStage initialization logic:")
if stage_config.multi_series_mode and stage_config.multi_series_config:
print(" [OK] Would initialize MultiDongle with multi_series_config")
print(f" MultiDongle(multi_series_config={stage_config.multi_series_config})")
else:
print(" [ERROR] Would fall back to single-series mode")
else:
print(" [ERROR] Multi-series config is None - this is the problem!")
except Exception as e:
print(f"Error in flow test: {e}")
import traceback
traceback.print_exc()
def test_node_direct():
"""Test creating a node directly and getting its inference config"""
print("\n=== Testing Node Direct Configuration ===")
try:
from core.nodes.exact_nodes import ExactModelNode
# This won't work without NodeGraphQt, but let's see what happens
node = ExactModelNode()
print("Node created (mock mode)")
# Test the get_business_properties method that would be called during export
props = node.get_business_properties()
print(f"Business properties: {props}")
except Exception as e:
print(f"Error in node test: {e}")
if __name__ == "__main__":
test_full_flow()
test_node_direct()

142
force_cleanup.py Normal file
View File

@ -0,0 +1,142 @@
"""
Force cleanup of all app data and processes
"""
import psutil
import os
import sys
import time
import tempfile
def kill_all_python_processes():
"""Force kill ALL Python processes (use with caution)"""
killed_processes = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
if 'python' in proc.info['name'].lower():
print(f"Killing Python process: {proc.info['pid']} - {proc.info['name']}")
proc.kill()
killed_processes.append(proc.info['pid'])
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
if killed_processes:
print(f"Killed {len(killed_processes)} Python processes")
time.sleep(3) # Give more time for cleanup
else:
print("No Python processes found")
def clear_shared_memory():
"""Clear Qt shared memory"""
try:
from PyQt5.QtCore import QSharedMemory
app_names = ["Cluster4NPU", "cluster4npu", "main"]
for app_name in app_names:
shared_mem = QSharedMemory(app_name)
if shared_mem.attach():
shared_mem.detach()
print(f"Cleared shared memory for: {app_name}")
except Exception as e:
print(f"Could not clear shared memory: {e}")
def clean_all_temp_files():
"""Remove all possible lock and temp files"""
possible_files = [
'app.lock',
'.app.lock',
'cluster4npu.lock',
'.cluster4npu.lock',
'main.lock',
'.main.lock'
]
# Check in current directory
current_dir_files = []
for filename in possible_files:
filepath = os.path.join(os.getcwd(), filename)
if os.path.exists(filepath):
try:
os.remove(filepath)
current_dir_files.append(filepath)
print(f"Removed: {filepath}")
except Exception as e:
print(f"Could not remove {filepath}: {e}")
# Check in temp directory
temp_dir = tempfile.gettempdir()
temp_files = []
for filename in possible_files:
filepath = os.path.join(temp_dir, filename)
if os.path.exists(filepath):
try:
os.remove(filepath)
temp_files.append(filepath)
print(f"Removed: {filepath}")
except Exception as e:
print(f"Could not remove {filepath}: {e}")
# Check in user home directory
home_dir = os.path.expanduser('~')
home_files = []
for filename in possible_files:
filepath = os.path.join(home_dir, filename)
if os.path.exists(filepath):
try:
os.remove(filepath)
home_files.append(filepath)
print(f"Removed: {filepath}")
except Exception as e:
print(f"Could not remove {filepath}: {e}")
total_removed = len(current_dir_files) + len(temp_files) + len(home_files)
if total_removed == 0:
print("No lock files found")
def force_unlock_files():
"""Try to unlock any locked files"""
try:
# On Windows, try to reset file handles
import subprocess
result = subprocess.run(['tasklist', '/FI', 'IMAGENAME eq python.exe'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
for line in lines[3:]: # Skip header lines
if 'python.exe' in line:
parts = line.split()
if len(parts) >= 2:
pid = parts[1]
try:
subprocess.run(['taskkill', '/F', '/PID', pid], timeout=5)
print(f"Force killed PID: {pid}")
except:
pass
except Exception as e:
print(f"Could not force unlock files: {e}")
if __name__ == '__main__':
print("FORCE CLEANUP - This will kill ALL Python processes!")
print("=" * 60)
response = input("Are you sure? This will close ALL Python programs (y/N): ")
if response.lower() in ['y', 'yes']:
print("\n1. Killing all Python processes...")
kill_all_python_processes()
print("\n2. Clearing shared memory...")
clear_shared_memory()
print("\n3. Removing lock files...")
clean_all_temp_files()
print("\n4. Force unlocking files...")
force_unlock_files()
print("\n" + "=" * 60)
print("FORCE CLEANUP COMPLETE!")
print("All Python processes killed and lock files removed.")
print("You can now start the app with 'python main.py'")
else:
print("Cleanup cancelled.")

121
gentle_cleanup.py Normal file
View File

@ -0,0 +1,121 @@
"""
Gentle cleanup of app data (safer approach)
"""
import psutil
import os
import sys
import time
def find_and_kill_app_processes():
"""Find and kill only the Cluster4NPU app processes"""
killed_processes = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'cwd']):
try:
if 'python' in proc.info['name'].lower():
cmdline = proc.info['cmdline']
cwd = proc.info['cwd']
# Check if this is our app
if (cmdline and
(any('main.py' in arg for arg in cmdline) or
any('cluster4npu' in arg.lower() for arg in cmdline) or
(cwd and 'cluster4npu' in cwd.lower()))):
print(f"Found app process: {proc.info['pid']}")
print(f" Command: {' '.join(cmdline) if cmdline else 'N/A'}")
print(f" Working dir: {cwd}")
# Try gentle termination first
proc.terminate()
time.sleep(2)
# If still running, force kill
if proc.is_running():
proc.kill()
print(f" Force killed: {proc.info['pid']}")
else:
print(f" Gently terminated: {proc.info['pid']}")
killed_processes.append(proc.info['pid'])
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
if killed_processes:
print(f"\nKilled {len(killed_processes)} app processes")
time.sleep(2)
else:
print("No app processes found")
def clear_app_locks():
"""Remove only app-specific lock files"""
app_specific_locks = [
'cluster4npu.lock',
'.cluster4npu.lock',
'Cluster4NPU.lock',
'main.lock',
'.main.lock'
]
locations = [
os.getcwd(), # Current directory
os.path.expanduser('~'), # User home
os.path.join(os.path.expanduser('~'), '.cluster4npu'), # App data dir
'C:\\temp' if os.name == 'nt' else '/tmp', # System temp
]
removed_files = []
for location in locations:
if not os.path.exists(location):
continue
for lock_name in app_specific_locks:
lock_path = os.path.join(location, lock_name)
if os.path.exists(lock_path):
try:
os.remove(lock_path)
removed_files.append(lock_path)
print(f"Removed lock: {lock_path}")
except Exception as e:
print(f"Could not remove {lock_path}: {e}")
if not removed_files:
print("No lock files found")
def reset_shared_memory():
"""Reset Qt shared memory for the app"""
try:
from PyQt5.QtCore import QSharedMemory
shared_mem = QSharedMemory("Cluster4NPU")
if shared_mem.attach():
print("Found shared memory, detaching...")
shared_mem.detach()
# Try to create and destroy to fully reset
if shared_mem.create(1):
shared_mem.detach()
print("Reset shared memory")
except Exception as e:
print(f"Could not reset shared memory: {e}")
if __name__ == '__main__':
print("Gentle App Cleanup")
print("=" * 30)
print("\n1. Looking for app processes...")
find_and_kill_app_processes()
print("\n2. Clearing app locks...")
clear_app_locks()
print("\n3. Resetting shared memory...")
reset_shared_memory()
print("\n" + "=" * 30)
print("Cleanup complete!")
print("You can now start the app with 'python main.py'")

66
kill_app_processes.py Normal file
View File

@ -0,0 +1,66 @@
"""
Kill any running app processes and clean up locks
"""
import psutil
import os
import sys
import time
def kill_python_processes():
"""Kill any Python processes that might be running the app"""
killed_processes = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
# Check if it's a Python process
if 'python' in proc.info['name'].lower():
cmdline = proc.info['cmdline']
if cmdline and any('main.py' in arg for arg in cmdline):
print(f"Killing process: {proc.info['pid']} - {' '.join(cmdline)}")
proc.kill()
killed_processes.append(proc.info['pid'])
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
if killed_processes:
print(f"Killed {len(killed_processes)} Python processes")
time.sleep(2) # Give processes time to cleanup
else:
print("No running app processes found")
def clean_lock_files():
"""Remove any lock files that might prevent app startup"""
possible_lock_files = [
'app.lock',
'.app.lock',
'cluster4npu.lock',
os.path.expanduser('~/.cluster4npu.lock'),
'/tmp/cluster4npu.lock',
'C:\\temp\\cluster4npu.lock'
]
removed_files = []
for lock_file in possible_lock_files:
try:
if os.path.exists(lock_file):
os.remove(lock_file)
removed_files.append(lock_file)
print(f"Removed lock file: {lock_file}")
except Exception as e:
print(f"Could not remove {lock_file}: {e}")
if removed_files:
print(f"Removed {len(removed_files)} lock files")
else:
print("No lock files found")
if __name__ == '__main__':
print("Cleaning up app processes and lock files...")
print("=" * 50)
kill_python_processes()
clean_lock_files()
print("=" * 50)
print("Cleanup complete! You can now start the app with 'python main.py'")

247
main.py
View File

@ -41,60 +41,194 @@ from ui.windows.login import DashboardLogin
class SingleInstance: class SingleInstance:
"""Ensure only one instance of the application can run.""" """Enhanced single instance handler with better error recovery."""
def __init__(self, app_name="Cluster4NPU"): def __init__(self, app_name="Cluster4NPU"):
self.app_name = app_name self.app_name = app_name
self.shared_memory = QSharedMemory(app_name) self.shared_memory = QSharedMemory(app_name)
self.lock_file = None self.lock_file = None
self.lock_fd = None self.lock_fd = None
self.process_check_enabled = True
def is_running(self): def is_running(self):
"""Check if another instance is already running.""" """Check if another instance is already running with recovery mechanisms."""
# Try to create shared memory # First, try to detect and clean up stale instances
if self.shared_memory.attach(): if self._detect_and_cleanup_stale_instances():
# Another instance is already running print("Cleaned up stale application instances")
# Try shared memory approach
if self._check_shared_memory():
return True return True
# Try to create the shared memory # Try file locking approach
if not self.shared_memory.create(1): if self._check_file_lock():
# Failed to create, likely another instance exists
return True return True
# Also use file locking as backup (works better on some systems)
if HAS_FCNTL:
try:
self.lock_file = os.path.join(tempfile.gettempdir(), f"{self.app_name}.lock")
self.lock_fd = os.open(self.lock_file, os.O_CREAT | os.O_EXCL | os.O_RDWR)
fcntl.lockf(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except (OSError, IOError):
# Another instance is running
if self.lock_fd:
os.close(self.lock_fd)
return True
else:
# On Windows, try simple file creation
try:
self.lock_file = os.path.join(tempfile.gettempdir(), f"{self.app_name}.lock")
self.lock_fd = os.open(self.lock_file, os.O_CREAT | os.O_EXCL | os.O_RDWR)
except (OSError, IOError):
return True
return False return False
def cleanup(self): def _detect_and_cleanup_stale_instances(self):
"""Clean up resources.""" """Detect and clean up stale instances that might have crashed."""
if self.shared_memory.isAttached(): cleaned_up = False
self.shared_memory.detach()
if self.lock_fd: try:
try: import psutil
# Check if there are any actual running processes
app_processes = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'create_time']):
try:
if 'python' in proc.info['name'].lower():
cmdline = proc.info['cmdline']
if cmdline and any('main.py' in arg for arg in cmdline):
app_processes.append(proc)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# If no actual app processes are running, clean up stale locks
if not app_processes:
cleaned_up = self._force_cleanup_locks()
except ImportError:
# psutil not available, try basic cleanup
cleaned_up = self._force_cleanup_locks()
except Exception as e:
print(f"Warning: Could not detect stale instances: {e}")
return cleaned_up
def _force_cleanup_locks(self):
"""Force cleanup of stale locks."""
cleaned_up = False
# Try to clean up shared memory
try:
if self.shared_memory.attach():
self.shared_memory.detach()
cleaned_up = True
except:
pass
# Try to clean up lock file
try:
lock_file = os.path.join(tempfile.gettempdir(), f"{self.app_name}.lock")
if os.path.exists(lock_file):
os.unlink(lock_file)
cleaned_up = True
except:
pass
return cleaned_up
def _check_shared_memory(self):
"""Check shared memory for running instance."""
try:
# Try to attach to existing shared memory
if self.shared_memory.attach():
# Check if the shared memory is actually valid
try:
# Try to read from it to verify it's not corrupted
data = self.shared_memory.data()
if data is not None:
return True # Valid instance found
else:
# Corrupted shared memory, clean it up
self.shared_memory.detach()
except:
# Error reading, clean up
self.shared_memory.detach()
# Try to create new shared memory
if not self.shared_memory.create(1):
# Could not create, but attachment failed too - might be corruption
return False
except Exception as e:
print(f"Warning: Shared memory check failed: {e}")
return False
return False
def _check_file_lock(self):
"""Check file lock for running instance."""
try:
self.lock_file = os.path.join(tempfile.gettempdir(), f"{self.app_name}.lock")
if HAS_FCNTL:
# Unix-like systems
try:
self.lock_fd = os.open(self.lock_file, os.O_CREAT | os.O_EXCL | os.O_RDWR)
fcntl.lockf(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
return False # Successfully locked, no other instance
except (OSError, IOError):
return True # Could not lock, another instance exists
else:
# Windows
try:
self.lock_fd = os.open(self.lock_file, os.O_CREAT | os.O_EXCL | os.O_RDWR)
return False # Successfully created, no other instance
except (OSError, IOError):
# File exists, but check if the process that created it is still running
if self._is_lock_file_stale():
# Stale lock file, remove it and try again
try:
os.unlink(self.lock_file)
self.lock_fd = os.open(self.lock_file, os.O_CREAT | os.O_EXCL | os.O_RDWR)
return False
except:
pass
return True
except Exception as e:
print(f"Warning: File lock check failed: {e}")
return False
def _is_lock_file_stale(self):
"""Check if the lock file is from a stale process."""
try:
if not os.path.exists(self.lock_file):
return True
# Check file age - if older than 5 minutes, consider it stale
import time
file_age = time.time() - os.path.getmtime(self.lock_file)
if file_age > 300: # 5 minutes
return True
# On Windows, we can't easily check if the process is still running
# without additional information, so we rely on age check
return False
except:
return True # If we can't check, assume it's stale
def cleanup(self):
"""Enhanced cleanup with better error handling."""
try:
if self.shared_memory.isAttached():
self.shared_memory.detach()
except Exception as e:
print(f"Warning: Could not detach shared memory: {e}")
try:
if self.lock_fd is not None:
if HAS_FCNTL: if HAS_FCNTL:
fcntl.lockf(self.lock_fd, fcntl.LOCK_UN) fcntl.lockf(self.lock_fd, fcntl.LOCK_UN)
os.close(self.lock_fd) os.close(self.lock_fd)
self.lock_fd = None
except Exception as e:
print(f"Warning: Could not close lock file descriptor: {e}")
try:
if self.lock_file and os.path.exists(self.lock_file):
os.unlink(self.lock_file) os.unlink(self.lock_file)
except: except Exception as e:
pass print(f"Warning: Could not remove lock file: {e}")
def force_cleanup(self):
"""Force cleanup of all locks (use when app crashed)."""
print("Force cleaning up application locks...")
self._force_cleanup_locks()
print("Force cleanup completed")
def setup_application(): def setup_application():
@ -125,6 +259,23 @@ def setup_application():
def main(): def main():
"""Main application entry point.""" """Main application entry point."""
# Check for command line arguments
if '--force-cleanup' in sys.argv or '--cleanup' in sys.argv:
print("Force cleanup mode enabled")
single_instance = SingleInstance()
single_instance.force_cleanup()
print("Cleanup completed. You can now start the application normally.")
sys.exit(0)
# Check for help argument
if '--help' in sys.argv or '-h' in sys.argv:
print("Cluster4NPU Application")
print("Usage: python main.py [options]")
print("Options:")
print(" --force-cleanup, --cleanup Force cleanup of stale application locks")
print(" --help, -h Show this help message")
sys.exit(0)
# Create a minimal QApplication first for the message box # Create a minimal QApplication first for the message box
temp_app = QApplication(sys.argv) if not QApplication.instance() else QApplication.instance() temp_app = QApplication(sys.argv) if not QApplication.instance() else QApplication.instance()
@ -132,12 +283,32 @@ def main():
single_instance = SingleInstance() single_instance = SingleInstance()
if single_instance.is_running(): if single_instance.is_running():
QMessageBox.warning( reply = QMessageBox.question(
None, None,
"Application Already Running", "Application Already Running",
"Cluster4NPU is already running. Please check your taskbar or system tray.", "Cluster4NPU is already running. \n\n"
"Would you like to:\n"
"• Click 'Yes' to force cleanup and restart\n"
"• Click 'No' to cancel startup",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
) )
sys.exit(0)
if reply == QMessageBox.Yes:
print("User requested force cleanup...")
single_instance.force_cleanup()
print("Cleanup completed, proceeding with startup...")
# Create a new instance checker after cleanup
single_instance = SingleInstance()
if single_instance.is_running():
QMessageBox.critical(
None,
"Cleanup Failed",
"Could not clean up the existing instance. Please restart your computer."
)
sys.exit(1)
else:
sys.exit(0)
try: try:
# Setup the full application # Setup the full application

View File

@ -1,193 +0,0 @@
import kp
from collections import defaultdict
from typing import Union
import os
import sys
import argparse
import time
import threading
import queue
import numpy as np
import cv2
# PWD = os.path.dirname(os.path.abspath(__file__))
# sys.path.insert(1, os.path.join(PWD, '..'))
IMAGE_FILE_PATH = r"c:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\images\people_talk_in_street_640x640.bmp"
LOOP_TIME = 100
def _image_send_function(_device_group: kp.DeviceGroup,
_loop_time: int,
_generic_inference_input_descriptor: kp.GenericImageInferenceDescriptor,
_image: Union[bytes, np.ndarray],
_image_format: kp.ImageFormat) -> None:
for _loop in range(_loop_time):
try:
_generic_inference_input_descriptor.inference_number = _loop
_generic_inference_input_descriptor.input_node_image_list = [kp.GenericInputNodeImage(
image=_image,
image_format=_image_format,
resize_mode=kp.ResizeMode.KP_RESIZE_ENABLE,
padding_mode=kp.PaddingMode.KP_PADDING_CORNER,
normalize_mode=kp.NormalizeMode.KP_NORMALIZE_KNERON
)]
kp.inference.generic_image_inference_send(device_group=device_groups[1],
generic_inference_input_descriptor=_generic_inference_input_descriptor)
except kp.ApiKPException as exception:
print(' - Error: inference failed, error = {}'.format(exception))
exit(0)
def _result_receive_function(_device_group: kp.DeviceGroup,
_loop_time: int,
_result_queue: queue.Queue) -> None:
_generic_raw_result = None
for _loop in range(_loop_time):
try:
_generic_raw_result = kp.inference.generic_image_inference_receive(device_group=device_groups[1])
if _generic_raw_result.header.inference_number != _loop:
print(' - Error: incorrect inference_number {} at frame {}'.format(
_generic_raw_result.header.inference_number, _loop))
print('.', end='', flush=True)
except kp.ApiKPException as exception:
print(' - Error: inference failed, error = {}'.format(exception))
exit(0)
_result_queue.put(_generic_raw_result)
model_path = ["C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\models\\KL520\\yolov5-noupsample_w640h640_kn-model-zoo\\kl520_20005_yolov5-noupsample_w640h640.nef", r"C:\Users\mason\Downloads\kneron_plus_v3.1.2\kneron_plus\res\models\KL720\yolov5-noupsample_w640h640_kn-model-zoo\kl720_20005_yolov5-noupsample_w640h640.nef"]
SCPU_FW_PATH_520 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL520\\fw_scpu.bin"
NCPU_FW_PATH_520 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL520\\fw_ncpu.bin"
SCPU_FW_PATH_720 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL720\\fw_scpu.bin"
NCPU_FW_PATH_720 = "C:\\Users\\mason\\Downloads\\kneron_plus_v3.1.2\\kneron_plus\\res\\firmware\\KL720\\fw_ncpu.bin"
device_list = kp.core.scan_devices()
grouped_devices = defaultdict(list)
for device in device_list.device_descriptor_list:
grouped_devices[device.product_id].append(device.usb_port_id)
print(f"Found device groups: {dict(grouped_devices)}")
device_groups = []
for product_id, usb_port_id in grouped_devices.items():
try:
group = kp.core.connect_devices(usb_port_id)
device_groups.append(group)
print(f"Successfully connected to group for product ID {product_id} with ports{usb_port_id}")
except kp.ApiKPException as e:
print(f"Failed to connect to group for product ID {product_id}: {e}")
print(device_groups)
print('[Set Device Timeout]')
kp.core.set_timeout(device_group=device_groups[0], milliseconds=5000)
kp.core.set_timeout(device_group=device_groups[1], milliseconds=5000)
print(' - Success')
try:
print('[Upload Firmware]')
kp.core.load_firmware_from_file(device_group=device_groups[0],
scpu_fw_path=SCPU_FW_PATH_520,
ncpu_fw_path=NCPU_FW_PATH_520)
kp.core.load_firmware_from_file(device_group=device_groups[1],
scpu_fw_path=SCPU_FW_PATH_720,
ncpu_fw_path=NCPU_FW_PATH_720)
print(' - Success')
except kp.ApiKPException as exception:
print('Error: upload firmware failed, error = \'{}\''.format(str(exception)))
exit(0)
print('[Upload Model]')
model_nef_descriptors = []
# for group in device_groups:
model_nef_descriptor = kp.core.load_model_from_file(device_group=device_groups[0], file_path=model_path[0])
model_nef_descriptors.append(model_nef_descriptor)
model_nef_descriptor = kp.core.load_model_from_file(device_group=device_groups[1], file_path=model_path[1])
model_nef_descriptors.append(model_nef_descriptor)
print(' - Success')
"""
prepare the image
"""
print('[Read Image]')
img = cv2.imread(filename=IMAGE_FILE_PATH)
img_bgr565 = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2BGR565)
print(' - Success')
"""
prepare generic image inference input descriptor
"""
print(model_nef_descriptors)
generic_inference_input_descriptor = kp.GenericImageInferenceDescriptor(
model_id=model_nef_descriptors[1].models[0].id,
)
"""
starting inference work
"""
print('[Starting Inference Work]')
print(' - Starting inference loop {} times'.format(LOOP_TIME))
print(' - ', end='')
result_queue = queue.Queue()
send_thread = threading.Thread(target=_image_send_function, args=(device_groups[1],
LOOP_TIME,
generic_inference_input_descriptor,
img_bgr565,
kp.ImageFormat.KP_IMAGE_FORMAT_RGB565))
receive_thread = threading.Thread(target=_result_receive_function, args=(device_groups[1],
LOOP_TIME,
result_queue))
start_inference_time = time.time()
send_thread.start()
receive_thread.start()
try:
while send_thread.is_alive():
send_thread.join(1)
while receive_thread.is_alive():
receive_thread.join(1)
except (KeyboardInterrupt, SystemExit):
print('\n - Received keyboard interrupt, quitting threads.')
exit(0)
end_inference_time = time.time()
time_spent = end_inference_time - start_inference_time
try:
generic_raw_result = result_queue.get(timeout=3)
except Exception as exception:
print('Error: Result queue is empty !')
exit(0)
print()
print('[Result]')
print(" - Total inference {} images".format(LOOP_TIME))
print(" - Time spent: {:.2f} secs, FPS = {:.1f}".format(time_spent, LOOP_TIME / time_spent))
"""
retrieve inference node output
"""
print('[Retrieve Inference Node Output ]')
inf_node_output_list = []
for node_idx in range(generic_raw_result.header.num_output_node):
inference_float_node_output = kp.inference.generic_inference_retrieve_float_node(node_idx=node_idx,
generic_raw_result=generic_raw_result,
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW)
inf_node_output_list.append(inference_float_node_output)
print(' - Success')
print('[Result]')
print(inf_node_output_list)

37
simple_test.py Normal file
View File

@ -0,0 +1,37 @@
#!/usr/bin/env python3
"""
Simple test for port ID configuration
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from core.nodes.exact_nodes import ExactModelNode
def main():
print("Creating ExactModelNode...")
node = ExactModelNode()
print("Testing property options...")
if hasattr(node, '_property_options'):
port_props = [k for k in node._property_options.keys() if 'port_ids' in k]
print(f"Found port ID properties: {port_props}")
else:
print("No _property_options found")
print("Testing _build_multi_series_config method...")
if hasattr(node, '_build_multi_series_config'):
print("Method exists")
try:
config = node._build_multi_series_config()
print(f"Config result: {config}")
except Exception as e:
print(f"Error calling method: {e}")
else:
print("Method does not exist")
print("Test completed!")
if __name__ == "__main__":
main()

69
test_folder_selection.py Normal file
View File

@ -0,0 +1,69 @@
"""
Test tkinter folder selection functionality
"""
import sys
import os
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from utils.folder_dialog import select_folder, select_assets_folder
def test_basic_folder_selection():
"""Test basic folder selection"""
print("Testing basic folder selection...")
folder = select_folder("Select any folder for testing")
if folder:
print(f"Selected folder: {folder}")
print(f" Exists: {os.path.exists(folder)}")
print(f" Is directory: {os.path.isdir(folder)}")
return True
else:
print("No folder selected")
return False
def test_assets_folder_selection():
"""Test Assets folder selection with validation"""
print("\nTesting Assets folder selection...")
result = select_assets_folder()
print(f"Selected path: {result['path']}")
print(f"Valid: {result['valid']}")
print(f"Message: {result['message']}")
if 'details' in result:
details = result['details']
print(f"Details:")
print(f" Has Firmware folder: {details.get('has_firmware_folder', False)}")
print(f" Has Models folder: {details.get('has_models_folder', False)}")
print(f" Firmware series: {details.get('firmware_series', [])}")
print(f" Models series: {details.get('models_series', [])}")
print(f" Available series: {details.get('available_series', [])}")
print(f" Series with files: {details.get('series_with_files', [])}")
return result['valid']
if __name__ == "__main__":
print("Testing Folder Selection Dialog")
print("=" * 40)
# Test basic functionality
basic_works = test_basic_folder_selection()
# Test Assets folder functionality
assets_works = test_assets_folder_selection()
print("\n" + "=" * 40)
print("Test Results:")
print(f"Basic folder selection: {'PASS' if basic_works else 'FAIL'}")
print(f"Assets folder selection: {'PASS' if assets_works else 'FAIL'}")
if basic_works:
print("\ntkinter folder selection is working!")
print("You can now use this in your ExactModelNode.")
else:
print("\ntkinter might not be available or there's an issue.")
print("Consider using PyQt5 QFileDialog as fallback.")

134
test_multi_series_fix.py Normal file
View File

@ -0,0 +1,134 @@
#!/usr/bin/env python3
"""
Test script to verify multi-series configuration fix
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# Test the mflow_converter functionality
def test_multi_series_config_building():
"""Test building multi-series config from properties"""
print("Testing multi-series config building...")
from core.functions.mflow_converter import MFlowConverter
# Create converter instance
converter = MFlowConverter(default_fw_path='.')
# Mock properties data that would come from a node
test_properties = {
'multi_series_mode': True,
'enabled_series': ['520', '720'],
'kl520_port_ids': '28,32',
'kl720_port_ids': '4',
'assets_folder': '', # Empty for this test
'max_queue_size': 100
}
# Test building config
config = converter._build_multi_series_config_from_properties(test_properties)
print(f"Generated config: {config}")
if config:
# Verify structure
assert 'KL520' in config, "KL520 should be in config"
assert 'KL720' in config, "KL720 should be in config"
# Check KL520 config
kl520_config = config['KL520']
assert 'port_ids' in kl520_config, "KL520 should have port_ids"
assert kl520_config['port_ids'] == [28, 32], f"KL520 port_ids should be [28, 32], got {kl520_config['port_ids']}"
# Check KL720 config
kl720_config = config['KL720']
assert 'port_ids' in kl720_config, "KL720 should have port_ids"
assert kl720_config['port_ids'] == [4], f"KL720 port_ids should be [4], got {kl720_config['port_ids']}"
print("[OK] Multi-series config structure is correct")
else:
print("[ERROR] Config building returned None")
return False
# Test with invalid port IDs
invalid_properties = {
'multi_series_mode': True,
'enabled_series': ['520'],
'kl520_port_ids': 'invalid,port,ids',
'assets_folder': ''
}
invalid_config = converter._build_multi_series_config_from_properties(invalid_properties)
assert invalid_config is None, "Invalid port IDs should result in None config"
print("[OK] Invalid port IDs handled correctly")
return True
def test_stage_config():
"""Test StageConfig with multi-series support"""
print("\\nTesting StageConfig with multi-series...")
from core.functions.InferencePipeline import StageConfig
# Test creating StageConfig with multi-series
multi_series_config = {
"KL520": {"port_ids": [28, 32]},
"KL720": {"port_ids": [4]}
}
stage_config = StageConfig(
stage_id="test_stage",
port_ids=[], # Not used in multi-series mode
scpu_fw_path='',
ncpu_fw_path='',
model_path='',
upload_fw=False,
multi_series_mode=True,
multi_series_config=multi_series_config
)
print(f"Created StageConfig with multi_series_mode: {stage_config.multi_series_mode}")
print(f"Multi-series config: {stage_config.multi_series_config}")
assert stage_config.multi_series_mode == True, "multi_series_mode should be True"
assert stage_config.multi_series_config == multi_series_config, "multi_series_config should match"
print("[OK] StageConfig supports multi-series configuration")
return True
def main():
"""Run all tests"""
print("Testing Multi-Series Configuration Fix")
print("=" * 50)
try:
# Test config building
if not test_multi_series_config_building():
print("[ERROR] Config building test failed")
return False
# Test StageConfig
if not test_stage_config():
print("[ERROR] StageConfig test failed")
return False
print("\\n" + "=" * 50)
print("[SUCCESS] All tests passed!")
print("\\nThe fix should now properly:")
print("1. Detect multi_series_mode from node properties")
print("2. Build multi_series_config from series-specific port IDs")
print("3. Pass the config to MultiDongle for true multi-series operation")
return True
except Exception as e:
print(f"[ERROR] Test failed with exception: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

View File

@ -0,0 +1,203 @@
"""
Final Integration Test for Multi-Series Multidongle
Comprehensive test suite for the completed multi-series integration
"""
import unittest
import sys
import os
# Add project root to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'core', 'functions'))
from Multidongle import MultiDongle, DongleSeriesSpec
class TestMultiSeriesIntegration(unittest.TestCase):
def setUp(self):
"""Set up test fixtures"""
self.multi_series_config = {
"KL520": {
"port_ids": [28, 32],
"model_path": "/path/to/kl520_model.nef",
"firmware_paths": {
"scpu": "/path/to/kl520_scpu.bin",
"ncpu": "/path/to/kl520_ncpu.bin"
}
},
"KL720": {
"port_ids": [40, 44],
"model_path": "/path/to/kl720_model.nef",
"firmware_paths": {
"scpu": "/path/to/kl720_scpu.bin",
"ncpu": "/path/to/kl720_ncpu.bin"
}
}
}
def test_multi_series_initialization_success(self):
"""Test that multi-series initialization works correctly"""
multidongle = MultiDongle(multi_series_config=self.multi_series_config)
# Should be in multi-series mode
self.assertTrue(multidongle.multi_series_mode)
# Should have series groups configured
self.assertIsNotNone(multidongle.series_groups)
self.assertIn("KL520", multidongle.series_groups)
self.assertIn("KL720", multidongle.series_groups)
# Should have correct configuration for each series
kl520_config = multidongle.series_groups["KL520"]
self.assertEqual(kl520_config["port_ids"], [28, 32])
self.assertEqual(kl520_config["model_path"], "/path/to/kl520_model.nef")
kl720_config = multidongle.series_groups["KL720"]
self.assertEqual(kl720_config["port_ids"], [40, 44])
self.assertEqual(kl720_config["model_path"], "/path/to/kl720_model.nef")
# Should have GOPS weights calculated
self.assertIsNotNone(multidongle.gops_weights)
self.assertIn("KL520", multidongle.gops_weights)
self.assertIn("KL720", multidongle.gops_weights)
# KL720 should have higher weight due to higher GOPS (28 vs 3 GOPS)
# But since both have 2 devices: KL520=3*2=6 total GOPS, KL720=28*2=56 total GOPS
# Total = 62 GOPS, so KL520 weight = 6/62 ≈ 0.097, KL720 weight = 56/62 ≈ 0.903
self.assertGreater(multidongle.gops_weights["KL720"],
multidongle.gops_weights["KL720"])
# Weights should sum to 1.0
total_weight = sum(multidongle.gops_weights.values())
self.assertAlmostEqual(total_weight, 1.0, places=5)
print("Multi-series initialization test passed")
def test_single_series_to_multi_series_conversion_success(self):
"""Test that single-series config gets converted to multi-series internally"""
# Legacy single-series initialization
multidongle = MultiDongle(
port_id=[28, 32],
scpu_fw_path="/path/to/scpu.bin",
ncpu_fw_path="/path/to/ncpu.bin",
model_path="/path/to/model.nef",
upload_fw=True
)
# Should NOT be in explicit multi-series mode (legacy mode)
self.assertFalse(multidongle.multi_series_mode)
# But should internally convert to multi-series format
self.assertIsNotNone(multidongle.series_groups)
self.assertEqual(len(multidongle.series_groups), 1)
# Should auto-detect series (will be KL520 based on available devices or fallback)
series_keys = list(multidongle.series_groups.keys())
self.assertEqual(len(series_keys), 1)
detected_series = series_keys[0]
self.assertIn(detected_series, DongleSeriesSpec.SERIES_SPECS.keys())
# Should have correct port configuration
series_config = multidongle.series_groups[detected_series]
self.assertEqual(series_config["port_ids"], [28, 32])
self.assertEqual(series_config["model_path"], "/path/to/model.nef")
# Should have 100% weight since it's single series
self.assertEqual(multidongle.gops_weights[detected_series], 1.0)
print(f"Single-to-multi-series conversion test passed (detected: {detected_series})")
def test_load_balancing_success(self):
"""Test that load balancing works based on GOPS weights"""
multidongle = MultiDongle(multi_series_config=self.multi_series_config)
# Should have load balancing method
optimal_series = multidongle._select_optimal_series()
self.assertIsNotNone(optimal_series)
self.assertIn(optimal_series, ["KL520", "KL720"])
# With zero load, should select the series with highest weight (KL720)
self.assertEqual(optimal_series, "KL720")
# Test load balancing under different conditions
# Simulate high load on KL720
multidongle.current_loads["KL720"] = 100
multidongle.current_loads["KL520"] = 0
# Now should prefer KL520 despite lower GOPS due to lower load
optimal_series_with_load = multidongle._select_optimal_series()
self.assertEqual(optimal_series_with_load, "KL520")
print("Load balancing test passed")
def test_backward_compatibility_maintained(self):
"""Test that existing single-series API still works perfectly"""
# This should work exactly as before
multidongle = MultiDongle(
port_id=[28, 32],
scpu_fw_path="/path/to/scpu.bin",
ncpu_fw_path="/path/to/ncpu.bin",
model_path="/path/to/model.nef"
)
# Legacy properties should still exist and work
self.assertIsNotNone(multidongle.port_id)
self.assertEqual(multidongle.port_id, [28, 32])
self.assertEqual(multidongle.model_path, "/path/to/model.nef")
self.assertEqual(multidongle.scpu_fw_path, "/path/to/scpu.bin")
self.assertEqual(multidongle.ncpu_fw_path, "/path/to/ncpu.bin")
# Legacy attributes should be available
self.assertIsNotNone(multidongle.device_group) # Will be None initially
self.assertIsNotNone(multidongle._input_queue)
self.assertIsNotNone(multidongle._output_queue)
print("Backward compatibility test passed")
def test_series_specs_are_correct(self):
"""Test that series specifications match expected values"""
specs = DongleSeriesSpec.SERIES_SPECS
# Check that all expected series are present
expected_series = ["KL520", "KL720", "KL630", "KL730", "KL540"]
for series in expected_series:
self.assertIn(series, specs)
# Check GOPS values are reasonable
self.assertEqual(specs["KL520"]["gops"], 3)
self.assertEqual(specs["KL720"]["gops"], 28)
self.assertEqual(specs["KL630"]["gops"], 400)
self.assertEqual(specs["KL730"]["gops"], 1600)
self.assertEqual(specs["KL540"]["gops"], 800)
print("Series specifications test passed")
def test_edge_cases(self):
"""Test various edge cases and error handling"""
# Test with empty port list (single-series)
multidongle_empty = MultiDongle(port_id=[])
self.assertEqual(len(multidongle_empty.series_groups), 0)
# Test with unknown series (should raise error)
with self.assertRaises(ValueError):
MultiDongle(multi_series_config={"UNKNOWN_SERIES": {"port_ids": [1, 2]}})
# Test with no port IDs in multi-series config
config_no_ports = {
"KL520": {
"port_ids": [],
"model_path": "/path/to/model.nef"
}
}
multidongle_no_ports = MultiDongle(multi_series_config=config_no_ports)
self.assertEqual(multidongle_no_ports.gops_weights["KL520"], 0.0) # 0 weight due to no devices
print("Edge cases test passed")
if __name__ == '__main__':
print("Running Multi-Series Integration Tests")
print("=" * 50)
unittest.main(verbosity=2)

View File

@ -0,0 +1,170 @@
"""
Test Multi-Series Integration for Multidongle
Testing the integration of multi-series functionality into the existing Multidongle class
following TDD principles.
"""
import unittest
import sys
import os
from unittest.mock import Mock, patch, MagicMock
# Add project root to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'core', 'functions'))
from Multidongle import MultiDongle
class TestMultiSeriesMultidongle(unittest.TestCase):
def setUp(self):
"""Set up test fixtures"""
self.multi_series_config = {
"KL520": {
"port_ids": [28, 32],
"model_path": "/path/to/kl520_model.nef",
"firmware_paths": {
"scpu": "/path/to/kl520_scpu.bin",
"ncpu": "/path/to/kl520_ncpu.bin"
}
},
"KL720": {
"port_ids": [40, 44],
"model_path": "/path/to/kl720_model.nef",
"firmware_paths": {
"scpu": "/path/to/kl720_scpu.bin",
"ncpu": "/path/to/kl720_ncpu.bin"
}
}
}
def test_multi_series_initialization_should_fail(self):
"""
Test that multi-series initialization accepts config and sets up series groups
This should FAIL initially since the functionality doesn't exist yet
"""
# This should work but will fail initially
try:
multidongle = MultiDongle(multi_series_config=self.multi_series_config)
# Should have series groups configured
self.assertIsNotNone(multidongle.series_groups)
self.assertIn("KL520", multidongle.series_groups)
self.assertIn("KL720", multidongle.series_groups)
# Should have GOPS weights calculated
self.assertIsNotNone(multidongle.gops_weights)
self.assertIn("KL520", multidongle.gops_weights)
self.assertIn("KL720", multidongle.gops_weights)
# KL720 should have higher weight due to higher GOPS
self.assertGreater(multidongle.gops_weights["KL720"],
multidongle.gops_weights["KL520"])
self.fail("Multi-series initialization should not work yet - test should fail")
except (AttributeError, TypeError) as e:
# Expected to fail at this stage
print(f"Expected failure: {e}")
self.assertTrue(True, "Multi-series initialization correctly fails (not implemented yet)")
def test_single_series_to_multi_series_conversion_should_fail(self):
"""
Test that single-series config gets converted to multi-series internally
This should FAIL initially
"""
try:
# Legacy single-series initialization
multidongle = MultiDongle(
port_id=[28, 32],
scpu_fw_path="/path/to/scpu.bin",
ncpu_fw_path="/path/to/ncpu.bin",
model_path="/path/to/model.nef",
upload_fw=True
)
# Should internally convert to multi-series format
self.assertIsNotNone(multidongle.series_groups)
self.assertEqual(len(multidongle.series_groups), 1)
# Should auto-detect series from device scan or use default
series_keys = list(multidongle.series_groups.keys())
self.assertEqual(len(series_keys), 1)
self.fail("Single to multi-series conversion should not work yet")
except (AttributeError, TypeError) as e:
# Expected to fail at this stage
print(f"Expected failure: {e}")
self.assertTrue(True, "Single-series conversion correctly fails (not implemented yet)")
def test_load_balancing_should_fail(self):
"""
Test that load balancing works based on GOPS weights
This should FAIL initially
"""
try:
multidongle = MultiDongle(multi_series_config=self.multi_series_config)
# Should have load balancing method
optimal_series = multidongle._select_optimal_series()
self.assertIsNotNone(optimal_series)
self.assertIn(optimal_series, ["KL520", "KL720"])
self.fail("Load balancing should not work yet")
except (AttributeError, TypeError) as e:
# Expected to fail at this stage
print(f"Expected failure: {e}")
self.assertTrue(True, "Load balancing correctly fails (not implemented yet)")
def test_backward_compatibility_should_work(self):
"""
Test that existing single-series API still works
This should PASS (existing functionality)
"""
# This should still work with existing code
try:
multidongle = MultiDongle(
port_id=[28, 32],
scpu_fw_path="/path/to/scpu.bin",
ncpu_fw_path="/path/to/ncpu.bin",
model_path="/path/to/model.nef"
)
# Basic properties should still exist
self.assertIsNotNone(multidongle.port_id)
self.assertEqual(multidongle.port_id, [28, 32])
self.assertEqual(multidongle.model_path, "/path/to/model.nef")
print("Backward compatibility test passed")
except Exception as e:
self.fail(f"Backward compatibility should work: {e}")
def test_multi_series_device_grouping_should_fail(self):
"""
Test that devices are properly grouped by series
This should FAIL initially
"""
try:
multidongle = MultiDongle(multi_series_config=self.multi_series_config)
multidongle.initialize()
# Should have device groups for each series
self.assertIsNotNone(multidongle.device_groups)
self.assertEqual(len(multidongle.device_groups), 2)
# Each series should have its device group
for series_name, config in self.multi_series_config.items():
self.assertIn(series_name, multidongle.device_groups)
self.fail("Multi-series device grouping should not work yet")
except (AttributeError, TypeError) as e:
# Expected to fail
print(f"Expected failure: {e}")
self.assertTrue(True, "Device grouping correctly fails (not implemented yet)")
if __name__ == '__main__':
unittest.main()

46
test_multidongle_start.py Normal file
View File

@ -0,0 +1,46 @@
#!/usr/bin/env python3
"""
Test MultiDongle start/stop functionality
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
def test_multidongle_start():
"""Test MultiDongle start method"""
try:
from core.functions.Multidongle import MultiDongle
# Test multi-series configuration
multi_series_config = {
"KL520": {"port_ids": [28, 32]},
"KL720": {"port_ids": [4]}
}
print("Creating MultiDongle with multi-series config...")
multidongle = MultiDongle(multi_series_config=multi_series_config)
print(f"Multi-series mode: {multidongle.multi_series_mode}")
print(f"Has _start_multi_series method: {hasattr(multidongle, '_start_multi_series')}")
print(f"Has _stop_multi_series method: {hasattr(multidongle, '_stop_multi_series')}")
print("MultiDongle created successfully!")
# Test that the required attributes exist
expected_attrs = ['send_threads', 'receive_threads', 'dispatcher_thread', 'result_ordering_thread']
for attr in expected_attrs:
if hasattr(multidongle, attr):
print(f"[OK] Has attribute: {attr}")
else:
print(f"[ERROR] Missing attribute: {attr}")
print("Test completed successfully!")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
test_multidongle_start()

201
test_port_id_config.py Normal file
View File

@ -0,0 +1,201 @@
#!/usr/bin/env python3
"""
Test script for new series-specific port ID configuration functionality
"""
import sys
import os
# Add the project root to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
try:
from core.nodes.exact_nodes import ExactModelNode
print("[OK] Successfully imported ExactModelNode")
except ImportError as e:
print(f"[ERROR] Failed to import ExactModelNode: {e}")
sys.exit(1)
def test_port_id_properties():
"""Test that new port ID properties are created correctly"""
print("\n=== Testing Port ID Properties Creation ===")
try:
node = ExactModelNode()
# Test that all series port ID properties exist
series_properties = ['kl520_port_ids', 'kl720_port_ids', 'kl630_port_ids', 'kl730_port_ids', 'kl540_port_ids']
for prop in series_properties:
if hasattr(node, 'get_property'):
try:
value = node.get_property(prop)
print(f"[OK] Property {prop} exists with value: '{value}'")
except:
print(f"[ERROR] Property {prop} does not exist or cannot be accessed")
else:
print(f"[WARN] Node does not have get_property method (NodeGraphQt not available)")
break
# Test property options
if hasattr(node, '_property_options'):
for prop in series_properties:
if prop in node._property_options:
options = node._property_options[prop]
print(f"[OK] Property options for {prop}: {options}")
else:
print(f"[ERROR] No property options found for {prop}")
else:
print("[WARN] Node does not have _property_options")
except Exception as e:
print(f"[ERROR] Error testing port ID properties: {e}")
def test_display_properties():
"""Test that display properties work correctly"""
print("\n=== Testing Display Properties ===")
try:
node = ExactModelNode()
if not hasattr(node, 'get_display_properties'):
print("[WARN] Node does not have get_display_properties method (NodeGraphQt not available)")
return
# Test single-series mode
if hasattr(node, 'set_property'):
node.set_property('multi_series_mode', False)
single_props = node.get_display_properties()
print(f"[OK] Single-series display properties: {single_props}")
# Test multi-series mode
node.set_property('multi_series_mode', True)
node.set_property('enabled_series', ['520', '720'])
multi_props = node.get_display_properties()
print(f"[OK] Multi-series display properties: {multi_props}")
# Check if port ID properties are included
expected_port_props = ['kl520_port_ids', 'kl720_port_ids']
found_port_props = [prop for prop in multi_props if prop in expected_port_props]
print(f"[OK] Found port ID properties in display: {found_port_props}")
# Test with different enabled series
node.set_property('enabled_series', ['630', '730'])
multi_props_2 = node.get_display_properties()
print(f"[OK] Display properties with KL630/730: {multi_props_2}")
else:
print("[WARN] Node does not have set_property method (NodeGraphQt not available)")
except Exception as e:
print(f"[ERROR] Error testing display properties: {e}")
def test_multi_series_config():
"""Test multi-series configuration building"""
print("\n=== Testing Multi-Series Config Building ===")
try:
node = ExactModelNode()
if not hasattr(node, '_build_multi_series_config'):
print("[ERROR] Node does not have _build_multi_series_config method")
return
if not hasattr(node, 'set_property'):
print("[WARN] Node does not have set_property method (NodeGraphQt not available)")
return
# Test with sample configuration
node.set_property('enabled_series', ['520', '720'])
node.set_property('kl520_port_ids', '28,32')
node.set_property('kl720_port_ids', '30,34')
node.set_property('assets_folder', '/fake/assets/path')
# Build multi-series config
config = node._build_multi_series_config()
print(f"[OK] Generated multi-series config: {config}")
# Verify structure
if config:
expected_keys = ['KL520', 'KL720']
for key in expected_keys:
if key in config:
series_config = config[key]
print(f"[OK] {key} config: {series_config}")
if 'port_ids' in series_config:
print(f" - Port IDs: {series_config['port_ids']}")
else:
print(f" [ERROR] Missing port_ids in {key} config")
else:
print(f"[ERROR] Missing {key} in config")
else:
print("[ERROR] Generated config is None or empty")
# Test with invalid port IDs
node.set_property('kl520_port_ids', 'invalid,port,ids')
config_invalid = node._build_multi_series_config()
print(f"[OK] Config with invalid port IDs: {config_invalid}")
except Exception as e:
print(f"[ERROR] Error testing multi-series config: {e}")
def test_inference_config():
"""Test inference configuration"""
print("\n=== Testing Inference Config ===")
try:
node = ExactModelNode()
if not hasattr(node, 'get_inference_config'):
print("[ERROR] Node does not have get_inference_config method")
return
if not hasattr(node, 'set_property'):
print("[WARN] Node does not have set_property method (NodeGraphQt not available)")
return
# Test multi-series inference config
node.set_property('multi_series_mode', True)
node.set_property('enabled_series', ['520', '720'])
node.set_property('kl520_port_ids', '28,32')
node.set_property('kl720_port_ids', '30,34')
node.set_property('assets_folder', '/fake/assets')
node.set_property('max_queue_size', 50)
inference_config = node.get_inference_config()
print(f"[OK] Inference config: {inference_config}")
# Check if multi_series_config is included
if 'multi_series_config' in inference_config:
ms_config = inference_config['multi_series_config']
print(f"[OK] Multi-series config included: {ms_config}")
else:
print("[WARN] Multi-series config not found in inference config")
# Test single-series mode
node.set_property('multi_series_mode', False)
node.set_property('model_path', '/fake/model.nef')
node.set_property('port_id', '28')
single_config = node.get_inference_config()
print(f"[OK] Single-series config: {single_config}")
except Exception as e:
print(f"[ERROR] Error testing inference config: {e}")
def main():
"""Run all tests"""
print("Testing Series-Specific Port ID Configuration")
print("=" * 50)
test_port_id_properties()
test_display_properties()
test_multi_series_config()
test_inference_config()
print("\n" + "=" * 50)
print("Test completed!")
if __name__ == "__main__":
main()

View File

@ -79,8 +79,10 @@ class StdoutCapture:
def write(self, text): def write(self, text):
# Write to original stdout/stderr (so it still appears in terminal) # Write to original stdout/stderr (so it still appears in terminal)
self.original.write(text) # Check if original exists (it might be None in PyInstaller builds)
self.original.flush() if self.original is not None:
self.original.write(text)
self.original.flush()
# Capture for GUI if it's a substantial message and not already emitting # Capture for GUI if it's a substantial message and not already emitting
if text.strip() and not self._emitting: if text.strip() and not self._emitting:
@ -91,7 +93,9 @@ class StdoutCapture:
self._emitting = False self._emitting = False
def flush(self): def flush(self):
self.original.flush() # Check if original exists before calling flush
if self.original is not None:
self.original.flush()
# Replace stdout and stderr with our tee writers # Replace stdout and stderr with our tee writers
sys.stdout = TeeWriter(self.original_stdout, self.captured_output, self.signal_emitter) sys.stdout = TeeWriter(self.original_stdout, self.captured_output, self.signal_emitter)
@ -622,10 +626,32 @@ Stage Configurations:
for i, stage_config in enumerate(config.stage_configs, 1): for i, stage_config in enumerate(config.stage_configs, 1):
analysis_text += f"\nStage {i}: {stage_config.stage_id}\n" analysis_text += f"\nStage {i}: {stage_config.stage_id}\n"
analysis_text += f" Port IDs: {stage_config.port_ids}\n"
analysis_text += f" Model Path: {stage_config.model_path}\n" # Check if this is multi-series configuration
analysis_text += f" SCPU Firmware: {stage_config.scpu_fw_path}\n" if stage_config.multi_series_config:
analysis_text += f" NCPU Firmware: {stage_config.ncpu_fw_path}\n" analysis_text += f" Mode: Multi-Series\n"
analysis_text += f" Series Configured: {list(stage_config.multi_series_config.keys())}\n"
# Show details for each series
for series_name, series_config in stage_config.multi_series_config.items():
analysis_text += f" \n {series_name} Configuration:\n"
analysis_text += f" Port IDs: {series_config.get('port_ids', [])}\n"
model_path = series_config.get('model_path', 'Not specified')
analysis_text += f" Model: {model_path}\n"
firmware_paths = series_config.get('firmware_paths', {})
if firmware_paths:
analysis_text += f" SCPU Firmware: {firmware_paths.get('scpu', 'Not specified')}\n"
analysis_text += f" NCPU Firmware: {firmware_paths.get('ncpu', 'Not specified')}\n"
else:
analysis_text += f" Firmware: Not specified\n"
else:
# Single-series (legacy) configuration
analysis_text += f" Mode: Single-Series\n"
analysis_text += f" Port IDs: {stage_config.port_ids}\n"
analysis_text += f" Model Path: {stage_config.model_path}\n"
analysis_text += f" SCPU Firmware: {stage_config.scpu_fw_path}\n"
analysis_text += f" NCPU Firmware: {stage_config.ncpu_fw_path}\n"
analysis_text += f" Upload Firmware: {stage_config.upload_fw}\n" analysis_text += f" Upload Firmware: {stage_config.upload_fw}\n"
analysis_text += f" Max Queue Size: {stage_config.max_queue_size}\n" analysis_text += f" Max Queue Size: {stage_config.max_queue_size}\n"
@ -663,23 +689,66 @@ Stage Configurations:
stage_group = QGroupBox(f"Stage {i}: {stage_config.stage_id}") stage_group = QGroupBox(f"Stage {i}: {stage_config.stage_id}")
stage_layout = QFormLayout(stage_group) stage_layout = QFormLayout(stage_group)
# Create read-only fields for stage configuration # Check if this is multi-series configuration
model_path_edit = QLineEdit(stage_config.model_path) if stage_config.multi_series_config:
model_path_edit.setReadOnly(True) # Multi-series configuration display
stage_layout.addRow("Model Path:", model_path_edit) mode_edit = QLineEdit("Multi-Series")
mode_edit.setReadOnly(True)
stage_layout.addRow("Mode:", mode_edit)
scpu_fw_edit = QLineEdit(stage_config.scpu_fw_path) series_edit = QLineEdit(str(list(stage_config.multi_series_config.keys())))
scpu_fw_edit.setReadOnly(True) series_edit.setReadOnly(True)
stage_layout.addRow("SCPU Firmware:", scpu_fw_edit) stage_layout.addRow("Series:", series_edit)
ncpu_fw_edit = QLineEdit(stage_config.ncpu_fw_path) # Show details for each series
ncpu_fw_edit.setReadOnly(True) for series_name, series_config in stage_config.multi_series_config.items():
stage_layout.addRow("NCPU Firmware:", ncpu_fw_edit) series_label = QLabel(f"--- {series_name} ---")
series_label.setStyleSheet("font-weight: bold; color: #89b4fa;")
stage_layout.addRow(series_label)
port_ids_edit = QLineEdit(str(stage_config.port_ids)) port_ids_edit = QLineEdit(str(series_config.get('port_ids', [])))
port_ids_edit.setReadOnly(True) port_ids_edit.setReadOnly(True)
stage_layout.addRow("Port IDs:", port_ids_edit) stage_layout.addRow(f"{series_name} Port IDs:", port_ids_edit)
model_path = series_config.get('model_path', 'Not specified')
model_path_edit = QLineEdit(model_path)
model_path_edit.setReadOnly(True)
stage_layout.addRow(f"{series_name} Model:", model_path_edit)
firmware_paths = series_config.get('firmware_paths', {})
if firmware_paths:
scpu_path = firmware_paths.get('scpu', 'Not specified')
scpu_fw_edit = QLineEdit(scpu_path)
scpu_fw_edit.setReadOnly(True)
stage_layout.addRow(f"{series_name} SCPU FW:", scpu_fw_edit)
ncpu_path = firmware_paths.get('ncpu', 'Not specified')
ncpu_fw_edit = QLineEdit(ncpu_path)
ncpu_fw_edit.setReadOnly(True)
stage_layout.addRow(f"{series_name} NCPU FW:", ncpu_fw_edit)
else:
# Single-series configuration display
mode_edit = QLineEdit("Single-Series")
mode_edit.setReadOnly(True)
stage_layout.addRow("Mode:", mode_edit)
model_path_edit = QLineEdit(stage_config.model_path)
model_path_edit.setReadOnly(True)
stage_layout.addRow("Model Path:", model_path_edit)
scpu_fw_edit = QLineEdit(stage_config.scpu_fw_path)
scpu_fw_edit.setReadOnly(True)
stage_layout.addRow("SCPU Firmware:", scpu_fw_edit)
ncpu_fw_edit = QLineEdit(stage_config.ncpu_fw_path)
ncpu_fw_edit.setReadOnly(True)
stage_layout.addRow("NCPU Firmware:", ncpu_fw_edit)
port_ids_edit = QLineEdit(str(stage_config.port_ids))
port_ids_edit.setReadOnly(True)
stage_layout.addRow("Port IDs:", port_ids_edit)
# Common fields
queue_size_spin = QSpinBox() queue_size_spin = QSpinBox()
queue_size_spin.setValue(stage_config.max_queue_size) queue_size_spin.setValue(stage_config.max_queue_size)
queue_size_spin.setReadOnly(True) queue_size_spin.setReadOnly(True)

View File

@ -43,6 +43,7 @@ except ImportError:
from config.theme import HARMONIOUS_THEME_STYLESHEET from config.theme import HARMONIOUS_THEME_STYLESHEET
from config.settings import get_settings from config.settings import get_settings
from utils.folder_dialog import select_assets_folder
try: try:
from core.nodes import ( from core.nodes import (
InputNode, ModelNode, PreprocessNode, PostprocessNode, OutputNode, InputNode, ModelNode, PreprocessNode, PostprocessNode, OutputNode,
@ -1323,8 +1324,74 @@ class IntegratedPipelineDashboard(QMainWindow):
if hasattr(node, '_property_options') and prop_name in node._property_options: if hasattr(node, '_property_options') and prop_name in node._property_options:
prop_options = node._property_options[prop_name] prop_options = node._property_options[prop_name]
# Check for file path properties first (from prop_options or name pattern) # Special handling for assets_folder property
if (prop_options and isinstance(prop_options, dict) and prop_options.get('type') == 'file_path') or \ if prop_name == 'assets_folder':
# Assets folder property with validation and improved dialog
display_text = self.truncate_path_smart(str(prop_value)) if prop_value else 'Select Assets Folder...'
widget = QPushButton(display_text)
# Set fixed width and styling to prevent expansion
widget.setMaximumWidth(250)
widget.setMinimumWidth(200)
widget.setStyleSheet("""
QPushButton {
text-align: left;
padding: 5px 8px;
background-color: #45475a;
color: #cdd6f4;
border: 1px solid #585b70;
border-radius: 4px;
font-size: 10px;
}
QPushButton:hover {
background-color: #585b70;
border-color: #a6e3a1;
}
QPushButton:pressed {
background-color: #313244;
}
""")
# Store full path for tooltip and internal use
full_path = str(prop_value) if prop_value else ''
widget.setToolTip(f"Full path: {full_path}\n\nClick to browse for Assets folder\n(Should contain Firmware/ and Models/ subfolders)")
def browse_assets_folder():
# Use the specialized assets folder dialog with validation
result = select_assets_folder(initial_dir=full_path or '')
if result['path']:
# Update button text with truncated path
truncated_text = self.truncate_path_smart(result['path'])
widget.setText(truncated_text)
# Create detailed tooltip with validation results
tooltip_lines = [f"Full path: {result['path']}"]
if result['valid']:
tooltip_lines.append("✓ Valid Assets folder structure detected")
if 'details' in result and 'available_series' in result['details']:
series = result['details']['available_series']
tooltip_lines.append(f"Available series: {', '.join(series)}")
else:
tooltip_lines.append(f"{result['message']}")
tooltip_lines.append("\nClick to browse for Assets folder")
widget.setToolTip('\n'.join(tooltip_lines))
# Set property with full path
if hasattr(node, 'set_property'):
node.set_property(prop_name, result['path'])
# Show validation message to user
if not result['valid']:
QMessageBox.warning(self, "Assets Folder Validation",
f"Selected folder may not have the expected structure:\n\n{result['message']}\n\n"
"Expected structure:\nAssets/\n├── Firmware/\n│ └── KL520/, KL720/, etc.\n└── Models/\n └── KL520/, KL720/, etc.")
widget.clicked.connect(browse_assets_folder)
# Check for file path properties (from prop_options or name pattern)
elif (prop_options and isinstance(prop_options, dict) and prop_options.get('type') == 'file_path') or \
prop_name in ['model_path', 'source_path', 'destination']: prop_name in ['model_path', 'source_path', 'destination']:
# File path property with smart truncation and width limits # File path property with smart truncation and width limits
display_text = self.truncate_path_smart(str(prop_value)) if prop_value else 'Select File...' display_text = self.truncate_path_smart(str(prop_value)) if prop_value else 'Select File...'

View File

@ -21,8 +21,12 @@ Usage:
# Import utilities as they are implemented # Import utilities as they are implemented
# from . import file_utils # from . import file_utils
# from . import ui_utils # from . import ui_utils
from .folder_dialog import select_folder, select_assets_folder, validate_assets_folder_structure
__all__ = [ __all__ = [
# "file_utils", # "file_utils",
# "ui_utils" # "ui_utils"
"select_folder",
"select_assets_folder",
"validate_assets_folder_structure"
] ]

252
utils/folder_dialog.py Normal file
View File

@ -0,0 +1,252 @@
"""
Folder selection utilities using PyQt5 as primary, tkinter as fallback
"""
import os
def select_folder(title="Select Folder", initial_dir="", must_exist=True):
"""
Open a folder selection dialog using PyQt5 (preferred) or tkinter (fallback)
Args:
title (str): Dialog window title
initial_dir (str): Initial directory to open
must_exist (bool): Whether the folder must already exist
Returns:
str: Selected folder path, or empty string if cancelled
"""
# Try PyQt5 first (more reliable on macOS)
try:
from PyQt5.QtWidgets import QApplication, QFileDialog
import sys
# Create QApplication if it doesn't exist
app = QApplication.instance()
if app is None:
app = QApplication(sys.argv)
# Set initial directory
if not initial_dir:
initial_dir = os.getcwd()
elif not os.path.exists(initial_dir):
initial_dir = os.getcwd()
# Open folder selection dialog
folder_path = QFileDialog.getExistingDirectory(
None,
title,
initial_dir,
QFileDialog.ShowDirsOnly | QFileDialog.DontResolveSymlinks
)
return folder_path if folder_path else ""
except ImportError:
print("PyQt5 not available, trying tkinter...")
# Fallback to tkinter
try:
import tkinter as tk
from tkinter import filedialog
# Create a root window but keep it hidden
root = tk.Tk()
root.withdraw() # Hide the main window
root.attributes('-topmost', True) # Bring dialog to front
# Set initial directory
if not initial_dir:
initial_dir = os.getcwd()
# Open folder selection dialog
folder_path = filedialog.askdirectory(
title=title,
initialdir=initial_dir,
mustexist=must_exist
)
# Destroy the root window
root.destroy()
return folder_path if folder_path else ""
except ImportError:
print("tkinter also not available")
return ""
except Exception as e:
print(f"Error opening tkinter folder dialog: {e}")
return ""
except Exception as e:
print(f"Error opening PyQt5 folder dialog: {e}")
return ""
def select_assets_folder(initial_dir=""):
"""
Specialized function for selecting Assets folder with validation
Args:
initial_dir (str): Initial directory to open
Returns:
dict: Result with 'path', 'valid', and 'message' keys
"""
folder_path = select_folder(
title="Select Assets Folder (containing Firmware/ and Models/)",
initial_dir=initial_dir
)
if not folder_path:
return {'path': '', 'valid': False, 'message': 'No folder selected'}
# Validate folder structure
validation_result = validate_assets_folder_structure(folder_path)
return {
'path': folder_path,
'valid': validation_result['valid'],
'message': validation_result['message'],
'details': validation_result.get('details', {})
}
def validate_assets_folder_structure(folder_path):
"""
Validate that a folder has the expected Assets structure
Expected structure:
Assets/
Firmware/
KL520/
fw_scpu.bin
fw_ncpu.bin
KL720/
fw_scpu.bin
fw_ncpu.bin
Models/
KL520/
model.nef
KL720/
model.nef
Args:
folder_path (str): Path to validate
Returns:
dict: Validation result with 'valid', 'message', and 'details' keys
"""
if not os.path.exists(folder_path):
return {'valid': False, 'message': 'Folder does not exist'}
if not os.path.isdir(folder_path):
return {'valid': False, 'message': 'Path is not a directory'}
details = {}
issues = []
# Check for Firmware and Models folders
firmware_path = os.path.join(folder_path, 'Firmware')
models_path = os.path.join(folder_path, 'Models')
has_firmware = os.path.exists(firmware_path) and os.path.isdir(firmware_path)
has_models = os.path.exists(models_path) and os.path.isdir(models_path)
details['has_firmware_folder'] = has_firmware
details['has_models_folder'] = has_models
if not has_firmware:
issues.append("Missing 'Firmware' folder")
if not has_models:
issues.append("Missing 'Models' folder")
if not (has_firmware and has_models):
return {
'valid': False,
'message': f"Invalid folder structure: {', '.join(issues)}",
'details': details
}
# Check for series subfolders
expected_series = ['KL520', 'KL720', 'KL630', 'KL730', 'KL540']
firmware_series = []
models_series = []
try:
firmware_dirs = [d for d in os.listdir(firmware_path)
if os.path.isdir(os.path.join(firmware_path, d))]
firmware_series = [d for d in firmware_dirs if d in expected_series]
models_dirs = [d for d in os.listdir(models_path)
if os.path.isdir(os.path.join(models_path, d))]
models_series = [d for d in models_dirs if d in expected_series]
except Exception as e:
return {
'valid': False,
'message': f"Error reading folder contents: {e}",
'details': details
}
details['firmware_series'] = firmware_series
details['models_series'] = models_series
# Find common series (have both firmware and models)
common_series = list(set(firmware_series) & set(models_series))
details['available_series'] = common_series
if not common_series:
return {
'valid': False,
'message': "No series found with both firmware and models folders",
'details': details
}
# Check for actual files in series folders
series_with_files = []
for series in common_series:
has_files = False
# Check firmware files
fw_series_path = os.path.join(firmware_path, series)
if os.path.exists(fw_series_path):
fw_files = [f for f in os.listdir(fw_series_path)
if f.endswith('.bin')]
if fw_files:
has_files = True
# Check model files
model_series_path = os.path.join(models_path, series)
if os.path.exists(model_series_path):
model_files = [f for f in os.listdir(model_series_path)
if f.endswith('.nef')]
if model_files and has_files:
series_with_files.append(series)
details['series_with_files'] = series_with_files
if not series_with_files:
return {
'valid': False,
'message': "No series found with actual firmware and model files",
'details': details
}
return {
'valid': True,
'message': f"Valid Assets folder with {len(series_with_files)} series: {', '.join(series_with_files)}",
'details': details
}
# Example usage
if __name__ == "__main__":
print("Testing folder selection...")
# Test basic folder selection
folder = select_folder("Select any folder")
print(f"Selected: {folder}")
# Test Assets folder selection with validation
result = select_assets_folder()
print(f"Assets folder result: {result}")

41
verify_properties.py Normal file
View File

@ -0,0 +1,41 @@
#!/usr/bin/env python3
"""
Verify that properties are correctly set for multi-series
"""
def verify_properties():
"""Check the expected multi-series properties"""
print("Multi-Series Configuration Checklist:")
print("=" * 50)
print("\n1. In your Dashboard, Model Node properties should have:")
print(" ✓ multi_series_mode = True")
print(" ✓ enabled_series = ['520', '720']")
print(" ✓ kl520_port_ids = '28,32'")
print(" ✓ kl720_port_ids = '4'")
print(" ✓ assets_folder = (optional, for auto model/firmware detection)")
print("\n2. After setting these properties, when you deploy:")
print(" Expected output should show:")
print(" '[stage_1_Model_Node] Using multi-series mode with config: ...'")
print(" NOT: 'Single-series config converted to multi-series format'")
print("\n3. If you still see single-series behavior:")
print(" a) Double-check property names (they should be lowercase)")
print(" b) Make sure multi_series_mode is checked/enabled")
print(" c) Verify port IDs are comma-separated strings")
print(" d) Save the .mflow file and re-deploy")
print("\n4. Property format reference:")
print(" - kl520_port_ids: '28,32' (string, comma-separated)")
print(" - kl720_port_ids: '4' (string)")
print(" - enabled_series: ['520', '720'] (list)")
print(" - multi_series_mode: True (boolean)")
print("\n" + "=" * 50)
print("If properties are set correctly, your deployment should use")
print("true multi-series load balancing across KL520 and KL720 dongles!")
if __name__ == "__main__":
verify_properties()