From 9020be5e7a64fd6b9a61ad65536d8d779396ec5d Mon Sep 17 00:00:00 2001 From: Masonmason Date: Wed, 16 Jul 2025 21:13:33 +0800 Subject: [PATCH] Add Kneron device auto-detection and connection features MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add scan_devices() method using kp.core.scan_devices() for device discovery - Add connect_auto_detected_devices() for automatic device connection - Add device series detection (KL520, KL720, KL630, KL730, KL540, etc.) - Add auto_detect parameter to MultiDongle constructor - Add get_device_info() and print_device_info() methods to display port IDs and series - Update connection logic to use kp.core.connect_devices() per official docs - Add device_detection_example.py with usage examples - Maintain backward compatibility with manual port specification Features display dongle series and port ID as requested for better device management. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- cluster4npu_ui/core/functions/Multidongle.py | 222 ++++++++++++++++++- cluster4npu_ui/device_detection_example.py | 131 +++++++++++ 2 files changed, 350 insertions(+), 3 deletions(-) create mode 100644 cluster4npu_ui/device_detection_example.py diff --git a/cluster4npu_ui/core/functions/Multidongle.py b/cluster4npu_ui/core/functions/Multidongle.py index bdacb7c..0c66606 100644 --- a/cluster4npu_ui/core/functions/Multidongle.py +++ b/cluster4npu_ui/core/functions/Multidongle.py @@ -78,16 +78,147 @@ class MultiDongle: # 'YCBCR422_Y0CBY1CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CBY1CR, } - def __init__(self, port_id: list, scpu_fw_path: str, ncpu_fw_path: str, model_path: str, upload_fw: bool = False): + @staticmethod + def scan_devices(): + """ + Scan for available Kneron devices and return their information. + + Returns: + List[Dict]: List of device information containing port_id, series, and device_descriptor + """ + try: + print('[Scanning Devices]') + device_descriptors = kp.core.scan_devices() + + if not device_descriptors: + print(' - No devices found') + return [] + + devices_info = [] + print(f' - Found {len(device_descriptors)} device(s):') + + for i, device_desc in enumerate(device_descriptors): + # Get device series (e.g., KL520, KL720, etc.) + series = MultiDongle._get_device_series(device_desc) + port_id = device_desc.port_id + + device_info = { + 'port_id': port_id, + 'series': series, + 'device_descriptor': device_desc + } + devices_info.append(device_info) + + print(f' [{i+1}] Port ID: {port_id}, Series: {series}') + + return devices_info + + except kp.ApiKPException as exception: + print(f'Error: scan devices fail, error msg: [{str(exception)}]') + return [] + + @staticmethod + def _get_device_series(device_descriptor): + """ + Extract device series from device descriptor. + + Args: + device_descriptor: Device descriptor from scan_devices() + + Returns: + str: Device series (e.g., 'KL520', 'KL720', etc.) + """ + try: + # Try to get the chip from device descriptor + if hasattr(device_descriptor, 'chip'): + chip = device_descriptor.chip + if chip == kp.ModelNefDescriptor.KP_CHIP_KL520: + return 'KL520' + elif chip == kp.ModelNefDescriptor.KP_CHIP_KL720: + return 'KL720' + elif chip == kp.ModelNefDescriptor.KP_CHIP_KL630: + return 'KL630' + elif chip == kp.ModelNefDescriptor.KP_CHIP_KL730: + return 'KL730' + elif chip == kp.ModelNefDescriptor.KP_CHIP_KL540: + return 'KL540' + elif chip == kp.ModelNefDescriptor.KP_CHIP_KL630_LEGACY: + return 'KL630_LEGACY' + elif chip == kp.ModelNefDescriptor.KP_CHIP_KL720_LEGACY: + return 'KL720_LEGACY' + elif chip == kp.ModelNefDescriptor.KP_CHIP_KL520_LEGACY: + return 'KL520_LEGACY' + + # Fallback: try to get from device descriptor attributes + if hasattr(device_descriptor, 'product_name'): + return device_descriptor.product_name + + return 'Unknown' + + except Exception as e: + print(f'Warning: Unable to determine device series: {str(e)}') + return 'Unknown' + + @staticmethod + def connect_auto_detected_devices(device_count: int = None): + """ + Auto-detect and connect to available Kneron devices. + + Args: + device_count: Number of devices to connect. If None, connect to all available devices. + + Returns: + Tuple[kp.DeviceGroup, List[Dict]]: Device group and list of connected device info + """ + devices_info = MultiDongle.scan_devices() + + if not devices_info: + raise Exception("No Kneron devices found") + + # Determine how many devices to connect + if device_count is None: + device_count = len(devices_info) + else: + device_count = min(device_count, len(devices_info)) + + # Get port IDs for connection + port_ids = [devices_info[i]['port_id'] for i in range(device_count)] + + try: + print(f'[Connecting to {device_count} device(s)]') + device_group = kp.core.connect_devices(usb_port_ids=port_ids) + print(' - Success') + + connected_devices = devices_info[:device_count] + return device_group, connected_devices + + except kp.ApiKPException as exception: + raise Exception(f'Failed to connect devices: {str(exception)}') + + def __init__(self, port_id: list = None, scpu_fw_path: str = None, ncpu_fw_path: str = None, model_path: str = None, upload_fw: bool = False, auto_detect: bool = False): """ Initialize the MultiDongle class. - :param port_id: List of USB port IDs for the same layer's devices. + :param port_id: List of USB port IDs for the same layer's devices. If None and auto_detect=True, will auto-detect devices. :param scpu_fw_path: Path to the SCPU firmware file. :param ncpu_fw_path: Path to the NCPU firmware file. :param model_path: Path to the model file. :param upload_fw: Flag to indicate whether to upload firmware. + :param auto_detect: Flag to auto-detect and connect to available devices. """ - self.port_id = port_id + self.auto_detect = auto_detect + self.connected_devices_info = [] + + if auto_detect: + # Auto-detect devices + devices_info = self.scan_devices() + if devices_info: + self.port_id = [device['port_id'] for device in devices_info] + self.connected_devices_info = devices_info + else: + raise Exception("No Kneron devices found for auto-detection") + else: + self.port_id = port_id or [] + self.upload_fw = upload_fw # Check if the firmware is needed @@ -393,6 +524,91 @@ class MultiDongle: except queue.Empty: return None + def get_device_info(self): + """ + Get information about connected devices including port IDs and series. + + Returns: + List[Dict]: List of device information with port_id and series + """ + if self.auto_detect and self.connected_devices_info: + return self.connected_devices_info + + # If not auto-detected, try to get info from device group + if self.device_group: + try: + device_info_list = [] + + # Get device group content + device_group_content = self.device_group.content + + # Iterate through devices in the group + for i, port_id in enumerate(self.port_id): + device_info = { + 'port_id': port_id, + 'series': 'Unknown', # We'll try to determine this + 'device_descriptor': None + } + + # Try to get device series from device group + try: + # This is a simplified approach - you might need to adjust + # based on the actual device group structure + if hasattr(device_group_content, 'devices') and i < len(device_group_content.devices): + device = device_group_content.devices[i] + if hasattr(device, 'chip_id'): + device_info['series'] = self._chip_id_to_series(device.chip_id) + except: + # If we can't get series info, keep as 'Unknown' + pass + + device_info_list.append(device_info) + + return device_info_list + + except Exception as e: + print(f"Warning: Could not get device info from device group: {str(e)}") + + # Fallback: return basic info based on port_id + return [{'port_id': port_id, 'series': 'Unknown', 'device_descriptor': None} for port_id in self.port_id] + + def _chip_id_to_series(self, chip_id): + """ + Convert chip ID to series name. + + Args: + chip_id: Chip ID from device + + Returns: + str: Device series name + """ + chip_mapping = { + 'kl520': 'KL520', + 'kl720': 'KL720', + 'kl630': 'KL630', + 'kl730': 'KL730', + 'kl540': 'KL540', + } + + if isinstance(chip_id, str): + return chip_mapping.get(chip_id.lower(), 'Unknown') + + return 'Unknown' + + def print_device_info(self): + """ + Print detailed information about connected devices. + """ + devices_info = self.get_device_info() + + if not devices_info: + print("No device information available") + return + + print(f"\n[Connected Devices - {len(devices_info)} device(s)]") + for i, device_info in enumerate(devices_info): + print(f" [{i+1}] Port ID: {device_info['port_id']}, Series: {device_info['series']}") + def __del__(self): """Ensure resources are released when the object is garbage collected.""" self.stop() diff --git a/cluster4npu_ui/device_detection_example.py b/cluster4npu_ui/device_detection_example.py new file mode 100644 index 0000000..6a2d730 --- /dev/null +++ b/cluster4npu_ui/device_detection_example.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +""" +Example script demonstrating Kneron device auto-detection functionality. +This script shows how to scan for devices and connect to them automatically. +""" + +import sys +import os + +# Add the core functions path to sys.path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'core', 'functions')) + +def example_device_scan(): + """ + Example 1: Scan for available devices without connecting + """ + print("=== Example 1: Device Scanning ===") + + try: + from Multidongle import MultiDongle + + # Scan for available devices + devices = MultiDongle.scan_devices() + + if not devices: + print("No Kneron devices found") + return + + print(f"Found {len(devices)} device(s):") + for i, device in enumerate(devices): + print(f" [{i+1}] Port ID: {device['port_id']}, Series: {device['series']}") + + except Exception as e: + print(f"Error during device scan: {str(e)}") + +def example_auto_connect(): + """ + Example 2: Auto-connect to all available devices + """ + print("\n=== Example 2: Auto-Connect to Devices ===") + + try: + from Multidongle import MultiDongle + + # Connect to all available devices automatically + device_group, connected_devices = MultiDongle.connect_auto_detected_devices() + + print(f"Successfully connected to {len(connected_devices)} device(s):") + for i, device in enumerate(connected_devices): + print(f" [{i+1}] Port ID: {device['port_id']}, Series: {device['series']}") + + # Disconnect devices + import kp + kp.core.disconnect_devices(device_group=device_group) + print("Devices disconnected") + + except Exception as e: + print(f"Error during auto-connect: {str(e)}") + +def example_multidongle_with_auto_detect(): + """ + Example 3: Use MultiDongle with auto-detection + """ + print("\n=== Example 3: MultiDongle with Auto-Detection ===") + + try: + from Multidongle import MultiDongle + + # Create MultiDongle instance with auto-detection + # Note: You'll need to provide firmware and model paths for full initialization + multidongle = MultiDongle( + auto_detect=True, + scpu_fw_path="path/to/fw_scpu.bin", # Update with actual path + ncpu_fw_path="path/to/fw_ncpu.bin", # Update with actual path + model_path="path/to/model.nef", # Update with actual path + upload_fw=False # Set to True if you want to upload firmware + ) + + # Print device information + multidongle.print_device_info() + + # Get device info programmatically + device_info = multidongle.get_device_info() + + print("\nDevice details:") + for device in device_info: + print(f" Port ID: {device['port_id']}, Series: {device['series']}") + + except Exception as e: + print(f"Error during MultiDongle auto-detection: {str(e)}") + +def example_connect_specific_count(): + """ + Example 4: Connect to specific number of devices + """ + print("\n=== Example 4: Connect to Specific Number of Devices ===") + + try: + from Multidongle import MultiDongle + + # Connect to only 2 devices (or all available if less than 2) + device_group, connected_devices = MultiDongle.connect_auto_detected_devices(device_count=2) + + print(f"Connected to {len(connected_devices)} device(s):") + for i, device in enumerate(connected_devices): + print(f" [{i+1}] Port ID: {device['port_id']}, Series: {device['series']}") + + # Disconnect devices + import kp + kp.core.disconnect_devices(device_group=device_group) + print("Devices disconnected") + + except Exception as e: + print(f"Error during specific count connect: {str(e)}") + +if __name__ == "__main__": + print("Kneron Device Auto-Detection Examples") + print("=" * 50) + + # Run examples + example_device_scan() + example_auto_connect() + example_multidongle_with_auto_detect() + example_connect_specific_count() + + print("\n" + "=" * 50) + print("Examples completed!") + print("\nUsage Notes:") + print("- Make sure Kneron devices are connected via USB") + print("- Update firmware and model paths in example 3") + print("- The examples require the Kneron SDK to be properly installed") \ No newline at end of file