fix: add retry logic for KL520 USB connect and better error hints

- Retry connect up to 3 times with 2s delay and re-scan between attempts
- Re-scan refreshes USB device handles which can resolve timing issues
- Add Windows-specific hint about Zadig/WinUSB driver in error message
- Firmware reload reconnect also gets retry + fallback to without_check

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
jim800121chen 2026-03-09 21:31:35 +08:00
parent 4ad39d2f17
commit 9950bf7232

View File

@ -698,18 +698,48 @@ def handle_connect(params):
# In these cases, connect_devices_without_check() works and we can
# still load firmware afterwards.
use_without_check = (_device_chip == "KL720") or (not target_dev.is_connectable)
if use_without_check:
_log(f"{_device_chip}: Using connect_devices_without_check(usb_port_id={target_dev.usb_port_id}, connectable={target_dev.is_connectable})...")
_device_group = kp.core.connect_devices_without_check(
usb_port_ids=[target_dev.usb_port_id]
)
_log(f"connect_devices_without_check succeeded")
else:
_log(f"Calling kp.core.connect_devices(usb_port_id={target_dev.usb_port_id})...")
_device_group = kp.core.connect_devices(
usb_port_ids=[target_dev.usb_port_id]
)
_log(f"connect_devices succeeded")
max_retries = 3
last_err = None
for attempt in range(max_retries):
try:
if use_without_check:
_log(f"{_device_chip}: connect_devices_without_check(usb_port_id={target_dev.usb_port_id}, connectable={target_dev.is_connectable}) attempt {attempt+1}/{max_retries}...")
_device_group = kp.core.connect_devices_without_check(
usb_port_ids=[target_dev.usb_port_id]
)
else:
_log(f"connect_devices(usb_port_id={target_dev.usb_port_id}) attempt {attempt+1}/{max_retries}...")
_device_group = kp.core.connect_devices(
usb_port_ids=[target_dev.usb_port_id]
)
_log(f"connect succeeded on attempt {attempt+1}")
last_err = None
break
except Exception as conn_err:
last_err = conn_err
_log(f"connect attempt {attempt+1} failed: {conn_err}")
if attempt < max_retries - 1:
time.sleep(2)
# Re-scan to refresh device handle
try:
descs = kp.core.scan_devices()
for i in range(descs.device_descriptor_number):
dev = descs.device_descriptor_list[i]
if port and str(dev.usb_port_id) == port:
target_dev = dev
break
elif not port:
target_dev = descs.device_descriptor_list[0]
break
except Exception:
pass
if last_err is not None:
hint = ""
if sys.platform == "win32":
hint = " On Windows, ensure the WinUSB driver is installed for this device (use Zadig: https://zadig.akeo.ie)."
raise RuntimeError(f"Failed to connect after {max_retries} attempts: {last_err}.{hint}")
# KL720 needs longer timeout for large NEF transfers (12MB+ over USB)
_timeout_ms = 60000 if _device_chip == "KL720" else 10000
@ -732,12 +762,27 @@ def handle_connect(params):
_log("Firmware loaded, waiting for reboot...")
time.sleep(5)
# Reconnect after firmware load
descs = kp.core.scan_devices()
target_dev = descs.device_descriptor_list[0]
_device_group = kp.core.connect_devices(
usb_port_ids=[target_dev.usb_port_id]
)
# Reconnect after firmware load (with retry)
_device_group = None
for retry in range(3):
try:
descs = kp.core.scan_devices()
target_dev = descs.device_descriptor_list[0]
try:
_device_group = kp.core.connect_devices(
usb_port_ids=[target_dev.usb_port_id]
)
except Exception:
_device_group = kp.core.connect_devices_without_check(
usb_port_ids=[target_dev.usb_port_id]
)
break
except Exception as re_err:
_log(f"Reconnect attempt {retry+1} failed: {re_err}")
if retry < 2:
time.sleep(3)
if _device_group is None:
return {"error": "Device not found after firmware load. Unplug and re-plug the device."}
kp.core.set_timeout(
device_group=_device_group, milliseconds=_timeout_ms
)