diff --git a/edge-ai-platform/server/scripts/kneron_bridge.py b/edge-ai-platform/server/scripts/kneron_bridge.py index 4526b0d..0e2120b 100644 --- a/edge-ai-platform/server/scripts/kneron_bridge.py +++ b/edge-ai-platform/server/scripts/kneron_bridge.py @@ -21,9 +21,15 @@ import numpy as np try: import kp HAS_KP = True -except ImportError: +except (ImportError, AttributeError, Exception): HAS_KP = False +try: + import usb.core + HAS_PYUSB = True +except ImportError: + HAS_PYUSB = False + try: import cv2 HAS_CV2 = True @@ -558,25 +564,72 @@ def _parse_classification_output(result, num_classes=1000): # ── Command handlers ───────────────────────────────────────────────── def handle_scan(): - """Scan for connected Kneron devices.""" - if not HAS_KP: - return {"devices": [], "error_detail": "kp module not available"} + """Scan for connected Kneron devices. + Tries Kneron PLUS SDK first (provides firmware info, kn_number, etc.). + Falls back to pyusb if the SDK is unavailable (e.g. macOS missing .dylib). + """ + if HAS_KP: + try: + descs = kp.core.scan_devices() + devices = [] + for i in range(descs.device_descriptor_number): + dev = descs.device_descriptor_list[i] + devices.append({ + "port": str(dev.usb_port_id), + "firmware": str(dev.firmware), + "kn_number": f"0x{dev.kn_number:08X}", + "product_id": f"0x{dev.product_id:04X}", + "connectable": dev.is_connectable, + }) + return {"devices": devices} + except Exception as e: + _log(f"kp.core.scan_devices failed: {e}, trying pyusb fallback") + + # Fallback: use pyusb (same approach as kneron_detect.py) + if HAS_PYUSB: + return _scan_with_pyusb() + + return {"devices": [], "error_detail": "neither kp nor pyusb available"} + + +# Known Kneron product IDs (same as kneron_detect.py) +_KNERON_VENDOR_ID = 0x3231 +_KNOWN_PRODUCTS = { + 0x0100: "KL520", + 0x0200: "KL720", + 0x0720: "KL720", + 0x0530: "KL530", + 0x0630: "KL630", + 0x0730: "KL730", +} + + +def _scan_with_pyusb(): + """Scan for Kneron devices using pyusb (libusb backend).""" try: - descs = kp.core.scan_devices() + usb_devices = list(usb.core.find(find_all=True, idVendor=_KNERON_VENDOR_ID)) devices = [] - for i in range(descs.device_descriptor_number): - dev = descs.device_descriptor_list[i] + for dev in usb_devices: + product_id = f"0x{dev.idProduct:04X}" + chip = _KNOWN_PRODUCTS.get(dev.idProduct, f"Unknown-{product_id}") + # pyusb port_id: bus-address + port = f"{dev.bus}-{dev.address}" + firmware = "unknown" + try: + firmware = dev.product or "unknown" + except Exception: + pass devices.append({ - "port": str(dev.usb_port_id), - "firmware": str(dev.firmware), - "kn_number": f"0x{dev.kn_number:08X}", - "product_id": f"0x{dev.product_id:04X}", - "connectable": dev.is_connectable, + "port": port, + "firmware": firmware, + "kn_number": "0x00000000", + "product_id": product_id, + "connectable": True, }) return {"devices": devices} except Exception as e: - return {"devices": [], "error_detail": str(e)} + return {"devices": [], "error_detail": f"pyusb scan failed: {e}"} def handle_connect(params): @@ -931,6 +984,22 @@ def handle_inference(params): img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) if img is None: return {"error": "failed to decode image"} + h, w = img.shape[:2] + # KL520 NPU requires input image dimensions >= model input size + # and both width/height must be even numbers. + min_dim = _model_input_size + if w < min_dim or h < min_dim or w % 2 != 0 or h % 2 != 0: + if w < min_dim or h < min_dim: + scale = max(min_dim / w, min_dim / h) + new_w = int(w * scale) + new_h = int(h * scale) + else: + new_w, new_h = w, h + # Ensure even dimensions (NPU requirement) + new_w = (new_w + 1) & ~1 + new_h = (new_h + 1) & ~1 + img = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_LINEAR) + _log(f"Inference image resized: {w}x{h} -> {new_w}x{new_h} (min_dim={min_dim})") # Convert BGR to BGR565 img_bgr565 = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2BGR565) else: @@ -999,9 +1068,23 @@ def handle_inference(params): def main(): """Main loop: read JSON commands from stdin, write responses to stdout.""" + # The Kneron C SDK may write ANSI-colored warnings directly to fd 1 + # (stdout), which corrupts our JSON-RPC protocol. To prevent this we + # dup the real stdout fd, then redirect fd 1 to stderr so any C-level + # writes go to stderr. Our JSON responses use the duped fd. + _real_stdout_fd = os.dup(1) # duplicate fd 1 + os.dup2(2, 1) # fd 1 now points to stderr + _real_stdout = os.fdopen(_real_stdout_fd, "w") + sys.stdout = sys.stderr # Python-level redirect too + + def _respond(obj): + """Write a JSON response to the real stdout (not stderr).""" + _real_stdout.write(json.dumps(obj) + "\n") + _real_stdout.flush() + # Signal readiness - print(json.dumps({"status": "ready"}), flush=True) - _log(f"Bridge started (kp={'yes' if HAS_KP else 'no'}, cv2={'yes' if HAS_CV2 else 'no'})") + _respond({"status": "ready"}) + _log(f"Bridge started (kp={'yes' if HAS_KP else 'no'}, pyusb={'yes' if HAS_PYUSB else 'no'}, cv2={'yes' if HAS_CV2 else 'no'})") for line in sys.stdin: line = line.strip() @@ -1024,9 +1107,9 @@ def main(): result = handle_inference(cmd) else: result = {"error": f"unknown command: {action}"} - print(json.dumps(result), flush=True) + _respond(result) except Exception as e: - print(json.dumps({"error": str(e)}), flush=True) + _respond({"error": str(e)}) if __name__ == "__main__":