visionA/local-tool/server/scripts/kneron_bridge.py
jim800121chen a6a121ae86 fix(local-tool): suppress KneronPLUS DeviceGroup.__del__ access violation
Windows 上 bridge script 結束時 Python GC 呼叫 DeviceGroup.__del__ →
kp_disconnect_devices 對已釋放的 native handle 操作 → OSError: access
violation reading 0x00...0C。這是 KneronPLUS SDK 的 destructor 沒做
null check 的已知問題,不影響功能但會印嚇人的 stack trace 到 stderr。

修法:
- 新增 _cleanup() 函式:明確 kp.core.disconnect_devices + 把
  _device_group 設 None(讓 __del__ 成 no-op)
- atexit.register(_cleanup) 確保 interpreter 關閉前 cleanup
- main() return 後也同步呼叫一次(belt-and-suspenders)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 15:40:30 +08:00

1138 lines
43 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Kneron Bridge - JSON-RPC over stdin/stdout
This script acts as a bridge between the Go backend and the Kneron PLUS
Python SDK. It reads JSON commands from stdin and writes JSON responses
to stdout.
Supports:
- KL520 (USB Boot mode - firmware must be loaded each session)
- KL720 (flash-based - firmware pre-installed, models freely reloadable)
"""
import sys
import json
import base64
import time
import os
import io
import numpy as np
try:
import kp
HAS_KP = True
except (ImportError, AttributeError, Exception):
HAS_KP = False
try:
import usb.core
HAS_PYUSB = True
except ImportError:
HAS_PYUSB = False
try:
import cv2
HAS_CV2 = True
except ImportError:
HAS_CV2 = False
# ── Global state ──────────────────────────────────────────────────────
_device_group = None
_model_id = None
_model_nef = None
_model_input_size = 224 # updated on model load
_model_type = "tiny_yolov3" # updated on model load based on model_id / nef name
_firmware_loaded = False
_device_chip = "KL520" # updated on connect from product_id / device_type
# COCO 80-class labels
COCO_CLASSES = [
"person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck",
"boat", "traffic light", "fire hydrant", "stop sign", "parking meter", "bench",
"bird", "cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra",
"giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee",
"skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove",
"skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup",
"fork", "knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange",
"broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "couch",
"potted plant", "bed", "dining table", "toilet", "tv", "laptop", "mouse",
"remote", "keyboard", "cell phone", "microwave", "oven", "toaster", "sink",
"refrigerator", "book", "clock", "vase", "scissors", "teddy bear", "hair drier",
"toothbrush"
]
# Anchor boxes per model type (each list entry = one output head)
ANCHORS_TINY_YOLOV3 = [
[(81, 82), (135, 169), (344, 319)], # 7×7 head (large objects)
[(10, 14), (23, 27), (37, 58)], # 14×14 head (small objects)
]
# YOLOv5s anchors (Kneron model 20005, no-upsample variant for KL520)
ANCHORS_YOLOV5S = [
[(116, 90), (156, 198), (373, 326)], # P5/32 (large)
[(30, 61), (62, 45), (59, 119)], # P4/16 (medium)
[(10, 13), (16, 30), (33, 23)], # P3/8 (small)
]
CONF_THRESHOLD = 0.25
NMS_IOU_THRESHOLD = 0.45
# Known Kneron model IDs → (model_type, input_size)
KNOWN_MODELS = {
# Tiny YOLO v3 (default KL520 model)
0: ("tiny_yolov3", 224),
# ResNet18 classification (model 20001)
20001: ("resnet18", 224),
# FCOS DarkNet53s detection (model 20004)
20004: ("fcos", 512),
# YOLOv5s no-upsample (model 20005)
20005: ("yolov5s", 640),
}
def _log(msg):
"""Write log messages to stderr (stdout is reserved for JSON-RPC)."""
print(f"[kneron_bridge] {msg}", file=sys.stderr, flush=True)
def _resolve_firmware_paths(chip="KL520"):
"""Resolve firmware paths relative to this script's directory."""
base = os.path.dirname(os.path.abspath(__file__))
fw_dir = os.path.join(base, "firmware", chip)
scpu = os.path.join(fw_dir, "fw_scpu.bin")
ncpu = os.path.join(fw_dir, "fw_ncpu.bin")
if os.path.exists(scpu) and os.path.exists(ncpu):
return scpu, ncpu
# Fallback: check KNERON_FW_DIR env var
fw_dir = os.environ.get("KNERON_FW_DIR", "")
if fw_dir:
scpu = os.path.join(fw_dir, "fw_scpu.bin")
ncpu = os.path.join(fw_dir, "fw_ncpu.bin")
if os.path.exists(scpu) and os.path.exists(ncpu):
return scpu, ncpu
return None, None
def _detect_model_type(model_id, nef_path):
"""Detect model type and input size from model ID or .nef filename."""
global _model_type, _model_input_size
# Check known model IDs
if model_id in KNOWN_MODELS:
_model_type, _model_input_size = KNOWN_MODELS[model_id]
_log(f"Model type detected by ID {model_id}: {_model_type} ({_model_input_size}x{_model_input_size})")
return
# Fallback: try to infer from filename
basename = os.path.basename(nef_path).lower() if nef_path else ""
if "yolov5" in basename:
_model_type = "yolov5s"
# Try to parse input size from filename like w640h640
_model_input_size = _parse_size_from_name(basename, default=640)
elif "fcos" in basename:
_model_type = "fcos"
_model_input_size = _parse_size_from_name(basename, default=512)
elif "ssd" in basename:
_model_type = "ssd"
_model_input_size = _parse_size_from_name(basename, default=320)
elif "resnet" in basename or "classification" in basename:
_model_type = "resnet18"
_model_input_size = _parse_size_from_name(basename, default=224)
elif "tiny_yolo" in basename or "tinyyolo" in basename:
_model_type = "tiny_yolov3"
_model_input_size = _parse_size_from_name(basename, default=224)
else:
# Default: assume YOLO-like detection
_model_type = "tiny_yolov3"
_model_input_size = 224
_log(f"Model type detected by filename '{basename}': {_model_type} ({_model_input_size}x{_model_input_size})")
def _parse_size_from_name(name, default=224):
"""Extract input size from filename like 'w640h640' or 'w512h512'."""
import re
m = re.search(r'w(\d+)h(\d+)', name)
if m:
return int(m.group(1))
return default
# ── Post-processing ──────────────────────────────────────────────────
def _sigmoid(x):
return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))
def _nms(detections, iou_threshold=NMS_IOU_THRESHOLD):
"""Non-Maximum Suppression."""
detections.sort(key=lambda d: d["confidence"], reverse=True)
keep = []
for d in detections:
skip = False
for k in keep:
if d["class_id"] != k["class_id"]:
continue
x1 = max(d["bbox"]["x"], k["bbox"]["x"])
y1 = max(d["bbox"]["y"], k["bbox"]["y"])
x2 = min(d["bbox"]["x"] + d["bbox"]["width"],
k["bbox"]["x"] + k["bbox"]["width"])
y2 = min(d["bbox"]["y"] + d["bbox"]["height"],
k["bbox"]["y"] + k["bbox"]["height"])
inter = max(0, x2 - x1) * max(0, y2 - y1)
a1 = d["bbox"]["width"] * d["bbox"]["height"]
a2 = k["bbox"]["width"] * k["bbox"]["height"]
if inter / (a1 + a2 - inter + 1e-6) > iou_threshold:
skip = True
break
if not skip:
keep.append(d)
return keep
def _get_preproc_info(result):
"""Extract letterbox padding info from the inference result.
Kneron SDK applies letterbox resize (aspect-ratio-preserving + zero padding)
before inference. The hw_pre_proc_info tells us how to reverse it.
Returns (pad_left, pad_top, resize_w, resize_h, model_w, model_h) or None.
"""
try:
info = result.header.hw_pre_proc_info_list[0]
return {
"pad_left": info.pad_left if hasattr(info, 'pad_left') else 0,
"pad_top": info.pad_top if hasattr(info, 'pad_top') else 0,
"resized_w": info.resized_img_width if hasattr(info, 'resized_img_width') else 0,
"resized_h": info.resized_img_height if hasattr(info, 'resized_img_height') else 0,
"model_w": info.model_input_width if hasattr(info, 'model_input_width') else 0,
"model_h": info.model_input_height if hasattr(info, 'model_input_height') else 0,
"img_w": info.img_width if hasattr(info, 'img_width') else 0,
"img_h": info.img_height if hasattr(info, 'img_height') else 0,
}
except Exception:
return None
def _correct_bbox_for_letterbox(x, y, w, h, preproc, model_size):
"""Remove letterbox padding offset from normalized bbox coordinates.
Input (x, y, w, h) is in model-input-space normalized to 0-1.
Output is re-normalized to the original image aspect ratio (still 0-1).
For KP_PADDING_CORNER (default): image is at top-left, padding at bottom/right.
"""
if preproc is None:
return x, y, w, h
model_w = preproc["model_w"] or model_size
model_h = preproc["model_h"] or model_size
pad_left = preproc["pad_left"]
pad_top = preproc["pad_top"]
resized_w = preproc["resized_w"] or model_w
resized_h = preproc["resized_h"] or model_h
# If no padding was applied, skip correction
if pad_left == 0 and pad_top == 0 and resized_w == model_w and resized_h == model_h:
return x, y, w, h
# Convert from normalized (0-1 of model input) to pixel coords in model space
px = x * model_w
py = y * model_h
pw = w * model_w
ph = h * model_h
# Subtract padding offset
px -= pad_left
py -= pad_top
# Re-normalize to the resized (un-padded) image dimensions
nx = px / resized_w
ny = py / resized_h
nw = pw / resized_w
nh = ph / resized_h
# Clip to 0-1
nx = max(0.0, min(1.0, nx))
ny = max(0.0, min(1.0, ny))
nw = min(1.0 - nx, nw)
nh = min(1.0 - ny, nh)
return nx, ny, nw, nh
def _parse_yolo_output(result, anchors, input_size, num_classes=80):
"""Parse YOLO (v3/v5) raw output into detection results.
Works for both Tiny YOLOv3 and YOLOv5 — the tensor layout is the same:
(num_anchors * (5 + num_classes), grid_h, grid_w)
The key differences are:
- anchor values
- input_size used for anchor normalization
- number of output heads
Bounding boxes are corrected for letterbox padding so coordinates
are relative to the original image (normalized 0-1).
"""
detections = []
entry_size = 5 + num_classes # 85 for COCO 80 classes
# Get letterbox padding info
preproc = _get_preproc_info(result)
if preproc:
_log(f"Preproc info: pad=({preproc['pad_left']},{preproc['pad_top']}), "
f"resized=({preproc['resized_w']}x{preproc['resized_h']}), "
f"model=({preproc['model_w']}x{preproc['model_h']}), "
f"img=({preproc['img_w']}x{preproc['img_h']})")
for head_idx in range(result.header.num_output_node):
output = kp.inference.generic_inference_retrieve_float_node(
node_idx=head_idx,
generic_raw_result=result,
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
)
arr = output.ndarray[0] # (C, H, W)
channels, grid_h, grid_w = arr.shape
# Determine number of anchors for this head
num_anchors = channels // entry_size
if num_anchors < 1:
_log(f"Head {head_idx}: unexpected shape {arr.shape}, skipping")
continue
# Use the correct anchor set for this head
if head_idx < len(anchors):
head_anchors = anchors[head_idx]
else:
_log(f"Head {head_idx}: no anchors defined, skipping")
continue
for a_idx in range(min(num_anchors, len(head_anchors))):
off = a_idx * entry_size
for cy in range(grid_h):
for cx in range(grid_w):
obj_conf = _sigmoid(arr[off + 4, cy, cx])
if obj_conf < CONF_THRESHOLD:
continue
cls_scores = _sigmoid(arr[off + 5:off + entry_size, cy, cx])
cls_id = int(np.argmax(cls_scores))
cls_conf = float(cls_scores[cls_id])
conf = float(obj_conf * cls_conf)
if conf < CONF_THRESHOLD:
continue
bx = (_sigmoid(arr[off, cy, cx]) + cx) / grid_w
by = (_sigmoid(arr[off + 1, cy, cx]) + cy) / grid_h
aw, ah = head_anchors[a_idx]
bw = (np.exp(min(float(arr[off + 2, cy, cx]), 10)) * aw) / input_size
bh = (np.exp(min(float(arr[off + 3, cy, cx]), 10)) * ah) / input_size
# Convert center x,y,w,h to corner x,y,w,h (normalized to model input)
x = max(0.0, bx - bw / 2)
y = max(0.0, by - bh / 2)
w = min(1.0, bx + bw / 2) - x
h = min(1.0, by + bh / 2) - y
# Correct for letterbox padding
x, y, w, h = _correct_bbox_for_letterbox(x, y, w, h, preproc, input_size)
label = COCO_CLASSES[cls_id] if cls_id < len(COCO_CLASSES) else f"class_{cls_id}"
detections.append({
"label": label,
"class_id": cls_id,
"confidence": conf,
"bbox": {"x": x, "y": y, "width": w, "height": h},
})
detections = _nms(detections)
# Remove internal class_id before returning
for d in detections:
del d["class_id"]
return detections
def _parse_ssd_output(result, input_size=320, num_classes=2):
"""Parse SSD face detection output.
SSD typically outputs two tensors:
- locations: (num_boxes, 4) — bounding box coordinates
- confidences: (num_boxes, num_classes) — class scores
For the KL520 SSD face detection model (kl520_ssd_fd_lm.nef),
the output contains face detections with landmarks.
"""
detections = []
preproc = _get_preproc_info(result)
try:
# Retrieve all output nodes
num_outputs = result.header.num_output_node
outputs = []
for i in range(num_outputs):
output = kp.inference.generic_inference_retrieve_float_node(
node_idx=i,
generic_raw_result=result,
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
)
outputs.append(output.ndarray[0])
if num_outputs < 2:
_log(f"SSD: expected >=2 output nodes, got {num_outputs}")
return detections
# Heuristic: the larger tensor is locations, smaller is confidences
# Or: first output = locations, second = confidences
locations = outputs[0]
confidences = outputs[1]
# Flatten if needed
if locations.ndim > 2:
locations = locations.reshape(-1, 4)
if confidences.ndim > 2:
confidences = confidences.reshape(-1, confidences.shape[-1])
num_boxes = min(locations.shape[0], confidences.shape[0])
for i in range(num_boxes):
# SSD confidence: class 0 = background, class 1 = face
if confidences.shape[-1] > 1:
conf = float(confidences[i, 1]) # face class
else:
conf = float(_sigmoid(confidences[i, 0]))
if conf < CONF_THRESHOLD:
continue
# SSD outputs are typically [x_min, y_min, x_max, y_max] normalized
x_min = float(np.clip(locations[i, 0], 0.0, 1.0))
y_min = float(np.clip(locations[i, 1], 0.0, 1.0))
x_max = float(np.clip(locations[i, 2], 0.0, 1.0))
y_max = float(np.clip(locations[i, 3], 0.0, 1.0))
w = x_max - x_min
h = y_max - y_min
if w <= 0 or h <= 0:
continue
# Correct for letterbox padding
x_min, y_min, w, h = _correct_bbox_for_letterbox(
x_min, y_min, w, h, preproc, input_size)
detections.append({
"label": "face",
"class_id": 0,
"confidence": conf,
"bbox": {"x": x_min, "y": y_min, "width": w, "height": h},
})
detections = _nms(detections)
for d in detections:
del d["class_id"]
except Exception as e:
_log(f"SSD parse error: {e}")
return detections
def _parse_fcos_output(result, input_size=512, num_classes=80):
"""Parse FCOS (Fully Convolutional One-Stage) detection output.
FCOS outputs per feature level:
- classification: (num_classes, H, W)
- centerness: (1, H, W)
- regression: (4, H, W) — distances from each pixel to box edges (l, t, r, b)
The outputs come in groups of 3 per feature level.
"""
detections = []
preproc = _get_preproc_info(result)
try:
num_outputs = result.header.num_output_node
outputs = []
for i in range(num_outputs):
output = kp.inference.generic_inference_retrieve_float_node(
node_idx=i,
generic_raw_result=result,
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
)
outputs.append(output.ndarray[0])
# FCOS typically has 5 feature levels × 3 outputs = 15 output nodes
# Or fewer for simplified models. Group by 3: (cls, centerness, reg)
# If we can't determine the grouping, try a simpler approach.
strides = [8, 16, 32, 64, 128]
num_levels = num_outputs // 3
for level in range(num_levels):
cls_out = outputs[level * 3] # (num_classes, H, W)
cnt_out = outputs[level * 3 + 1] # (1, H, W)
reg_out = outputs[level * 3 + 2] # (4, H, W)
stride = strides[level] if level < len(strides) else (8 * (2 ** level))
h, w = cls_out.shape[1], cls_out.shape[2]
for cy in range(h):
for cx in range(w):
cls_scores = _sigmoid(cls_out[:, cy, cx])
cls_id = int(np.argmax(cls_scores))
cls_conf = float(cls_scores[cls_id])
centerness = float(_sigmoid(cnt_out[0, cy, cx]))
conf = cls_conf * centerness
if conf < CONF_THRESHOLD:
continue
# Regression: distances from pixel center to box edges
px = (cx + 0.5) * stride
py = (cy + 0.5) * stride
l = float(np.exp(min(reg_out[0, cy, cx], 10))) * stride
t = float(np.exp(min(reg_out[1, cy, cx], 10))) * stride
r = float(np.exp(min(reg_out[2, cy, cx], 10))) * stride
b = float(np.exp(min(reg_out[3, cy, cx], 10))) * stride
x_min = max(0.0, (px - l) / input_size)
y_min = max(0.0, (py - t) / input_size)
x_max = min(1.0, (px + r) / input_size)
y_max = min(1.0, (py + b) / input_size)
bw = x_max - x_min
bh = y_max - y_min
if bw <= 0 or bh <= 0:
continue
# Correct for letterbox padding
x_min, y_min, bw, bh = _correct_bbox_for_letterbox(
x_min, y_min, bw, bh, preproc, input_size)
label = COCO_CLASSES[cls_id] if cls_id < len(COCO_CLASSES) else f"class_{cls_id}"
detections.append({
"label": label,
"class_id": cls_id,
"confidence": conf,
"bbox": {"x": x_min, "y": y_min, "width": bw, "height": bh},
})
detections = _nms(detections)
for d in detections:
del d["class_id"]
except Exception as e:
_log(f"FCOS parse error: {e}")
return detections
def _parse_classification_output(result, num_classes=1000):
"""Parse classification model output (e.g., ResNet18 ImageNet)."""
try:
output = kp.inference.generic_inference_retrieve_float_node(
node_idx=0,
generic_raw_result=result,
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
)
scores = output.ndarray.flatten()
# Apply softmax
exp_scores = np.exp(scores - np.max(scores))
probs = exp_scores / exp_scores.sum()
# Top-5
top_indices = np.argsort(probs)[::-1][:5]
classifications = []
for idx in top_indices:
label = COCO_CLASSES[idx] if idx < len(COCO_CLASSES) else f"class_{idx}"
classifications.append({
"label": label,
"confidence": float(probs[idx]),
})
return classifications
except Exception as e:
_log(f"Classification parse error: {e}")
return []
# ── Command handlers ─────────────────────────────────────────────────
def handle_scan():
"""Scan for connected Kneron devices.
Tries Kneron PLUS SDK first (provides firmware info, kn_number, etc.).
Falls back to pyusb if the SDK is unavailable (e.g. macOS missing .dylib).
"""
if HAS_KP:
try:
descs = kp.core.scan_devices()
devices = []
for i in range(descs.device_descriptor_number):
dev = descs.device_descriptor_list[i]
devices.append({
"port": str(dev.usb_port_id),
"firmware": str(dev.firmware),
"kn_number": f"0x{dev.kn_number:08X}",
"product_id": f"0x{dev.product_id:04X}",
"connectable": dev.is_connectable,
})
return {"devices": devices}
except Exception as e:
_log(f"kp.core.scan_devices failed: {e}, trying pyusb fallback")
# Fallback: use pyusb (same approach as kneron_detect.py)
if HAS_PYUSB:
return _scan_with_pyusb()
return {"devices": [], "error_detail": "neither kp nor pyusb available"}
# Known Kneron product IDs (same as kneron_detect.py)
_KNERON_VENDOR_ID = 0x3231
_KNOWN_PRODUCTS = {
0x0100: "KL520",
0x0200: "KL720",
0x0720: "KL720",
0x0530: "KL530",
0x0630: "KL630",
0x0730: "KL730",
}
def _scan_with_pyusb():
"""Scan for Kneron devices using pyusb (libusb backend)."""
try:
usb_devices = list(usb.core.find(find_all=True, idVendor=_KNERON_VENDOR_ID))
devices = []
for dev in usb_devices:
product_id = f"0x{dev.idProduct:04X}"
chip = _KNOWN_PRODUCTS.get(dev.idProduct, f"Unknown-{product_id}")
# pyusb port_id: bus-address
port = f"{dev.bus}-{dev.address}"
firmware = "unknown"
try:
firmware = dev.product or "unknown"
except Exception:
pass
devices.append({
"port": port,
"firmware": firmware,
"kn_number": "0x00000000",
"product_id": product_id,
"connectable": True,
})
return {"devices": devices}
except Exception as e:
return {"devices": [], "error_detail": f"pyusb scan failed: {e}"}
def handle_connect(params):
"""Connect to a Kneron device and load firmware if needed.
KL520: USB Boot mode — firmware MUST be uploaded every session.
KL720 (KDP2, pid=0x0720): Flash-based — firmware pre-installed.
KL720 (KDP legacy, pid=0x0200): Old firmware — needs connect_without_check
+ firmware load to RAM before normal operation.
"""
global _device_group, _firmware_loaded, _device_chip
if not HAS_KP:
return {"error": "kp module not available"}
try:
port = params.get("port", "")
device_type = params.get("device_type", "")
# Scan to find device
descs = kp.core.scan_devices()
if descs.device_descriptor_number == 0:
return {"error": "no Kneron device found"}
# Find device by port or use first one
target_dev = None
for i in range(descs.device_descriptor_number):
dev = descs.device_descriptor_list[i]
if port and str(dev.usb_port_id) == port:
target_dev = dev
break
if target_dev is None:
target_dev = descs.device_descriptor_list[0]
# Note: KL520 in USB Boot mode has is_connectable=False, which is
# normal — it becomes connectable after firmware is loaded. KL720 KDP
# legacy (pid=0x0200) is also not connectable until firmware load.
# So we do NOT reject is_connectable=False here; instead we attempt
# connection and firmware load as appropriate.
# Determine chip type from device_type param or product_id
pid = target_dev.product_id
if "kl720" in device_type.lower():
_device_chip = "KL720"
elif "kl520" in device_type.lower():
_device_chip = "KL520"
elif pid in (0x0200, 0x0720):
_device_chip = "KL720"
else:
_device_chip = "KL520"
fw_str = str(target_dev.firmware)
is_kdp_legacy = (_device_chip == "KL720" and pid == 0x0200)
_log(f"Chip type: {_device_chip} (product_id=0x{pid:04X}, device_type={device_type}, fw={fw_str})")
# ── KL720 KDP Legacy (pid=0x0200): old firmware, incompatible with SDK ──
if is_kdp_legacy:
_log(f"KL720 has legacy KDP firmware (pid=0x0200). Using connect_devices_without_check...")
_device_group = kp.core.connect_devices_without_check(
usb_port_ids=[target_dev.usb_port_id]
)
kp.core.set_timeout(device_group=_device_group, milliseconds=60000)
# Load KDP2 firmware to RAM so the device can operate with this SDK
scpu_path, ncpu_path = _resolve_firmware_paths("KL720")
if scpu_path and ncpu_path:
_log(f"KL720: Loading KDP2 firmware to RAM: {scpu_path}")
kp.core.load_firmware_from_file(
_device_group, scpu_path, ncpu_path
)
_firmware_loaded = True
_log("KL720: Firmware loaded to RAM, waiting for reboot...")
time.sleep(5)
# Reconnect — device should now be running KDP2 in RAM
descs = kp.core.scan_devices()
reconnected = False
for i in range(descs.device_descriptor_number):
dev = descs.device_descriptor_list[i]
if dev.product_id in (0x0200, 0x0720):
target_dev = dev
reconnected = True
break
if not reconnected:
return {"error": "KL720 not found after firmware load. Unplug and re-plug."}
# Try normal connect first, fallback to without_check
try:
_device_group = kp.core.connect_devices(
usb_port_ids=[target_dev.usb_port_id]
)
except Exception as conn_err:
_log(f"KL720: Normal reconnect failed ({conn_err}), using without_check...")
_device_group = kp.core.connect_devices_without_check(
usb_port_ids=[target_dev.usb_port_id]
)
kp.core.set_timeout(device_group=_device_group, milliseconds=10000)
fw_str = str(target_dev.firmware)
_log(f"KL720: Reconnected after firmware load, pid=0x{target_dev.product_id:04X}, fw={fw_str}")
else:
_log("WARNING: KL720 firmware files not found. Cannot operate with KDP legacy device.")
_device_group = None
return {"error": "KL720 has legacy KDP firmware but KDP2 firmware files not found. "
"Run update_kl720_firmware.py to flash KDP2 permanently."}
return {
"status": "connected",
"firmware": fw_str,
"kn_number": f"0x{target_dev.kn_number:08X}",
"chip": _device_chip,
"kdp_legacy": True,
}
# ── Normal connection (KL520 or KL720 KDP2) ──
# Use connect_devices_without_check when:
# - KL720 KDP2: connect_devices() often fails with Error 28
# - KL520 USB Boot: is_connectable=False, connect_devices() rejects it
# In these cases, connect_devices_without_check() works and we can
# still load firmware afterwards.
use_without_check = (_device_chip == "KL720") or (not target_dev.is_connectable)
max_retries = 3
last_err = None
for attempt in range(max_retries):
try:
# Clear any stale device group from previous failed attempt
# to prevent DeviceGroup.__del__ access violation during GC.
_device_group = None
if use_without_check:
_log(f"{_device_chip}: connect_devices_without_check(usb_port_id={target_dev.usb_port_id}, connectable={target_dev.is_connectable}) attempt {attempt+1}/{max_retries}...")
_device_group = kp.core.connect_devices_without_check(
usb_port_ids=[target_dev.usb_port_id]
)
else:
_log(f"connect_devices(usb_port_id={target_dev.usb_port_id}) attempt {attempt+1}/{max_retries}...")
_device_group = kp.core.connect_devices(
usb_port_ids=[target_dev.usb_port_id]
)
_log(f"connect succeeded on attempt {attempt+1}")
last_err = None
break
except Exception as conn_err:
_device_group = None # prevent __del__ crash on stale handle
last_err = conn_err
_log(f"connect attempt {attempt+1} failed: {conn_err}")
if attempt < max_retries - 1:
time.sleep(2)
# Re-scan to refresh device handle
try:
descs = kp.core.scan_devices()
for i in range(descs.device_descriptor_number):
dev = descs.device_descriptor_list[i]
if port and str(dev.usb_port_id) == port:
target_dev = dev
break
elif not port:
target_dev = descs.device_descriptor_list[0]
break
except Exception:
pass
if last_err is not None:
hint = ""
if sys.platform == "win32":
hint = (" On Windows, ensure the WinUSB driver is installed for this device."
" Re-run the installer or use Zadig (https://zadig.akeo.ie).")
raise RuntimeError(f"Failed to connect after {max_retries} attempts: {last_err}.{hint}")
# KL720 needs longer timeout for large NEF transfers (12MB+ over USB)
_timeout_ms = 60000 if _device_chip == "KL720" else 10000
_log(f"Calling set_timeout(milliseconds={_timeout_ms})...")
kp.core.set_timeout(device_group=_device_group, milliseconds=_timeout_ms)
_log(f"set_timeout succeeded")
# Firmware handling — chip-dependent
if "Loader" in fw_str:
# Device is in USB Boot (Loader) mode and needs firmware
if _device_chip == "KL720":
_log(f"WARNING: {_device_chip} is in Loader mode (unusual). Attempting firmware load...")
scpu_path, ncpu_path = _resolve_firmware_paths(_device_chip)
if scpu_path and ncpu_path:
_log(f"{_device_chip}: Loading firmware: {scpu_path}")
kp.core.load_firmware_from_file(
_device_group, scpu_path, ncpu_path
)
_firmware_loaded = True
_log("Firmware loaded, waiting for reboot...")
time.sleep(5)
# Reconnect after firmware load (with retry)
_device_group = None
for retry in range(3):
try:
descs = kp.core.scan_devices()
target_dev = descs.device_descriptor_list[0]
try:
_device_group = kp.core.connect_devices(
usb_port_ids=[target_dev.usb_port_id]
)
except Exception:
_device_group = kp.core.connect_devices_without_check(
usb_port_ids=[target_dev.usb_port_id]
)
break
except Exception as re_err:
_log(f"Reconnect attempt {retry+1} failed: {re_err}")
if retry < 2:
time.sleep(3)
if _device_group is None:
return {"error": "Device not found after firmware load. Unplug and re-plug the device."}
kp.core.set_timeout(
device_group=_device_group, milliseconds=_timeout_ms
)
fw_str = str(target_dev.firmware)
_log(f"Reconnected after firmware load, firmware: {fw_str}")
else:
_log(f"WARNING: {_device_chip} firmware files not found, skipping firmware load")
else:
# Not in Loader mode — firmware already present
_log(f"{_device_chip}: firmware already present (normal). fw={fw_str}")
return {
"status": "connected",
"firmware": fw_str,
"kn_number": f"0x{target_dev.kn_number:08X}",
"chip": _device_chip,
}
except Exception as e:
_device_group = None
return {"error": str(e)}
def handle_disconnect(params):
"""Disconnect from the current device."""
global _device_group, _model_id, _model_nef, _firmware_loaded
global _model_type, _model_input_size, _device_chip
_device_group = None
_model_id = None
_model_nef = None
_model_type = "tiny_yolov3"
_model_input_size = 224
_firmware_loaded = False
_device_chip = "KL520"
return {"status": "disconnected"}
def handle_reset(params):
"""Reset the device back to USB Boot (Loader) state.
This forces the device to drop its firmware and any loaded models.
After reset the device will re-enumerate on USB, so the caller
must wait and issue a fresh 'connect' command.
"""
global _device_group, _model_id, _model_nef, _firmware_loaded
global _model_type, _model_input_size, _device_chip
if _device_group is None:
return {"error": "device not connected"}
try:
_log("Resetting device (kp.core.reset_device KP_RESET_REBOOT)...")
kp.core.reset_device(
device_group=_device_group,
reset_mode=kp.ResetMode.KP_RESET_REBOOT,
)
_log("Device reset command sent successfully")
except Exception as e:
_log(f"reset_device raised: {e}")
# Even if it throws, the device usually does reset.
# Clear all state — the device is gone until it re-enumerates.
_device_group = None
_model_id = None
_model_nef = None
_model_type = "tiny_yolov3"
_model_input_size = 224
_firmware_loaded = False
_device_chip = "KL520"
return {"status": "reset"}
def handle_load_model(params):
"""Load a model file onto the device.
KL520 USB Boot mode limitation: only one model can be loaded per
USB session. If error 40 occurs, the error is returned to the Go
driver which handles it by restarting the entire Python bridge.
"""
global _model_id, _model_nef
if _device_group is None:
return {"error": "device not connected"}
path = params.get("path", "")
if not path or not os.path.exists(path):
return {"error": f"model file not found: {path}"}
try:
_model_nef = kp.core.load_model_from_file(
device_group=_device_group,
file_path=path
)
except Exception as e:
return {"error": str(e)}
try:
model = _model_nef.models[0]
_model_id = model.id
# Detect model type and input size
_detect_model_type(_model_id, path)
_log(f"Model loaded: id={_model_id}, type={_model_type}, "
f"input={_model_input_size}, target={_model_nef.target_chip}")
return {
"status": "loaded",
"model_id": _model_id,
"model_type": _model_type,
"input_size": _model_input_size,
"model_path": path,
"target_chip": str(_model_nef.target_chip),
}
except Exception as e:
return {"error": str(e)}
def handle_inference(params):
"""Run inference on the provided image data."""
if _device_group is None:
return {"error": "device not connected"}
if _model_id is None:
return {"error": "no model loaded"}
image_b64 = params.get("image_base64", "")
try:
t0 = time.time()
if image_b64:
# Decode base64 image
img_bytes = base64.b64decode(image_b64)
if HAS_CV2:
# Decode image with OpenCV
img_array = np.frombuffer(img_bytes, dtype=np.uint8)
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
if img is None:
return {"error": "failed to decode image"}
h, w = img.shape[:2]
# KL520 NPU requires input image dimensions >= model input size
# and both width/height must be even numbers.
min_dim = _model_input_size
if w < min_dim or h < min_dim or w % 2 != 0 or h % 2 != 0:
if w < min_dim or h < min_dim:
scale = max(min_dim / w, min_dim / h)
new_w = int(w * scale)
new_h = int(h * scale)
else:
new_w, new_h = w, h
# Ensure even dimensions (NPU requirement)
new_w = (new_w + 1) & ~1
new_h = (new_h + 1) & ~1
img = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
_log(f"Inference image resized: {w}x{h} -> {new_w}x{new_h} (min_dim={min_dim})")
# Convert BGR to BGR565
img_bgr565 = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2BGR565)
else:
# Fallback: try to use raw bytes (assume RGB565 format)
img_bgr565 = np.frombuffer(img_bytes, dtype=np.uint8)
else:
return {"error": "no image data provided"}
# Create inference config
inf_config = kp.GenericImageInferenceDescriptor(
model_id=_model_id,
inference_number=0,
input_node_image_list=[
kp.GenericInputNodeImage(
image=img_bgr565,
image_format=kp.ImageFormat.KP_IMAGE_FORMAT_RGB565,
)
]
)
# Send and receive
kp.inference.generic_image_inference_send(_device_group, inf_config)
result = kp.inference.generic_image_inference_receive(_device_group)
elapsed_ms = (time.time() - t0) * 1000
# Parse output based on model type
detections = []
classifications = []
task_type = "detection"
if _model_type == "resnet18":
task_type = "classification"
classifications = _parse_classification_output(result)
elif _model_type == "ssd":
detections = _parse_ssd_output(result, input_size=_model_input_size)
elif _model_type == "fcos":
detections = _parse_fcos_output(result, input_size=_model_input_size)
elif _model_type == "yolov5s":
detections = _parse_yolo_output(
result,
anchors=ANCHORS_YOLOV5S,
input_size=_model_input_size,
)
else:
# Default: Tiny YOLOv3
detections = _parse_yolo_output(
result,
anchors=ANCHORS_TINY_YOLOV3,
input_size=_model_input_size,
)
return {
"taskType": task_type,
"timestamp": int(time.time() * 1000),
"latencyMs": round(elapsed_ms, 1),
"detections": detections,
"classifications": classifications,
}
except Exception as e:
return {"error": str(e)}
# ── Main loop ────────────────────────────────────────────────────────
def main():
"""Main loop: read JSON commands from stdin, write responses to stdout."""
# The Kneron C SDK may write ANSI-colored warnings directly to fd 1
# (stdout), which corrupts our JSON-RPC protocol. To prevent this we
# dup the real stdout fd, then redirect fd 1 to stderr so any C-level
# writes go to stderr. Our JSON responses use the duped fd.
_real_stdout_fd = os.dup(1) # duplicate fd 1
os.dup2(2, 1) # fd 1 now points to stderr
_real_stdout = os.fdopen(_real_stdout_fd, "w")
sys.stdout = sys.stderr # Python-level redirect too
def _respond(obj):
"""Write a JSON response to the real stdout (not stderr)."""
_real_stdout.write(json.dumps(obj) + "\n")
_real_stdout.flush()
# Signal readiness
_respond({"status": "ready"})
_log(f"Bridge started (kp={'yes' if HAS_KP else 'no'}, pyusb={'yes' if HAS_PYUSB else 'no'}, cv2={'yes' if HAS_CV2 else 'no'})")
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
cmd = json.loads(line)
action = cmd.get("cmd", "")
if action == "scan":
result = handle_scan()
elif action == "connect":
result = handle_connect(cmd)
elif action == "disconnect":
result = handle_disconnect(cmd)
elif action == "reset":
result = handle_reset(cmd)
elif action == "load_model":
result = handle_load_model(cmd)
elif action == "inference":
result = handle_inference(cmd)
else:
result = {"error": f"unknown command: {action}"}
_respond(result)
except Exception as e:
_respond({"error": str(e)})
def _cleanup():
"""Explicitly disconnect and clear _device_group before Python GC runs.
KneronPLUS SDK's DeviceGroup.__del__ calls kp_disconnect_devices on a
native handle that may already be freed when the interpreter is shutting
down, causing 'OSError: access violation reading 0x00...'. By doing a
clean disconnect + setting the global to None here, __del__ becomes a
no-op (None has no __del__).
"""
global _device_group
if _device_group is not None:
try:
kp.core.disconnect_devices(_device_group)
except Exception:
pass
_device_group = None
if __name__ == "__main__":
import atexit
atexit.register(_cleanup)
main()
_cleanup() # also call synchronously in case atexit doesn't fire