#!/usr/bin/env python3 """Kneron Bridge - JSON-RPC over stdin/stdout This script acts as a bridge between the Go backend and the Kneron PLUS Python SDK. It reads JSON commands from stdin and writes JSON responses to stdout. Supports: - KL520 (USB Boot mode - firmware must be loaded each session) - KL720 (flash-based - firmware pre-installed, models freely reloadable) """ import sys import json import base64 import time import os import io import numpy as np def _preload_kneron_dylibs_macos(): """macOS 專用:用絕對路徑預先 dlopen wheel 內的 libusb + libkplus。 背景: - KneronPLUS wheel 把 libusb-1.0.0.dylib + libkplus.dylib 放在 kp/lib/。 - macOS dyld 在載入 libkplus 時會去找它的相依 libusb-1.0.0.dylib。 預設搜尋路徑(/usr/local/lib、/usr/lib)在 bundled Python 環境下通常 找不到(我們沒有 brew libusb),於是 `import kp` 就拋 OSError → HAS_KP=False → scan 回空陣列。 - macOS hardened runtime 會剝掉 DYLD_LIBRARY_PATH 等環境變數,所以 改從 Go 端注入 env 也不保險;最穩的做法是在 Python 這端用 ctypes 以絕對路徑先載入,後續 `import kp` 時 dyld 會重用已載入的映像。 Windows / Linux 不走這支 — 各自機制已在 Go 端處理(Windows 靠 PATH、 Linux 靠 wheel 自帶的 libusb.so.1.0.0 + LD_LIBRARY_PATH)。 """ if sys.platform != "darwin": return try: import ctypes import importlib.util spec = importlib.util.find_spec("kp") if spec is None or not spec.submodule_search_locations: return kp_dir = spec.submodule_search_locations[0] lib_dir = os.path.join(kp_dir, "lib") # 載入順序:先 libusb,再 libkplus(libkplus 相依 libusb) for name in ("libusb-1.0.0.dylib", "libkplus.dylib"): path = os.path.join(lib_dir, name) if os.path.isfile(path): try: ctypes.CDLL(path, mode=ctypes.RTLD_GLOBAL) except OSError: pass except Exception: pass _preload_kneron_dylibs_macos() try: import kp HAS_KP = True except (ImportError, AttributeError, Exception): HAS_KP = False try: import usb.core HAS_PYUSB = True except ImportError: HAS_PYUSB = False try: import cv2 HAS_CV2 = True except ImportError: HAS_CV2 = False # ── Global state ────────────────────────────────────────────────────── _device_group = None def _clear_device_group(): """Safely disconnect and clear the global _device_group. KneronPLUS SDK's DeviceGroup.__del__ calls kp_disconnect_devices on the native handle, but if the handle is already invalid (failed connect / stale state) it causes 'OSError: access violation'. By explicitly disconnecting before setting None, __del__ becomes a no-op on an already-disconnected handle. All errors are silenced — this is best-effort cleanup. """ global _device_group if _device_group is not None: try: kp.core.disconnect_devices(_device_group) except Exception: pass _device_group = None _model_id = None _model_nef = None _model_input_size = 224 # updated on model load _model_type = "tiny_yolov3" # updated on model load based on model_id / nef name _firmware_loaded = False _device_chip = "KL520" # updated on connect from product_id / device_type # COCO 80-class labels COCO_CLASSES = [ "person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck", "boat", "traffic light", "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove", "skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "couch", "potted plant", "bed", "dining table", "toilet", "tv", "laptop", "mouse", "remote", "keyboard", "cell phone", "microwave", "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors", "teddy bear", "hair drier", "toothbrush" ] # Anchor boxes per model type (each list entry = one output head) ANCHORS_TINY_YOLOV3 = [ [(81, 82), (135, 169), (344, 319)], # 7×7 head (large objects) [(10, 14), (23, 27), (37, 58)], # 14×14 head (small objects) ] # YOLOv5s anchors (Kneron model 20005, no-upsample variant for KL520) ANCHORS_YOLOV5S = [ [(116, 90), (156, 198), (373, 326)], # P5/32 (large) [(30, 61), (62, 45), (59, 119)], # P4/16 (medium) [(10, 13), (16, 30), (33, 23)], # P3/8 (small) ] CONF_THRESHOLD = 0.25 NMS_IOU_THRESHOLD = 0.45 # Known Kneron model IDs → (model_type, input_size) KNOWN_MODELS = { # Tiny YOLO v3 (default KL520 model) 0: ("tiny_yolov3", 224), # ResNet18 classification (model 20001) 20001: ("resnet18", 224), # FCOS DarkNet53s detection (model 20004) 20004: ("fcos", 512), # YOLOv5s no-upsample (model 20005) 20005: ("yolov5s", 640), } def _log(msg): """Write log messages to stderr (stdout is reserved for JSON-RPC).""" print(f"[kneron_bridge] {msg}", file=sys.stderr, flush=True) def _resolve_firmware_paths(chip="KL520"): """Resolve firmware paths relative to this script's directory.""" base = os.path.dirname(os.path.abspath(__file__)) fw_dir = os.path.join(base, "firmware", chip) scpu = os.path.join(fw_dir, "fw_scpu.bin") ncpu = os.path.join(fw_dir, "fw_ncpu.bin") if os.path.exists(scpu) and os.path.exists(ncpu): return scpu, ncpu # Fallback: check KNERON_FW_DIR env var fw_dir = os.environ.get("KNERON_FW_DIR", "") if fw_dir: scpu = os.path.join(fw_dir, "fw_scpu.bin") ncpu = os.path.join(fw_dir, "fw_ncpu.bin") if os.path.exists(scpu) and os.path.exists(ncpu): return scpu, ncpu return None, None def _detect_model_type(model_id, nef_path): """Detect model type and input size from model ID or .nef filename.""" global _model_type, _model_input_size # Check known model IDs if model_id in KNOWN_MODELS: _model_type, _model_input_size = KNOWN_MODELS[model_id] _log(f"Model type detected by ID {model_id}: {_model_type} ({_model_input_size}x{_model_input_size})") return # Fallback: try to infer from filename basename = os.path.basename(nef_path).lower() if nef_path else "" if "yolov5" in basename: _model_type = "yolov5s" # Try to parse input size from filename like w640h640 _model_input_size = _parse_size_from_name(basename, default=640) elif "fcos" in basename: _model_type = "fcos" _model_input_size = _parse_size_from_name(basename, default=512) elif "ssd" in basename: _model_type = "ssd" _model_input_size = _parse_size_from_name(basename, default=320) elif "resnet" in basename or "classification" in basename: _model_type = "resnet18" _model_input_size = _parse_size_from_name(basename, default=224) elif "tiny_yolo" in basename or "tinyyolo" in basename: _model_type = "tiny_yolov3" _model_input_size = _parse_size_from_name(basename, default=224) else: # Default: assume YOLO-like detection _model_type = "tiny_yolov3" _model_input_size = 224 _log(f"Model type detected by filename '{basename}': {_model_type} ({_model_input_size}x{_model_input_size})") def _parse_size_from_name(name, default=224): """Extract input size from filename like 'w640h640' or 'w512h512'.""" import re m = re.search(r'w(\d+)h(\d+)', name) if m: return int(m.group(1)) return default # ── Post-processing ────────────────────────────────────────────────── def _sigmoid(x): return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500))) def _nms(detections, iou_threshold=NMS_IOU_THRESHOLD): """Non-Maximum Suppression.""" detections.sort(key=lambda d: d["confidence"], reverse=True) keep = [] for d in detections: skip = False for k in keep: if d["class_id"] != k["class_id"]: continue x1 = max(d["bbox"]["x"], k["bbox"]["x"]) y1 = max(d["bbox"]["y"], k["bbox"]["y"]) x2 = min(d["bbox"]["x"] + d["bbox"]["width"], k["bbox"]["x"] + k["bbox"]["width"]) y2 = min(d["bbox"]["y"] + d["bbox"]["height"], k["bbox"]["y"] + k["bbox"]["height"]) inter = max(0, x2 - x1) * max(0, y2 - y1) a1 = d["bbox"]["width"] * d["bbox"]["height"] a2 = k["bbox"]["width"] * k["bbox"]["height"] if inter / (a1 + a2 - inter + 1e-6) > iou_threshold: skip = True break if not skip: keep.append(d) return keep def _get_preproc_info(result): """Extract letterbox padding info from the inference result. Kneron SDK applies letterbox resize (aspect-ratio-preserving + zero padding) before inference. The hw_pre_proc_info tells us how to reverse it. Returns (pad_left, pad_top, resize_w, resize_h, model_w, model_h) or None. """ try: info = result.header.hw_pre_proc_info_list[0] return { "pad_left": info.pad_left if hasattr(info, 'pad_left') else 0, "pad_top": info.pad_top if hasattr(info, 'pad_top') else 0, "resized_w": info.resized_img_width if hasattr(info, 'resized_img_width') else 0, "resized_h": info.resized_img_height if hasattr(info, 'resized_img_height') else 0, "model_w": info.model_input_width if hasattr(info, 'model_input_width') else 0, "model_h": info.model_input_height if hasattr(info, 'model_input_height') else 0, "img_w": info.img_width if hasattr(info, 'img_width') else 0, "img_h": info.img_height if hasattr(info, 'img_height') else 0, } except Exception: return None def _correct_bbox_for_letterbox(x, y, w, h, preproc, model_size): """Remove letterbox padding offset from normalized bbox coordinates. Input (x, y, w, h) is in model-input-space normalized to 0-1. Output is re-normalized to the original image aspect ratio (still 0-1). For KP_PADDING_CORNER (default): image is at top-left, padding at bottom/right. """ if preproc is None: return x, y, w, h model_w = preproc["model_w"] or model_size model_h = preproc["model_h"] or model_size pad_left = preproc["pad_left"] pad_top = preproc["pad_top"] resized_w = preproc["resized_w"] or model_w resized_h = preproc["resized_h"] or model_h # If no padding was applied, skip correction if pad_left == 0 and pad_top == 0 and resized_w == model_w and resized_h == model_h: return x, y, w, h # Convert from normalized (0-1 of model input) to pixel coords in model space px = x * model_w py = y * model_h pw = w * model_w ph = h * model_h # Subtract padding offset px -= pad_left py -= pad_top # Re-normalize to the resized (un-padded) image dimensions nx = px / resized_w ny = py / resized_h nw = pw / resized_w nh = ph / resized_h # Clip to 0-1 nx = max(0.0, min(1.0, nx)) ny = max(0.0, min(1.0, ny)) nw = min(1.0 - nx, nw) nh = min(1.0 - ny, nh) return nx, ny, nw, nh def _parse_yolo_output(result, anchors, input_size, num_classes=80): """Parse YOLO (v3/v5) raw output into detection results. Works for both Tiny YOLOv3 and YOLOv5 — the tensor layout is the same: (num_anchors * (5 + num_classes), grid_h, grid_w) The key differences are: - anchor values - input_size used for anchor normalization - number of output heads Bounding boxes are corrected for letterbox padding so coordinates are relative to the original image (normalized 0-1). """ detections = [] entry_size = 5 + num_classes # 85 for COCO 80 classes # Get letterbox padding info preproc = _get_preproc_info(result) if preproc: _log(f"Preproc info: pad=({preproc['pad_left']},{preproc['pad_top']}), " f"resized=({preproc['resized_w']}x{preproc['resized_h']}), " f"model=({preproc['model_w']}x{preproc['model_h']}), " f"img=({preproc['img_w']}x{preproc['img_h']})") for head_idx in range(result.header.num_output_node): output = kp.inference.generic_inference_retrieve_float_node( node_idx=head_idx, generic_raw_result=result, channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW ) arr = output.ndarray[0] # (C, H, W) channels, grid_h, grid_w = arr.shape # Determine number of anchors for this head num_anchors = channels // entry_size if num_anchors < 1: _log(f"Head {head_idx}: unexpected shape {arr.shape}, skipping") continue # Use the correct anchor set for this head if head_idx < len(anchors): head_anchors = anchors[head_idx] else: _log(f"Head {head_idx}: no anchors defined, skipping") continue for a_idx in range(min(num_anchors, len(head_anchors))): off = a_idx * entry_size for cy in range(grid_h): for cx in range(grid_w): obj_conf = _sigmoid(arr[off + 4, cy, cx]) if obj_conf < CONF_THRESHOLD: continue cls_scores = _sigmoid(arr[off + 5:off + entry_size, cy, cx]) cls_id = int(np.argmax(cls_scores)) cls_conf = float(cls_scores[cls_id]) conf = float(obj_conf * cls_conf) if conf < CONF_THRESHOLD: continue bx = (_sigmoid(arr[off, cy, cx]) + cx) / grid_w by = (_sigmoid(arr[off + 1, cy, cx]) + cy) / grid_h aw, ah = head_anchors[a_idx] bw = (np.exp(min(float(arr[off + 2, cy, cx]), 10)) * aw) / input_size bh = (np.exp(min(float(arr[off + 3, cy, cx]), 10)) * ah) / input_size # Convert center x,y,w,h to corner x,y,w,h (normalized to model input) x = max(0.0, bx - bw / 2) y = max(0.0, by - bh / 2) w = min(1.0, bx + bw / 2) - x h = min(1.0, by + bh / 2) - y # Correct for letterbox padding x, y, w, h = _correct_bbox_for_letterbox(x, y, w, h, preproc, input_size) label = COCO_CLASSES[cls_id] if cls_id < len(COCO_CLASSES) else f"class_{cls_id}" detections.append({ "label": label, "class_id": cls_id, "confidence": conf, "bbox": {"x": x, "y": y, "width": w, "height": h}, }) detections = _nms(detections) # Remove internal class_id before returning for d in detections: del d["class_id"] return detections def _parse_ssd_output(result, input_size=320, num_classes=2): """Parse SSD face detection output. SSD typically outputs two tensors: - locations: (num_boxes, 4) — bounding box coordinates - confidences: (num_boxes, num_classes) — class scores For the KL520 SSD face detection model (kl520_ssd_fd_lm.nef), the output contains face detections with landmarks. """ detections = [] preproc = _get_preproc_info(result) try: # Retrieve all output nodes num_outputs = result.header.num_output_node outputs = [] for i in range(num_outputs): output = kp.inference.generic_inference_retrieve_float_node( node_idx=i, generic_raw_result=result, channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW ) outputs.append(output.ndarray[0]) if num_outputs < 2: _log(f"SSD: expected >=2 output nodes, got {num_outputs}") return detections # Heuristic: the larger tensor is locations, smaller is confidences # Or: first output = locations, second = confidences locations = outputs[0] confidences = outputs[1] # Flatten if needed if locations.ndim > 2: locations = locations.reshape(-1, 4) if confidences.ndim > 2: confidences = confidences.reshape(-1, confidences.shape[-1]) num_boxes = min(locations.shape[0], confidences.shape[0]) for i in range(num_boxes): # SSD confidence: class 0 = background, class 1 = face if confidences.shape[-1] > 1: conf = float(confidences[i, 1]) # face class else: conf = float(_sigmoid(confidences[i, 0])) if conf < CONF_THRESHOLD: continue # SSD outputs are typically [x_min, y_min, x_max, y_max] normalized x_min = float(np.clip(locations[i, 0], 0.0, 1.0)) y_min = float(np.clip(locations[i, 1], 0.0, 1.0)) x_max = float(np.clip(locations[i, 2], 0.0, 1.0)) y_max = float(np.clip(locations[i, 3], 0.0, 1.0)) w = x_max - x_min h = y_max - y_min if w <= 0 or h <= 0: continue # Correct for letterbox padding x_min, y_min, w, h = _correct_bbox_for_letterbox( x_min, y_min, w, h, preproc, input_size) detections.append({ "label": "face", "class_id": 0, "confidence": conf, "bbox": {"x": x_min, "y": y_min, "width": w, "height": h}, }) detections = _nms(detections) for d in detections: del d["class_id"] except Exception as e: _log(f"SSD parse error: {e}") return detections def _parse_fcos_output(result, input_size=512, num_classes=80): """Parse FCOS (Fully Convolutional One-Stage) detection output. FCOS outputs per feature level: - classification: (num_classes, H, W) - centerness: (1, H, W) - regression: (4, H, W) — distances from each pixel to box edges (l, t, r, b) The outputs come in groups of 3 per feature level. """ detections = [] preproc = _get_preproc_info(result) try: num_outputs = result.header.num_output_node outputs = [] for i in range(num_outputs): output = kp.inference.generic_inference_retrieve_float_node( node_idx=i, generic_raw_result=result, channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW ) outputs.append(output.ndarray[0]) # FCOS typically has 5 feature levels × 3 outputs = 15 output nodes # Or fewer for simplified models. Group by 3: (cls, centerness, reg) # If we can't determine the grouping, try a simpler approach. strides = [8, 16, 32, 64, 128] num_levels = num_outputs // 3 for level in range(num_levels): cls_out = outputs[level * 3] # (num_classes, H, W) cnt_out = outputs[level * 3 + 1] # (1, H, W) reg_out = outputs[level * 3 + 2] # (4, H, W) stride = strides[level] if level < len(strides) else (8 * (2 ** level)) h, w = cls_out.shape[1], cls_out.shape[2] for cy in range(h): for cx in range(w): cls_scores = _sigmoid(cls_out[:, cy, cx]) cls_id = int(np.argmax(cls_scores)) cls_conf = float(cls_scores[cls_id]) centerness = float(_sigmoid(cnt_out[0, cy, cx])) conf = cls_conf * centerness if conf < CONF_THRESHOLD: continue # Regression: distances from pixel center to box edges px = (cx + 0.5) * stride py = (cy + 0.5) * stride l = float(np.exp(min(reg_out[0, cy, cx], 10))) * stride t = float(np.exp(min(reg_out[1, cy, cx], 10))) * stride r = float(np.exp(min(reg_out[2, cy, cx], 10))) * stride b = float(np.exp(min(reg_out[3, cy, cx], 10))) * stride x_min = max(0.0, (px - l) / input_size) y_min = max(0.0, (py - t) / input_size) x_max = min(1.0, (px + r) / input_size) y_max = min(1.0, (py + b) / input_size) bw = x_max - x_min bh = y_max - y_min if bw <= 0 or bh <= 0: continue # Correct for letterbox padding x_min, y_min, bw, bh = _correct_bbox_for_letterbox( x_min, y_min, bw, bh, preproc, input_size) label = COCO_CLASSES[cls_id] if cls_id < len(COCO_CLASSES) else f"class_{cls_id}" detections.append({ "label": label, "class_id": cls_id, "confidence": conf, "bbox": {"x": x_min, "y": y_min, "width": bw, "height": bh}, }) detections = _nms(detections) for d in detections: del d["class_id"] except Exception as e: _log(f"FCOS parse error: {e}") return detections def _parse_classification_output(result, num_classes=1000): """Parse classification model output (e.g., ResNet18 ImageNet).""" try: output = kp.inference.generic_inference_retrieve_float_node( node_idx=0, generic_raw_result=result, channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW ) scores = output.ndarray.flatten() # Apply softmax exp_scores = np.exp(scores - np.max(scores)) probs = exp_scores / exp_scores.sum() # Top-5 top_indices = np.argsort(probs)[::-1][:5] classifications = [] for idx in top_indices: label = COCO_CLASSES[idx] if idx < len(COCO_CLASSES) else f"class_{idx}" classifications.append({ "label": label, "confidence": float(probs[idx]), }) return classifications except Exception as e: _log(f"Classification parse error: {e}") return [] # ── Command handlers ───────────────────────────────────────────────── def handle_scan(): """Scan for connected Kneron devices. Tries Kneron PLUS SDK first (provides firmware info, kn_number, etc.). Falls back to pyusb if the SDK is unavailable (e.g. macOS missing .dylib). """ if HAS_KP: try: descs = kp.core.scan_devices() devices = [] for i in range(descs.device_descriptor_number): dev = descs.device_descriptor_list[i] devices.append({ "port": str(dev.usb_port_id), "firmware": str(dev.firmware), "kn_number": f"0x{dev.kn_number:08X}", "product_id": f"0x{dev.product_id:04X}", "connectable": dev.is_connectable, }) return {"devices": devices} except Exception as e: _log(f"kp.core.scan_devices failed: {e}, trying pyusb fallback") # Fallback: use pyusb (same approach as kneron_detect.py) if HAS_PYUSB: return _scan_with_pyusb() return {"devices": [], "error_detail": "neither kp nor pyusb available"} # Known Kneron product IDs (same as kneron_detect.py) _KNERON_VENDOR_ID = 0x3231 _KNOWN_PRODUCTS = { 0x0100: "KL520", 0x0200: "KL720", 0x0720: "KL720", 0x0530: "KL530", 0x0630: "KL630", 0x0730: "KL730", } def _scan_with_pyusb(): """Scan for Kneron devices using pyusb (libusb backend).""" try: usb_devices = list(usb.core.find(find_all=True, idVendor=_KNERON_VENDOR_ID)) devices = [] for dev in usb_devices: product_id = f"0x{dev.idProduct:04X}" chip = _KNOWN_PRODUCTS.get(dev.idProduct, f"Unknown-{product_id}") # pyusb port_id: bus-address port = f"{dev.bus}-{dev.address}" firmware = "unknown" try: firmware = dev.product or "unknown" except Exception: pass devices.append({ "port": port, "firmware": firmware, "kn_number": "0x00000000", "product_id": product_id, "connectable": True, }) return {"devices": devices} except Exception as e: return {"devices": [], "error_detail": f"pyusb scan failed: {e}"} def handle_connect(params): """Connect to a Kneron device and load firmware if needed. KL520: USB Boot mode — firmware MUST be uploaded every session. KL720 (KDP2, pid=0x0720): Flash-based — firmware pre-installed. KL720 (KDP legacy, pid=0x0200): Old firmware — needs connect_without_check + firmware load to RAM before normal operation. """ global _device_group, _firmware_loaded, _device_chip if not HAS_KP: return {"error": "kp module not available"} try: port = params.get("port", "") device_type = params.get("device_type", "") # Scan to find device descs = kp.core.scan_devices() if descs.device_descriptor_number == 0: return {"error": "no Kneron device found"} # Find device by port or use first one target_dev = None for i in range(descs.device_descriptor_number): dev = descs.device_descriptor_list[i] if port and str(dev.usb_port_id) == port: target_dev = dev break if target_dev is None: target_dev = descs.device_descriptor_list[0] # Note: KL520 in USB Boot mode has is_connectable=False, which is # normal — it becomes connectable after firmware is loaded. KL720 KDP # legacy (pid=0x0200) is also not connectable until firmware load. # So we do NOT reject is_connectable=False here; instead we attempt # connection and firmware load as appropriate. # Determine chip type from device_type param or product_id pid = target_dev.product_id if "kl720" in device_type.lower(): _device_chip = "KL720" elif "kl520" in device_type.lower(): _device_chip = "KL520" elif pid in (0x0200, 0x0720): _device_chip = "KL720" else: _device_chip = "KL520" fw_str = str(target_dev.firmware) is_kdp_legacy = (_device_chip == "KL720" and pid == 0x0200) _log(f"Chip type: {_device_chip} (product_id=0x{pid:04X}, device_type={device_type}, fw={fw_str})") # ── KL720 KDP Legacy (pid=0x0200): old firmware, incompatible with SDK ── if is_kdp_legacy: _log(f"KL720 has legacy KDP firmware (pid=0x0200). Using connect_devices_without_check...") _device_group = kp.core.connect_devices_without_check( usb_port_ids=[target_dev.usb_port_id] ) kp.core.set_timeout(device_group=_device_group, milliseconds=60000) # Load KDP2 firmware to RAM so the device can operate with this SDK scpu_path, ncpu_path = _resolve_firmware_paths("KL720") if scpu_path and ncpu_path: _log(f"KL720: Loading KDP2 firmware to RAM: {scpu_path}") kp.core.load_firmware_from_file( _device_group, scpu_path, ncpu_path ) _firmware_loaded = True _log("KL720: Firmware loaded to RAM, waiting for reboot...") time.sleep(5) # Reconnect — device should now be running KDP2 in RAM descs = kp.core.scan_devices() reconnected = False for i in range(descs.device_descriptor_number): dev = descs.device_descriptor_list[i] if dev.product_id in (0x0200, 0x0720): target_dev = dev reconnected = True break if not reconnected: return {"error": "KL720 not found after firmware load. Unplug and re-plug."} # Try normal connect first, fallback to without_check try: _device_group = kp.core.connect_devices( usb_port_ids=[target_dev.usb_port_id] ) except Exception as conn_err: _log(f"KL720: Normal reconnect failed ({conn_err}), using without_check...") _device_group = kp.core.connect_devices_without_check( usb_port_ids=[target_dev.usb_port_id] ) kp.core.set_timeout(device_group=_device_group, milliseconds=10000) fw_str = str(target_dev.firmware) _log(f"KL720: Reconnected after firmware load, pid=0x{target_dev.product_id:04X}, fw={fw_str}") else: _log("WARNING: KL720 firmware files not found. Cannot operate with KDP legacy device.") _clear_device_group() return {"error": "KL720 has legacy KDP firmware but KDP2 firmware files not found. " "Run update_kl720_firmware.py to flash KDP2 permanently."} return { "status": "connected", "firmware": fw_str, "kn_number": f"0x{target_dev.kn_number:08X}", "chip": _device_chip, "kdp_legacy": True, } # ── Normal connection (KL520 or KL720 KDP2) ── # Use connect_devices_without_check when: # - KL720 KDP2: connect_devices() often fails with Error 28 # - KL520 USB Boot: is_connectable=False, connect_devices() rejects it # In these cases, connect_devices_without_check() works and we can # still load firmware afterwards. use_without_check = (_device_chip == "KL720") or (not target_dev.is_connectable) max_retries = 3 last_err = None for attempt in range(max_retries): try: # Clear any stale device group from previous failed attempt. _clear_device_group() if use_without_check: _log(f"{_device_chip}: connect_devices_without_check(usb_port_id={target_dev.usb_port_id}, connectable={target_dev.is_connectable}) attempt {attempt+1}/{max_retries}...") _device_group = kp.core.connect_devices_without_check( usb_port_ids=[target_dev.usb_port_id] ) else: _log(f"connect_devices(usb_port_id={target_dev.usb_port_id}) attempt {attempt+1}/{max_retries}...") _device_group = kp.core.connect_devices( usb_port_ids=[target_dev.usb_port_id] ) _log(f"connect succeeded on attempt {attempt+1}") last_err = None break except Exception as conn_err: _clear_device_group() last_err = conn_err _log(f"connect attempt {attempt+1} failed: {conn_err}") if attempt < max_retries - 1: time.sleep(2) # Re-scan to refresh device handle try: descs = kp.core.scan_devices() for i in range(descs.device_descriptor_number): dev = descs.device_descriptor_list[i] if port and str(dev.usb_port_id) == port: target_dev = dev break elif not port: target_dev = descs.device_descriptor_list[0] break except Exception: pass if last_err is not None: hint = "" if sys.platform == "win32": hint = (" On Windows, ensure the WinUSB driver is installed for this device." " Re-run the installer or use Zadig (https://zadig.akeo.ie).") raise RuntimeError(f"Failed to connect after {max_retries} attempts: {last_err}.{hint}") # KL720 needs longer timeout for large NEF transfers (12MB+ over USB) _timeout_ms = 60000 if _device_chip == "KL720" else 10000 _log(f"Calling set_timeout(milliseconds={_timeout_ms})...") kp.core.set_timeout(device_group=_device_group, milliseconds=_timeout_ms) _log(f"set_timeout succeeded") # Firmware handling — chip-dependent if "Loader" in fw_str: # Device is in USB Boot (Loader) mode and needs firmware if _device_chip == "KL720": _log(f"WARNING: {_device_chip} is in Loader mode (unusual). Attempting firmware load...") scpu_path, ncpu_path = _resolve_firmware_paths(_device_chip) if scpu_path and ncpu_path: _log(f"{_device_chip}: Loading firmware: {scpu_path}") kp.core.load_firmware_from_file( _device_group, scpu_path, ncpu_path ) _firmware_loaded = True _log("Firmware loaded, waiting for reboot...") time.sleep(5) # Reconnect after firmware load (with retry) _clear_device_group() for retry in range(3): try: descs = kp.core.scan_devices() target_dev = descs.device_descriptor_list[0] try: _device_group = kp.core.connect_devices( usb_port_ids=[target_dev.usb_port_id] ) except Exception: _device_group = kp.core.connect_devices_without_check( usb_port_ids=[target_dev.usb_port_id] ) break except Exception as re_err: _log(f"Reconnect attempt {retry+1} failed: {re_err}") if retry < 2: time.sleep(3) if _device_group is None: return {"error": "Device not found after firmware load. Unplug and re-plug the device."} kp.core.set_timeout( device_group=_device_group, milliseconds=_timeout_ms ) fw_str = str(target_dev.firmware) _log(f"Reconnected after firmware load, firmware: {fw_str}") else: _log(f"WARNING: {_device_chip} firmware files not found, skipping firmware load") else: # Not in Loader mode — firmware already present _log(f"{_device_chip}: firmware already present (normal). fw={fw_str}") return { "status": "connected", "firmware": fw_str, "kn_number": f"0x{target_dev.kn_number:08X}", "chip": _device_chip, } except Exception as e: _clear_device_group() return {"error": str(e)} def handle_disconnect(params): """Disconnect from the current device.""" global _device_group, _model_id, _model_nef, _firmware_loaded global _model_type, _model_input_size, _device_chip _clear_device_group() _model_id = None _model_nef = None _model_type = "tiny_yolov3" _model_input_size = 224 _firmware_loaded = False _device_chip = "KL520" return {"status": "disconnected"} def handle_reset(params): """Reset the device back to USB Boot (Loader) state. This forces the device to drop its firmware and any loaded models. After reset the device will re-enumerate on USB, so the caller must wait and issue a fresh 'connect' command. """ global _device_group, _model_id, _model_nef, _firmware_loaded global _model_type, _model_input_size, _device_chip if _device_group is None: return {"error": "device not connected"} try: _log("Resetting device (kp.core.reset_device KP_RESET_REBOOT)...") kp.core.reset_device( device_group=_device_group, reset_mode=kp.ResetMode.KP_RESET_REBOOT, ) _log("Device reset command sent successfully") except Exception as e: _log(f"reset_device raised: {e}") # Even if it throws, the device usually does reset. # Clear all state — the device is gone until it re-enumerates. _clear_device_group() _model_id = None _model_nef = None _model_type = "tiny_yolov3" _model_input_size = 224 _firmware_loaded = False _device_chip = "KL520" return {"status": "reset"} def handle_load_model(params): """Load a model file onto the device. KL520 USB Boot mode limitation: only one model can be loaded per USB session. If error 40 occurs, the error is returned to the Go driver which handles it by restarting the entire Python bridge. """ global _model_id, _model_nef if _device_group is None: return {"error": "device not connected"} path = params.get("path", "") if not path or not os.path.exists(path): return {"error": f"model file not found: {path}"} try: _model_nef = kp.core.load_model_from_file( device_group=_device_group, file_path=path ) except Exception as e: return {"error": str(e)} try: model = _model_nef.models[0] _model_id = model.id # Detect model type and input size _detect_model_type(_model_id, path) _log(f"Model loaded: id={_model_id}, type={_model_type}, " f"input={_model_input_size}, target={_model_nef.target_chip}") return { "status": "loaded", "model_id": _model_id, "model_type": _model_type, "input_size": _model_input_size, "model_path": path, "target_chip": str(_model_nef.target_chip), } except Exception as e: return {"error": str(e)} def handle_inference(params): """Run inference on the provided image data.""" if _device_group is None: return {"error": "device not connected"} if _model_id is None: return {"error": "no model loaded"} image_b64 = params.get("image_base64", "") try: t0 = time.time() if image_b64: # Decode base64 image img_bytes = base64.b64decode(image_b64) if HAS_CV2: # Decode image with OpenCV img_array = np.frombuffer(img_bytes, dtype=np.uint8) img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) if img is None: return {"error": "failed to decode image"} h, w = img.shape[:2] # KL520 NPU requires input image dimensions >= model input size # and both width/height must be even numbers. min_dim = _model_input_size if w < min_dim or h < min_dim or w % 2 != 0 or h % 2 != 0: if w < min_dim or h < min_dim: scale = max(min_dim / w, min_dim / h) new_w = int(w * scale) new_h = int(h * scale) else: new_w, new_h = w, h new_w = (new_w + 1) & ~1 new_h = (new_h + 1) & ~1 img = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_LINEAR) _log(f"Inference image resized: {w}x{h} -> {new_w}x{new_h} (min_dim={min_dim})") # Convert BGR to BGR565 img_bgr565 = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2BGR565) else: img_bgr565 = np.frombuffer(img_bytes, dtype=np.uint8) else: return {"error": "no image data provided"} # Create inference config (original: pass numpy ndarray, SDK reads shape) inf_config = kp.GenericImageInferenceDescriptor( model_id=_model_id, inference_number=0, input_node_image_list=[ kp.GenericInputNodeImage( image=img_bgr565, image_format=kp.ImageFormat.KP_IMAGE_FORMAT_RGB565, ) ] ) # Send and receive _log(f"Inference: sending to NPU (model_type={_model_type}, input_size={_model_input_size})") kp.inference.generic_image_inference_send(_device_group, inf_config) result = kp.inference.generic_image_inference_receive(_device_group) _log(f"Inference: receive complete, parsing...") elapsed_ms = (time.time() - t0) * 1000 # Parse output based on model type detections = [] classifications = [] task_type = "detection" if _model_type == "resnet18": task_type = "classification" classifications = _parse_classification_output(result) elif _model_type == "ssd": detections = _parse_ssd_output(result, input_size=_model_input_size) elif _model_type == "fcos": detections = _parse_fcos_output(result, input_size=_model_input_size) elif _model_type == "yolov5s": detections = _parse_yolo_output( result, anchors=ANCHORS_YOLOV5S, input_size=_model_input_size, ) else: # Default: Tiny YOLOv3 detections = _parse_yolo_output( result, anchors=ANCHORS_TINY_YOLOV3, input_size=_model_input_size, ) _log(f"Inference: parse done, detections={len(detections)}, classifications={len(classifications)}, elapsed={elapsed_ms:.1f}ms") return { "taskType": task_type, "timestamp": int(time.time() * 1000), "latencyMs": round(elapsed_ms, 1), "detections": detections, "classifications": classifications, } except Exception as e: import traceback _log(f"Inference EXCEPTION: {type(e).__name__}: {e}\n{traceback.format_exc()}") return {"error": str(e)} # ── Main loop ──────────────────────────────────────────────────────── def main(): """Main loop: read JSON commands from stdin, write responses to stdout.""" # The Kneron C SDK may write ANSI-colored warnings directly to fd 1 # (stdout), which corrupts our JSON-RPC protocol. To prevent this we # dup the real stdout fd, then redirect fd 1 to stderr so any C-level # writes go to stderr. Our JSON responses use the duped fd. _real_stdout_fd = os.dup(1) # duplicate fd 1 os.dup2(2, 1) # fd 1 now points to stderr _real_stdout = os.fdopen(_real_stdout_fd, "w") sys.stdout = sys.stderr # Python-level redirect too def _respond(obj): """Write a JSON response to the real stdout (not stderr).""" _real_stdout.write(json.dumps(obj) + "\n") _real_stdout.flush() # Signal readiness _respond({"status": "ready"}) _log(f"Bridge started (kp={'yes' if HAS_KP else 'no'}, pyusb={'yes' if HAS_PYUSB else 'no'}, cv2={'yes' if HAS_CV2 else 'no'})") for line in sys.stdin: line = line.strip() if not line: continue try: cmd = json.loads(line) action = cmd.get("cmd", "") if action == "scan": result = handle_scan() elif action == "connect": result = handle_connect(cmd) elif action == "disconnect": result = handle_disconnect(cmd) elif action == "reset": result = handle_reset(cmd) elif action == "load_model": result = handle_load_model(cmd) elif action == "inference": result = handle_inference(cmd) else: result = {"error": f"unknown command: {action}"} _respond(result) except Exception as e: _respond({"error": str(e)}) def _cleanup(): """Explicitly disconnect and clear _device_group before Python GC runs. KneronPLUS SDK's DeviceGroup.__del__ calls kp_disconnect_devices on a native handle that may already be freed when the interpreter is shutting down, causing 'OSError: access violation reading 0x00...'. By doing a clean disconnect + setting the global to None here, __del__ becomes a no-op (None has no __del__). """ global _device_group if _device_group is not None: try: kp.core.disconnect_devices(_device_group) except Exception: pass _device_group = None if __name__ == "__main__": import atexit atexit.register(_cleanup) main() _cleanup() # also call synchronously in case atexit doesn't fire