From 9950bf7232d80f9195a0ab910d99952f0d192c37 Mon Sep 17 00:00:00 2001 From: jim800121chen Date: Mon, 9 Mar 2026 21:31:35 +0800 Subject: [PATCH] fix: add retry logic for KL520 USB connect and better error hints - Retry connect up to 3 times with 2s delay and re-scan between attempts - Re-scan refreshes USB device handles which can resolve timing issues - Add Windows-specific hint about Zadig/WinUSB driver in error message - Firmware reload reconnect also gets retry + fallback to without_check Co-Authored-By: Claude Opus 4.6 --- .../server/scripts/kneron_bridge.py | 81 ++++++++++++++----- 1 file changed, 63 insertions(+), 18 deletions(-) diff --git a/edge-ai-platform/server/scripts/kneron_bridge.py b/edge-ai-platform/server/scripts/kneron_bridge.py index 90de33d..5638b1b 100644 --- a/edge-ai-platform/server/scripts/kneron_bridge.py +++ b/edge-ai-platform/server/scripts/kneron_bridge.py @@ -698,18 +698,48 @@ def handle_connect(params): # In these cases, connect_devices_without_check() works and we can # still load firmware afterwards. use_without_check = (_device_chip == "KL720") or (not target_dev.is_connectable) - if use_without_check: - _log(f"{_device_chip}: Using connect_devices_without_check(usb_port_id={target_dev.usb_port_id}, connectable={target_dev.is_connectable})...") - _device_group = kp.core.connect_devices_without_check( - usb_port_ids=[target_dev.usb_port_id] - ) - _log(f"connect_devices_without_check succeeded") - else: - _log(f"Calling kp.core.connect_devices(usb_port_id={target_dev.usb_port_id})...") - _device_group = kp.core.connect_devices( - usb_port_ids=[target_dev.usb_port_id] - ) - _log(f"connect_devices succeeded") + + max_retries = 3 + last_err = None + for attempt in range(max_retries): + try: + if use_without_check: + _log(f"{_device_chip}: connect_devices_without_check(usb_port_id={target_dev.usb_port_id}, connectable={target_dev.is_connectable}) attempt {attempt+1}/{max_retries}...") + _device_group = kp.core.connect_devices_without_check( + usb_port_ids=[target_dev.usb_port_id] + ) + else: + _log(f"connect_devices(usb_port_id={target_dev.usb_port_id}) attempt {attempt+1}/{max_retries}...") + _device_group = kp.core.connect_devices( + usb_port_ids=[target_dev.usb_port_id] + ) + _log(f"connect succeeded on attempt {attempt+1}") + last_err = None + break + except Exception as conn_err: + last_err = conn_err + _log(f"connect attempt {attempt+1} failed: {conn_err}") + if attempt < max_retries - 1: + time.sleep(2) + # Re-scan to refresh device handle + try: + descs = kp.core.scan_devices() + for i in range(descs.device_descriptor_number): + dev = descs.device_descriptor_list[i] + if port and str(dev.usb_port_id) == port: + target_dev = dev + break + elif not port: + target_dev = descs.device_descriptor_list[0] + break + except Exception: + pass + + if last_err is not None: + hint = "" + if sys.platform == "win32": + hint = " On Windows, ensure the WinUSB driver is installed for this device (use Zadig: https://zadig.akeo.ie)." + raise RuntimeError(f"Failed to connect after {max_retries} attempts: {last_err}.{hint}") # KL720 needs longer timeout for large NEF transfers (12MB+ over USB) _timeout_ms = 60000 if _device_chip == "KL720" else 10000 @@ -732,12 +762,27 @@ def handle_connect(params): _log("Firmware loaded, waiting for reboot...") time.sleep(5) - # Reconnect after firmware load - descs = kp.core.scan_devices() - target_dev = descs.device_descriptor_list[0] - _device_group = kp.core.connect_devices( - usb_port_ids=[target_dev.usb_port_id] - ) + # Reconnect after firmware load (with retry) + _device_group = None + for retry in range(3): + try: + descs = kp.core.scan_devices() + target_dev = descs.device_descriptor_list[0] + try: + _device_group = kp.core.connect_devices( + usb_port_ids=[target_dev.usb_port_id] + ) + except Exception: + _device_group = kp.core.connect_devices_without_check( + usb_port_ids=[target_dev.usb_port_id] + ) + break + except Exception as re_err: + _log(f"Reconnect attempt {retry+1} failed: {re_err}") + if retry < 2: + time.sleep(3) + if _device_group is None: + return {"error": "Device not found after firmware load. Unplug and re-plug the device."} kp.core.set_timeout( device_group=_device_group, milliseconds=_timeout_ms )