fix: add pyusb fallback scan to kneron_bridge.py

The bridge script was missing pyusb import and _scan_with_pyusb()
fallback, so scan returned empty on systems without the KneronPLUS
SDK (kp module). Now falls back to pyusb for device detection.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
jim800121chen 2026-03-25 18:46:04 +08:00
parent 559d0fdb8a
commit 99dc2f2a34

View File

@ -21,9 +21,15 @@ import numpy as np
try:
import kp
HAS_KP = True
except ImportError:
except (ImportError, AttributeError, Exception):
HAS_KP = False
try:
import usb.core
HAS_PYUSB = True
except ImportError:
HAS_PYUSB = False
try:
import cv2
HAS_CV2 = True
@ -558,25 +564,72 @@ def _parse_classification_output(result, num_classes=1000):
# ── Command handlers ─────────────────────────────────────────────────
def handle_scan():
"""Scan for connected Kneron devices."""
if not HAS_KP:
return {"devices": [], "error_detail": "kp module not available"}
"""Scan for connected Kneron devices.
Tries Kneron PLUS SDK first (provides firmware info, kn_number, etc.).
Falls back to pyusb if the SDK is unavailable (e.g. macOS missing .dylib).
"""
if HAS_KP:
try:
descs = kp.core.scan_devices()
devices = []
for i in range(descs.device_descriptor_number):
dev = descs.device_descriptor_list[i]
devices.append({
"port": str(dev.usb_port_id),
"firmware": str(dev.firmware),
"kn_number": f"0x{dev.kn_number:08X}",
"product_id": f"0x{dev.product_id:04X}",
"connectable": dev.is_connectable,
})
return {"devices": devices}
except Exception as e:
_log(f"kp.core.scan_devices failed: {e}, trying pyusb fallback")
# Fallback: use pyusb (same approach as kneron_detect.py)
if HAS_PYUSB:
return _scan_with_pyusb()
return {"devices": [], "error_detail": "neither kp nor pyusb available"}
# Known Kneron product IDs (same as kneron_detect.py)
_KNERON_VENDOR_ID = 0x3231
_KNOWN_PRODUCTS = {
0x0100: "KL520",
0x0200: "KL720",
0x0720: "KL720",
0x0530: "KL530",
0x0630: "KL630",
0x0730: "KL730",
}
def _scan_with_pyusb():
"""Scan for Kneron devices using pyusb (libusb backend)."""
try:
descs = kp.core.scan_devices()
usb_devices = list(usb.core.find(find_all=True, idVendor=_KNERON_VENDOR_ID))
devices = []
for i in range(descs.device_descriptor_number):
dev = descs.device_descriptor_list[i]
for dev in usb_devices:
product_id = f"0x{dev.idProduct:04X}"
chip = _KNOWN_PRODUCTS.get(dev.idProduct, f"Unknown-{product_id}")
# pyusb port_id: bus-address
port = f"{dev.bus}-{dev.address}"
firmware = "unknown"
try:
firmware = dev.product or "unknown"
except Exception:
pass
devices.append({
"port": str(dev.usb_port_id),
"firmware": str(dev.firmware),
"kn_number": f"0x{dev.kn_number:08X}",
"product_id": f"0x{dev.product_id:04X}",
"connectable": dev.is_connectable,
"port": port,
"firmware": firmware,
"kn_number": "0x00000000",
"product_id": product_id,
"connectable": True,
})
return {"devices": devices}
except Exception as e:
return {"devices": [], "error_detail": str(e)}
return {"devices": [], "error_detail": f"pyusb scan failed: {e}"}
def handle_connect(params):
@ -931,6 +984,22 @@ def handle_inference(params):
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
if img is None:
return {"error": "failed to decode image"}
h, w = img.shape[:2]
# KL520 NPU requires input image dimensions >= model input size
# and both width/height must be even numbers.
min_dim = _model_input_size
if w < min_dim or h < min_dim or w % 2 != 0 or h % 2 != 0:
if w < min_dim or h < min_dim:
scale = max(min_dim / w, min_dim / h)
new_w = int(w * scale)
new_h = int(h * scale)
else:
new_w, new_h = w, h
# Ensure even dimensions (NPU requirement)
new_w = (new_w + 1) & ~1
new_h = (new_h + 1) & ~1
img = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
_log(f"Inference image resized: {w}x{h} -> {new_w}x{new_h} (min_dim={min_dim})")
# Convert BGR to BGR565
img_bgr565 = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2BGR565)
else:
@ -999,9 +1068,23 @@ def handle_inference(params):
def main():
"""Main loop: read JSON commands from stdin, write responses to stdout."""
# The Kneron C SDK may write ANSI-colored warnings directly to fd 1
# (stdout), which corrupts our JSON-RPC protocol. To prevent this we
# dup the real stdout fd, then redirect fd 1 to stderr so any C-level
# writes go to stderr. Our JSON responses use the duped fd.
_real_stdout_fd = os.dup(1) # duplicate fd 1
os.dup2(2, 1) # fd 1 now points to stderr
_real_stdout = os.fdopen(_real_stdout_fd, "w")
sys.stdout = sys.stderr # Python-level redirect too
def _respond(obj):
"""Write a JSON response to the real stdout (not stderr)."""
_real_stdout.write(json.dumps(obj) + "\n")
_real_stdout.flush()
# Signal readiness
print(json.dumps({"status": "ready"}), flush=True)
_log(f"Bridge started (kp={'yes' if HAS_KP else 'no'}, cv2={'yes' if HAS_CV2 else 'no'})")
_respond({"status": "ready"})
_log(f"Bridge started (kp={'yes' if HAS_KP else 'no'}, pyusb={'yes' if HAS_PYUSB else 'no'}, cv2={'yes' if HAS_CV2 else 'no'})")
for line in sys.stdin:
line = line.strip()
@ -1024,9 +1107,9 @@ def main():
result = handle_inference(cmd)
else:
result = {"error": f"unknown command: {action}"}
print(json.dumps(result), flush=True)
_respond(result)
except Exception as e:
print(json.dumps({"error": str(e)}), flush=True)
_respond({"error": str(e)})
if __name__ == "__main__":