#!/usr/bin/env python3 """Kneron Bridge - JSON-RPC over stdin/stdout This script acts as a bridge between the Go backend and the Kneron PLUS Python SDK. It reads JSON commands from stdin and writes JSON responses to stdout. Supports: - KL520 (USB Boot mode - firmware must be loaded each session) - KL720 (flash-based - firmware pre-installed, models freely reloadable) """ import sys import json import base64 import time import os import io import numpy as np def _preload_kneron_dylibs_macos(): """macOS 專用:用絕對路徑預先 dlopen wheel 內的 libusb + libkplus。 背景: - KneronPLUS wheel 把 libusb-1.0.0.dylib + libkplus.dylib 放在 kp/lib/。 - macOS dyld 在載入 libkplus 時會去找它的相依 libusb-1.0.0.dylib。 預設搜尋路徑(/usr/local/lib、/usr/lib)在 bundled Python 環境下通常 找不到(我們沒有 brew libusb),於是 `import kp` 就拋 OSError → HAS_KP=False → scan 回空陣列。 - macOS hardened runtime 會剝掉 DYLD_LIBRARY_PATH 等環境變數,所以 改從 Go 端注入 env 也不保險;最穩的做法是在 Python 這端用 ctypes 以絕對路徑先載入,後續 `import kp` 時 dyld 會重用已載入的映像。 Windows / Linux 不走這支 — 各自機制已在 Go 端處理(Windows 靠 PATH、 Linux 靠 wheel 自帶的 libusb.so.1.0.0 + LD_LIBRARY_PATH)。 """ if sys.platform != "darwin": return try: import ctypes import importlib.util spec = importlib.util.find_spec("kp") if spec is None or not spec.submodule_search_locations: return kp_dir = spec.submodule_search_locations[0] lib_dir = os.path.join(kp_dir, "lib") # 載入順序:先 libusb,再 libkplus(libkplus 相依 libusb) for name in ("libusb-1.0.0.dylib", "libkplus.dylib"): path = os.path.join(lib_dir, name) if os.path.isfile(path): try: ctypes.CDLL(path, mode=ctypes.RTLD_GLOBAL) except OSError: pass except Exception: pass _preload_kneron_dylibs_macos() try: import kp HAS_KP = True except (ImportError, AttributeError, Exception): HAS_KP = False try: import usb.core HAS_PYUSB = True except ImportError: HAS_PYUSB = False try: import cv2 HAS_CV2 = True except ImportError: HAS_CV2 = False # ── Global state ────────────────────────────────────────────────────── _device_group = None def _clear_device_group(): """Safely disconnect and clear the global _device_group. KneronPLUS SDK's DeviceGroup.__del__ calls kp_disconnect_devices on the native handle, but if the handle is already invalid (failed connect / stale state) it causes 'OSError: access violation'. By explicitly disconnecting before setting None, __del__ becomes a no-op on an already-disconnected handle. All errors are silenced — this is best-effort cleanup. """ global _device_group if _device_group is not None: try: kp.core.disconnect_devices(_device_group) except Exception: pass _device_group = None _model_id = None _model_nef = None _model_input_size = 224 # updated on model load _model_type = "tiny_yolov3" # updated on model load based on model_id / nef name _firmware_loaded = False _device_chip = "KL520" # updated on connect from product_id / device_type # COCO 80-class labels COCO_CLASSES = [ "person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck", "boat", "traffic light", "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove", "skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "couch", "potted plant", "bed", "dining table", "toilet", "tv", "laptop", "mouse", "remote", "keyboard", "cell phone", "microwave", "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors", "teddy bear", "hair drier", "toothbrush" ] # Anchor boxes per model type (each list entry = one output head) ANCHORS_TINY_YOLOV3 = [ [(81, 82), (135, 169), (344, 319)], # 7×7 head (large objects) [(10, 14), (23, 27), (37, 58)], # 14×14 head (small objects) ] # YOLOv5s anchors (Kneron model 20005, no-upsample variant for KL520) ANCHORS_YOLOV5S = [ [(116, 90), (156, 198), (373, 326)], # P5/32 (large) [(30, 61), (62, 45), (59, 119)], # P4/16 (medium) [(10, 13), (16, 30), (33, 23)], # P3/8 (small) ] CONF_THRESHOLD = 0.25 NMS_IOU_THRESHOLD = 0.45 # Known Kneron model IDs → (model_type, input_size) KNOWN_MODELS = { # Tiny YOLO v3 (default KL520 model) 0: ("tiny_yolov3", 224), # ResNet18 classification (model 20001) 20001: ("resnet18", 224), # FCOS DarkNet53s detection (model 20004) 20004: ("fcos", 512), # YOLOv5s no-upsample (model 20005) 20005: ("yolov5s", 640), } def _log(msg): """Write log messages to stderr (stdout is reserved for JSON-RPC).""" print(f"[kneron_bridge] {msg}", file=sys.stderr, flush=True) def _resolve_firmware_paths(chip="KL520"): """Resolve firmware paths relative to this script's directory. Returns (scpu_path, ncpu_path) tuple for backward compat with existing handle_connect() callers. Use _resolve_firmware_paths_full(chip) to get loader path additionally (only KL520 has fw_loader.bin in A 階段). """ base = os.path.dirname(os.path.abspath(__file__)) fw_dir = os.path.join(base, "firmware", chip) scpu = os.path.join(fw_dir, "fw_scpu.bin") ncpu = os.path.join(fw_dir, "fw_ncpu.bin") if os.path.exists(scpu) and os.path.exists(ncpu): return scpu, ncpu # Fallback: check KNERON_FW_DIR env var fw_dir = os.environ.get("KNERON_FW_DIR", "") if fw_dir: scpu = os.path.join(fw_dir, "fw_scpu.bin") ncpu = os.path.join(fw_dir, "fw_ncpu.bin") if os.path.exists(scpu) and os.path.exists(ncpu): return scpu, ncpu return None, None _FW_ALLOWED_CHIPS = ("KL520", "KL720") # A 階段範圍、Reviewer m1 雙重防護用 def _resolve_firmware_paths_full(chip="KL520"): """Resolve scpu / ncpu / loader paths. A 階段:只有 KL520 有 fw_loader.bin(用於 KDP1 legacy → KDP2 升級的 SDK loader stage)。KL720 不需要 loader(不走 SDK loader path、直接 ctypes 呼叫 kp_update_kdp_firmware_from_files 也不需要 loader 檔)。 Reviewer m1:對 chip 參數做雙重 allow-list 防護。chip 來自 JSON-RPC stdin、 雖然 caller (handle_firmware_upgrade) 已 enforce allow-list、但這裡再過一道 避免未來 caller 拓寬時破防。額外拒絕含 path separator / 父目錄 / 絕對路徑 的非法輸入、確保 os.path.join 絕不 traverse。 Returns: dict: {"scpu": , "ncpu": , "loader": , "version": } 若 scpu/ncpu 任一缺檔、scpu/ncpu 為 None。 """ # 雙重 allow-list 防護(caller 已過一次、這裡再過一次防 path traversal) if not isinstance(chip, str) or chip not in _FW_ALLOWED_CHIPS: return {"scpu": None, "ncpu": None, "loader": None, "version": None} # 額外字元防護(即使 _FW_ALLOWED_CHIPS 拓寬到不安全字串也擋) if "/" in chip or "\\" in chip or ".." in chip or os.path.isabs(chip): return {"scpu": None, "ncpu": None, "loader": None, "version": None} base = os.path.dirname(os.path.abspath(__file__)) fw_dir = os.path.join(base, "firmware", chip) scpu = os.path.join(fw_dir, "fw_scpu.bin") ncpu = os.path.join(fw_dir, "fw_ncpu.bin") loader = os.path.join(fw_dir, "fw_loader.bin") version_file = os.path.join(fw_dir, "VERSION") result = {"scpu": None, "ncpu": None, "loader": None, "version": None} if os.path.exists(scpu) and os.path.exists(ncpu): result["scpu"] = scpu result["ncpu"] = ncpu if os.path.exists(loader): result["loader"] = loader if os.path.exists(version_file): try: with open(version_file, "r", encoding="utf-8") as f: result["version"] = f.read().strip() except Exception: pass # Fallback: KNERON_FW_DIR env var if result["scpu"] is None or result["ncpu"] is None: env_dir = os.environ.get("KNERON_FW_DIR", "") if env_dir: scpu2 = os.path.join(env_dir, "fw_scpu.bin") ncpu2 = os.path.join(env_dir, "fw_ncpu.bin") if os.path.exists(scpu2) and os.path.exists(ncpu2): result["scpu"] = scpu2 result["ncpu"] = ncpu2 loader2 = os.path.join(env_dir, "fw_loader.bin") if os.path.exists(loader2): result["loader"] = loader2 return result def _detect_model_type(model_id, nef_path): """Detect model type and input size from model ID or .nef filename.""" global _model_type, _model_input_size # Check known model IDs if model_id in KNOWN_MODELS: _model_type, _model_input_size = KNOWN_MODELS[model_id] _log(f"Model type detected by ID {model_id}: {_model_type} ({_model_input_size}x{_model_input_size})") return # Fallback: try to infer from filename basename = os.path.basename(nef_path).lower() if nef_path else "" if "yolov5" in basename: _model_type = "yolov5s" # Try to parse input size from filename like w640h640 _model_input_size = _parse_size_from_name(basename, default=640) elif "fcos" in basename: _model_type = "fcos" _model_input_size = _parse_size_from_name(basename, default=512) elif "ssd" in basename: _model_type = "ssd" _model_input_size = _parse_size_from_name(basename, default=320) elif "resnet" in basename or "classification" in basename: _model_type = "resnet18" _model_input_size = _parse_size_from_name(basename, default=224) elif "tiny_yolo" in basename or "tinyyolo" in basename: _model_type = "tiny_yolov3" _model_input_size = _parse_size_from_name(basename, default=224) else: # Default: assume YOLO-like detection _model_type = "tiny_yolov3" _model_input_size = 224 _log(f"Model type detected by filename '{basename}': {_model_type} ({_model_input_size}x{_model_input_size})") def _parse_size_from_name(name, default=224): """Extract input size from filename like 'w640h640' or 'w512h512'.""" import re m = re.search(r'w(\d+)h(\d+)', name) if m: return int(m.group(1)) return default # ── Post-processing ────────────────────────────────────────────────── def _sigmoid(x): return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500))) def _nms(detections, iou_threshold=NMS_IOU_THRESHOLD): """Non-Maximum Suppression.""" detections.sort(key=lambda d: d["confidence"], reverse=True) keep = [] for d in detections: skip = False for k in keep: if d["class_id"] != k["class_id"]: continue x1 = max(d["bbox"]["x"], k["bbox"]["x"]) y1 = max(d["bbox"]["y"], k["bbox"]["y"]) x2 = min(d["bbox"]["x"] + d["bbox"]["width"], k["bbox"]["x"] + k["bbox"]["width"]) y2 = min(d["bbox"]["y"] + d["bbox"]["height"], k["bbox"]["y"] + k["bbox"]["height"]) inter = max(0, x2 - x1) * max(0, y2 - y1) a1 = d["bbox"]["width"] * d["bbox"]["height"] a2 = k["bbox"]["width"] * k["bbox"]["height"] if inter / (a1 + a2 - inter + 1e-6) > iou_threshold: skip = True break if not skip: keep.append(d) return keep def _get_preproc_info(result): """Extract letterbox padding info from the inference result. Kneron SDK applies letterbox resize (aspect-ratio-preserving + zero padding) before inference. The hw_pre_proc_info tells us how to reverse it. Returns (pad_left, pad_top, resize_w, resize_h, model_w, model_h) or None. """ try: info = result.header.hw_pre_proc_info_list[0] return { "pad_left": info.pad_left if hasattr(info, 'pad_left') else 0, "pad_top": info.pad_top if hasattr(info, 'pad_top') else 0, "resized_w": info.resized_img_width if hasattr(info, 'resized_img_width') else 0, "resized_h": info.resized_img_height if hasattr(info, 'resized_img_height') else 0, "model_w": info.model_input_width if hasattr(info, 'model_input_width') else 0, "model_h": info.model_input_height if hasattr(info, 'model_input_height') else 0, "img_w": info.img_width if hasattr(info, 'img_width') else 0, "img_h": info.img_height if hasattr(info, 'img_height') else 0, } except Exception: return None def _correct_bbox_for_letterbox(x, y, w, h, preproc, model_size): """Remove letterbox padding offset from normalized bbox coordinates. Input (x, y, w, h) is in model-input-space normalized to 0-1. Output is re-normalized to the original image aspect ratio (still 0-1). For KP_PADDING_CORNER (default): image is at top-left, padding at bottom/right. """ if preproc is None: return x, y, w, h model_w = preproc["model_w"] or model_size model_h = preproc["model_h"] or model_size pad_left = preproc["pad_left"] pad_top = preproc["pad_top"] resized_w = preproc["resized_w"] or model_w resized_h = preproc["resized_h"] or model_h # If no padding was applied, skip correction if pad_left == 0 and pad_top == 0 and resized_w == model_w and resized_h == model_h: return x, y, w, h # Convert from normalized (0-1 of model input) to pixel coords in model space px = x * model_w py = y * model_h pw = w * model_w ph = h * model_h # Subtract padding offset px -= pad_left py -= pad_top # Re-normalize to the resized (un-padded) image dimensions nx = px / resized_w ny = py / resized_h nw = pw / resized_w nh = ph / resized_h # Clip to 0-1 nx = max(0.0, min(1.0, nx)) ny = max(0.0, min(1.0, ny)) nw = min(1.0 - nx, nw) nh = min(1.0 - ny, nh) return nx, ny, nw, nh def _parse_yolo_output(result, anchors, input_size, num_classes=80): """Parse YOLO (v3/v5) raw output into detection results. Works for both Tiny YOLOv3 and YOLOv5 — the tensor layout is the same: (num_anchors * (5 + num_classes), grid_h, grid_w) The key differences are: - anchor values - input_size used for anchor normalization - number of output heads Bounding boxes are corrected for letterbox padding so coordinates are relative to the original image (normalized 0-1). """ detections = [] entry_size = 5 + num_classes # 85 for COCO 80 classes # Get letterbox padding info preproc = _get_preproc_info(result) if preproc: _log(f"Preproc info: pad=({preproc['pad_left']},{preproc['pad_top']}), " f"resized=({preproc['resized_w']}x{preproc['resized_h']}), " f"model=({preproc['model_w']}x{preproc['model_h']}), " f"img=({preproc['img_w']}x{preproc['img_h']})") for head_idx in range(result.header.num_output_node): output = kp.inference.generic_inference_retrieve_float_node( node_idx=head_idx, generic_raw_result=result, channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW ) arr = output.ndarray[0] # (C, H, W) channels, grid_h, grid_w = arr.shape # Determine number of anchors for this head num_anchors = channels // entry_size if num_anchors < 1: _log(f"Head {head_idx}: unexpected shape {arr.shape}, skipping") continue # Use the correct anchor set for this head if head_idx < len(anchors): head_anchors = anchors[head_idx] else: _log(f"Head {head_idx}: no anchors defined, skipping") continue for a_idx in range(min(num_anchors, len(head_anchors))): off = a_idx * entry_size for cy in range(grid_h): for cx in range(grid_w): obj_conf = _sigmoid(arr[off + 4, cy, cx]) if obj_conf < CONF_THRESHOLD: continue cls_scores = _sigmoid(arr[off + 5:off + entry_size, cy, cx]) cls_id = int(np.argmax(cls_scores)) cls_conf = float(cls_scores[cls_id]) conf = float(obj_conf * cls_conf) if conf < CONF_THRESHOLD: continue bx = (_sigmoid(arr[off, cy, cx]) + cx) / grid_w by = (_sigmoid(arr[off + 1, cy, cx]) + cy) / grid_h aw, ah = head_anchors[a_idx] bw = (np.exp(min(float(arr[off + 2, cy, cx]), 10)) * aw) / input_size bh = (np.exp(min(float(arr[off + 3, cy, cx]), 10)) * ah) / input_size # Convert center x,y,w,h to corner x,y,w,h (normalized to model input) x = max(0.0, bx - bw / 2) y = max(0.0, by - bh / 2) w = min(1.0, bx + bw / 2) - x h = min(1.0, by + bh / 2) - y # Correct for letterbox padding x, y, w, h = _correct_bbox_for_letterbox(x, y, w, h, preproc, input_size) label = COCO_CLASSES[cls_id] if cls_id < len(COCO_CLASSES) else f"class_{cls_id}" detections.append({ "label": label, "class_id": cls_id, "confidence": conf, "bbox": {"x": x, "y": y, "width": w, "height": h}, }) detections = _nms(detections) # Remove internal class_id before returning for d in detections: del d["class_id"] return detections def _parse_ssd_output(result, input_size=320, num_classes=2): """Parse SSD face detection output. SSD typically outputs two tensors: - locations: (num_boxes, 4) — bounding box coordinates - confidences: (num_boxes, num_classes) — class scores For the KL520 SSD face detection model (kl520_ssd_fd_lm.nef), the output contains face detections with landmarks. """ detections = [] preproc = _get_preproc_info(result) try: # Retrieve all output nodes num_outputs = result.header.num_output_node outputs = [] for i in range(num_outputs): output = kp.inference.generic_inference_retrieve_float_node( node_idx=i, generic_raw_result=result, channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW ) outputs.append(output.ndarray[0]) if num_outputs < 2: _log(f"SSD: expected >=2 output nodes, got {num_outputs}") return detections # Heuristic: the larger tensor is locations, smaller is confidences # Or: first output = locations, second = confidences locations = outputs[0] confidences = outputs[1] # Flatten if needed if locations.ndim > 2: locations = locations.reshape(-1, 4) if confidences.ndim > 2: confidences = confidences.reshape(-1, confidences.shape[-1]) num_boxes = min(locations.shape[0], confidences.shape[0]) for i in range(num_boxes): # SSD confidence: class 0 = background, class 1 = face if confidences.shape[-1] > 1: conf = float(confidences[i, 1]) # face class else: conf = float(_sigmoid(confidences[i, 0])) if conf < CONF_THRESHOLD: continue # SSD outputs are typically [x_min, y_min, x_max, y_max] normalized x_min = float(np.clip(locations[i, 0], 0.0, 1.0)) y_min = float(np.clip(locations[i, 1], 0.0, 1.0)) x_max = float(np.clip(locations[i, 2], 0.0, 1.0)) y_max = float(np.clip(locations[i, 3], 0.0, 1.0)) w = x_max - x_min h = y_max - y_min if w <= 0 or h <= 0: continue # Correct for letterbox padding x_min, y_min, w, h = _correct_bbox_for_letterbox( x_min, y_min, w, h, preproc, input_size) detections.append({ "label": "face", "class_id": 0, "confidence": conf, "bbox": {"x": x_min, "y": y_min, "width": w, "height": h}, }) detections = _nms(detections) for d in detections: del d["class_id"] except Exception as e: _log(f"SSD parse error: {e}") return detections def _parse_fcos_output(result, input_size=512, num_classes=80): """Parse FCOS (Fully Convolutional One-Stage) detection output. FCOS outputs per feature level: - classification: (num_classes, H, W) - centerness: (1, H, W) - regression: (4, H, W) — distances from each pixel to box edges (l, t, r, b) The outputs come in groups of 3 per feature level. """ detections = [] preproc = _get_preproc_info(result) try: num_outputs = result.header.num_output_node outputs = [] for i in range(num_outputs): output = kp.inference.generic_inference_retrieve_float_node( node_idx=i, generic_raw_result=result, channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW ) outputs.append(output.ndarray[0]) # FCOS typically has 5 feature levels × 3 outputs = 15 output nodes # Or fewer for simplified models. Group by 3: (cls, centerness, reg) # If we can't determine the grouping, try a simpler approach. strides = [8, 16, 32, 64, 128] num_levels = num_outputs // 3 for level in range(num_levels): cls_out = outputs[level * 3] # (num_classes, H, W) cnt_out = outputs[level * 3 + 1] # (1, H, W) reg_out = outputs[level * 3 + 2] # (4, H, W) stride = strides[level] if level < len(strides) else (8 * (2 ** level)) h, w = cls_out.shape[1], cls_out.shape[2] for cy in range(h): for cx in range(w): cls_scores = _sigmoid(cls_out[:, cy, cx]) cls_id = int(np.argmax(cls_scores)) cls_conf = float(cls_scores[cls_id]) centerness = float(_sigmoid(cnt_out[0, cy, cx])) conf = cls_conf * centerness if conf < CONF_THRESHOLD: continue # Regression: distances from pixel center to box edges px = (cx + 0.5) * stride py = (cy + 0.5) * stride l = float(np.exp(min(reg_out[0, cy, cx], 10))) * stride t = float(np.exp(min(reg_out[1, cy, cx], 10))) * stride r = float(np.exp(min(reg_out[2, cy, cx], 10))) * stride b = float(np.exp(min(reg_out[3, cy, cx], 10))) * stride x_min = max(0.0, (px - l) / input_size) y_min = max(0.0, (py - t) / input_size) x_max = min(1.0, (px + r) / input_size) y_max = min(1.0, (py + b) / input_size) bw = x_max - x_min bh = y_max - y_min if bw <= 0 or bh <= 0: continue # Correct for letterbox padding x_min, y_min, bw, bh = _correct_bbox_for_letterbox( x_min, y_min, bw, bh, preproc, input_size) label = COCO_CLASSES[cls_id] if cls_id < len(COCO_CLASSES) else f"class_{cls_id}" detections.append({ "label": label, "class_id": cls_id, "confidence": conf, "bbox": {"x": x_min, "y": y_min, "width": bw, "height": bh}, }) detections = _nms(detections) for d in detections: del d["class_id"] except Exception as e: _log(f"FCOS parse error: {e}") return detections def _parse_classification_output(result, num_classes=1000): """Parse classification model output (e.g., ResNet18 ImageNet).""" try: output = kp.inference.generic_inference_retrieve_float_node( node_idx=0, generic_raw_result=result, channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW ) scores = output.ndarray.flatten() # Apply softmax exp_scores = np.exp(scores - np.max(scores)) probs = exp_scores / exp_scores.sum() # Top-5 top_indices = np.argsort(probs)[::-1][:5] classifications = [] for idx in top_indices: label = COCO_CLASSES[idx] if idx < len(COCO_CLASSES) else f"class_{idx}" classifications.append({ "label": label, "confidence": float(probs[idx]), }) return classifications except Exception as e: _log(f"Classification parse error: {e}") return [] # ── Command handlers ───────────────────────────────────────────────── def handle_scan(): """Scan for connected Kneron devices. Tries Kneron PLUS SDK first (provides firmware info, kn_number, etc.). Falls back to pyusb if the SDK is unavailable (e.g. macOS missing .dylib). """ if HAS_KP: try: descs = kp.core.scan_devices() devices = [] for i in range(descs.device_descriptor_number): dev = descs.device_descriptor_list[i] devices.append({ "port": str(dev.usb_port_id), "firmware": str(dev.firmware), "kn_number": f"0x{dev.kn_number:08X}", "product_id": f"0x{dev.product_id:04X}", "connectable": dev.is_connectable, }) return {"devices": devices} except Exception as e: _log(f"kp.core.scan_devices failed: {e}, trying pyusb fallback") # Fallback: use pyusb (same approach as kneron_detect.py) if HAS_PYUSB: return _scan_with_pyusb() return {"devices": [], "error_detail": "neither kp nor pyusb available"} # Known Kneron product IDs (same as kneron_detect.py) _KNERON_VENDOR_ID = 0x3231 _KNOWN_PRODUCTS = { 0x0100: "KL520", 0x0200: "KL720", 0x0720: "KL720", 0x0530: "KL530", 0x0630: "KL630", 0x0730: "KL730", } def _scan_with_pyusb(): """Scan for Kneron devices using pyusb (libusb backend).""" try: usb_devices = list(usb.core.find(find_all=True, idVendor=_KNERON_VENDOR_ID)) devices = [] for dev in usb_devices: product_id = f"0x{dev.idProduct:04X}" chip = _KNOWN_PRODUCTS.get(dev.idProduct, f"Unknown-{product_id}") # pyusb port_id: bus-address port = f"{dev.bus}-{dev.address}" firmware = "unknown" try: firmware = dev.product or "unknown" except Exception: pass devices.append({ "port": port, "firmware": firmware, "kn_number": "0x00000000", "product_id": product_id, "connectable": True, }) return {"devices": devices} except Exception as e: return {"devices": [], "error_detail": f"pyusb scan failed: {e}"} def handle_connect(params): """Connect to a Kneron device and load firmware if needed. KL520: USB Boot mode — firmware MUST be uploaded every session. KL720 (KDP2, pid=0x0720): Flash-based — firmware pre-installed. KL720 (KDP legacy, pid=0x0200): Old firmware — needs connect_without_check + firmware load to RAM before normal operation. """ global _device_group, _firmware_loaded, _device_chip if not HAS_KP: return {"error": "kp module not available"} try: port = params.get("port", "") device_type = params.get("device_type", "") # Scan to find device descs = kp.core.scan_devices() if descs.device_descriptor_number == 0: return {"error": "no Kneron device found"} # Find device by port or use first one target_dev = None for i in range(descs.device_descriptor_number): dev = descs.device_descriptor_list[i] if port and str(dev.usb_port_id) == port: target_dev = dev break if target_dev is None: target_dev = descs.device_descriptor_list[0] # Note: KL520 in USB Boot mode has is_connectable=False, which is # normal — it becomes connectable after firmware is loaded. KL720 KDP # legacy (pid=0x0200) is also not connectable until firmware load. # So we do NOT reject is_connectable=False here; instead we attempt # connection and firmware load as appropriate. # Determine chip type from device_type param or product_id pid = target_dev.product_id if "kl720" in device_type.lower(): _device_chip = "KL720" elif "kl520" in device_type.lower(): _device_chip = "KL520" elif pid in (0x0200, 0x0720): _device_chip = "KL720" else: _device_chip = "KL520" fw_str = str(target_dev.firmware) is_kdp_legacy = (_device_chip == "KL720" and pid == 0x0200) _log(f"Chip type: {_device_chip} (product_id=0x{pid:04X}, device_type={device_type}, fw={fw_str})") # ── KL720 KDP Legacy (pid=0x0200): old firmware, incompatible with SDK ── if is_kdp_legacy: _log(f"KL720 has legacy KDP firmware (pid=0x0200). Using connect_devices_without_check...") _device_group = kp.core.connect_devices_without_check( usb_port_ids=[target_dev.usb_port_id] ) kp.core.set_timeout(device_group=_device_group, milliseconds=60000) # Load KDP2 firmware to RAM so the device can operate with this SDK scpu_path, ncpu_path = _resolve_firmware_paths("KL720") if scpu_path and ncpu_path: _log(f"KL720: Loading KDP2 firmware to RAM: {scpu_path}") kp.core.load_firmware_from_file( _device_group, scpu_path, ncpu_path ) _firmware_loaded = True _log("KL720: Firmware loaded to RAM, waiting for reboot...") time.sleep(5) # Reconnect — device should now be running KDP2 in RAM descs = kp.core.scan_devices() reconnected = False for i in range(descs.device_descriptor_number): dev = descs.device_descriptor_list[i] if dev.product_id in (0x0200, 0x0720): target_dev = dev reconnected = True break if not reconnected: return {"error": "KL720 not found after firmware load. Unplug and re-plug."} # Try normal connect first, fallback to without_check try: _device_group = kp.core.connect_devices( usb_port_ids=[target_dev.usb_port_id] ) except Exception as conn_err: _log(f"KL720: Normal reconnect failed ({conn_err}), using without_check...") _device_group = kp.core.connect_devices_without_check( usb_port_ids=[target_dev.usb_port_id] ) kp.core.set_timeout(device_group=_device_group, milliseconds=10000) fw_str = str(target_dev.firmware) _log(f"KL720: Reconnected after firmware load, pid=0x{target_dev.product_id:04X}, fw={fw_str}") else: _log("WARNING: KL720 firmware files not found. Cannot operate with KDP legacy device.") _clear_device_group() return {"error": "KL720 has legacy KDP firmware but KDP2 firmware files not found. " "Run update_kl720_firmware.py to flash KDP2 permanently."} return { "status": "connected", "firmware": fw_str, "kn_number": f"0x{target_dev.kn_number:08X}", "chip": _device_chip, "kdp_legacy": True, } # ── Normal connection (KL520 or KL720 KDP2) ── # Use connect_devices_without_check when: # - KL720 KDP2: connect_devices() often fails with Error 28 # - KL520 USB Boot: is_connectable=False, connect_devices() rejects it # In these cases, connect_devices_without_check() works and we can # still load firmware afterwards. use_without_check = (_device_chip == "KL720") or (not target_dev.is_connectable) max_retries = 3 last_err = None for attempt in range(max_retries): try: # Clear any stale device group from previous failed attempt. _clear_device_group() if use_without_check: _log(f"{_device_chip}: connect_devices_without_check(usb_port_id={target_dev.usb_port_id}, connectable={target_dev.is_connectable}) attempt {attempt+1}/{max_retries}...") _device_group = kp.core.connect_devices_without_check( usb_port_ids=[target_dev.usb_port_id] ) else: _log(f"connect_devices(usb_port_id={target_dev.usb_port_id}) attempt {attempt+1}/{max_retries}...") _device_group = kp.core.connect_devices( usb_port_ids=[target_dev.usb_port_id] ) _log(f"connect succeeded on attempt {attempt+1}") last_err = None break except Exception as conn_err: _clear_device_group() last_err = conn_err _log(f"connect attempt {attempt+1} failed: {conn_err}") if attempt < max_retries - 1: time.sleep(2) # Re-scan to refresh device handle try: descs = kp.core.scan_devices() for i in range(descs.device_descriptor_number): dev = descs.device_descriptor_list[i] if port and str(dev.usb_port_id) == port: target_dev = dev break elif not port: target_dev = descs.device_descriptor_list[0] break except Exception: pass if last_err is not None: hint = "" if sys.platform == "win32": hint = (" On Windows, ensure the WinUSB driver is installed for this device." " Re-run the installer or use Zadig (https://zadig.akeo.ie).") raise RuntimeError(f"Failed to connect after {max_retries} attempts: {last_err}.{hint}") # KL720 needs longer timeout for large NEF transfers (12MB+ over USB) _timeout_ms = 60000 if _device_chip == "KL720" else 10000 _log(f"Calling set_timeout(milliseconds={_timeout_ms})...") kp.core.set_timeout(device_group=_device_group, milliseconds=_timeout_ms) _log(f"set_timeout succeeded") # Firmware handling — chip-dependent. # fresh_firmware_loaded is used by Go driver to decide whether to # skip the post-connect reset (freshly loaded firmware is already # in a clean state — reset would just waste 30-60s reloading it). fresh_firmware_loaded = False if "Loader" in fw_str: # Device is in USB Boot (Loader) mode and needs firmware if _device_chip == "KL720": _log(f"WARNING: {_device_chip} is in Loader mode (unusual). Attempting firmware load...") scpu_path, ncpu_path = _resolve_firmware_paths(_device_chip) if scpu_path and ncpu_path: _log(f"{_device_chip}: Loading firmware: {scpu_path}") kp.core.load_firmware_from_file( _device_group, scpu_path, ncpu_path ) _firmware_loaded = True _log("Firmware loaded, waiting for reboot...") time.sleep(5) # Reconnect after firmware load (with retry) _clear_device_group() for retry in range(3): try: descs = kp.core.scan_devices() target_dev = descs.device_descriptor_list[0] try: _device_group = kp.core.connect_devices( usb_port_ids=[target_dev.usb_port_id] ) except Exception: _device_group = kp.core.connect_devices_without_check( usb_port_ids=[target_dev.usb_port_id] ) break except Exception as re_err: _log(f"Reconnect attempt {retry+1} failed: {re_err}") if retry < 2: time.sleep(3) if _device_group is None: return {"error": "Device not found after firmware load. Unplug and re-plug the device."} kp.core.set_timeout( device_group=_device_group, milliseconds=_timeout_ms ) fw_str = str(target_dev.firmware) fresh_firmware_loaded = True _log(f"Reconnected after firmware load, firmware: {fw_str}") else: _log(f"WARNING: {_device_chip} firmware files not found, skipping firmware load") else: # Not in Loader mode — firmware already present from a previous # session. This is the state that triggers Error 15 on inference # without reset, per observed bug. _log(f"{_device_chip}: firmware already present (normal). fw={fw_str}") return { "status": "connected", "firmware": fw_str, "kn_number": f"0x{target_dev.kn_number:08X}", "chip": _device_chip, "fresh_firmware_loaded": fresh_firmware_loaded, } except Exception as e: _clear_device_group() return {"error": str(e)} def handle_disconnect(params): """Disconnect from the current device.""" global _device_group, _model_id, _model_nef, _firmware_loaded global _model_type, _model_input_size, _device_chip _clear_device_group() _model_id = None _model_nef = None _model_type = "tiny_yolov3" _model_input_size = 224 _firmware_loaded = False _device_chip = "KL520" return {"status": "disconnected"} def handle_reset(params): """Reset the device back to USB Boot (Loader) state. This forces the device to drop its firmware and any loaded models. After reset the device will re-enumerate on USB, so the caller must wait and issue a fresh 'connect' command. """ global _device_group, _model_id, _model_nef, _firmware_loaded global _model_type, _model_input_size, _device_chip if _device_group is None: return {"error": "device not connected"} try: _log("Resetting device (kp.core.reset_device KP_RESET_REBOOT)...") kp.core.reset_device( device_group=_device_group, reset_mode=kp.ResetMode.KP_RESET_REBOOT, ) _log("Device reset command sent successfully") except Exception as e: _log(f"reset_device raised: {e}") # Even if it throws, the device usually does reset. # Clear all state — the device is gone until it re-enumerates. _clear_device_group() _model_id = None _model_nef = None _model_type = "tiny_yolov3" _model_input_size = 224 _firmware_loaded = False _device_chip = "KL520" return {"status": "reset"} def handle_load_model(params): """Load a model file onto the device. KL520 USB Boot mode limitation: only one model can be loaded per USB session. If error 40 occurs, the error is returned to the Go driver which handles it by restarting the entire Python bridge. """ global _model_id, _model_nef if _device_group is None: return {"error": "device not connected"} path = params.get("path", "") if not path or not os.path.exists(path): return {"error": f"model file not found: {path}"} try: _model_nef = kp.core.load_model_from_file( device_group=_device_group, file_path=path ) except Exception as e: return {"error": str(e)} try: model = _model_nef.models[0] _model_id = model.id # Detect model type and input size _detect_model_type(_model_id, path) _log(f"Model loaded: id={_model_id}, type={_model_type}, " f"input={_model_input_size}, target={_model_nef.target_chip}") return { "status": "loaded", "model_id": _model_id, "model_type": _model_type, "input_size": _model_input_size, "model_path": path, "target_chip": str(_model_nef.target_chip), } except Exception as e: return {"error": str(e)} def handle_inference(params): """Run inference on the provided image data.""" if _device_group is None: return {"error": "device not connected"} if _model_id is None: return {"error": "no model loaded"} image_b64 = params.get("image_base64", "") try: t0 = time.time() if image_b64: # Decode base64 image img_bytes = base64.b64decode(image_b64) if HAS_CV2: # Decode image with OpenCV img_array = np.frombuffer(img_bytes, dtype=np.uint8) img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) if img is None: return {"error": "failed to decode image"} h, w = img.shape[:2] # KL520 NPU requires input image dimensions >= model input size # and both width/height must be even numbers. min_dim = _model_input_size if w < min_dim or h < min_dim or w % 2 != 0 or h % 2 != 0: if w < min_dim or h < min_dim: scale = max(min_dim / w, min_dim / h) new_w = int(w * scale) new_h = int(h * scale) else: new_w, new_h = w, h new_w = (new_w + 1) & ~1 new_h = (new_h + 1) & ~1 img = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_LINEAR) _log(f"Inference image resized: {w}x{h} -> {new_w}x{new_h} (min_dim={min_dim})") # Convert BGR to BGR565 img_bgr565 = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2BGR565) else: img_bgr565 = np.frombuffer(img_bytes, dtype=np.uint8) else: return {"error": "no image data provided"} # Create inference config (original: pass numpy ndarray, SDK reads shape) inf_config = kp.GenericImageInferenceDescriptor( model_id=_model_id, inference_number=0, input_node_image_list=[ kp.GenericInputNodeImage( image=img_bgr565, image_format=kp.ImageFormat.KP_IMAGE_FORMAT_RGB565, ) ] ) # Send and receive _log(f"Inference: sending to NPU (model_type={_model_type}, input_size={_model_input_size})") kp.inference.generic_image_inference_send(_device_group, inf_config) result = kp.inference.generic_image_inference_receive(_device_group) _log(f"Inference: receive complete, parsing...") elapsed_ms = (time.time() - t0) * 1000 # Parse output based on model type detections = [] classifications = [] task_type = "detection" if _model_type == "resnet18": task_type = "classification" classifications = _parse_classification_output(result) elif _model_type == "ssd": detections = _parse_ssd_output(result, input_size=_model_input_size) elif _model_type == "fcos": detections = _parse_fcos_output(result, input_size=_model_input_size) elif _model_type == "yolov5s": detections = _parse_yolo_output( result, anchors=ANCHORS_YOLOV5S, input_size=_model_input_size, ) else: # Default: Tiny YOLOv3 detections = _parse_yolo_output( result, anchors=ANCHORS_TINY_YOLOV3, input_size=_model_input_size, ) _log(f"Inference: parse done, detections={len(detections)}, classifications={len(classifications)}, elapsed={elapsed_ms:.1f}ms") return { "taskType": task_type, "timestamp": int(time.time() * 1000), "latencyMs": round(elapsed_ms, 1), "detections": detections, "classifications": classifications, } except Exception as e: import traceback _log(f"Inference EXCEPTION: {type(e).__name__}: {e}\n{traceback.format_exc()}") return {"error": str(e)} # ── Firmware upgrade (A 階段 M9-1) ─────────────────────────────────── # # 對應 TDD v2/firmware-management.md §5.1 / §6.1: # - 自動升級 KDP1 legacy → KDP2,含 KL520(USB Boot mode + loader stage) # 與 KL720(含 KDP legacy pid=0x0200)。 # - Stage 命名採 Design:preparing / loading / flashing / verifying / done / error # (TDD §4.3 為 source of truth)。 # - 失敗 reason enum(TDD §3.4):scan_not_found / connect_failed / # loader_write_failed / upgrade_mid_failed / disconnect_during_op / # timeout / verify_mismatch / verify_not_found。 # # 為什麼走 ctypes:KneronPLUS Python wrapper 沒 export # `kp_update_kdp_firmware_from_files`(見 research-kl520-fw-management/ # 56-m9-6-strong-validation-result.md 附帶發現 1),warrenchen reference # 實作 `LocalAPI/legacy_plus121_runner.py` 直接 ctypes 打 C symbol,本檔 # 沿用該模式。 KDP_MAGIC_CONNECTION_PASS = 536173391 # 與 warrenchen reference 一致 KP_SUCCESS = 0 USB_WAIT_AFTER_REBOOT_MS = 2000 # SDK loader 階段 reboot 等待 USB_WAIT_AFTER_UPGRADE_MS = 5000 # AC-FW-1.6:升級後 5-8s USB stable USB_WAIT_RETRY_CONNECT_MS = 200 MAX_RECONNECT_RETRIES = 15 # 5s sleep + 15 * 200ms = 8s 上界 KL520_UPGRADE_TIMEOUT_S = 60 # AC-FW-1.7 KL720_UPGRADE_TIMEOUT_S = 200 # AC-FW-1.7 # 進度事件 stage % 對照(TDD §4.3) _FW_STAGE_PERCENT = { "preparing": 5, "loading": 20, "flashing": 50, "verifying": 90, "done": 100, "error": -1, } # 升級進行中旗標(SIGTERM handler 用、AC-FW-1.9 graceful shutdown 拒絕) # Reviewer m4:原本還有 _firmware_upgrade_start_ts 全域變數、與 SIGTERM handler # closure capture 的 start_ts 重複、容易未來 desync → 砍掉、單一 source of truth # 走 closure。 _firmware_upgrade_in_progress = False def _fw_normalize_code(code): """Convert int8-like unsigned (e.g. 253 for -3) to signed. 與 warrenchen reference 一致:某些 legacy 路徑回 unsigned int8 值。 """ try: c = int(code) except Exception: return code if c > 127: return c - 256 return c def _fw_emit_progress(stage, message="", elapsed_ms=0, eta_ms=0, extra=None): """Push a progress event to stderr as a JSON-RPC notification line. Go driver 抓 stderr line-by-line、轉成 WebSocket FirmwareProgress 給前端。 Schema 對齊 TDD §4.2 `FirmwareProgress`: {"event": "firmware_progress", "percent": int, "stage": str, "message": str, "elapsed_ms": int, "eta_ms": int, ...} Stage `error` 時 caller 應 push 額外 reason / raw_error / before_version 透過 extra dict。 """ payload = { "event": "firmware_progress", "percent": _FW_STAGE_PERCENT.get(stage, 0), "stage": stage, "message": message, "elapsed_ms": int(elapsed_ms), "eta_ms": int(eta_ms), } if extra: payload.update(extra) try: # 寫到 stderr、與既有 _log() 同 fd、但用 JSON 格式(不加 [kneron_bridge] prefix) # 方便 Go driver 區分「progress event JSON」vs「自由文字 log」。 print(json.dumps(payload), file=sys.stderr, flush=True) except Exception: # progress emit 失敗不該影響升級流程本身 pass def _fw_load_libkplus(): """Load libkplus shared library via ctypes、bind needed C symbol signatures. 跨平台:macOS .dylib / Linux .so / Windows .dll。優先用 `kp` module 已載 入的 lib path(避免重複載入造成 mismatch),fallback 到 wheel 內 lib/ 目錄。 Raises: RuntimeError: 若 libkplus 找不到或符號 binding 失敗。 """ import ctypes import importlib.util spec = importlib.util.find_spec("kp") if spec is None or not spec.submodule_search_locations: raise RuntimeError("kp module spec not found") kp_dir = spec.submodule_search_locations[0] lib_dir = os.path.join(kp_dir, "lib") # 平台對應的 lib filename if sys.platform == "darwin": lib_name = "libkplus.dylib" elif sys.platform == "win32": lib_name = "libkplus.dll" else: lib_name = "libkplus.so" lib_path = os.path.join(lib_dir, lib_name) if not os.path.isfile(lib_path): # Windows 可能用其他命名(warrenchen reference 是 libkplus.dll) # 嘗試找任何 libkplus* 檔案 # Reviewer m2:sort() 確保 deterministic 順序、不依賴 os.listdir 回傳次序 candidates = sorted( f for f in os.listdir(lib_dir) if f.startswith("libkplus") ) if not candidates: raise RuntimeError(f"libkplus not found in {lib_dir}") lib_path = os.path.join(lib_dir, candidates[0]) _log(f"WARNING: libkplus fallback using {candidates[0]} (primary {lib_name} not found)") # Windows: add_dll_directory 確保相依 dll 可解析 if sys.platform == "win32" and hasattr(os, "add_dll_directory"): try: os.add_dll_directory(lib_dir) except Exception: pass lib = ctypes.CDLL(lib_path) # Bind C symbol signatures(與 warrenchen reference 完全一致) lib.kp_connect_devices.argtypes = [ ctypes.c_int, # num_devices ctypes.POINTER(ctypes.c_int), # usb_port_ids ctypes.POINTER(ctypes.c_int), # status_out ] lib.kp_connect_devices.restype = ctypes.c_void_p # device_group handle lib.kp_set_timeout.argtypes = [ctypes.c_void_p, ctypes.c_int] lib.kp_set_timeout.restype = None lib.kp_load_firmware_from_file.argtypes = [ ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p ] lib.kp_load_firmware_from_file.restype = ctypes.c_int lib.kp_update_kdp_firmware_from_files.argtypes = [ ctypes.c_void_p, # device_group ctypes.c_char_p, # scpu_or_loader path ctypes.c_char_p, # ncpu path or NULL ctypes.c_bool, # auto_reboot ] lib.kp_update_kdp_firmware_from_files.restype = ctypes.c_int lib.kp_disconnect_devices.argtypes = [ctypes.c_void_p] lib.kp_disconnect_devices.restype = ctypes.c_int if hasattr(lib, "kp_error_string"): lib.kp_error_string.argtypes = [ctypes.c_int] lib.kp_error_string.restype = ctypes.c_char_p return lib def _fw_errstr(lib, code): """Decode kp error code → string via kp_error_string()。 與 warrenchen 一致:先試 raw code、若無回應再試 signed normalize 後值。 """ signed = _fw_normalize_code(code) if hasattr(lib, "kp_error_string"): try: msg = lib.kp_error_string(int(code)) if not msg and signed != code: msg = lib.kp_error_string(int(signed)) if msg: return msg.decode("utf-8", errors="replace") except Exception: pass return f"code={code}" def _fw_connect_with_magic(lib, port_id): """Connect with magic pass = 536173391 (允許 KDP1 legacy device 連線)。 Returns: device_group handle (c_void_p int). Raises: RuntimeError("connect_failed: ...") on failure. """ import ctypes port_ids = (ctypes.c_int * 1)(int(port_id)) status = ctypes.c_int(KDP_MAGIC_CONNECTION_PASS) dg = lib.kp_connect_devices(1, port_ids, ctypes.byref(status)) if not dg or status.value != KP_SUCCESS: signed = _fw_normalize_code(status.value) raise RuntimeError( f"connect_failed: raw_code={status.value}, signed={signed}, " f"msg={_fw_errstr(lib, status.value)}" ) return dg def _fw_scan_target(port): """Scan devices via kp.core.scan_devices() and find target by usb_port_id. Returns: descriptor or None. """ try: descs = kp.core.scan_devices() except Exception as e: _log(f"fw_scan_target: scan_devices failed: {e}") return None if descs.device_descriptor_number == 0: return None for i in range(descs.device_descriptor_number): dev = descs.device_descriptor_list[i] if port and str(dev.usb_port_id) == str(port): return dev return None def _fw_rescan_and_wait(port, max_wait_s=8.0, initial_sleep_s=5.0): """等 USB re-enumerate stable → rescan 找回 target by port (AC-FW-1.6)。 Args: port: 原 usb_port_id(升級後 re-enumerate 通常保留同 port)。 max_wait_s: 從 initial_sleep_s 過後再加 max_wait_s - initial_sleep_s 秒輪詢上界。實測 5 秒已穩、保留上界 8 秒(AC-FW-1.6)。 initial_sleep_s: 第一次 rescan 前固定等的秒數。 Returns: (descriptor or None, total_wait_s). """ time.sleep(initial_sleep_s) waited = initial_sleep_s target = _fw_scan_target(port) if target is not None: return target, waited # 多輪 short-poll poll_step = 0.5 while waited < max_wait_s: time.sleep(poll_step) waited += poll_step target = _fw_scan_target(port) if target is not None: return target, waited return None, waited def _fw_classify_legacy(firmware_str, product_id): """判斷 device 是否為 KDP1 legacy state(需走 loader stage)。 KL520 legacy 訊號:firmware 字串為 "KDP"、"KDP1"、"KDP1.x"、"USB Boot"、 "USB Boot Loader"、"LOADER" 等 legacy state、或空字串 (某些 USB Boot state 不回 firmware string)。 KL720 legacy 訊號:product_id == 0x0200 (KP_DEVICE_KL720_LEGACY)。 Reviewer M3 + s3:原本只用 substring match `"KDP" in fw and "KDP2" not in fw` 對 KDP3(未來 firmware)會誤判 legacy → 改用顯式 prefix 比對表 + 已知字串 enumeration、確保覆蓋 KDP1 各種 firmware 字串變體、forward-compat KDP3+。 Returns True if needs SDK loader stage、False if can short-circuit to flashing. """ if product_id == 0x0200: return True # KL720 KDP1 legacy(pid 明示、不靠 firmware 字串) fw = (firmware_str or "").strip().upper() # 已知 KDP1 legacy firmware 字串完整列舉(明示比對、不靠 substring) legacy_exact = { "", # 某些 USB Boot state 不回 firmware string "KDP", "KDP1", "USB BOOT", "USB BOOT LOADER", "LOADER", "BOOTLOADER", } if fw in legacy_exact: return True # KDP1.x(KDP1.0 / KDP1.5 等版本字串) if fw.startswith("KDP1.") or fw.startswith("KDP1 "): return True # 明示放行 KDP2 / KDP3+(forward-compat、避免 substring match 對未來 firmware 誤判) # KDP2.x / KDP3.x / KDP4.x ... 皆為 modern firmware、不需走 loader for prefix in ("KDP2", "KDP3", "KDP4", "KDP5", "KDP6", "KDP7", "KDP8", "KDP9"): if fw.startswith(prefix): return False # 未知 firmware 字串:保守 default = 不走 loader(避免誤觸 loader stage brick device) # 例:未來 firmware 用全新命名("NEF"、"K3"、等)→ 假設是 modern firmware # 若這判斷錯了、verify 階段會 detect verify_mismatch、不致 brick return False def _fw_eta_ms(chip, current_stage): """估算剩餘 ms(給前端顯示 ~X 秒、非精確)。 依 TDD §4.2:UI 顯示「~X 秒 remaining」、精度低可接受。 """ # 各 stage 預估完成時刻(以升級開始為 0): if chip == "KL520": total_ms = 30000 # AC-FW-1.7 預估 30s cum = {"preparing": 2000, "loading": 8000, "flashing": 22000, "verifying": 28000} else: # KL720 total_ms = 180000 # AC-FW-1.7 預估 180s cum = {"preparing": 5000, "loading": 30000, "flashing": 160000, "verifying": 175000} done_at = cum.get(current_stage, total_ms) return max(0, total_ms - done_at) # ── Firmware upgrade exceptions + failure handler ──────────────────── # # Reviewer M1:原本 _FwError / _FwTimeoutError / _fw_handle_failure 宣告位於 # handle_firmware_upgrade **之後**(語法上 Python module load 時會先掃完整個檔 # 才走 handler、所以 happy-path 不會炸 NameError、但 readability 差、且若有人 # 在 handler 中間插入 module-level code 觸發呼叫就會炸)。 # 移到 handler 之前、讓讀者從上而下能理解 error flow。 class _FwError(Exception): """Internal exception carrying (stage, reason, message) for firmware ops.""" def __init__(self, stage, reason, message): super().__init__(message) self.stage = stage self.reason = reason self.message = message class _FwTimeoutError(Exception): """Raised when total upgrade duration exceeds chip timeout.""" def __init__(self, stage): super().__init__(f"timeout at stage={stage}") self.stage = stage def _fw_handle_failure(stage, reason, message, before_fw, start_ts, dg, lib, raw=""): """彙整失敗 progress event + return 給 caller 的 error dict。 對齊 TDD §6.1 失敗回傳格式: {"error":, "stage":, "reason":, "raw_error":} Reviewer m3:原本此 helper 內 disconnect、caller 的 finally 也 disconnect、 雙重 disconnect 對 SDK 行為未定。改成「single owner of disconnect」原則: 本 helper 不再 disconnect、由 caller 的 finally 統一處理。本函式只負責 emit progress event + 組裝 error dict。 """ elapsed = int((time.monotonic() - start_ts) * 1000) _log(f"firmware_upgrade FAILED: stage={stage}, reason={reason}, " f"message={message}, elapsed_ms={elapsed}") _fw_emit_progress( "error", message=message, elapsed_ms=elapsed, eta_ms=0, extra={ "error": message, "reason": reason, "raw_error": raw or message, "before_version": before_fw, }, ) return { "error": message, "stage": stage, "reason": reason, "raw_error": raw or message, } def handle_firmware_upgrade(params): """A 階段 M9-1:自動升級 KDP1 → KDP2、KL520 與 KL720。 對應 TDD §6.1 表 + §5.1 流程: Input: {"port": "", "chip": "KL520" | "KL720"} Output (success): {"status":"upgraded", "before_firmware":, "after_firmware":, "method":"ctypes_kp_update_kdp_firmware_from_files", "duration_ms":} Output (failure): {"error":, "stage":, "reason":, "raw_error":} 每進入一個 stage 透過 _fw_emit_progress() 推 progress event 到 stderr, Go driver 抓 stderr line-by-line 轉成 WebSocket FirmwareProgress 給前端。 """ global _firmware_upgrade_in_progress if not HAS_KP: return {"error": "kp module not available", "stage": "preparing", "reason": "scan_not_found", "raw_error": "kp not available"} chip = params.get("chip", "KL520") port = str(params.get("port", "")) if chip not in ("KL520", "KL720"): return {"error": f"unsupported chip for A 階段: {chip}", "stage": "preparing", "reason": "scan_not_found", "raw_error": f"chip={chip} not in (KL520, KL720)"} timeout_s = KL520_UPGRADE_TIMEOUT_S if chip == "KL520" else KL720_UPGRADE_TIMEOUT_S start_ts = time.monotonic() def elapsed_ms(): return int((time.monotonic() - start_ts) * 1000) def check_timeout(current_stage): if (time.monotonic() - start_ts) > timeout_s: raise _FwTimeoutError(current_stage) # ── AC-FW-1.9 graceful shutdown 拒絕:標記升級進行中 ── # Reviewer m4:原本還寫 _firmware_upgrade_start_ts 全域、與 SIGTERM handler # closure 重複、已移除、改由 closure capture start_ts 為 single source。 _firmware_upgrade_in_progress = True # 在升降版進入 critical section 期間註冊 SIGTERM handler # (收 SIGTERM 不立即退、改 log warning event;實際 server 端 lock # 由 M9-2 Go driver / M9-3 service 實作、bridge.py 只負責「正在跑時 # 拒絕被 kill」) _fw_register_sigterm_handler(start_ts) method = "ctypes_kp_update_kdp_firmware_from_files" before_fw = "" lib = None dg = None try: # ── preparing:scan + connect ──────────────────────────────── _fw_emit_progress( "preparing", message=f"scanning {chip} on port {port}", elapsed_ms=elapsed_ms(), eta_ms=_fw_eta_ms(chip, "preparing"), ) check_timeout("preparing") # 先 disconnect 既有 _device_group(若有)、避免 handle 衝突 _clear_device_group() target = _fw_scan_target(port) if target is None: raise _FwError( "preparing", "scan_not_found", f"device with port_id={port} not found in scan", ) before_fw = str(target.firmware) target_port_id = int(target.usb_port_id) target_pid = int(target.product_id) _log(f"firmware_upgrade: chip={chip}, port={target_port_id}, " f"pid=0x{target_pid:04X}, firmware='{before_fw}'") # ── 解析 firmware 檔路徑 ───────────────────────────────────── fw_paths = _resolve_firmware_paths_full(chip) if fw_paths["scpu"] is None or fw_paths["ncpu"] is None: raise _FwError( "preparing", "scan_not_found", f"firmware files not found for {chip} " f"(scpu/ncpu missing in server/scripts/firmware/{chip}/)", ) # ── 載入 libkplus + ctypes binding ────────────────────────── try: lib = _fw_load_libkplus() except Exception as e: raise _FwError( "preparing", "connect_failed", f"libkplus load failed: {e}", ) # ── connect with magic(allow KDP1 legacy device)─────────── try: dg = _fw_connect_with_magic(lib, target_port_id) except RuntimeError as e: raise _FwError("preparing", "connect_failed", str(e)) # set timeout for SDK operations(注意:不是整體 upgrade timeout、 # 是單一 SDK call 的 timeout、避免單個 kp_load/update call 卡住) lib.kp_set_timeout(dg, int(timeout_s * 1000)) # ── 判斷是否走 SDK loader stage ────────────────────────────── # Reviewer M2:原本控制流隱式(`if needs_loader: if loader_path is None: ...` # nested)、讀者不易看清「實際會跑 loading stage」的條件。改為三個顯式 bool: # # needs_loader = device 處於 KDP1 legacy state(_fw_classify_legacy) # should_run_loader_stage = 實際會跑 loading stage(loader.bin 存在 + needs_loader) # loader_required_but_missing = KL520 KDP1 legacy 但缺 loader.bin(必失敗) # # 三個情境的流程: # 1. KL520 KDP1 legacy + loader.bin 存在 → loading → flashing(SDK load) # → verifying → done (should_run_loader_stage=True) # 2. KL520 KDP1 legacy + loader.bin 缺 → fail at loading (loader_write_failed) # 3. KL720 KDP1 legacy + loader.bin 缺 → skip loading、直接 flashing(warrenchen 模式) # → verifying → done (should_run_loader_stage=False) # 4. already KDP2(KL520/KL720)→ skip loading、直接 flashing(warrenchen 模式) # → verifying → done (should_run_loader_stage=False) needs_loader = _fw_classify_legacy(before_fw, target_pid) loader_path = fw_paths["loader"] should_run_loader_stage = needs_loader and loader_path is not None loader_required_but_missing = ( needs_loader and loader_path is None and chip == "KL520" ) _log(f"firmware_upgrade: needs_loader={needs_loader}, " f"should_run_loader_stage={should_run_loader_stage}, " f"loader_required_but_missing={loader_required_but_missing}, " f"legacy={'yes' if needs_loader else 'no'}") # ── 情境 2:KL520 KDP1 legacy 但缺 loader.bin → 直接失敗 ───── if loader_required_but_missing: check_timeout("loading") raise _FwError( "loading", "loader_write_failed", f"fw_loader.bin not found for {chip} but device is in " f"KDP1 legacy state (firmware='{before_fw}')", ) # ── 情境 1:跑 loading stage(KL520 KDP1 legacy + loader.bin)── if should_run_loader_stage: check_timeout("loading") _fw_emit_progress( "loading", message="writing USB Boot loader firmware", elapsed_ms=elapsed_ms(), eta_ms=_fw_eta_ms(chip, "loading"), ) ret = lib.kp_update_kdp_firmware_from_files( dg, loader_path.encode("utf-8"), None, # loader stage: ncpu = NULL True, # auto_reboot ) if ret != KP_SUCCESS: raise _FwError( "loading", "loader_write_failed", f"kp_update_kdp_firmware_from_files(loader) ret={ret} " f"({_fw_errstr(lib, ret)})", ) # auto_reboot 後 disconnect 可能失敗(USB re-enumerate)容忍 try: lib.kp_disconnect_devices(dg) except Exception: pass # disconnect 完設 dg=None、避免 finally double-disconnect 已 freed handle dg = None # 等 device reboot 完進 USB Boot mode(Loader firmware loaded) time.sleep(USB_WAIT_AFTER_REBOOT_MS / 1000.0) # rescan + reconnect with magic target = _fw_scan_target(port) if target is None: raise _FwError( "loading", "disconnect_during_op", f"device disappeared after loader write, port={port}", ) try: dg = _fw_connect_with_magic(lib, int(target.usb_port_id)) except RuntimeError as e: raise _FwError( "loading", "connect_failed", f"reconnect after loader failed: {e}", ) lib.kp_set_timeout(dg, int(timeout_s * 1000)) elif needs_loader: # 情境 3:KL720 KDP1 legacy 沒 loader.bin → 跳過 loading、直接 flashing # warrenchen 模式:kp_update_kdp_firmware_from_files(scpu, ncpu, True) 一次寫 _log(f"firmware_upgrade: {chip} legacy without loader.bin、" f"skipping loading stage, will go directly to flashing") # ── flashing:寫入 KDP2 firmware(scpu + ncpu)───────────── check_timeout("flashing") _fw_emit_progress( "flashing", message="writing KDP2 firmware (scpu + ncpu)", elapsed_ms=elapsed_ms(), eta_ms=_fw_eta_ms(chip, "flashing"), ) if should_run_loader_stage: # 情境 1:device 已透過 loader stage 進 Loader mode、用 # kp_load_firmware_from_file 載 scpu + ncpu 到 RAM ret = lib.kp_load_firmware_from_file( dg, fw_paths["scpu"].encode("utf-8"), fw_paths["ncpu"].encode("utf-8"), ) if ret != KP_SUCCESS: raise _FwError( "flashing", "upgrade_mid_failed", f"kp_load_firmware_from_file ret={ret} " f"({_fw_errstr(lib, ret)})", ) else: # 情境 3 / 4:沒走 loader stage(KL720 legacy without loader.bin、 # 或 already KDP2)→ warrenchen 模式:直接 # kp_update_kdp_firmware_from_files(scpu, ncpu, True) 一次寫 ret = lib.kp_update_kdp_firmware_from_files( dg, fw_paths["scpu"].encode("utf-8"), fw_paths["ncpu"].encode("utf-8"), True, # auto_reboot ) if ret != KP_SUCCESS: raise _FwError( "flashing", "upgrade_mid_failed", f"kp_update_kdp_firmware_from_files ret={ret} " f"({_fw_errstr(lib, ret)})", ) # disconnect after upgrade:auto_reboot 後 disconnect 失敗預期、容忍 try: lib.kp_disconnect_devices(dg) except Exception: pass dg = None # ── verifying:等 USB re-enumerate → rescan → 驗 firmware 字串 ── check_timeout("verifying") _fw_emit_progress( "verifying", message="waiting USB re-enumerate and verifying firmware version", elapsed_ms=elapsed_ms(), eta_ms=_fw_eta_ms(chip, "verifying"), ) # AC-FW-1.6: 等 5-8 秒 USB stable target_after, waited = _fw_rescan_and_wait( port, max_wait_s=USB_WAIT_AFTER_UPGRADE_MS / 1000.0 + 3.0, # 5 + 3 = 8s 上界 initial_sleep_s=USB_WAIT_AFTER_UPGRADE_MS / 1000.0, ) if target_after is None: raise _FwError( "verifying", "verify_not_found", f"device not found after upgrade (waited {waited:.1f}s)、" f"USB may still be re-enumerating, please re-plug", ) after_fw = str(target_after.firmware) after_pid = int(target_after.product_id) # 驗證 firmware 字串已升到 KDP2(不再是 KDP1 legacy) if _fw_classify_legacy(after_fw, after_pid): raise _FwError( "verifying", "verify_mismatch", f"firmware after upgrade still appears legacy: " f"firmware='{after_fw}', pid=0x{after_pid:04X}", ) # ── done ── duration_ms = elapsed_ms() _fw_emit_progress( "done", message=f"upgraded from '{before_fw}' to '{after_fw}'", elapsed_ms=duration_ms, eta_ms=0, ) return { "status": "upgraded", "before_firmware": before_fw, "after_firmware": after_fw, "method": method, "duration_ms": duration_ms, } except _FwTimeoutError as e: return _fw_handle_failure( e.stage, "timeout", f"upgrade exceeded {timeout_s}s timeout at stage={e.stage}", before_fw, start_ts, dg, lib, raw=str(e), ) except _FwError as e: return _fw_handle_failure( e.stage, e.reason, e.message, before_fw, start_ts, dg, lib, raw=str(e), ) except Exception as e: import traceback tb = traceback.format_exc() _log(f"firmware_upgrade UNEXPECTED EXCEPTION: {type(e).__name__}: {e}\n{tb}") return _fw_handle_failure( "flashing", "upgrade_mid_failed", f"unexpected: {type(e).__name__}: {e}", before_fw, start_ts, dg, lib, raw=tb, ) finally: _firmware_upgrade_in_progress = False # Reviewer m3:disconnect 的 single owner = 此 finally block。 # _fw_handle_failure 已改為「不在裡面 disconnect」、避免 double-disconnect。 # success path 在 1810 行已 disconnect 並設 dg=None、此處 if dg is not None # 會 short-circuit 跳過、不會 double。 # fail path:dg 可能還持有 handle、由本 finally 統一收尾。 if dg is not None and lib is not None: try: lib.kp_disconnect_devices(dg) except Exception: pass dg = None # 確保不會被外部誤用 _fw_unregister_sigterm_handler() # ── SIGTERM handler (AC-FW-1.9 graceful shutdown rejection) ────────── # # 升級進行中收到 SIGTERM 時,不立即退出、改在 stderr push warning event。 # 實際的 server-side lock 機制由 M9-2 / M9-3 實作(progress.md「未解決問題」 # 註記為依賴)。本處 bridge.py 端的責任:「正在跑時拒絕被 kill」。 # # Windows 沒有 SIGTERM 概念、改用 atexit。Linux/macOS 用 signal handler。 _fw_original_sigterm_handler = None def _fw_register_sigterm_handler(start_ts): """註冊 SIGTERM handler:升級進行中時拒絕並 log warning。""" global _fw_original_sigterm_handler if sys.platform == "win32": return # Windows 沒 SIGTERM try: import signal def handler(signum, frame): if _firmware_upgrade_in_progress: elapsed = int((time.monotonic() - start_ts) * 1000) try: print( json.dumps({ "event": "shutdown_rejected", "reason": "firmware_upgrade_in_progress", "task": "firmware_upgrade", "elapsed_ms": elapsed, }), file=sys.stderr, flush=True, ) except Exception: pass # 拒絕 SIGTERM:不呼叫 sys.exit、不 raise、繼續執行升級 return # 沒升級進行中、走預設行為 if callable(_fw_original_sigterm_handler): _fw_original_sigterm_handler(signum, frame) else: sys.exit(0) _fw_original_sigterm_handler = signal.signal(signal.SIGTERM, handler) except Exception as e: _log(f"SIGTERM handler registration failed: {e}") def _fw_unregister_sigterm_handler(): """還原 SIGTERM handler 為 install 前狀態。""" global _fw_original_sigterm_handler if sys.platform == "win32": return try: import signal if _fw_original_sigterm_handler is not None: signal.signal(signal.SIGTERM, _fw_original_sigterm_handler) _fw_original_sigterm_handler = None else: signal.signal(signal.SIGTERM, signal.SIG_DFL) except Exception: pass # ── Main loop ──────────────────────────────────────────────────────── def main(): """Main loop: read JSON commands from stdin, write responses to stdout.""" # The Kneron C SDK may write ANSI-colored warnings directly to fd 1 # (stdout), which corrupts our JSON-RPC protocol. To prevent this we # dup the real stdout fd, then redirect fd 1 to stderr so any C-level # writes go to stderr. Our JSON responses use the duped fd. _real_stdout_fd = os.dup(1) # duplicate fd 1 os.dup2(2, 1) # fd 1 now points to stderr _real_stdout = os.fdopen(_real_stdout_fd, "w") sys.stdout = sys.stderr # Python-level redirect too def _respond(obj): """Write a JSON response to the real stdout (not stderr).""" _real_stdout.write(json.dumps(obj) + "\n") _real_stdout.flush() # Signal readiness _respond({"status": "ready"}) _log(f"Bridge started (kp={'yes' if HAS_KP else 'no'}, pyusb={'yes' if HAS_PYUSB else 'no'}, cv2={'yes' if HAS_CV2 else 'no'})") for line in sys.stdin: line = line.strip() if not line: continue try: cmd = json.loads(line) action = cmd.get("cmd", "") if action == "scan": result = handle_scan() elif action == "connect": result = handle_connect(cmd) elif action == "disconnect": result = handle_disconnect(cmd) elif action == "reset": result = handle_reset(cmd) elif action == "load_model": result = handle_load_model(cmd) elif action == "inference": result = handle_inference(cmd) elif action == "firmware_upgrade": result = handle_firmware_upgrade(cmd) else: result = {"error": f"unknown command: {action}"} _respond(result) except Exception as e: _respond({"error": str(e)}) def _cleanup(): """Explicitly disconnect and clear _device_group before Python GC runs. KneronPLUS SDK's DeviceGroup.__del__ calls kp_disconnect_devices on a native handle that may already be freed when the interpreter is shutting down, causing 'OSError: access violation reading 0x00...'. By doing a clean disconnect + setting the global to None here, __del__ becomes a no-op (None has no __del__). """ global _device_group if _device_group is not None: try: kp.core.disconnect_devices(_device_group) except Exception: pass _device_group = None if __name__ == "__main__": import atexit atexit.register(_cleanup) main() _cleanup() # also call synchronously in case atexit doesn't fire