Add Kneron device auto-detection and connection features

- Add scan_devices() method using kp.core.scan_devices() for device discovery
- Add connect_auto_detected_devices() for automatic device connection
- Add device series detection (KL520, KL720, KL630, KL730, KL540, etc.)
- Add auto_detect parameter to MultiDongle constructor
- Add get_device_info() and print_device_info() methods to display port IDs and series
- Update connection logic to use kp.core.connect_devices() per official docs
- Add device_detection_example.py with usage examples
- Maintain backward compatibility with manual port specification

Features display dongle series and port ID as requested for better device management.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Masonmason 2025-07-16 21:13:33 +08:00
parent f5e017b099
commit 9020be5e7a
2 changed files with 350 additions and 3 deletions

View File

@ -78,16 +78,147 @@ class MultiDongle:
# 'YCBCR422_Y0CBY1CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CBY1CR, # 'YCBCR422_Y0CBY1CR': kp.ImageFormat.KP_IMAGE_FORMAT_Y0CBY1CR,
} }
def __init__(self, port_id: list, scpu_fw_path: str, ncpu_fw_path: str, model_path: str, upload_fw: bool = False): @staticmethod
def scan_devices():
"""
Scan for available Kneron devices and return their information.
Returns:
List[Dict]: List of device information containing port_id, series, and device_descriptor
"""
try:
print('[Scanning Devices]')
device_descriptors = kp.core.scan_devices()
if not device_descriptors:
print(' - No devices found')
return []
devices_info = []
print(f' - Found {len(device_descriptors)} device(s):')
for i, device_desc in enumerate(device_descriptors):
# Get device series (e.g., KL520, KL720, etc.)
series = MultiDongle._get_device_series(device_desc)
port_id = device_desc.port_id
device_info = {
'port_id': port_id,
'series': series,
'device_descriptor': device_desc
}
devices_info.append(device_info)
print(f' [{i+1}] Port ID: {port_id}, Series: {series}')
return devices_info
except kp.ApiKPException as exception:
print(f'Error: scan devices fail, error msg: [{str(exception)}]')
return []
@staticmethod
def _get_device_series(device_descriptor):
"""
Extract device series from device descriptor.
Args:
device_descriptor: Device descriptor from scan_devices()
Returns:
str: Device series (e.g., 'KL520', 'KL720', etc.)
"""
try:
# Try to get the chip from device descriptor
if hasattr(device_descriptor, 'chip'):
chip = device_descriptor.chip
if chip == kp.ModelNefDescriptor.KP_CHIP_KL520:
return 'KL520'
elif chip == kp.ModelNefDescriptor.KP_CHIP_KL720:
return 'KL720'
elif chip == kp.ModelNefDescriptor.KP_CHIP_KL630:
return 'KL630'
elif chip == kp.ModelNefDescriptor.KP_CHIP_KL730:
return 'KL730'
elif chip == kp.ModelNefDescriptor.KP_CHIP_KL540:
return 'KL540'
elif chip == kp.ModelNefDescriptor.KP_CHIP_KL630_LEGACY:
return 'KL630_LEGACY'
elif chip == kp.ModelNefDescriptor.KP_CHIP_KL720_LEGACY:
return 'KL720_LEGACY'
elif chip == kp.ModelNefDescriptor.KP_CHIP_KL520_LEGACY:
return 'KL520_LEGACY'
# Fallback: try to get from device descriptor attributes
if hasattr(device_descriptor, 'product_name'):
return device_descriptor.product_name
return 'Unknown'
except Exception as e:
print(f'Warning: Unable to determine device series: {str(e)}')
return 'Unknown'
@staticmethod
def connect_auto_detected_devices(device_count: int = None):
"""
Auto-detect and connect to available Kneron devices.
Args:
device_count: Number of devices to connect. If None, connect to all available devices.
Returns:
Tuple[kp.DeviceGroup, List[Dict]]: Device group and list of connected device info
"""
devices_info = MultiDongle.scan_devices()
if not devices_info:
raise Exception("No Kneron devices found")
# Determine how many devices to connect
if device_count is None:
device_count = len(devices_info)
else:
device_count = min(device_count, len(devices_info))
# Get port IDs for connection
port_ids = [devices_info[i]['port_id'] for i in range(device_count)]
try:
print(f'[Connecting to {device_count} device(s)]')
device_group = kp.core.connect_devices(usb_port_ids=port_ids)
print(' - Success')
connected_devices = devices_info[:device_count]
return device_group, connected_devices
except kp.ApiKPException as exception:
raise Exception(f'Failed to connect devices: {str(exception)}')
def __init__(self, port_id: list = None, scpu_fw_path: str = None, ncpu_fw_path: str = None, model_path: str = None, upload_fw: bool = False, auto_detect: bool = False):
""" """
Initialize the MultiDongle class. Initialize the MultiDongle class.
:param port_id: List of USB port IDs for the same layer's devices. :param port_id: List of USB port IDs for the same layer's devices. If None and auto_detect=True, will auto-detect devices.
:param scpu_fw_path: Path to the SCPU firmware file. :param scpu_fw_path: Path to the SCPU firmware file.
:param ncpu_fw_path: Path to the NCPU firmware file. :param ncpu_fw_path: Path to the NCPU firmware file.
:param model_path: Path to the model file. :param model_path: Path to the model file.
:param upload_fw: Flag to indicate whether to upload firmware. :param upload_fw: Flag to indicate whether to upload firmware.
:param auto_detect: Flag to auto-detect and connect to available devices.
""" """
self.port_id = port_id self.auto_detect = auto_detect
self.connected_devices_info = []
if auto_detect:
# Auto-detect devices
devices_info = self.scan_devices()
if devices_info:
self.port_id = [device['port_id'] for device in devices_info]
self.connected_devices_info = devices_info
else:
raise Exception("No Kneron devices found for auto-detection")
else:
self.port_id = port_id or []
self.upload_fw = upload_fw self.upload_fw = upload_fw
# Check if the firmware is needed # Check if the firmware is needed
@ -393,6 +524,91 @@ class MultiDongle:
except queue.Empty: except queue.Empty:
return None return None
def get_device_info(self):
"""
Get information about connected devices including port IDs and series.
Returns:
List[Dict]: List of device information with port_id and series
"""
if self.auto_detect and self.connected_devices_info:
return self.connected_devices_info
# If not auto-detected, try to get info from device group
if self.device_group:
try:
device_info_list = []
# Get device group content
device_group_content = self.device_group.content
# Iterate through devices in the group
for i, port_id in enumerate(self.port_id):
device_info = {
'port_id': port_id,
'series': 'Unknown', # We'll try to determine this
'device_descriptor': None
}
# Try to get device series from device group
try:
# This is a simplified approach - you might need to adjust
# based on the actual device group structure
if hasattr(device_group_content, 'devices') and i < len(device_group_content.devices):
device = device_group_content.devices[i]
if hasattr(device, 'chip_id'):
device_info['series'] = self._chip_id_to_series(device.chip_id)
except:
# If we can't get series info, keep as 'Unknown'
pass
device_info_list.append(device_info)
return device_info_list
except Exception as e:
print(f"Warning: Could not get device info from device group: {str(e)}")
# Fallback: return basic info based on port_id
return [{'port_id': port_id, 'series': 'Unknown', 'device_descriptor': None} for port_id in self.port_id]
def _chip_id_to_series(self, chip_id):
"""
Convert chip ID to series name.
Args:
chip_id: Chip ID from device
Returns:
str: Device series name
"""
chip_mapping = {
'kl520': 'KL520',
'kl720': 'KL720',
'kl630': 'KL630',
'kl730': 'KL730',
'kl540': 'KL540',
}
if isinstance(chip_id, str):
return chip_mapping.get(chip_id.lower(), 'Unknown')
return 'Unknown'
def print_device_info(self):
"""
Print detailed information about connected devices.
"""
devices_info = self.get_device_info()
if not devices_info:
print("No device information available")
return
print(f"\n[Connected Devices - {len(devices_info)} device(s)]")
for i, device_info in enumerate(devices_info):
print(f" [{i+1}] Port ID: {device_info['port_id']}, Series: {device_info['series']}")
def __del__(self): def __del__(self):
"""Ensure resources are released when the object is garbage collected.""" """Ensure resources are released when the object is garbage collected."""
self.stop() self.stop()

View File

@ -0,0 +1,131 @@
#!/usr/bin/env python3
"""
Example script demonstrating Kneron device auto-detection functionality.
This script shows how to scan for devices and connect to them automatically.
"""
import sys
import os
# Add the core functions path to sys.path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'core', 'functions'))
def example_device_scan():
"""
Example 1: Scan for available devices without connecting
"""
print("=== Example 1: Device Scanning ===")
try:
from Multidongle import MultiDongle
# Scan for available devices
devices = MultiDongle.scan_devices()
if not devices:
print("No Kneron devices found")
return
print(f"Found {len(devices)} device(s):")
for i, device in enumerate(devices):
print(f" [{i+1}] Port ID: {device['port_id']}, Series: {device['series']}")
except Exception as e:
print(f"Error during device scan: {str(e)}")
def example_auto_connect():
"""
Example 2: Auto-connect to all available devices
"""
print("\n=== Example 2: Auto-Connect to Devices ===")
try:
from Multidongle import MultiDongle
# Connect to all available devices automatically
device_group, connected_devices = MultiDongle.connect_auto_detected_devices()
print(f"Successfully connected to {len(connected_devices)} device(s):")
for i, device in enumerate(connected_devices):
print(f" [{i+1}] Port ID: {device['port_id']}, Series: {device['series']}")
# Disconnect devices
import kp
kp.core.disconnect_devices(device_group=device_group)
print("Devices disconnected")
except Exception as e:
print(f"Error during auto-connect: {str(e)}")
def example_multidongle_with_auto_detect():
"""
Example 3: Use MultiDongle with auto-detection
"""
print("\n=== Example 3: MultiDongle with Auto-Detection ===")
try:
from Multidongle import MultiDongle
# Create MultiDongle instance with auto-detection
# Note: You'll need to provide firmware and model paths for full initialization
multidongle = MultiDongle(
auto_detect=True,
scpu_fw_path="path/to/fw_scpu.bin", # Update with actual path
ncpu_fw_path="path/to/fw_ncpu.bin", # Update with actual path
model_path="path/to/model.nef", # Update with actual path
upload_fw=False # Set to True if you want to upload firmware
)
# Print device information
multidongle.print_device_info()
# Get device info programmatically
device_info = multidongle.get_device_info()
print("\nDevice details:")
for device in device_info:
print(f" Port ID: {device['port_id']}, Series: {device['series']}")
except Exception as e:
print(f"Error during MultiDongle auto-detection: {str(e)}")
def example_connect_specific_count():
"""
Example 4: Connect to specific number of devices
"""
print("\n=== Example 4: Connect to Specific Number of Devices ===")
try:
from Multidongle import MultiDongle
# Connect to only 2 devices (or all available if less than 2)
device_group, connected_devices = MultiDongle.connect_auto_detected_devices(device_count=2)
print(f"Connected to {len(connected_devices)} device(s):")
for i, device in enumerate(connected_devices):
print(f" [{i+1}] Port ID: {device['port_id']}, Series: {device['series']}")
# Disconnect devices
import kp
kp.core.disconnect_devices(device_group=device_group)
print("Devices disconnected")
except Exception as e:
print(f"Error during specific count connect: {str(e)}")
if __name__ == "__main__":
print("Kneron Device Auto-Detection Examples")
print("=" * 50)
# Run examples
example_device_scan()
example_auto_connect()
example_multidongle_with_auto_detect()
example_connect_specific_count()
print("\n" + "=" * 50)
print("Examples completed!")
print("\nUsage Notes:")
print("- Make sure Kneron devices are connected via USB")
print("- Update firmware and model paths in example 3")
print("- The examples require the Kneron SDK to be properly installed")