local-tool/: visionA-local desktop app
- M1: Wails shell + Go server + Next.js UI + Mock mode (macOS dmg ready)
- M2: i18n (zh-TW/en) + Settings 4-tab refactor
- M3: Embedded Python 3.12 runtime (python-build-standalone) + KneronPLUS wheels
- M4: Windows Inno Setup script (build on Windows runner)
- M5: Linux AppImage script + udev rule (build on Linux runner)
- M6: ffmpeg (GPL, pending legal review) + yt-dlp bundled
- Lifecycle: watchServer health check, fatal native dialog,
Wails IPC raise endpoint, stale process cleanup
.autoflow/: full PRD / Design Spec / Architecture / Testing docs
(4 rounds tri-party discussion + cross review)
.github/workflows/: macOS / Windows / Linux build CI
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1117 lines
42 KiB
Python
1117 lines
42 KiB
Python
#!/usr/bin/env python3
|
||
"""Kneron Bridge - JSON-RPC over stdin/stdout
|
||
|
||
This script acts as a bridge between the Go backend and the Kneron PLUS
|
||
Python SDK. It reads JSON commands from stdin and writes JSON responses
|
||
to stdout.
|
||
|
||
Supports:
|
||
- KL520 (USB Boot mode - firmware must be loaded each session)
|
||
- KL720 (flash-based - firmware pre-installed, models freely reloadable)
|
||
"""
|
||
import sys
|
||
import json
|
||
import base64
|
||
import time
|
||
import os
|
||
import io
|
||
|
||
import numpy as np
|
||
|
||
try:
|
||
import kp
|
||
HAS_KP = True
|
||
except (ImportError, AttributeError, Exception):
|
||
HAS_KP = False
|
||
|
||
try:
|
||
import usb.core
|
||
HAS_PYUSB = True
|
||
except ImportError:
|
||
HAS_PYUSB = False
|
||
|
||
try:
|
||
import cv2
|
||
HAS_CV2 = True
|
||
except ImportError:
|
||
HAS_CV2 = False
|
||
|
||
# ── Global state ──────────────────────────────────────────────────────
|
||
_device_group = None
|
||
_model_id = None
|
||
_model_nef = None
|
||
_model_input_size = 224 # updated on model load
|
||
_model_type = "tiny_yolov3" # updated on model load based on model_id / nef name
|
||
_firmware_loaded = False
|
||
_device_chip = "KL520" # updated on connect from product_id / device_type
|
||
|
||
# COCO 80-class labels
|
||
COCO_CLASSES = [
|
||
"person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck",
|
||
"boat", "traffic light", "fire hydrant", "stop sign", "parking meter", "bench",
|
||
"bird", "cat", "dog", "horse", "sheep", "cow", "elephant", "bear", "zebra",
|
||
"giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee",
|
||
"skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove",
|
||
"skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup",
|
||
"fork", "knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange",
|
||
"broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "couch",
|
||
"potted plant", "bed", "dining table", "toilet", "tv", "laptop", "mouse",
|
||
"remote", "keyboard", "cell phone", "microwave", "oven", "toaster", "sink",
|
||
"refrigerator", "book", "clock", "vase", "scissors", "teddy bear", "hair drier",
|
||
"toothbrush"
|
||
]
|
||
|
||
# Anchor boxes per model type (each list entry = one output head)
|
||
ANCHORS_TINY_YOLOV3 = [
|
||
[(81, 82), (135, 169), (344, 319)], # 7×7 head (large objects)
|
||
[(10, 14), (23, 27), (37, 58)], # 14×14 head (small objects)
|
||
]
|
||
|
||
# YOLOv5s anchors (Kneron model 20005, no-upsample variant for KL520)
|
||
ANCHORS_YOLOV5S = [
|
||
[(116, 90), (156, 198), (373, 326)], # P5/32 (large)
|
||
[(30, 61), (62, 45), (59, 119)], # P4/16 (medium)
|
||
[(10, 13), (16, 30), (33, 23)], # P3/8 (small)
|
||
]
|
||
|
||
CONF_THRESHOLD = 0.25
|
||
NMS_IOU_THRESHOLD = 0.45
|
||
|
||
# Known Kneron model IDs → (model_type, input_size)
|
||
KNOWN_MODELS = {
|
||
# Tiny YOLO v3 (default KL520 model)
|
||
0: ("tiny_yolov3", 224),
|
||
# ResNet18 classification (model 20001)
|
||
20001: ("resnet18", 224),
|
||
# FCOS DarkNet53s detection (model 20004)
|
||
20004: ("fcos", 512),
|
||
# YOLOv5s no-upsample (model 20005)
|
||
20005: ("yolov5s", 640),
|
||
}
|
||
|
||
|
||
def _log(msg):
|
||
"""Write log messages to stderr (stdout is reserved for JSON-RPC)."""
|
||
print(f"[kneron_bridge] {msg}", file=sys.stderr, flush=True)
|
||
|
||
|
||
def _resolve_firmware_paths(chip="KL520"):
|
||
"""Resolve firmware paths relative to this script's directory."""
|
||
base = os.path.dirname(os.path.abspath(__file__))
|
||
fw_dir = os.path.join(base, "firmware", chip)
|
||
scpu = os.path.join(fw_dir, "fw_scpu.bin")
|
||
ncpu = os.path.join(fw_dir, "fw_ncpu.bin")
|
||
if os.path.exists(scpu) and os.path.exists(ncpu):
|
||
return scpu, ncpu
|
||
# Fallback: check KNERON_FW_DIR env var
|
||
fw_dir = os.environ.get("KNERON_FW_DIR", "")
|
||
if fw_dir:
|
||
scpu = os.path.join(fw_dir, "fw_scpu.bin")
|
||
ncpu = os.path.join(fw_dir, "fw_ncpu.bin")
|
||
if os.path.exists(scpu) and os.path.exists(ncpu):
|
||
return scpu, ncpu
|
||
return None, None
|
||
|
||
|
||
def _detect_model_type(model_id, nef_path):
|
||
"""Detect model type and input size from model ID or .nef filename."""
|
||
global _model_type, _model_input_size
|
||
|
||
# Check known model IDs
|
||
if model_id in KNOWN_MODELS:
|
||
_model_type, _model_input_size = KNOWN_MODELS[model_id]
|
||
_log(f"Model type detected by ID {model_id}: {_model_type} ({_model_input_size}x{_model_input_size})")
|
||
return
|
||
|
||
# Fallback: try to infer from filename
|
||
basename = os.path.basename(nef_path).lower() if nef_path else ""
|
||
|
||
if "yolov5" in basename:
|
||
_model_type = "yolov5s"
|
||
# Try to parse input size from filename like w640h640
|
||
_model_input_size = _parse_size_from_name(basename, default=640)
|
||
elif "fcos" in basename:
|
||
_model_type = "fcos"
|
||
_model_input_size = _parse_size_from_name(basename, default=512)
|
||
elif "ssd" in basename:
|
||
_model_type = "ssd"
|
||
_model_input_size = _parse_size_from_name(basename, default=320)
|
||
elif "resnet" in basename or "classification" in basename:
|
||
_model_type = "resnet18"
|
||
_model_input_size = _parse_size_from_name(basename, default=224)
|
||
elif "tiny_yolo" in basename or "tinyyolo" in basename:
|
||
_model_type = "tiny_yolov3"
|
||
_model_input_size = _parse_size_from_name(basename, default=224)
|
||
else:
|
||
# Default: assume YOLO-like detection
|
||
_model_type = "tiny_yolov3"
|
||
_model_input_size = 224
|
||
|
||
_log(f"Model type detected by filename '{basename}': {_model_type} ({_model_input_size}x{_model_input_size})")
|
||
|
||
|
||
def _parse_size_from_name(name, default=224):
|
||
"""Extract input size from filename like 'w640h640' or 'w512h512'."""
|
||
import re
|
||
m = re.search(r'w(\d+)h(\d+)', name)
|
||
if m:
|
||
return int(m.group(1))
|
||
return default
|
||
|
||
|
||
# ── Post-processing ──────────────────────────────────────────────────
|
||
|
||
def _sigmoid(x):
|
||
return 1.0 / (1.0 + np.exp(-np.clip(x, -500, 500)))
|
||
|
||
|
||
def _nms(detections, iou_threshold=NMS_IOU_THRESHOLD):
|
||
"""Non-Maximum Suppression."""
|
||
detections.sort(key=lambda d: d["confidence"], reverse=True)
|
||
keep = []
|
||
for d in detections:
|
||
skip = False
|
||
for k in keep:
|
||
if d["class_id"] != k["class_id"]:
|
||
continue
|
||
x1 = max(d["bbox"]["x"], k["bbox"]["x"])
|
||
y1 = max(d["bbox"]["y"], k["bbox"]["y"])
|
||
x2 = min(d["bbox"]["x"] + d["bbox"]["width"],
|
||
k["bbox"]["x"] + k["bbox"]["width"])
|
||
y2 = min(d["bbox"]["y"] + d["bbox"]["height"],
|
||
k["bbox"]["y"] + k["bbox"]["height"])
|
||
inter = max(0, x2 - x1) * max(0, y2 - y1)
|
||
a1 = d["bbox"]["width"] * d["bbox"]["height"]
|
||
a2 = k["bbox"]["width"] * k["bbox"]["height"]
|
||
if inter / (a1 + a2 - inter + 1e-6) > iou_threshold:
|
||
skip = True
|
||
break
|
||
if not skip:
|
||
keep.append(d)
|
||
return keep
|
||
|
||
|
||
def _get_preproc_info(result):
|
||
"""Extract letterbox padding info from the inference result.
|
||
|
||
Kneron SDK applies letterbox resize (aspect-ratio-preserving + zero padding)
|
||
before inference. The hw_pre_proc_info tells us how to reverse it.
|
||
|
||
Returns (pad_left, pad_top, resize_w, resize_h, model_w, model_h) or None.
|
||
"""
|
||
try:
|
||
info = result.header.hw_pre_proc_info_list[0]
|
||
return {
|
||
"pad_left": info.pad_left if hasattr(info, 'pad_left') else 0,
|
||
"pad_top": info.pad_top if hasattr(info, 'pad_top') else 0,
|
||
"resized_w": info.resized_img_width if hasattr(info, 'resized_img_width') else 0,
|
||
"resized_h": info.resized_img_height if hasattr(info, 'resized_img_height') else 0,
|
||
"model_w": info.model_input_width if hasattr(info, 'model_input_width') else 0,
|
||
"model_h": info.model_input_height if hasattr(info, 'model_input_height') else 0,
|
||
"img_w": info.img_width if hasattr(info, 'img_width') else 0,
|
||
"img_h": info.img_height if hasattr(info, 'img_height') else 0,
|
||
}
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _correct_bbox_for_letterbox(x, y, w, h, preproc, model_size):
|
||
"""Remove letterbox padding offset from normalized bbox coordinates.
|
||
|
||
Input (x, y, w, h) is in model-input-space normalized to 0-1.
|
||
Output is re-normalized to the original image aspect ratio (still 0-1).
|
||
|
||
For KP_PADDING_CORNER (default): image is at top-left, padding at bottom/right.
|
||
"""
|
||
if preproc is None:
|
||
return x, y, w, h
|
||
|
||
model_w = preproc["model_w"] or model_size
|
||
model_h = preproc["model_h"] or model_size
|
||
pad_left = preproc["pad_left"]
|
||
pad_top = preproc["pad_top"]
|
||
resized_w = preproc["resized_w"] or model_w
|
||
resized_h = preproc["resized_h"] or model_h
|
||
|
||
# If no padding was applied, skip correction
|
||
if pad_left == 0 and pad_top == 0 and resized_w == model_w and resized_h == model_h:
|
||
return x, y, w, h
|
||
|
||
# Convert from normalized (0-1 of model input) to pixel coords in model space
|
||
px = x * model_w
|
||
py = y * model_h
|
||
pw = w * model_w
|
||
ph = h * model_h
|
||
|
||
# Subtract padding offset
|
||
px -= pad_left
|
||
py -= pad_top
|
||
|
||
# Re-normalize to the resized (un-padded) image dimensions
|
||
nx = px / resized_w
|
||
ny = py / resized_h
|
||
nw = pw / resized_w
|
||
nh = ph / resized_h
|
||
|
||
# Clip to 0-1
|
||
nx = max(0.0, min(1.0, nx))
|
||
ny = max(0.0, min(1.0, ny))
|
||
nw = min(1.0 - nx, nw)
|
||
nh = min(1.0 - ny, nh)
|
||
|
||
return nx, ny, nw, nh
|
||
|
||
|
||
def _parse_yolo_output(result, anchors, input_size, num_classes=80):
|
||
"""Parse YOLO (v3/v5) raw output into detection results.
|
||
|
||
Works for both Tiny YOLOv3 and YOLOv5 — the tensor layout is the same:
|
||
(num_anchors * (5 + num_classes), grid_h, grid_w)
|
||
|
||
The key differences are:
|
||
- anchor values
|
||
- input_size used for anchor normalization
|
||
- number of output heads
|
||
|
||
Bounding boxes are corrected for letterbox padding so coordinates
|
||
are relative to the original image (normalized 0-1).
|
||
"""
|
||
detections = []
|
||
entry_size = 5 + num_classes # 85 for COCO 80 classes
|
||
|
||
# Get letterbox padding info
|
||
preproc = _get_preproc_info(result)
|
||
if preproc:
|
||
_log(f"Preproc info: pad=({preproc['pad_left']},{preproc['pad_top']}), "
|
||
f"resized=({preproc['resized_w']}x{preproc['resized_h']}), "
|
||
f"model=({preproc['model_w']}x{preproc['model_h']}), "
|
||
f"img=({preproc['img_w']}x{preproc['img_h']})")
|
||
|
||
for head_idx in range(result.header.num_output_node):
|
||
output = kp.inference.generic_inference_retrieve_float_node(
|
||
node_idx=head_idx,
|
||
generic_raw_result=result,
|
||
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
|
||
)
|
||
arr = output.ndarray[0] # (C, H, W)
|
||
channels, grid_h, grid_w = arr.shape
|
||
|
||
# Determine number of anchors for this head
|
||
num_anchors = channels // entry_size
|
||
if num_anchors < 1:
|
||
_log(f"Head {head_idx}: unexpected shape {arr.shape}, skipping")
|
||
continue
|
||
|
||
# Use the correct anchor set for this head
|
||
if head_idx < len(anchors):
|
||
head_anchors = anchors[head_idx]
|
||
else:
|
||
_log(f"Head {head_idx}: no anchors defined, skipping")
|
||
continue
|
||
|
||
for a_idx in range(min(num_anchors, len(head_anchors))):
|
||
off = a_idx * entry_size
|
||
for cy in range(grid_h):
|
||
for cx in range(grid_w):
|
||
obj_conf = _sigmoid(arr[off + 4, cy, cx])
|
||
if obj_conf < CONF_THRESHOLD:
|
||
continue
|
||
|
||
cls_scores = _sigmoid(arr[off + 5:off + entry_size, cy, cx])
|
||
cls_id = int(np.argmax(cls_scores))
|
||
cls_conf = float(cls_scores[cls_id])
|
||
conf = float(obj_conf * cls_conf)
|
||
|
||
if conf < CONF_THRESHOLD:
|
||
continue
|
||
|
||
bx = (_sigmoid(arr[off, cy, cx]) + cx) / grid_w
|
||
by = (_sigmoid(arr[off + 1, cy, cx]) + cy) / grid_h
|
||
aw, ah = head_anchors[a_idx]
|
||
bw = (np.exp(min(float(arr[off + 2, cy, cx]), 10)) * aw) / input_size
|
||
bh = (np.exp(min(float(arr[off + 3, cy, cx]), 10)) * ah) / input_size
|
||
|
||
# Convert center x,y,w,h to corner x,y,w,h (normalized to model input)
|
||
x = max(0.0, bx - bw / 2)
|
||
y = max(0.0, by - bh / 2)
|
||
w = min(1.0, bx + bw / 2) - x
|
||
h = min(1.0, by + bh / 2) - y
|
||
|
||
# Correct for letterbox padding
|
||
x, y, w, h = _correct_bbox_for_letterbox(x, y, w, h, preproc, input_size)
|
||
|
||
label = COCO_CLASSES[cls_id] if cls_id < len(COCO_CLASSES) else f"class_{cls_id}"
|
||
detections.append({
|
||
"label": label,
|
||
"class_id": cls_id,
|
||
"confidence": conf,
|
||
"bbox": {"x": x, "y": y, "width": w, "height": h},
|
||
})
|
||
|
||
detections = _nms(detections)
|
||
|
||
# Remove internal class_id before returning
|
||
for d in detections:
|
||
del d["class_id"]
|
||
|
||
return detections
|
||
|
||
|
||
def _parse_ssd_output(result, input_size=320, num_classes=2):
|
||
"""Parse SSD face detection output.
|
||
|
||
SSD typically outputs two tensors:
|
||
- locations: (num_boxes, 4) — bounding box coordinates
|
||
- confidences: (num_boxes, num_classes) — class scores
|
||
|
||
For the KL520 SSD face detection model (kl520_ssd_fd_lm.nef),
|
||
the output contains face detections with landmarks.
|
||
"""
|
||
detections = []
|
||
preproc = _get_preproc_info(result)
|
||
|
||
try:
|
||
# Retrieve all output nodes
|
||
num_outputs = result.header.num_output_node
|
||
outputs = []
|
||
for i in range(num_outputs):
|
||
output = kp.inference.generic_inference_retrieve_float_node(
|
||
node_idx=i,
|
||
generic_raw_result=result,
|
||
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
|
||
)
|
||
outputs.append(output.ndarray[0])
|
||
|
||
if num_outputs < 2:
|
||
_log(f"SSD: expected >=2 output nodes, got {num_outputs}")
|
||
return detections
|
||
|
||
# Heuristic: the larger tensor is locations, smaller is confidences
|
||
# Or: first output = locations, second = confidences
|
||
locations = outputs[0]
|
||
confidences = outputs[1]
|
||
|
||
# Flatten if needed
|
||
if locations.ndim > 2:
|
||
locations = locations.reshape(-1, 4)
|
||
if confidences.ndim > 2:
|
||
confidences = confidences.reshape(-1, confidences.shape[-1])
|
||
|
||
num_boxes = min(locations.shape[0], confidences.shape[0])
|
||
|
||
for i in range(num_boxes):
|
||
# SSD confidence: class 0 = background, class 1 = face
|
||
if confidences.shape[-1] > 1:
|
||
conf = float(confidences[i, 1]) # face class
|
||
else:
|
||
conf = float(_sigmoid(confidences[i, 0]))
|
||
|
||
if conf < CONF_THRESHOLD:
|
||
continue
|
||
|
||
# SSD outputs are typically [x_min, y_min, x_max, y_max] normalized
|
||
x_min = float(np.clip(locations[i, 0], 0.0, 1.0))
|
||
y_min = float(np.clip(locations[i, 1], 0.0, 1.0))
|
||
x_max = float(np.clip(locations[i, 2], 0.0, 1.0))
|
||
y_max = float(np.clip(locations[i, 3], 0.0, 1.0))
|
||
|
||
w = x_max - x_min
|
||
h = y_max - y_min
|
||
if w <= 0 or h <= 0:
|
||
continue
|
||
|
||
# Correct for letterbox padding
|
||
x_min, y_min, w, h = _correct_bbox_for_letterbox(
|
||
x_min, y_min, w, h, preproc, input_size)
|
||
|
||
detections.append({
|
||
"label": "face",
|
||
"class_id": 0,
|
||
"confidence": conf,
|
||
"bbox": {"x": x_min, "y": y_min, "width": w, "height": h},
|
||
})
|
||
|
||
detections = _nms(detections)
|
||
for d in detections:
|
||
del d["class_id"]
|
||
|
||
except Exception as e:
|
||
_log(f"SSD parse error: {e}")
|
||
|
||
return detections
|
||
|
||
|
||
def _parse_fcos_output(result, input_size=512, num_classes=80):
|
||
"""Parse FCOS (Fully Convolutional One-Stage) detection output.
|
||
|
||
FCOS outputs per feature level:
|
||
- classification: (num_classes, H, W)
|
||
- centerness: (1, H, W)
|
||
- regression: (4, H, W) — distances from each pixel to box edges (l, t, r, b)
|
||
|
||
The outputs come in groups of 3 per feature level.
|
||
"""
|
||
detections = []
|
||
preproc = _get_preproc_info(result)
|
||
|
||
try:
|
||
num_outputs = result.header.num_output_node
|
||
outputs = []
|
||
for i in range(num_outputs):
|
||
output = kp.inference.generic_inference_retrieve_float_node(
|
||
node_idx=i,
|
||
generic_raw_result=result,
|
||
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
|
||
)
|
||
outputs.append(output.ndarray[0])
|
||
|
||
# FCOS typically has 5 feature levels × 3 outputs = 15 output nodes
|
||
# Or fewer for simplified models. Group by 3: (cls, centerness, reg)
|
||
# If we can't determine the grouping, try a simpler approach.
|
||
strides = [8, 16, 32, 64, 128]
|
||
num_levels = num_outputs // 3
|
||
|
||
for level in range(num_levels):
|
||
cls_out = outputs[level * 3] # (num_classes, H, W)
|
||
cnt_out = outputs[level * 3 + 1] # (1, H, W)
|
||
reg_out = outputs[level * 3 + 2] # (4, H, W)
|
||
|
||
stride = strides[level] if level < len(strides) else (8 * (2 ** level))
|
||
h, w = cls_out.shape[1], cls_out.shape[2]
|
||
|
||
for cy in range(h):
|
||
for cx in range(w):
|
||
cls_scores = _sigmoid(cls_out[:, cy, cx])
|
||
cls_id = int(np.argmax(cls_scores))
|
||
cls_conf = float(cls_scores[cls_id])
|
||
centerness = float(_sigmoid(cnt_out[0, cy, cx]))
|
||
conf = cls_conf * centerness
|
||
|
||
if conf < CONF_THRESHOLD:
|
||
continue
|
||
|
||
# Regression: distances from pixel center to box edges
|
||
px = (cx + 0.5) * stride
|
||
py = (cy + 0.5) * stride
|
||
l = float(np.exp(min(reg_out[0, cy, cx], 10))) * stride
|
||
t = float(np.exp(min(reg_out[1, cy, cx], 10))) * stride
|
||
r = float(np.exp(min(reg_out[2, cy, cx], 10))) * stride
|
||
b = float(np.exp(min(reg_out[3, cy, cx], 10))) * stride
|
||
|
||
x_min = max(0.0, (px - l) / input_size)
|
||
y_min = max(0.0, (py - t) / input_size)
|
||
x_max = min(1.0, (px + r) / input_size)
|
||
y_max = min(1.0, (py + b) / input_size)
|
||
|
||
bw = x_max - x_min
|
||
bh = y_max - y_min
|
||
if bw <= 0 or bh <= 0:
|
||
continue
|
||
|
||
# Correct for letterbox padding
|
||
x_min, y_min, bw, bh = _correct_bbox_for_letterbox(
|
||
x_min, y_min, bw, bh, preproc, input_size)
|
||
|
||
label = COCO_CLASSES[cls_id] if cls_id < len(COCO_CLASSES) else f"class_{cls_id}"
|
||
detections.append({
|
||
"label": label,
|
||
"class_id": cls_id,
|
||
"confidence": conf,
|
||
"bbox": {"x": x_min, "y": y_min, "width": bw, "height": bh},
|
||
})
|
||
|
||
detections = _nms(detections)
|
||
for d in detections:
|
||
del d["class_id"]
|
||
|
||
except Exception as e:
|
||
_log(f"FCOS parse error: {e}")
|
||
|
||
return detections
|
||
|
||
|
||
def _parse_classification_output(result, num_classes=1000):
|
||
"""Parse classification model output (e.g., ResNet18 ImageNet)."""
|
||
try:
|
||
output = kp.inference.generic_inference_retrieve_float_node(
|
||
node_idx=0,
|
||
generic_raw_result=result,
|
||
channels_ordering=kp.ChannelOrdering.KP_CHANNEL_ORDERING_CHW
|
||
)
|
||
scores = output.ndarray.flatten()
|
||
|
||
# Apply softmax
|
||
exp_scores = np.exp(scores - np.max(scores))
|
||
probs = exp_scores / exp_scores.sum()
|
||
|
||
# Top-5
|
||
top_indices = np.argsort(probs)[::-1][:5]
|
||
classifications = []
|
||
for idx in top_indices:
|
||
label = COCO_CLASSES[idx] if idx < len(COCO_CLASSES) else f"class_{idx}"
|
||
classifications.append({
|
||
"label": label,
|
||
"confidence": float(probs[idx]),
|
||
})
|
||
|
||
return classifications
|
||
|
||
except Exception as e:
|
||
_log(f"Classification parse error: {e}")
|
||
return []
|
||
|
||
|
||
# ── Command handlers ─────────────────────────────────────────────────
|
||
|
||
def handle_scan():
|
||
"""Scan for connected Kneron devices.
|
||
|
||
Tries Kneron PLUS SDK first (provides firmware info, kn_number, etc.).
|
||
Falls back to pyusb if the SDK is unavailable (e.g. macOS missing .dylib).
|
||
"""
|
||
if HAS_KP:
|
||
try:
|
||
descs = kp.core.scan_devices()
|
||
devices = []
|
||
for i in range(descs.device_descriptor_number):
|
||
dev = descs.device_descriptor_list[i]
|
||
devices.append({
|
||
"port": str(dev.usb_port_id),
|
||
"firmware": str(dev.firmware),
|
||
"kn_number": f"0x{dev.kn_number:08X}",
|
||
"product_id": f"0x{dev.product_id:04X}",
|
||
"connectable": dev.is_connectable,
|
||
})
|
||
return {"devices": devices}
|
||
except Exception as e:
|
||
_log(f"kp.core.scan_devices failed: {e}, trying pyusb fallback")
|
||
|
||
# Fallback: use pyusb (same approach as kneron_detect.py)
|
||
if HAS_PYUSB:
|
||
return _scan_with_pyusb()
|
||
|
||
return {"devices": [], "error_detail": "neither kp nor pyusb available"}
|
||
|
||
|
||
# Known Kneron product IDs (same as kneron_detect.py)
|
||
_KNERON_VENDOR_ID = 0x3231
|
||
_KNOWN_PRODUCTS = {
|
||
0x0100: "KL520",
|
||
0x0200: "KL720",
|
||
0x0720: "KL720",
|
||
0x0530: "KL530",
|
||
0x0630: "KL630",
|
||
0x0730: "KL730",
|
||
}
|
||
|
||
|
||
def _scan_with_pyusb():
|
||
"""Scan for Kneron devices using pyusb (libusb backend)."""
|
||
try:
|
||
usb_devices = list(usb.core.find(find_all=True, idVendor=_KNERON_VENDOR_ID))
|
||
devices = []
|
||
for dev in usb_devices:
|
||
product_id = f"0x{dev.idProduct:04X}"
|
||
chip = _KNOWN_PRODUCTS.get(dev.idProduct, f"Unknown-{product_id}")
|
||
# pyusb port_id: bus-address
|
||
port = f"{dev.bus}-{dev.address}"
|
||
firmware = "unknown"
|
||
try:
|
||
firmware = dev.product or "unknown"
|
||
except Exception:
|
||
pass
|
||
devices.append({
|
||
"port": port,
|
||
"firmware": firmware,
|
||
"kn_number": "0x00000000",
|
||
"product_id": product_id,
|
||
"connectable": True,
|
||
})
|
||
return {"devices": devices}
|
||
except Exception as e:
|
||
return {"devices": [], "error_detail": f"pyusb scan failed: {e}"}
|
||
|
||
|
||
def handle_connect(params):
|
||
"""Connect to a Kneron device and load firmware if needed.
|
||
|
||
KL520: USB Boot mode — firmware MUST be uploaded every session.
|
||
KL720 (KDP2, pid=0x0720): Flash-based — firmware pre-installed.
|
||
KL720 (KDP legacy, pid=0x0200): Old firmware — needs connect_without_check
|
||
+ firmware load to RAM before normal operation.
|
||
"""
|
||
global _device_group, _firmware_loaded, _device_chip
|
||
|
||
if not HAS_KP:
|
||
return {"error": "kp module not available"}
|
||
|
||
try:
|
||
port = params.get("port", "")
|
||
device_type = params.get("device_type", "")
|
||
|
||
# Scan to find device
|
||
descs = kp.core.scan_devices()
|
||
if descs.device_descriptor_number == 0:
|
||
return {"error": "no Kneron device found"}
|
||
|
||
# Find device by port or use first one
|
||
target_dev = None
|
||
for i in range(descs.device_descriptor_number):
|
||
dev = descs.device_descriptor_list[i]
|
||
if port and str(dev.usb_port_id) == port:
|
||
target_dev = dev
|
||
break
|
||
if target_dev is None:
|
||
target_dev = descs.device_descriptor_list[0]
|
||
|
||
# Note: KL520 in USB Boot mode has is_connectable=False, which is
|
||
# normal — it becomes connectable after firmware is loaded. KL720 KDP
|
||
# legacy (pid=0x0200) is also not connectable until firmware load.
|
||
# So we do NOT reject is_connectable=False here; instead we attempt
|
||
# connection and firmware load as appropriate.
|
||
|
||
# Determine chip type from device_type param or product_id
|
||
pid = target_dev.product_id
|
||
if "kl720" in device_type.lower():
|
||
_device_chip = "KL720"
|
||
elif "kl520" in device_type.lower():
|
||
_device_chip = "KL520"
|
||
elif pid in (0x0200, 0x0720):
|
||
_device_chip = "KL720"
|
||
else:
|
||
_device_chip = "KL520"
|
||
|
||
fw_str = str(target_dev.firmware)
|
||
is_kdp_legacy = (_device_chip == "KL720" and pid == 0x0200)
|
||
|
||
_log(f"Chip type: {_device_chip} (product_id=0x{pid:04X}, device_type={device_type}, fw={fw_str})")
|
||
|
||
# ── KL720 KDP Legacy (pid=0x0200): old firmware, incompatible with SDK ──
|
||
if is_kdp_legacy:
|
||
_log(f"KL720 has legacy KDP firmware (pid=0x0200). Using connect_devices_without_check...")
|
||
_device_group = kp.core.connect_devices_without_check(
|
||
usb_port_ids=[target_dev.usb_port_id]
|
||
)
|
||
kp.core.set_timeout(device_group=_device_group, milliseconds=60000)
|
||
|
||
# Load KDP2 firmware to RAM so the device can operate with this SDK
|
||
scpu_path, ncpu_path = _resolve_firmware_paths("KL720")
|
||
if scpu_path and ncpu_path:
|
||
_log(f"KL720: Loading KDP2 firmware to RAM: {scpu_path}")
|
||
kp.core.load_firmware_from_file(
|
||
_device_group, scpu_path, ncpu_path
|
||
)
|
||
_firmware_loaded = True
|
||
_log("KL720: Firmware loaded to RAM, waiting for reboot...")
|
||
time.sleep(5)
|
||
|
||
# Reconnect — device should now be running KDP2 in RAM
|
||
descs = kp.core.scan_devices()
|
||
reconnected = False
|
||
for i in range(descs.device_descriptor_number):
|
||
dev = descs.device_descriptor_list[i]
|
||
if dev.product_id in (0x0200, 0x0720):
|
||
target_dev = dev
|
||
reconnected = True
|
||
break
|
||
if not reconnected:
|
||
return {"error": "KL720 not found after firmware load. Unplug and re-plug."}
|
||
|
||
# Try normal connect first, fallback to without_check
|
||
try:
|
||
_device_group = kp.core.connect_devices(
|
||
usb_port_ids=[target_dev.usb_port_id]
|
||
)
|
||
except Exception as conn_err:
|
||
_log(f"KL720: Normal reconnect failed ({conn_err}), using without_check...")
|
||
_device_group = kp.core.connect_devices_without_check(
|
||
usb_port_ids=[target_dev.usb_port_id]
|
||
)
|
||
kp.core.set_timeout(device_group=_device_group, milliseconds=10000)
|
||
fw_str = str(target_dev.firmware)
|
||
_log(f"KL720: Reconnected after firmware load, pid=0x{target_dev.product_id:04X}, fw={fw_str}")
|
||
else:
|
||
_log("WARNING: KL720 firmware files not found. Cannot operate with KDP legacy device.")
|
||
_device_group = None
|
||
return {"error": "KL720 has legacy KDP firmware but KDP2 firmware files not found. "
|
||
"Run update_kl720_firmware.py to flash KDP2 permanently."}
|
||
|
||
return {
|
||
"status": "connected",
|
||
"firmware": fw_str,
|
||
"kn_number": f"0x{target_dev.kn_number:08X}",
|
||
"chip": _device_chip,
|
||
"kdp_legacy": True,
|
||
}
|
||
|
||
# ── Normal connection (KL520 or KL720 KDP2) ──
|
||
# Use connect_devices_without_check when:
|
||
# - KL720 KDP2: connect_devices() often fails with Error 28
|
||
# - KL520 USB Boot: is_connectable=False, connect_devices() rejects it
|
||
# In these cases, connect_devices_without_check() works and we can
|
||
# still load firmware afterwards.
|
||
use_without_check = (_device_chip == "KL720") or (not target_dev.is_connectable)
|
||
|
||
max_retries = 3
|
||
last_err = None
|
||
for attempt in range(max_retries):
|
||
try:
|
||
# Clear any stale device group from previous failed attempt
|
||
# to prevent DeviceGroup.__del__ access violation during GC.
|
||
_device_group = None
|
||
|
||
if use_without_check:
|
||
_log(f"{_device_chip}: connect_devices_without_check(usb_port_id={target_dev.usb_port_id}, connectable={target_dev.is_connectable}) attempt {attempt+1}/{max_retries}...")
|
||
_device_group = kp.core.connect_devices_without_check(
|
||
usb_port_ids=[target_dev.usb_port_id]
|
||
)
|
||
else:
|
||
_log(f"connect_devices(usb_port_id={target_dev.usb_port_id}) attempt {attempt+1}/{max_retries}...")
|
||
_device_group = kp.core.connect_devices(
|
||
usb_port_ids=[target_dev.usb_port_id]
|
||
)
|
||
_log(f"connect succeeded on attempt {attempt+1}")
|
||
last_err = None
|
||
break
|
||
except Exception as conn_err:
|
||
_device_group = None # prevent __del__ crash on stale handle
|
||
last_err = conn_err
|
||
_log(f"connect attempt {attempt+1} failed: {conn_err}")
|
||
if attempt < max_retries - 1:
|
||
time.sleep(2)
|
||
# Re-scan to refresh device handle
|
||
try:
|
||
descs = kp.core.scan_devices()
|
||
for i in range(descs.device_descriptor_number):
|
||
dev = descs.device_descriptor_list[i]
|
||
if port and str(dev.usb_port_id) == port:
|
||
target_dev = dev
|
||
break
|
||
elif not port:
|
||
target_dev = descs.device_descriptor_list[0]
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
if last_err is not None:
|
||
hint = ""
|
||
if sys.platform == "win32":
|
||
hint = (" On Windows, ensure the WinUSB driver is installed for this device."
|
||
" Re-run the installer or use Zadig (https://zadig.akeo.ie).")
|
||
raise RuntimeError(f"Failed to connect after {max_retries} attempts: {last_err}.{hint}")
|
||
|
||
# KL720 needs longer timeout for large NEF transfers (12MB+ over USB)
|
||
_timeout_ms = 60000 if _device_chip == "KL720" else 10000
|
||
_log(f"Calling set_timeout(milliseconds={_timeout_ms})...")
|
||
kp.core.set_timeout(device_group=_device_group, milliseconds=_timeout_ms)
|
||
_log(f"set_timeout succeeded")
|
||
|
||
# Firmware handling — chip-dependent
|
||
if "Loader" in fw_str:
|
||
# Device is in USB Boot (Loader) mode and needs firmware
|
||
if _device_chip == "KL720":
|
||
_log(f"WARNING: {_device_chip} is in Loader mode (unusual). Attempting firmware load...")
|
||
scpu_path, ncpu_path = _resolve_firmware_paths(_device_chip)
|
||
if scpu_path and ncpu_path:
|
||
_log(f"{_device_chip}: Loading firmware: {scpu_path}")
|
||
kp.core.load_firmware_from_file(
|
||
_device_group, scpu_path, ncpu_path
|
||
)
|
||
_firmware_loaded = True
|
||
_log("Firmware loaded, waiting for reboot...")
|
||
time.sleep(5)
|
||
|
||
# Reconnect after firmware load (with retry)
|
||
_device_group = None
|
||
for retry in range(3):
|
||
try:
|
||
descs = kp.core.scan_devices()
|
||
target_dev = descs.device_descriptor_list[0]
|
||
try:
|
||
_device_group = kp.core.connect_devices(
|
||
usb_port_ids=[target_dev.usb_port_id]
|
||
)
|
||
except Exception:
|
||
_device_group = kp.core.connect_devices_without_check(
|
||
usb_port_ids=[target_dev.usb_port_id]
|
||
)
|
||
break
|
||
except Exception as re_err:
|
||
_log(f"Reconnect attempt {retry+1} failed: {re_err}")
|
||
if retry < 2:
|
||
time.sleep(3)
|
||
if _device_group is None:
|
||
return {"error": "Device not found after firmware load. Unplug and re-plug the device."}
|
||
kp.core.set_timeout(
|
||
device_group=_device_group, milliseconds=_timeout_ms
|
||
)
|
||
fw_str = str(target_dev.firmware)
|
||
_log(f"Reconnected after firmware load, firmware: {fw_str}")
|
||
else:
|
||
_log(f"WARNING: {_device_chip} firmware files not found, skipping firmware load")
|
||
else:
|
||
# Not in Loader mode — firmware already present
|
||
_log(f"{_device_chip}: firmware already present (normal). fw={fw_str}")
|
||
|
||
return {
|
||
"status": "connected",
|
||
"firmware": fw_str,
|
||
"kn_number": f"0x{target_dev.kn_number:08X}",
|
||
"chip": _device_chip,
|
||
}
|
||
|
||
except Exception as e:
|
||
_device_group = None
|
||
return {"error": str(e)}
|
||
|
||
|
||
def handle_disconnect(params):
|
||
"""Disconnect from the current device."""
|
||
global _device_group, _model_id, _model_nef, _firmware_loaded
|
||
global _model_type, _model_input_size, _device_chip
|
||
|
||
_device_group = None
|
||
_model_id = None
|
||
_model_nef = None
|
||
_model_type = "tiny_yolov3"
|
||
_model_input_size = 224
|
||
_firmware_loaded = False
|
||
_device_chip = "KL520"
|
||
|
||
return {"status": "disconnected"}
|
||
|
||
|
||
def handle_reset(params):
|
||
"""Reset the device back to USB Boot (Loader) state.
|
||
|
||
This forces the device to drop its firmware and any loaded models.
|
||
After reset the device will re-enumerate on USB, so the caller
|
||
must wait and issue a fresh 'connect' command.
|
||
"""
|
||
global _device_group, _model_id, _model_nef, _firmware_loaded
|
||
global _model_type, _model_input_size, _device_chip
|
||
|
||
if _device_group is None:
|
||
return {"error": "device not connected"}
|
||
|
||
try:
|
||
_log("Resetting device (kp.core.reset_device KP_RESET_REBOOT)...")
|
||
kp.core.reset_device(
|
||
device_group=_device_group,
|
||
reset_mode=kp.ResetMode.KP_RESET_REBOOT,
|
||
)
|
||
_log("Device reset command sent successfully")
|
||
except Exception as e:
|
||
_log(f"reset_device raised: {e}")
|
||
# Even if it throws, the device usually does reset.
|
||
|
||
# Clear all state — the device is gone until it re-enumerates.
|
||
_device_group = None
|
||
_model_id = None
|
||
_model_nef = None
|
||
_model_type = "tiny_yolov3"
|
||
_model_input_size = 224
|
||
_firmware_loaded = False
|
||
_device_chip = "KL520"
|
||
|
||
return {"status": "reset"}
|
||
|
||
|
||
def handle_load_model(params):
|
||
"""Load a model file onto the device.
|
||
|
||
KL520 USB Boot mode limitation: only one model can be loaded per
|
||
USB session. If error 40 occurs, the error is returned to the Go
|
||
driver which handles it by restarting the entire Python bridge.
|
||
"""
|
||
global _model_id, _model_nef
|
||
|
||
if _device_group is None:
|
||
return {"error": "device not connected"}
|
||
|
||
path = params.get("path", "")
|
||
if not path or not os.path.exists(path):
|
||
return {"error": f"model file not found: {path}"}
|
||
|
||
try:
|
||
_model_nef = kp.core.load_model_from_file(
|
||
device_group=_device_group,
|
||
file_path=path
|
||
)
|
||
except Exception as e:
|
||
return {"error": str(e)}
|
||
|
||
try:
|
||
model = _model_nef.models[0]
|
||
_model_id = model.id
|
||
|
||
# Detect model type and input size
|
||
_detect_model_type(_model_id, path)
|
||
|
||
_log(f"Model loaded: id={_model_id}, type={_model_type}, "
|
||
f"input={_model_input_size}, target={_model_nef.target_chip}")
|
||
return {
|
||
"status": "loaded",
|
||
"model_id": _model_id,
|
||
"model_type": _model_type,
|
||
"input_size": _model_input_size,
|
||
"model_path": path,
|
||
"target_chip": str(_model_nef.target_chip),
|
||
}
|
||
except Exception as e:
|
||
return {"error": str(e)}
|
||
|
||
|
||
def handle_inference(params):
|
||
"""Run inference on the provided image data."""
|
||
if _device_group is None:
|
||
return {"error": "device not connected"}
|
||
if _model_id is None:
|
||
return {"error": "no model loaded"}
|
||
|
||
image_b64 = params.get("image_base64", "")
|
||
|
||
try:
|
||
t0 = time.time()
|
||
|
||
if image_b64:
|
||
# Decode base64 image
|
||
img_bytes = base64.b64decode(image_b64)
|
||
|
||
if HAS_CV2:
|
||
# Decode image with OpenCV
|
||
img_array = np.frombuffer(img_bytes, dtype=np.uint8)
|
||
img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
|
||
if img is None:
|
||
return {"error": "failed to decode image"}
|
||
h, w = img.shape[:2]
|
||
# KL520 NPU requires input image dimensions >= model input size
|
||
# and both width/height must be even numbers.
|
||
min_dim = _model_input_size
|
||
if w < min_dim or h < min_dim or w % 2 != 0 or h % 2 != 0:
|
||
if w < min_dim or h < min_dim:
|
||
scale = max(min_dim / w, min_dim / h)
|
||
new_w = int(w * scale)
|
||
new_h = int(h * scale)
|
||
else:
|
||
new_w, new_h = w, h
|
||
# Ensure even dimensions (NPU requirement)
|
||
new_w = (new_w + 1) & ~1
|
||
new_h = (new_h + 1) & ~1
|
||
img = cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
|
||
_log(f"Inference image resized: {w}x{h} -> {new_w}x{new_h} (min_dim={min_dim})")
|
||
# Convert BGR to BGR565
|
||
img_bgr565 = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2BGR565)
|
||
else:
|
||
# Fallback: try to use raw bytes (assume RGB565 format)
|
||
img_bgr565 = np.frombuffer(img_bytes, dtype=np.uint8)
|
||
else:
|
||
return {"error": "no image data provided"}
|
||
|
||
# Create inference config
|
||
inf_config = kp.GenericImageInferenceDescriptor(
|
||
model_id=_model_id,
|
||
inference_number=0,
|
||
input_node_image_list=[
|
||
kp.GenericInputNodeImage(
|
||
image=img_bgr565,
|
||
image_format=kp.ImageFormat.KP_IMAGE_FORMAT_RGB565,
|
||
)
|
||
]
|
||
)
|
||
|
||
# Send and receive
|
||
kp.inference.generic_image_inference_send(_device_group, inf_config)
|
||
result = kp.inference.generic_image_inference_receive(_device_group)
|
||
|
||
elapsed_ms = (time.time() - t0) * 1000
|
||
|
||
# Parse output based on model type
|
||
detections = []
|
||
classifications = []
|
||
task_type = "detection"
|
||
|
||
if _model_type == "resnet18":
|
||
task_type = "classification"
|
||
classifications = _parse_classification_output(result)
|
||
elif _model_type == "ssd":
|
||
detections = _parse_ssd_output(result, input_size=_model_input_size)
|
||
elif _model_type == "fcos":
|
||
detections = _parse_fcos_output(result, input_size=_model_input_size)
|
||
elif _model_type == "yolov5s":
|
||
detections = _parse_yolo_output(
|
||
result,
|
||
anchors=ANCHORS_YOLOV5S,
|
||
input_size=_model_input_size,
|
||
)
|
||
else:
|
||
# Default: Tiny YOLOv3
|
||
detections = _parse_yolo_output(
|
||
result,
|
||
anchors=ANCHORS_TINY_YOLOV3,
|
||
input_size=_model_input_size,
|
||
)
|
||
|
||
return {
|
||
"taskType": task_type,
|
||
"timestamp": int(time.time() * 1000),
|
||
"latencyMs": round(elapsed_ms, 1),
|
||
"detections": detections,
|
||
"classifications": classifications,
|
||
}
|
||
|
||
except Exception as e:
|
||
return {"error": str(e)}
|
||
|
||
|
||
# ── Main loop ────────────────────────────────────────────────────────
|
||
|
||
def main():
|
||
"""Main loop: read JSON commands from stdin, write responses to stdout."""
|
||
# The Kneron C SDK may write ANSI-colored warnings directly to fd 1
|
||
# (stdout), which corrupts our JSON-RPC protocol. To prevent this we
|
||
# dup the real stdout fd, then redirect fd 1 to stderr so any C-level
|
||
# writes go to stderr. Our JSON responses use the duped fd.
|
||
_real_stdout_fd = os.dup(1) # duplicate fd 1
|
||
os.dup2(2, 1) # fd 1 now points to stderr
|
||
_real_stdout = os.fdopen(_real_stdout_fd, "w")
|
||
sys.stdout = sys.stderr # Python-level redirect too
|
||
|
||
def _respond(obj):
|
||
"""Write a JSON response to the real stdout (not stderr)."""
|
||
_real_stdout.write(json.dumps(obj) + "\n")
|
||
_real_stdout.flush()
|
||
|
||
# Signal readiness
|
||
_respond({"status": "ready"})
|
||
_log(f"Bridge started (kp={'yes' if HAS_KP else 'no'}, pyusb={'yes' if HAS_PYUSB else 'no'}, cv2={'yes' if HAS_CV2 else 'no'})")
|
||
|
||
for line in sys.stdin:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
try:
|
||
cmd = json.loads(line)
|
||
action = cmd.get("cmd", "")
|
||
if action == "scan":
|
||
result = handle_scan()
|
||
elif action == "connect":
|
||
result = handle_connect(cmd)
|
||
elif action == "disconnect":
|
||
result = handle_disconnect(cmd)
|
||
elif action == "reset":
|
||
result = handle_reset(cmd)
|
||
elif action == "load_model":
|
||
result = handle_load_model(cmd)
|
||
elif action == "inference":
|
||
result = handle_inference(cmd)
|
||
else:
|
||
result = {"error": f"unknown command: {action}"}
|
||
_respond(result)
|
||
except Exception as e:
|
||
_respond({"error": str(e)})
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|