visionA/local-tool/server/scripts/kneron_bridge.py
jim800121chen b71ff4cd3c perf(local-tool): Windows KL520 cold-boot connect 106s → ~40s(跳過多餘 reset)
背景:
Windows 實測 KL520 首次 connect 耗時 106 秒,原因是 reset 流程內部重複
firmware load:
  1. 進來 Loader → load firmware (35s) → Comp/U
  2. reset 退回 Loader → bridge 重啟
  3. reconnect 進來又是 Loader → load firmware (30s) → Comp/U
  4. Loader reconnect 第一次常 fail(15s timeout)
總共 ~65s 花在「砍掉剛載好的 firmware、再載一次」的白工上。

根因:先前修的 needsReset 邏輯不管 firmware 新舊一律 reset。但 Error 15
只發生在「Comp/U 是上次 session 殘留」的情境;「本次 connect 內部剛載的
Comp/U」session 是乾淨的,不需要 reset。

修法(條件性 reset):
- server/scripts/kneron_bridge.py:connect handler 新增追蹤本次有無走
  firmware load flow,return 多帶 `fresh_firmware_loaded` bool
- server/internal/driver/kneron/kl720_driver.go:Connect 讀 flag,若為
  true 就 skipReset(firmware 剛載的,session 已乾淨)

驗證(2026-04-21):
- `/tmp/test_bridge.py` 拔插 USB 後跑 `connect (fw=Loader) →
  fresh_firmware_loaded=True → skip reset → load_model → inference`
  → 11 detections(person×8, tie×3, latency 332ms)
- Mac UI Comp/U 殘留路徑:reset → 11 bbox ✓
- Mac UI Loader cold-boot 路徑(拔插後):skip reset → 11 bbox ✓

預期效益:
- Windows cold-boot(常見):106s → ~40s(省 65s)
- Mac 跨 session(常見):~15-20s 不變
- 極少數(Windows device 未斷電但跨 server session):走完整 reset

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-21 11:09:25 +08:00

1208 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Kneron Bridge - JSON-RPC over stdin/stdout
This script acts as a bridge between the Go backend and the Kneron PLUS
Python SDK. It reads JSON commands from stdin and writes JSON responses
to stdout.
Supports:
- KL520 (USB Boot mode - firmware must be loaded each session)
- KL720 (flash-based - firmware pre-installed, models freely reloadable)
"""
import sys
import json
import base64
import time
import os
import io
import numpy as np
def _preload_kneron_dylibs_macos():
"""macOS 專用:用絕對路徑預先 dlopen wheel 內的 libusb + libkplus。
背景:
- KneronPLUS wheel 把 libusb-1.0.0.dylib + libkplus.dylib 放在 kp/lib/。
- macOS dyld 在載入 libkplus 時會去找它的相依 libusb-1.0.0.dylib。
預設搜尋路徑(/usr/local/lib、/usr/lib在 bundled Python 環境下通常
找不到(我們沒有 brew libusb於是 `import kp` 就拋 OSError →
HAS_KP=False → scan 回空陣列。
- macOS hardened runtime 會剝掉 DYLD_LIBRARY_PATH 等環境變數,所以
改從 Go 端注入 env 也不保險;最穩的做法是在 Python 這端用 ctypes
以絕對路徑先載入,後續 `import kp` 時 dyld 會重用已載入的映像。
Windows / Linux 不走這支 — 各自機制已在 Go 端處理Windows 靠 PATH、
Linux 靠 wheel 自帶的 libusb.so.1.0.0 + LD_LIBRARY_PATH
"""
if sys.platform != "darwin":
return
try:
import ctypes
import importlib.util
spec = importlib.util.find_spec("kp")
if spec is None or not spec.submodule_search_locations:
return
kp_dir = spec.submodule_search_locations[0]
lib_dir = os.path.join(kp_dir, "lib")
# 載入順序:先 libusb再 libkpluslibkplus 相依 libusb
for name in ("libusb-1.0.0.dylib", "libkplus.dylib"):
path = os.path.join(lib_dir, name)
if os.path.isfile(path):
try:
ctypes.CDLL(path, mode=ctypes.RTLD_GLOBAL)
except OSError:
pass
except Exception:
pass
_preload_kneron_dylibs_macos()
try:
import kp
HAS_KP = True
except (ImportError, AttributeError, Exception):
HAS_KP = False
try:
import usb.core
HAS_PYUSB = True
except ImportError:
HAS_PYUSB = False
try:
import cv2
HAS_CV2 = True
except ImportError:
HAS_CV2 = False
# ── Global state ──────────────────────────────────────────────────────
_device_group = None
def _clear_device_group():
"""Safely disconnect and clear the global _device_group.
KneronPLUS SDK's DeviceGroup.__del__ calls kp_disconnect_devices on the
native handle, but if the handle is already invalid (failed connect / stale
state) it causes 'OSError: access violation'. By explicitly disconnecting
before setting None, __del__ becomes a no-op on an already-disconnected
handle. All errors are silenced — this is best-effort cleanup.
"""
global _device_group
if _device_group is not None:
try:
kp.core.disconnect_devices(_device_group)
except Exception:
pass
_device_group = None
_model_id = None
_model_nef = None
_model_input_size = 224 # updated on model load
_model_type = "tiny_yolov3" # updated on model load based on model_id / nef name
_firmware_loaded = False
_device_chip = "KL520" # updated on connect from product_id / device_type
# COCO 80-class labels
COCO_CLASSES = [
"person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck",
"boat", "traffic light", "fire hydrant", "stop sign", "parking meter", "bench",
"bird", "cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra",
"giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee",
"skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove",
"skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup",
"fork", "knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange",
"broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "couch",
"potted plant", "bed", "dining table", "toilet", "tv", "laptop", "mouse",
"remote", "keyboard", "cell phone", "microwave", "oven", "toaster", "sink",
"refrigerator", "book", "clock", "vase", "scissors", "teddy bear", "hair drier",
"toothbrush"
]
# Anchor boxes per model type (each list entry = one output head)
ANCHORS_TINY_YOLOV3 = [
[(81, 82), (135, 169), (344, 319)], # 7×7 head (large objects)
[(10, 14), (23, 27), (37, 58)], # 14×14 head (small objects)
]
# YOLOv5s anchors (Kneron model 20005, no-upsample variant for KL520)
ANCHORS_YOLOV5S = [
[(116, 90), (156, 198), (373, 326)], # P5/32 (large)
[(30, 61), (62, 45), (59, 119)], # P4/16 (medium)
[(10, 13), (16, 30), (33, 23)], # P3/8 (small)
]
CONF_THRESHOLD = 0.25
NMS_IOU_THRESHOLD = 0.45
# Known Kneron model IDs → (model_type, input_size)
KNOWN_MODELS = {
# Tiny YOLO v3 (default KL520 model)
0: ("tiny_yolov3", 224),
# ResNet18 classification (model 20001)
20001: ("resnet18", 224),
# FCOS DarkNet53s detection (model 20004)
20004: ("fcos", 512),
# YOLOv5s no-upsample (model 20005)
20005: ("yolov5s", 640),
}
def _log(msg):
"""Write log messages to stderr (stdout is reserved for JSON-RPC)."""
print(f"[kneron_bridge] {msg}", file=sys.stderr, flush=True)
def _resolve_firmware_paths(chip="KL520"):
"""Resolve firmware paths relative to this script's directory."""
base = os.path.dirname(os.path.abspath(__file__))
fw_dir = os.path.join(base, "firmware", chip)
scpu = os.path.join(fw_dir, "fw_scpu.bin")
ncpu = os.path.join(fw_dir, "fw_ncpu.bin")
if os.path.exists(scpu) and os.path.exists(ncpu):
return scpu, ncpu
# Fallback: check KNERON_FW_DIR env var
fw_dir = os.environ.get("KNERON_FW_DIR", "")
if fw_dir:
scpu = os.path.join(fw_dir, "fw_scpu.bin")
ncpu = os.path.join(fw_dir, "fw_ncpu.bin")
if os.path.exists(scpu) and os.path.exists(ncpu):
return scpu, ncpu
return None, None
def _detect_model_type(model_id, nef_path):
"""Detect model type and input size from model ID or .nef filename."""
global _model_type, _model_input_size
# Check known model IDs
if model_id in KNOWN_MODELS:
_model_type, _model_input_size = KNOWN_MODELS[model_id]
_log(f"Model type detected by ID {model_id}: {_model_type} ({_model_input_size}x{_model_input_size})")
return
# Fallback: try to infer from filename
basename = os.path.basename(nef_path).lower() if nef_path else ""
if "yolov5" in basename:
_model_type = "yolov5s"
# Try to parse input size from filename like w640h640
_model_input_size = _parse_size_from_name(basename, default=640)
elif "fcos" in basename:
_model_type = "fcos"
_model_input_size = _parse_size_from_name(basename, default=512)
elif "ssd" in basename:
_model_type = "ssd"
_model_input_size = _parse_size_from_name(basename, default=320)
elif "resnet" in basename or "classification" in basename:
_model_type = "resnet18"
_model_input_size = _parse_size_from_name(basename, default=224)
elif "tiny_yolo" in basename or "tinyyolo" in basename:
_model_type = "tiny_yolov3"
_model_input_size = _parse_size_from_name(basename, default=224)
else:
# Default: assume YOLO-like detection
_model_type = "tiny_yolov3"
_model_input_size = 224
_log(f"Model type detected by filename '{basename}': {_model_type} ({_model_input_size}x{_model_input_size})")
def _parse_size_from_name(name, default=224):
"""Extract input size from filename like 'w640h640' or 'w512h512'."""
import re
m = re.search(r'w(\d+)h(\d+)', name)
if m:
return int(m.group(1))
return default
# ── Post-processing ──────────────────────────────────────────────────
def _sigmoid(x):
return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))
def _nms(detections, iou_threshold=NMS_IOU_THRESHOLD):
"""Non-Maximum Suppression."""
detections.sort(key=lambda d: d["confidence"], reverse=True)
keep = []
for d in detections:
skip = False
for k in keep:
if d["class_id"] != k["class_id"]:
continue
x1 = max(d["bbox"]["x"], k["bbox"]["x"])
y1 = max(d["bbox"]["y"], k["bbox"]["y"])
x2 = min(d["bbox"]["x"] + d["bbox"]["width"],
k["bbox"]["x"] + k["bbox"]["width"])
y2 = min(d["bbox"]["y"] + d["bbox"]["height"],
k["bbox"]["y"] + k["bbox"]["height"])
inter = max(0, x2 - x1) * max(0, y2 - y1)
a1 = d["bbox"]["width"] * d["bbox"]["height"]
a2 = k["bbox"]["width"] * k["bbox"]["height"]
if inter / (a1 + a2 - inter + 1e-6) > iou_threshold:
skip = True
break
if not skip:
keep.append(d)
return keep
def _get_preproc_info(result):
"""Extract letterbox padding info from the inference result.
Kneron SDK applies letterbox resize (aspect-ratio-preserving + zero padding)
before inference. The hw_pre_proc_info tells us how to reverse it.
Returns (pad_left, pad_top, resize_w, resize_h, model_w, model_h) or None.
"""
try:
info = result.header.hw_pre_proc_info_list[0]
return {
"pad_left": info.pad_left if hasattr(info, 'pad_left') else 0,
"pad_top": info.pad_top if hasattr(info, 'pad_top') else 0,
"resized_w": info.resized_img_width if hasattr(info, 'resized_img_width') else 0,
"resized_h": info.resized_img_height if hasattr(info, 'resized_img_height') else 0,
"model_w": info.model_input_width if hasattr(info, 'model_input_width') else 0,
"model_h": info.model_input_height if hasattr(info, 'model_input_height') else 0,
"img_w": info.img_width if hasattr(info, 'img_width') else 0,
"img_h": info.img_height if hasattr(info, 'img_height') else 0,
}
except Exception:
return None
def _correct_bbox_for_letterbox(x, y, w, h, preproc, model_size):
"""Remove letterbox padding offset from normalized bbox coordinates.
Input (x, y, w, h) is in model-input-space normalized to 0-1.
Output is re-normalized to the original image aspect ratio (still 0-1).
For KP_PADDING_CORNER (default): image is at top-left, padding at bottom/right.
"""
if preproc is None:
return x, y, w, h
model_w = preproc["model_w"] or model_size
model_h = preproc["model_h"] or model_size
pad_left = preproc["pad_left"]
pad_top = preproc["pad_top"]
resized_w = preproc["resized_w"] or model_w
resized_h = preproc["resized_h"] or model_h
# If no padding was applied, skip correction
if pad_left == 0 and pad_top == 0 and resized_w == model_w and resized_h == model_h:
return x, y, w, h
# Convert from normalized (0-1 of model input) to pixel coords in model space
px = x * model_w
py = y * model_h
pw = w * model_w
ph = h * model_h
# Subtract padding offset
px -= pad_left
py -= pad_top
# Re-normalize to the resized (un-padded) image dimensions
nx = px / resized_w
ny = py / resized_h
nw = pw / resized_w
nh = ph / resized_h
# Clip to 0-1
nx = max(0.0, min(1.0, nx))
ny = max(0.0, min(1.0, ny))
nw = min(1.0 - nx, nw)
nh = min(1.0 - ny, nh)
return nx, ny, nw, nh
def _parse_yolo_output(result, anchors, input_size, num_classes=80):
"""Parse YOLO (v3/v5) raw output into detection results.
Works for both Tiny YOLOv3 and YOLOv5 — the tensor layout is the same:
(num_anchors * (5 + num_classes), grid_h, grid_w)
The key differences are:
- anchor values
- input_size used for anchor normalization
- number of output heads
Bounding boxes are corrected for letterbox padding so coordinates
are relative to the original image (normalized 0-1).
"""
detections = []
entry_size = 5 + num_classes # 85 for COCO 80 classes
# Get letterbox padding info
preproc = _get_preproc_info(result)
if preproc:
_log(f"Preproc info: pad=({preproc['pad_left']},{preproc['pad_top']}), "
f"resized=({preproc['resized_w']}x{preproc['resized_h']}), "
f"model=({preproc['model_w']}x{preproc['model_h']}), "
f"img=({preproc['img_w']}x{preproc['img_h']})")
for head_idx in range(result.header.num_output_node):
output = kp.inference.generic_inference_retrieve_float_node(
node_idx=head_idx,
generic_raw_result=result,
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
)
arr = output.ndarray[0] # (C, H, W)
channels, grid_h, grid_w = arr.shape
# Determine number of anchors for this head
num_anchors = channels // entry_size
if num_anchors < 1:
_log(f"Head {head_idx}: unexpected shape {arr.shape}, skipping")
continue
# Use the correct anchor set for this head
if head_idx < len(anchors):
head_anchors = anchors[head_idx]
else:
_log(f"Head {head_idx}: no anchors defined, skipping")
continue
for a_idx in range(min(num_anchors, len(head_anchors))):
off = a_idx * entry_size
for cy in range(grid_h):
for cx in range(grid_w):
obj_conf = _sigmoid(arr[off + 4, cy, cx])
if obj_conf < CONF_THRESHOLD:
continue
cls_scores = _sigmoid(arr[off + 5:off + entry_size, cy, cx])
cls_id = int(np.argmax(cls_scores))
cls_conf = float(cls_scores[cls_id])
conf = float(obj_conf * cls_conf)
if conf < CONF_THRESHOLD:
continue
bx = (_sigmoid(arr[off, cy, cx]) + cx) / grid_w
by = (_sigmoid(arr[off + 1, cy, cx]) + cy) / grid_h
aw, ah = head_anchors[a_idx]
bw = (np.exp(min(float(arr[off + 2, cy, cx]), 10)) * aw) / input_size
bh = (np.exp(min(float(arr[off + 3, cy, cx]), 10)) * ah) / input_size
# Convert center x,y,w,h to corner x,y,w,h (normalized to model input)
x = max(0.0, bx - bw / 2)
y = max(0.0, by - bh / 2)
w = min(1.0, bx + bw / 2) - x
h = min(1.0, by + bh / 2) - y
# Correct for letterbox padding
x, y, w, h = _correct_bbox_for_letterbox(x, y, w, h, preproc, input_size)
label = COCO_CLASSES[cls_id] if cls_id < len(COCO_CLASSES) else f"class_{cls_id}"
detections.append({
"label": label,
"class_id": cls_id,
"confidence": conf,
"bbox": {"x": x, "y": y, "width": w, "height": h},
})
detections = _nms(detections)
# Remove internal class_id before returning
for d in detections:
del d["class_id"]
return detections
def _parse_ssd_output(result, input_size=320, num_classes=2):
"""Parse SSD face detection output.
SSD typically outputs two tensors:
- locations: (num_boxes, 4) — bounding box coordinates
- confidences: (num_boxes, num_classes) — class scores
For the KL520 SSD face detection model (kl520_ssd_fd_lm.nef),
the output contains face detections with landmarks.
"""
detections = []
preproc = _get_preproc_info(result)
try:
# Retrieve all output nodes
num_outputs = result.header.num_output_node
outputs = []
for i in range(num_outputs):
output = kp.inference.generic_inference_retrieve_float_node(
node_idx=i,
generic_raw_result=result,
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
)
outputs.append(output.ndarray[0])
if num_outputs < 2:
_log(f"SSD: expected >=2 output nodes, got {num_outputs}")
return detections
# Heuristic: the larger tensor is locations, smaller is confidences
# Or: first output = locations, second = confidences
locations = outputs[0]
confidences = outputs[1]
# Flatten if needed
if locations.ndim > 2:
locations = locations.reshape(-1, 4)
if confidences.ndim > 2:
confidences = confidences.reshape(-1, confidences.shape[-1])
num_boxes = min(locations.shape[0], confidences.shape[0])
for i in range(num_boxes):
# SSD confidence: class 0 = background, class 1 = face
if confidences.shape[-1] > 1:
conf = float(confidences[i, 1]) # face class
else:
conf = float(_sigmoid(confidences[i, 0]))
if conf < CONF_THRESHOLD:
continue
# SSD outputs are typically [x_min, y_min, x_max, y_max] normalized
x_min = float(np.clip(locations[i, 0], 0.0, 1.0))
y_min = float(np.clip(locations[i, 1], 0.0, 1.0))
x_max = float(np.clip(locations[i, 2], 0.0, 1.0))
y_max = float(np.clip(locations[i, 3], 0.0, 1.0))
w = x_max - x_min
h = y_max - y_min
if w <= 0 or h <= 0:
continue
# Correct for letterbox padding
x_min, y_min, w, h = _correct_bbox_for_letterbox(
x_min, y_min, w, h, preproc, input_size)
detections.append({
"label": "face",
"class_id": 0,
"confidence": conf,
"bbox": {"x": x_min, "y": y_min, "width": w, "height": h},
})
detections = _nms(detections)
for d in detections:
del d["class_id"]
except Exception as e:
_log(f"SSD parse error: {e}")
return detections
def _parse_fcos_output(result, input_size=512, num_classes=80):
"""Parse FCOS (Fully Convolutional One-Stage) detection output.
FCOS outputs per feature level:
- classification: (num_classes, H, W)
- centerness: (1, H, W)
- regression: (4, H, W) — distances from each pixel to box edges (l, t, r, b)
The outputs come in groups of 3 per feature level.
"""
detections = []
preproc = _get_preproc_info(result)
try:
num_outputs = result.header.num_output_node
outputs = []
for i in range(num_outputs):
output = kp.inference.generic_inference_retrieve_float_node(
node_idx=i,
generic_raw_result=result,
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
)
outputs.append(output.ndarray[0])
# FCOS typically has 5 feature levels × 3 outputs = 15 output nodes
# Or fewer for simplified models. Group by 3: (cls, centerness, reg)
# If we can't determine the grouping, try a simpler approach.
strides = [8, 16, 32, 64, 128]
num_levels = num_outputs // 3
for level in range(num_levels):
cls_out = outputs[level * 3] # (num_classes, H, W)
cnt_out = outputs[level * 3 + 1] # (1, H, W)
reg_out = outputs[level * 3 + 2] # (4, H, W)
stride = strides[level] if level < len(strides) else (8 * (2 ** level))
h, w = cls_out.shape[1], cls_out.shape[2]
for cy in range(h):
for cx in range(w):
cls_scores = _sigmoid(cls_out[:, cy, cx])
cls_id = int(np.argmax(cls_scores))
cls_conf = float(cls_scores[cls_id])
centerness = float(_sigmoid(cnt_out[0, cy, cx]))
conf = cls_conf * centerness
if conf < CONF_THRESHOLD:
continue
# Regression: distances from pixel center to box edges
px = (cx + 0.5) * stride
py = (cy + 0.5) * stride
l = float(np.exp(min(reg_out[0, cy, cx], 10))) * stride
t = float(np.exp(min(reg_out[1, cy, cx], 10))) * stride
r = float(np.exp(min(reg_out[2, cy, cx], 10))) * stride
b = float(np.exp(min(reg_out[3, cy, cx], 10))) * stride
x_min = max(0.0, (px - l) / input_size)
y_min = max(0.0, (py - t) / input_size)
x_max = min(1.0, (px + r) / input_size)
y_max = min(1.0, (py + b) / input_size)
bw = x_max - x_min
bh = y_max - y_min
if bw <= 0 or bh <= 0:
continue
# Correct for letterbox padding
x_min, y_min, bw, bh = _correct_bbox_for_letterbox(
x_min, y_min, bw, bh, preproc, input_size)
label = COCO_CLASSES[cls_id] if cls_id < len(COCO_CLASSES) else f"class_{cls_id}"
detections.append({
"label": label,
"class_id": cls_id,
"confidence": conf,
"bbox": {"x": x_min, "y": y_min, "width": bw, "height": bh},
})
detections = _nms(detections)
for d in detections:
del d["class_id"]
except Exception as e:
_log(f"FCOS parse error: {e}")
return detections
def _parse_classification_output(result, num_classes=1000):
"""Parse classification model output (e.g., ResNet18 ImageNet)."""
try:
output = kp.inference.generic_inference_retrieve_float_node(
node_idx=0,
generic_raw_result=result,
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
)
scores = output.ndarray.flatten()
# Apply softmax
exp_scores = np.exp(scores - np.max(scores))
probs = exp_scores / exp_scores.sum()
# Top-5
top_indices = np.argsort(probs)[::-1][:5]
classifications = []
for idx in top_indices:
label = COCO_CLASSES[idx] if idx < len(COCO_CLASSES) else f"class_{idx}"
classifications.append({
"label": label,
"confidence": float(probs[idx]),
})
return classifications
except Exception as e:
_log(f"Classification parse error: {e}")
return []
# ── Command handlers ─────────────────────────────────────────────────
def handle_scan():
"""Scan for connected Kneron devices.
Tries Kneron PLUS SDK first (provides firmware info, kn_number, etc.).
Falls back to pyusb if the SDK is unavailable (e.g. macOS missing .dylib).
"""
if HAS_KP:
try:
descs = kp.core.scan_devices()
devices = []
for i in range(descs.device_descriptor_number):
dev = descs.device_descriptor_list[i]
devices.append({
"port": str(dev.usb_port_id),
"firmware": str(dev.firmware),
"kn_number": f"0x{dev.kn_number:08X}",
"product_id": f"0x{dev.product_id:04X}",
"connectable": dev.is_connectable,
})
return {"devices": devices}
except Exception as e:
_log(f"kp.core.scan_devices failed: {e}, trying pyusb fallback")
# Fallback: use pyusb (same approach as kneron_detect.py)
if HAS_PYUSB:
return _scan_with_pyusb()
return {"devices": [], "error_detail": "neither kp nor pyusb available"}
# Known Kneron product IDs (same as kneron_detect.py)
_KNERON_VENDOR_ID = 0x3231
_KNOWN_PRODUCTS = {
0x0100: "KL520",
0x0200: "KL720",
0x0720: "KL720",
0x0530: "KL530",
0x0630: "KL630",
0x0730: "KL730",
}
def _scan_with_pyusb():
"""Scan for Kneron devices using pyusb (libusb backend)."""
try:
usb_devices = list(usb.core.find(find_all=True, idVendor=_KNERON_VENDOR_ID))
devices = []
for dev in usb_devices:
product_id = f"0x{dev.idProduct:04X}"
chip = _KNOWN_PRODUCTS.get(dev.idProduct, f"Unknown-{product_id}")
# pyusb port_id: bus-address
port = f"{dev.bus}-{dev.address}"
firmware = "unknown"
try:
firmware = dev.product or "unknown"
except Exception:
pass
devices.append({
"port": port,
"firmware": firmware,
"kn_number": "0x00000000",
"product_id": product_id,
"connectable": True,
})
return {"devices": devices}
except Exception as e:
return {"devices": [], "error_detail": f"pyusb scan failed: {e}"}
def handle_connect(params):
"""Connect to a Kneron device and load firmware if needed.
KL520: USB Boot mode — firmware MUST be uploaded every session.
KL720 (KDP2, pid=0x0720): Flash-based — firmware pre-installed.
KL720 (KDP legacy, pid=0x0200): Old firmware — needs connect_without_check
+ firmware load to RAM before normal operation.
"""
global _device_group, _firmware_loaded, _device_chip
if not HAS_KP:
return {"error": "kp module not available"}
try:
port = params.get("port", "")
device_type = params.get("device_type", "")
# Scan to find device
descs = kp.core.scan_devices()
if descs.device_descriptor_number == 0:
return {"error": "no Kneron device found"}
# Find device by port or use first one
target_dev = None
for i in range(descs.device_descriptor_number):
dev = descs.device_descriptor_list[i]
if port and str(dev.usb_port_id) == port:
target_dev = dev
break
if target_dev is None:
target_dev = descs.device_descriptor_list[0]
# Note: KL520 in USB Boot mode has is_connectable=False, which is
# normal — it becomes connectable after firmware is loaded. KL720 KDP
# legacy (pid=0x0200) is also not connectable until firmware load.
# So we do NOT reject is_connectable=False here; instead we attempt
# connection and firmware load as appropriate.
# Determine chip type from device_type param or product_id
pid = target_dev.product_id
if "kl720" in device_type.lower():
_device_chip = "KL720"
elif "kl520" in device_type.lower():
_device_chip = "KL520"
elif pid in (0x0200, 0x0720):
_device_chip = "KL720"
else:
_device_chip = "KL520"
fw_str = str(target_dev.firmware)
is_kdp_legacy = (_device_chip == "KL720" and pid == 0x0200)
_log(f"Chip type: {_device_chip} (product_id=0x{pid:04X}, device_type={device_type}, fw={fw_str})")
# ── KL720 KDP Legacy (pid=0x0200): old firmware, incompatible with SDK ──
if is_kdp_legacy:
_log(f"KL720 has legacy KDP firmware (pid=0x0200). Using connect_devices_without_check...")
_device_group = kp.core.connect_devices_without_check(
usb_port_ids=[target_dev.usb_port_id]
)
kp.core.set_timeout(device_group=_device_group, milliseconds=60000)
# Load KDP2 firmware to RAM so the device can operate with this SDK
scpu_path, ncpu_path = _resolve_firmware_paths("KL720")
if scpu_path and ncpu_path:
_log(f"KL720: Loading KDP2 firmware to RAM: {scpu_path}")
kp.core.load_firmware_from_file(
_device_group, scpu_path, ncpu_path
)
_firmware_loaded = True
_log("KL720: Firmware loaded to RAM, waiting for reboot...")
time.sleep(5)
# Reconnect — device should now be running KDP2 in RAM
descs = kp.core.scan_devices()
reconnected = False
for i in range(descs.device_descriptor_number):
dev = descs.device_descriptor_list[i]
if dev.product_id in (0x0200, 0x0720):
target_dev = dev
reconnected = True
break
if not reconnected:
return {"error": "KL720 not found after firmware load. Unplug and re-plug."}
# Try normal connect first, fallback to without_check
try:
_device_group = kp.core.connect_devices(
usb_port_ids=[target_dev.usb_port_id]
)
except Exception as conn_err:
_log(f"KL720: Normal reconnect failed ({conn_err}), using without_check...")
_device_group = kp.core.connect_devices_without_check(
usb_port_ids=[target_dev.usb_port_id]
)
kp.core.set_timeout(device_group=_device_group, milliseconds=10000)
fw_str = str(target_dev.firmware)
_log(f"KL720: Reconnected after firmware load, pid=0x{target_dev.product_id:04X}, fw={fw_str}")
else:
_log("WARNING: KL720 firmware files not found. Cannot operate with KDP legacy device.")
_clear_device_group()
return {"error": "KL720 has legacy KDP firmware but KDP2 firmware files not found. "
"Run update_kl720_firmware.py to flash KDP2 permanently."}
return {
"status": "connected",
"firmware": fw_str,
"kn_number": f"0x{target_dev.kn_number:08X}",
"chip": _device_chip,
"kdp_legacy": True,
}
# ── Normal connection (KL520 or KL720 KDP2) ──
# Use connect_devices_without_check when:
# - KL720 KDP2: connect_devices() often fails with Error 28
# - KL520 USB Boot: is_connectable=False, connect_devices() rejects it
# In these cases, connect_devices_without_check() works and we can
# still load firmware afterwards.
use_without_check = (_device_chip == "KL720") or (not target_dev.is_connectable)
max_retries = 3
last_err = None
for attempt in range(max_retries):
try:
# Clear any stale device group from previous failed attempt.
_clear_device_group()
if use_without_check:
_log(f"{_device_chip}: connect_devices_without_check(usb_port_id={target_dev.usb_port_id}, connectable={target_dev.is_connectable}) attempt {attempt+1}/{max_retries}...")
_device_group = kp.core.connect_devices_without_check(
usb_port_ids=[target_dev.usb_port_id]
)
else:
_log(f"connect_devices(usb_port_id={target_dev.usb_port_id}) attempt {attempt+1}/{max_retries}...")
_device_group = kp.core.connect_devices(
usb_port_ids=[target_dev.usb_port_id]
)
_log(f"connect succeeded on attempt {attempt+1}")
last_err = None
break
except Exception as conn_err:
_clear_device_group()
last_err = conn_err
_log(f"connect attempt {attempt+1} failed: {conn_err}")
if attempt < max_retries - 1:
time.sleep(2)
# Re-scan to refresh device handle
try:
descs = kp.core.scan_devices()
for i in range(descs.device_descriptor_number):
dev = descs.device_descriptor_list[i]
if port and str(dev.usb_port_id) == port:
target_dev = dev
break
elif not port:
target_dev = descs.device_descriptor_list[0]
break
except Exception:
pass
if last_err is not None:
hint = ""
if sys.platform == "win32":
hint = (" On Windows, ensure the WinUSB driver is installed for this device."
" Re-run the installer or use Zadig (https://zadig.akeo.ie).")
raise RuntimeError(f"Failed to connect after {max_retries} attempts: {last_err}.{hint}")
# KL720 needs longer timeout for large NEF transfers (12MB+ over USB)
_timeout_ms = 60000 if _device_chip == "KL720" else 10000
_log(f"Calling set_timeout(milliseconds={_timeout_ms})...")
kp.core.set_timeout(device_group=_device_group, milliseconds=_timeout_ms)
_log(f"set_timeout succeeded")
# Firmware handling — chip-dependent.
# fresh_firmware_loaded is used by Go driver to decide whether to
# skip the post-connect reset (freshly loaded firmware is already
# in a clean state — reset would just waste 30-60s reloading it).
fresh_firmware_loaded = False
if "Loader" in fw_str:
# Device is in USB Boot (Loader) mode and needs firmware
if _device_chip == "KL720":
_log(f"WARNING: {_device_chip} is in Loader mode (unusual). Attempting firmware load...")
scpu_path, ncpu_path = _resolve_firmware_paths(_device_chip)
if scpu_path and ncpu_path:
_log(f"{_device_chip}: Loading firmware: {scpu_path}")
kp.core.load_firmware_from_file(
_device_group, scpu_path, ncpu_path
)
_firmware_loaded = True
_log("Firmware loaded, waiting for reboot...")
time.sleep(5)
# Reconnect after firmware load (with retry)
_clear_device_group()
for retry in range(3):
try:
descs = kp.core.scan_devices()
target_dev = descs.device_descriptor_list[0]
try:
_device_group = kp.core.connect_devices(
usb_port_ids=[target_dev.usb_port_id]
)
except Exception:
_device_group = kp.core.connect_devices_without_check(
usb_port_ids=[target_dev.usb_port_id]
)
break
except Exception as re_err:
_log(f"Reconnect attempt {retry+1} failed: {re_err}")
if retry < 2:
time.sleep(3)
if _device_group is None:
return {"error": "Device not found after firmware load. Unplug and re-plug the device."}
kp.core.set_timeout(
device_group=_device_group, milliseconds=_timeout_ms
)
fw_str = str(target_dev.firmware)
fresh_firmware_loaded = True
_log(f"Reconnected after firmware load, firmware: {fw_str}")
else:
_log(f"WARNING: {_device_chip} firmware files not found, skipping firmware load")
else:
# Not in Loader mode — firmware already present from a previous
# session. This is the state that triggers Error 15 on inference
# without reset, per observed bug.
_log(f"{_device_chip}: firmware already present (normal). fw={fw_str}")
return {
"status": "connected",
"firmware": fw_str,
"kn_number": f"0x{target_dev.kn_number:08X}",
"chip": _device_chip,
"fresh_firmware_loaded": fresh_firmware_loaded,
}
except Exception as e:
_clear_device_group()
return {"error": str(e)}
def handle_disconnect(params):
"""Disconnect from the current device."""
global _device_group, _model_id, _model_nef, _firmware_loaded
global _model_type, _model_input_size, _device_chip
_clear_device_group()
_model_id = None
_model_nef = None
_model_type = "tiny_yolov3"
_model_input_size = 224
_firmware_loaded = False
_device_chip = "KL520"
return {"status": "disconnected"}
def handle_reset(params):
"""Reset the device back to USB Boot (Loader) state.
This forces the device to drop its firmware and any loaded models.
After reset the device will re-enumerate on USB, so the caller
must wait and issue a fresh 'connect' command.
"""
global _device_group, _model_id, _model_nef, _firmware_loaded
global _model_type, _model_input_size, _device_chip
if _device_group is None:
return {"error": "device not connected"}
try:
_log("Resetting device (kp.core.reset_device KP_RESET_REBOOT)...")
kp.core.reset_device(
device_group=_device_group,
reset_mode=kp.ResetMode.KP_RESET_REBOOT,
)
_log("Device reset command sent successfully")
except Exception as e:
_log(f"reset_device raised: {e}")
# Even if it throws, the device usually does reset.
# Clear all state — the device is gone until it re-enumerates.
_clear_device_group()
_model_id = None
_model_nef = None
_model_type = "tiny_yolov3"
_model_input_size = 224
_firmware_loaded = False
_device_chip = "KL520"
return {"status": "reset"}
def handle_load_model(params):
"""Load a model file onto the device.
KL520 USB Boot mode limitation: only one model can be loaded per
USB session. If error 40 occurs, the error is returned to the Go
driver which handles it by restarting the entire Python bridge.
"""
global _model_id, _model_nef
if _device_group is None:
return {"error": "device not connected"}
path = params.get("path", "")
if not path or not os.path.exists(path):
return {"error": f"model file not found: {path}"}
try:
_model_nef = kp.core.load_model_from_file(
device_group=_device_group,
file_path=path
)
except Exception as e:
return {"error": str(e)}
try:
model = _model_nef.models[0]
_model_id = model.id
# Detect model type and input size
_detect_model_type(_model_id, path)
_log(f"Model loaded: id={_model_id}, type={_model_type}, "
f"input={_model_input_size}, target={_model_nef.target_chip}")
return {
"status": "loaded",
"model_id": _model_id,
"model_type": _model_type,
"input_size": _model_input_size,
"model_path": path,
"target_chip": str(_model_nef.target_chip),
}
except Exception as e:
return {"error": str(e)}
def handle_inference(params):
"""Run inference on the provided image data."""
if _device_group is None:
return {"error": "device not connected"}
if _model_id is None:
return {"error": "no model loaded"}
image_b64 = params.get("image_base64", "")
try:
t0 = time.time()
if image_b64:
# Decode base64 image
img_bytes = base64.b64decode(image_b64)
if HAS_CV2:
# Decode image with OpenCV
img_array = np.frombuffer(img_bytes, dtype=np.uint8)
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
if img is None:
return {"error": "failed to decode image"}
h, w = img.shape[:2]
# KL520 NPU requires input image dimensions >= model input size
# and both width/height must be even numbers.
min_dim = _model_input_size
if w < min_dim or h < min_dim or w % 2 != 0 or h % 2 != 0:
if w < min_dim or h < min_dim:
scale = max(min_dim / w, min_dim / h)
new_w = int(w * scale)
new_h = int(h * scale)
else:
new_w, new_h = w, h
new_w = (new_w + 1) & ~1
new_h = (new_h + 1) & ~1
img = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
_log(f"Inference image resized: {w}x{h} -> {new_w}x{new_h} (min_dim={min_dim})")
# Convert BGR to BGR565
img_bgr565 = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2BGR565)
else:
img_bgr565 = np.frombuffer(img_bytes, dtype=np.uint8)
else:
return {"error": "no image data provided"}
# Create inference config (original: pass numpy ndarray, SDK reads shape)
inf_config = kp.GenericImageInferenceDescriptor(
model_id=_model_id,
inference_number=0,
input_node_image_list=[
kp.GenericInputNodeImage(
image=img_bgr565,
image_format=kp.ImageFormat.KP_IMAGE_FORMAT_RGB565,
)
]
)
# Send and receive
_log(f"Inference: sending to NPU (model_type={_model_type}, input_size={_model_input_size})")
kp.inference.generic_image_inference_send(_device_group, inf_config)
result = kp.inference.generic_image_inference_receive(_device_group)
_log(f"Inference: receive complete, parsing...")
elapsed_ms = (time.time() - t0) * 1000
# Parse output based on model type
detections = []
classifications = []
task_type = "detection"
if _model_type == "resnet18":
task_type = "classification"
classifications = _parse_classification_output(result)
elif _model_type == "ssd":
detections = _parse_ssd_output(result, input_size=_model_input_size)
elif _model_type == "fcos":
detections = _parse_fcos_output(result, input_size=_model_input_size)
elif _model_type == "yolov5s":
detections = _parse_yolo_output(
result,
anchors=ANCHORS_YOLOV5S,
input_size=_model_input_size,
)
else:
# Default: Tiny YOLOv3
detections = _parse_yolo_output(
result,
anchors=ANCHORS_TINY_YOLOV3,
input_size=_model_input_size,
)
_log(f"Inference: parse done, detections={len(detections)}, classifications={len(classifications)}, elapsed={elapsed_ms:.1f}ms")
return {
"taskType": task_type,
"timestamp": int(time.time() * 1000),
"latencyMs": round(elapsed_ms, 1),
"detections": detections,
"classifications": classifications,
}
except Exception as e:
import traceback
_log(f"Inference EXCEPTION: {type(e).__name__}: {e}\n{traceback.format_exc()}")
return {"error": str(e)}
# ── Main loop ────────────────────────────────────────────────────────
def main():
"""Main loop: read JSON commands from stdin, write responses to stdout."""
# The Kneron C SDK may write ANSI-colored warnings directly to fd 1
# (stdout), which corrupts our JSON-RPC protocol. To prevent this we
# dup the real stdout fd, then redirect fd 1 to stderr so any C-level
# writes go to stderr. Our JSON responses use the duped fd.
_real_stdout_fd = os.dup(1) # duplicate fd 1
os.dup2(2, 1) # fd 1 now points to stderr
_real_stdout = os.fdopen(_real_stdout_fd, "w")
sys.stdout = sys.stderr # Python-level redirect too
def _respond(obj):
"""Write a JSON response to the real stdout (not stderr)."""
_real_stdout.write(json.dumps(obj) + "\n")
_real_stdout.flush()
# Signal readiness
_respond({"status": "ready"})
_log(f"Bridge started (kp={'yes' if HAS_KP else 'no'}, pyusb={'yes' if HAS_PYUSB else 'no'}, cv2={'yes' if HAS_CV2 else 'no'})")
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
cmd = json.loads(line)
action = cmd.get("cmd", "")
if action == "scan":
result = handle_scan()
elif action == "connect":
result = handle_connect(cmd)
elif action == "disconnect":
result = handle_disconnect(cmd)
elif action == "reset":
result = handle_reset(cmd)
elif action == "load_model":
result = handle_load_model(cmd)
elif action == "inference":
result = handle_inference(cmd)
else:
result = {"error": f"unknown command: {action}"}
_respond(result)
except Exception as e:
_respond({"error": str(e)})
def _cleanup():
"""Explicitly disconnect and clear _device_group before Python GC runs.
KneronPLUS SDK's DeviceGroup.__del__ calls kp_disconnect_devices on a
native handle that may already be freed when the interpreter is shutting
down, causing 'OSError: access violation reading 0x00...'. By doing a
clean disconnect + setting the global to None here, __del__ becomes a
no-op (None has no __del__).
"""
global _device_group
if _device_group is not None:
try:
kp.core.disconnect_devices(_device_group)
except Exception:
pass
_device_group = None
if __name__ == "__main__":
import atexit
atexit.register(_cleanup)
main()
_cleanup() # also call synchronously in case atexit doesn't fire