Add video inference viewer for object detection

- Implemented a new script `video_inference_viewer.py` for processing video files and performing inference using a specified model.
- Added functionality to encode frames in various formats and send them to a specified inference server.
- Included methods for decoding model outputs, applying non-maximum suppression (NMS), and drawing bounding boxes on the video frames.
- Integrated command-line arguments for configuration, including model ID, input dimensions, and output options.
- Added a graphical file dialog for selecting video files.
This commit is contained in:
warrenchen 2026-03-04 11:53:46 +09:00
parent 11e779bb40
commit 8a6a1e40b4
7 changed files with 1867 additions and 83 deletions

View File

@ -3,19 +3,26 @@ from __future__ import annotations
import base64
import json
import os
import tempfile
import subprocess
import sys
import threading
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi import FastAPI, File, Form, HTTPException, Request, UploadFile
from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
from pydantic import BaseModel, Field
import kp
try:
import cv2 # type: ignore
except Exception:
cv2 = None
SERVICE_VERSION = "0.1.0"
PROJECT_ROOT = Path(__file__).resolve().parent.parent
@ -24,6 +31,7 @@ DFUT_BIN = DFUT_ROOT / "bin"
DFUT_EXE = DFUT_BIN / "KneronDFUT.exe"
KP121_DIST = PROJECT_ROOT / "third_party" / "kneron_plus_1_2_1" / "dist"
KP121_RUNNER = Path(__file__).resolve().parent / "legacy_plus121_runner.py"
VIDEO_VIEWER_HTML = PROJECT_ROOT / "TestRes" / "Images" / "VideoInferenceWeb.html"
@dataclass
@ -501,11 +509,181 @@ def _query_windows_driver_status() -> List[Dict[str, Any]]:
return results
def _open_camera_capture(camera_id: int) -> Any:
if cv2 is None:
raise HTTPException(
status_code=500,
detail=_err("OPENCV_NOT_AVAILABLE", "opencv-python is not installed"),
)
cap = cv2.VideoCapture(camera_id, cv2.CAP_DSHOW)
if not cap.isOpened():
cap.release()
cap = cv2.VideoCapture(camera_id)
if not cap.isOpened():
cap.release()
raise HTTPException(
status_code=404,
detail=_err("CAMERA_NOT_FOUND", f"Cannot open camera id={camera_id}"),
)
return cap
def _mjpeg_stream_generator(cap: Any, jpeg_quality: int, frame_interval_sec: float):
try:
while True:
ok, frame = cap.read()
if not ok:
time.sleep(0.03)
continue
ok, encoded = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality])
if not ok:
continue
jpg = encoded.tobytes()
header = (
b"--frame\r\n"
b"Content-Type: image/jpeg\r\n"
b"Content-Length: " + str(len(jpg)).encode("ascii") + b"\r\n\r\n"
)
yield header + jpg + b"\r\n"
if frame_interval_sec > 0:
time.sleep(frame_interval_sec)
finally:
cap.release()
def _frame_to_input_bytes(frame_bgr: Any, image_format: str) -> bytes:
fmt = image_format.upper()
if fmt == "RGB565":
converted = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2BGR565)
return converted.tobytes()
if fmt == "RGBA8888":
converted = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGBA)
return converted.tobytes()
if fmt == "RAW8":
converted = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
return converted.tobytes()
raise HTTPException(
status_code=400,
detail=_err(
"UNSUPPORTED_STREAM_IMAGE_FORMAT",
"For /inference/run_video, supported image_format: RGB565, RGBA8888, RAW8",
),
)
def _run_inference_from_image_bytes(
image_bytes: bytes,
width: int,
height: int,
model_id: int,
image_format_text: str,
channels_ordering_text: str,
output_dtype_text: str,
) -> List[Dict[str, Any]]:
device_group = _require_device()
image_format = _image_format_from_str(image_format_text)
channels_ordering = _channels_ordering_from_str(channels_ordering_text)
if output_dtype_text.lower() != "float32":
raise HTTPException(
status_code=400,
detail=_err("INVALID_OUTPUT_DTYPE", "Only float32 output is supported in PoC"),
)
try:
if STATE.port_id is not None:
kp.core.get_model_info(device_group, STATE.port_id)
except kp.ApiKPException as exc:
if exc.api_return_code == kp.ApiReturnCode.KP_ERROR_MODEL_NOT_LOADED_35:
raise HTTPException(
status_code=500,
detail=_err(
"KP_ERROR_MODEL_NOT_LOADED_35",
str(kp.ApiReturnCode.KP_ERROR_MODEL_NOT_LOADED_35),
),
)
raise HTTPException(
status_code=500,
detail=_err(str(exc.api_return_code), str(exc)),
)
expected_size = _expected_image_size_bytes(image_format_text, width, height)
if expected_size is not None and len(image_bytes) != expected_size:
raise HTTPException(
status_code=400,
detail=_err(
"INVALID_IMAGE_SIZE",
(
f"image bytes size mismatch: expected={expected_size}, actual={len(image_bytes)}. "
"Send raw pixel bytes for selected image_format (not BMP/JPEG/PNG file bytes)."
),
),
)
input_image = kp.GenericInputNodeImage(
image=image_bytes,
width=width,
height=height,
image_format=image_format,
)
input_desc = kp.GenericImageInferenceDescriptor(
model_id=model_id,
input_node_image_list=[input_image],
)
try:
kp.inference.generic_image_inference_send(device_group, input_desc)
result = kp.inference.generic_image_inference_receive(device_group)
except kp.ApiKPException as exc:
raise HTTPException(
status_code=500,
detail=_err(str(exc.api_return_code), str(exc)),
)
outputs = []
for node_idx in range(result.header.num_output_node):
try:
node_output = kp.inference.generic_inference_retrieve_float_node(
node_idx, result, channels_ordering
)
except kp.ApiKPException as exc:
raise HTTPException(
status_code=500,
detail=_err(str(exc.api_return_code), str(exc)),
)
data_bytes = node_output.ndarray.astype("float32").tobytes()
outputs.append(
{
"node_idx": node_idx,
"name": node_output.name,
"dtype": "float32",
"shape": node_output.shape,
"data_base64": base64.b64encode(data_bytes).decode("ascii"),
"channels_ordering": channels_ordering.name,
}
)
return outputs
@app.get("/health")
def health() -> Dict[str, Any]:
return _ok({"status": "up"})
@app.get("/tools/video-inference")
def tools_video_inference() -> FileResponse:
if not VIDEO_VIEWER_HTML.is_file():
raise HTTPException(
status_code=404,
detail=_err("TOOL_PAGE_NOT_FOUND", f"Tool page not found: {VIDEO_VIEWER_HTML}"),
)
return FileResponse(str(VIDEO_VIEWER_HTML), media_type="text/html; charset=utf-8")
@app.get("/version")
def version() -> Dict[str, Any]:
return _ok(
@ -516,6 +694,90 @@ def version() -> Dict[str, Any]:
)
@app.get("/camera/list")
def camera_list(max_probe: int = 5) -> Dict[str, Any]:
if max_probe < 1 or max_probe > 20:
raise HTTPException(
status_code=400,
detail=_err("INVALID_MAX_PROBE", "max_probe must be between 1 and 20"),
)
if cv2 is None:
raise HTTPException(
status_code=500,
detail=_err("OPENCV_NOT_AVAILABLE", "opencv-python is not installed"),
)
cameras: List[Dict[str, Any]] = []
for camera_id in range(max_probe):
cap = cv2.VideoCapture(camera_id, cv2.CAP_DSHOW)
opened = cap.isOpened()
if not opened:
cap.release()
cap = cv2.VideoCapture(camera_id)
opened = cap.isOpened()
if opened:
cameras.append(
{
"camera_id": camera_id,
"width": int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0),
"height": int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0),
"fps": float(cap.get(cv2.CAP_PROP_FPS) or 0.0),
}
)
cap.release()
return _ok({"cameras": cameras})
@app.get("/camera/stream")
def camera_stream(
camera_id: int = 0,
width: Optional[int] = None,
height: Optional[int] = None,
fps: Optional[float] = None,
jpeg_quality: int = 80,
) -> StreamingResponse:
if camera_id < 0:
raise HTTPException(
status_code=400,
detail=_err("INVALID_CAMERA_ID", "camera_id must be >= 0"),
)
if width is not None and width <= 0:
raise HTTPException(status_code=400, detail=_err("INVALID_WIDTH", "width must be > 0"))
if height is not None and height <= 0:
raise HTTPException(status_code=400, detail=_err("INVALID_HEIGHT", "height must be > 0"))
if fps is not None and (fps <= 0 or fps > 60):
raise HTTPException(status_code=400, detail=_err("INVALID_FPS", "fps must be in range (0, 60]"))
if jpeg_quality < 1 or jpeg_quality > 100:
raise HTTPException(
status_code=400,
detail=_err("INVALID_JPEG_QUALITY", "jpeg_quality must be in range [1, 100]"),
)
cap = _open_camera_capture(camera_id)
if width is not None:
cap.set(cv2.CAP_PROP_FRAME_WIDTH, float(width))
if height is not None:
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, float(height))
if fps is not None:
cap.set(cv2.CAP_PROP_FPS, float(fps))
frame_interval_sec = (1.0 / float(fps)) if fps else 0.0
stream = _mjpeg_stream_generator(cap, jpeg_quality=jpeg_quality, frame_interval_sec=frame_interval_sec)
headers = {
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
return StreamingResponse(
stream,
media_type="multipart/x-mixed-replace; boundary=frame",
headers=headers,
)
@app.get("/devices")
def devices() -> Dict[str, Any]:
device_list = kp.core.scan_devices()
@ -888,32 +1150,6 @@ def models_reset() -> Dict[str, Any]:
@app.post("/inference/run")
def inference_run(req: InferenceRunRequest) -> Dict[str, Any]:
device_group = _require_device()
image_format = _image_format_from_str(req.image_format)
channels_ordering = _channels_ordering_from_str(req.channels_ordering)
if req.output_dtype.lower() != "float32":
raise HTTPException(
status_code=400,
detail=_err("INVALID_OUTPUT_DTYPE", "Only float32 output is supported in PoC"),
)
try:
if STATE.port_id is not None:
kp.core.get_model_info(device_group, STATE.port_id)
except kp.ApiKPException as exc:
if exc.api_return_code == kp.ApiReturnCode.KP_ERROR_MODEL_NOT_LOADED_35:
raise HTTPException(
status_code=500,
detail=_err(
"KP_ERROR_MODEL_NOT_LOADED_35",
str(kp.ApiReturnCode.KP_ERROR_MODEL_NOT_LOADED_35),
),
)
raise HTTPException(
status_code=500,
detail=_err(str(exc.api_return_code), str(exc)),
)
b64_text = req.image_base64.strip()
if b64_text.startswith("data:") and "," in b64_text:
b64_text = b64_text.split(",", 1)[1]
@ -925,68 +1161,121 @@ def inference_run(req: InferenceRunRequest) -> Dict[str, Any]:
status_code=400,
detail=_err("INVALID_BASE64", "image_base64 is not valid base64 data"),
)
expected_size = _expected_image_size_bytes(req.image_format, req.width, req.height)
if expected_size is not None and len(image_bytes) != expected_size:
raise HTTPException(
status_code=400,
detail=_err(
"INVALID_IMAGE_SIZE",
(
f"image bytes size mismatch: expected={expected_size}, actual={len(image_bytes)}. "
"Send raw pixel bytes for selected image_format (not BMP/JPEG/PNG file bytes)."
),
),
)
input_image = kp.GenericInputNodeImage(
image=image_bytes,
outputs = _run_inference_from_image_bytes(
image_bytes=image_bytes,
width=req.width,
height=req.height,
image_format=image_format,
)
input_desc = kp.GenericImageInferenceDescriptor(
model_id=req.model_id,
input_node_image_list=[input_image],
image_format_text=req.image_format,
channels_ordering_text=req.channels_ordering,
output_dtype_text=req.output_dtype,
)
try:
kp.inference.generic_image_inference_send(device_group, input_desc)
result = kp.inference.generic_image_inference_receive(device_group)
except kp.ApiKPException as exc:
raise HTTPException(
status_code=500,
detail=_err(str(exc.api_return_code), str(exc)),
)
outputs = []
for node_idx in range(result.header.num_output_node):
try:
node_output = kp.inference.generic_inference_retrieve_float_node(
node_idx, result, channels_ordering
)
except kp.ApiKPException as exc:
raise HTTPException(
status_code=500,
detail=_err(str(exc.api_return_code), str(exc)),
)
data_bytes = node_output.ndarray.astype("float32").tobytes()
outputs.append(
{
"node_idx": node_idx,
"name": node_output.name,
"dtype": "float32",
"shape": node_output.shape,
"data_base64": base64.b64encode(data_bytes).decode("ascii"),
"channels_ordering": channels_ordering.name,
}
)
return _ok({"outputs": outputs})
@app.post("/inference/run_video")
async def inference_run_video(
file: UploadFile = File(...),
model_id: int = Form(...),
image_format: str = Form(...),
channels_ordering: str = Form("DEFAULT"),
output_dtype: str = Form("float32"),
sample_every_n: int = Form(1),
max_frames: Optional[int] = Form(default=None),
) -> StreamingResponse:
if cv2 is None:
raise HTTPException(
status_code=500,
detail=_err("OPENCV_NOT_AVAILABLE", "opencv-python is not installed"),
)
if sample_every_n <= 0:
raise HTTPException(
status_code=400,
detail=_err("INVALID_SAMPLE_EVERY_N", "sample_every_n must be >= 1"),
)
if max_frames is not None and max_frames <= 0:
raise HTTPException(
status_code=400,
detail=_err("INVALID_MAX_FRAMES", "max_frames must be >= 1 when provided"),
)
suffix = Path(file.filename or "upload.mp4").suffix or ".mp4"
tmp_path = Path(tempfile.gettempdir()) / f"inference_upload_{int(time.time() * 1000)}{suffix}"
with tmp_path.open("wb") as f:
while True:
chunk = await file.read(1024 * 1024)
if not chunk:
break
f.write(chunk)
await file.close()
def _iter_results():
cap = cv2.VideoCapture(str(tmp_path))
if not cap.isOpened():
cap.release()
if tmp_path.exists():
tmp_path.unlink()
error_line = json.dumps(
_err("VIDEO_OPEN_FAILED", f"Cannot open uploaded video: {tmp_path.name}"),
ensure_ascii=False,
)
yield (error_line + "\n").encode("utf-8")
return
sent_count = 0
frame_index = -1
try:
while True:
ok, frame = cap.read()
if not ok:
break
frame_index += 1
if frame_index % sample_every_n != 0:
continue
height, width = int(frame.shape[0]), int(frame.shape[1])
image_bytes = _frame_to_input_bytes(frame, image_format)
outputs = _run_inference_from_image_bytes(
image_bytes=image_bytes,
width=width,
height=height,
model_id=model_id,
image_format_text=image_format,
channels_ordering_text=channels_ordering,
output_dtype_text=output_dtype,
)
payload = _ok(
{
"frame_index": frame_index,
"width": width,
"height": height,
"outputs": outputs,
}
)
yield (json.dumps(payload, ensure_ascii=False) + "\n").encode("utf-8")
sent_count += 1
if max_frames is not None and sent_count >= max_frames:
break
finally:
cap.release()
if tmp_path.exists():
tmp_path.unlink()
headers = {
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0",
"Connection": "keep-alive",
}
return StreamingResponse(
_iter_results(),
media_type="application/x-ndjson",
headers=headers,
)
if __name__ == "__main__":
import uvicorn

View File

@ -0,0 +1,293 @@
from __future__ import annotations
import base64
import math
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Sequence, Tuple
import numpy as np
YOLO_DEFAULT_ANCHORS: List[List[Tuple[float, float]]] = [
[(10.0, 14.0), (23.0, 27.0), (37.0, 58.0)],
[(81.0, 82.0), (135.0, 169.0), (344.0, 319.0)],
]
@dataclass
class Box:
cls: int
score: float
x1: float
y1: float
x2: float
y2: float
def _sigmoid(v: np.ndarray | float) -> np.ndarray | float:
return 1.0 / (1.0 + np.exp(-v))
def decode_outputs(raw_outputs: Sequence[Dict[str, Any]]) -> List[Dict[str, Any]]:
decoded: List[Dict[str, Any]] = []
for idx, o in enumerate(raw_outputs):
shape = list(o.get("shape") or [])
data_b64 = str(o.get("data_base64") or "")
raw = base64.b64decode(data_b64)
arr = np.frombuffer(raw, dtype="<f4")
expected = int(np.prod(shape)) if shape else arr.size
if expected != arr.size:
raise RuntimeError(f"Output node {idx} size mismatch: expected={expected}, got={arr.size}")
decoded.append(
{
"idx": idx,
"node_idx": int(o.get("node_idx", idx)),
"shape": shape,
"data": arr,
}
)
return decoded
def _pick_yolo_nodes(all_nodes: Sequence[Dict[str, Any]], num_classes: int) -> List[Dict[str, Any]]:
picked: List[Dict[str, Any]] = []
for o in all_nodes:
shape = o["shape"]
if len(shape) != 4 or shape[0] != 1:
continue
ch = int(shape[1])
if ch % (5 + num_classes) != 0:
continue
picked.append(o)
picked.sort(key=lambda n: int(n["shape"][2]), reverse=True)
return picked
def decode_yolo_common(
all_nodes: Sequence[Dict[str, Any]],
mode: str,
num_classes: int,
input_w: int,
input_h: int,
conf_th: float,
use_sigmoid: bool = True,
use_xy_sigmoid: bool = True,
score_mode: str = "obj_cls",
anchors_by_level: Optional[List[List[Tuple[float, float]]]] = None,
) -> List[Box]:
nodes = _pick_yolo_nodes(all_nodes, num_classes)
if not nodes:
raise RuntimeError("No YOLO-like [1,C,H,W] output nodes found")
anchors_levels = anchors_by_level or YOLO_DEFAULT_ANCHORS
boxes: List[Box] = []
attrs = 5 + num_classes
for lv, o in enumerate(nodes):
_, ch, gh, gw = o["shape"]
na = int(ch // attrs)
data: np.ndarray = o["data"]
anchors = anchors_levels[min(lv, len(anchors_levels) - 1)]
def at(channel_idx: int, y: int, x: int) -> float:
return float(data[channel_idx * gh * gw + y * gw + x])
for a in range(na):
aw, ah = anchors[min(a, len(anchors) - 1)]
base = a * attrs
for y in range(gh):
for x in range(gw):
tx = at(base + 0, y, x)
ty = at(base + 1, y, x)
tw = at(base + 2, y, x)
th = at(base + 3, y, x)
to = at(base + 4, y, x)
obj = float(_sigmoid(to) if use_sigmoid else to)
best_cls = -1
best_prob = -1e9
for k in range(num_classes):
p = at(base + 5 + k, y, x)
p = float(_sigmoid(p) if use_sigmoid else p)
if p > best_prob:
best_prob = p
best_cls = k
if score_mode == "obj":
score = obj
elif score_mode == "cls":
score = best_prob
else:
score = obj * best_prob
if score < conf_th:
continue
if mode == "yolov5":
sx = input_w / gw
sy = input_h / gh
txv = float(_sigmoid(tx) if use_xy_sigmoid else tx)
tyv = float(_sigmoid(ty) if use_xy_sigmoid else ty)
bx = (txv * 2.0 - 0.5 + x) * sx
by = (tyv * 2.0 - 0.5 + y) * sy
bw = (float(_sigmoid(tw)) * 2.0) ** 2 * aw
bh = (float(_sigmoid(th)) * 2.0) ** 2 * ah
else:
txv = float(_sigmoid(tx) if use_xy_sigmoid else tx)
tyv = float(_sigmoid(ty) if use_xy_sigmoid else ty)
bx = (txv + x) / gw * input_w
by = (tyv + y) / gh * input_h
bw = aw * math.exp(tw)
bh = ah * math.exp(th)
boxes.append(
Box(
cls=best_cls,
score=score,
x1=bx - bw / 2.0,
y1=by - bh / 2.0,
x2=bx + bw / 2.0,
y2=by + bh / 2.0,
)
)
return boxes
def _auto_fcos_indices(all_nodes: Sequence[Dict[str, Any]], num_classes: int) -> List[Tuple[int, int, int, int]]:
valid = [o for o in all_nodes if len(o["shape"]) == 4 and o["shape"][0] == 1]
cls_nodes = [o for o in valid if int(o["shape"][1]) == num_classes]
reg_nodes = [o for o in valid if int(o["shape"][1]) == 4]
ctr_nodes = [o for o in valid if int(o["shape"][1]) == 1]
by_hw: Dict[Tuple[int, int], Dict[str, Dict[str, Any]]] = {}
for n in cls_nodes:
by_hw.setdefault((int(n["shape"][2]), int(n["shape"][3])), {})["cls"] = n
for n in reg_nodes:
by_hw.setdefault((int(n["shape"][2]), int(n["shape"][3])), {})["reg"] = n
for n in ctr_nodes:
by_hw.setdefault((int(n["shape"][2]), int(n["shape"][3])), {})["ctr"] = n
levels: List[Tuple[int, int, int, int]] = []
for (h, _w), items in by_hw.items():
if not {"cls", "reg", "ctr"}.issubset(items.keys()):
continue
levels.append(
(
h,
int(items["cls"]["node_idx"]),
int(items["reg"]["node_idx"]),
int(items["ctr"]["node_idx"]),
)
)
levels.sort(key=lambda x: x[0], reverse=True)
strides = [8, 16, 32, 64, 128]
return [
(cls_i, reg_i, ctr_i, strides[min(i, len(strides) - 1)])
for i, (_h, cls_i, reg_i, ctr_i) in enumerate(levels)
]
def decode_fcos(
all_nodes: Sequence[Dict[str, Any]],
num_classes: int,
input_w: int,
input_h: int,
conf_th: float,
use_sigmoid: bool = True,
score_mode: str = "obj_cls",
) -> List[Box]:
levels = _auto_fcos_indices(all_nodes, num_classes)
if not levels:
raise RuntimeError("Cannot auto match FCOS cls/reg/ctr nodes")
boxes: List[Box] = []
by_idx = {int(n["node_idx"]): n for n in all_nodes}
for cls_idx, reg_idx, ctr_idx, stride in levels:
cls_node = by_idx.get(cls_idx)
reg_node = by_idx.get(reg_idx)
ctr_node = by_idx.get(ctr_idx)
if not cls_node or not reg_node or not ctr_node:
continue
gh = int(cls_node["shape"][2])
gw = int(cls_node["shape"][3])
cls_data: np.ndarray = cls_node["data"]
reg_data: np.ndarray = reg_node["data"]
ctr_data: np.ndarray = ctr_node["data"]
def at(node_data: np.ndarray, channel_idx: int, y: int, x: int) -> float:
return float(node_data[channel_idx * gh * gw + y * gw + x])
cls_channels = int(cls_node["shape"][1])
for y in range(gh):
for x in range(gw):
ctr = at(ctr_data, 0, y, x)
ctr = float(_sigmoid(ctr) if use_sigmoid else ctr)
best_cls = -1
best_prob = -1e9
for k in range(min(num_classes, cls_channels)):
p = at(cls_data, k, y, x)
p = float(_sigmoid(p) if use_sigmoid else p)
if p > best_prob:
best_prob = p
best_cls = k
if score_mode == "obj":
score = ctr
elif score_mode == "cls":
score = best_prob
else:
score = math.sqrt(max(0.0, best_prob * ctr))
if score < conf_th:
continue
l = max(0.0, at(reg_data, 0, y, x))
t = max(0.0, at(reg_data, 1, y, x))
r = max(0.0, at(reg_data, 2, y, x))
b = max(0.0, at(reg_data, 3, y, x))
cx = (x + 0.5) * stride
cy = (y + 0.5) * stride
x1 = max(0.0, min(input_w, cx - l))
y1 = max(0.0, min(input_h, cy - t))
x2 = max(0.0, min(input_w, cx + r))
y2 = max(0.0, min(input_h, cy + b))
if x2 <= x1 or y2 <= y1:
continue
boxes.append(Box(cls=best_cls, score=score, x1=x1, y1=y1, x2=x2, y2=y2))
return boxes
def _iou(a: Box, b: Box) -> float:
xx1 = max(a.x1, b.x1)
yy1 = max(a.y1, b.y1)
xx2 = min(a.x2, b.x2)
yy2 = min(a.y2, b.y2)
w = max(0.0, xx2 - xx1)
h = max(0.0, yy2 - yy1)
inter = w * h
if inter <= 0:
return 0.0
area_a = max(0.0, a.x2 - a.x1) * max(0.0, a.y2 - a.y1)
area_b = max(0.0, b.x2 - b.x1) * max(0.0, b.y2 - b.y1)
return inter / max(1e-9, area_a + area_b - inter)
def nms(boxes: Sequence[Box], iou_th: float, max_out: int) -> List[Box]:
by_cls: Dict[int, List[Box]] = {}
for b in boxes:
by_cls.setdefault(b.cls, []).append(b)
kept: List[Box] = []
for cls_boxes in by_cls.values():
cls_boxes = sorted(cls_boxes, key=lambda b: b.score, reverse=True)
picked: List[Box] = []
while cls_boxes:
cur = cls_boxes.pop(0)
picked.append(cur)
cls_boxes = [b for b in cls_boxes if _iou(cur, b) <= iou_th]
kept.extend(picked)
kept.sort(key=lambda b: b.score, reverse=True)
return kept[:max_out]

View File

@ -347,6 +347,46 @@ Response
}
```
### `POST /inference/run_video`
Notes
- Video file upload endpoint for continuous inference in PoC.
- Response is NDJSON stream (`application/x-ndjson`), one JSON object per processed frame.
- ByteTrack-specific tracking output is out of scope for current PoC; this endpoint returns raw model outputs per frame.
Request (`multipart/form-data`)
- `file`: video file (`.mp4/.avi/...`)
- `model_id`: integer
- `image_format`: `RGB565` | `RGBA8888` | `RAW8`
- `channels_ordering`: optional, default `DEFAULT`
- `output_dtype`: optional, default `float32`
- `sample_every_n`: optional, default `1`
- `max_frames`: optional
Response line example (NDJSON)
```json
{
"ok": true,
"data": {
"frame_index": 0,
"width": 640,
"height": 640,
"outputs": [
{ "node_idx": 0, "dtype": "float32", "shape": [1, 255, 80, 80], "data_base64": "..." }
]
},
"error": null
}
```
### `GET /tools/video-inference`
Notes
- Serves a single-page visual test tool from LocalAPI.
- Supports two input sources:
- Video file
- Webcam (browser `getUserMedia`)
- Frontend calls `POST /inference/run` frame-by-frame and draws decoded boxes on canvas.
- Purpose: PoC visual validation for YOLOv5/FCOS/TinyYOLO style models.
- ByteTrack visualization/tracking is intentionally excluded in current phase.
### `WS /ws` (streaming inference)
Notes
- For camera/video stream, use WebSocket for low-latency send/receive.
@ -396,7 +436,7 @@ Message (server -> client)
- MEMO: define production approach for privilege handling (installer-time elevation, helper process with UAC prompt, or enterprise pre-install policy) so end-user flow does not get blocked.
## API Test Progress (Windows PoC)
Updated: 2026-03-03
Updated: 2026-03-04
### Completed
- `GET /health`
@ -417,6 +457,8 @@ Updated: 2026-03-03
- `POST /models/clear`
- `POST /models/reset`
- `POST /inference/run`
- `POST /inference/run_video`
- `GET /tools/video-inference`
### Pending
- None (for currently implemented HTTP endpoints).
@ -429,6 +471,24 @@ Updated: 2026-03-03
- `POST /models/load` and `POST /inference/run` must be tested as a pair in the same flow.
- Test pairs are defined in `local_service_win/TestRes/TEST_PAIRS.md`.
### Video/Webcam PoC Test Flow
1. Start LocalAPI service.
2. Connect device and load model:
- `POST /devices/connect`
- `POST /models/load`
3. Visual tool path:
- Open `http://127.0.0.1:4398/tools/video-inference`
- Select source (`Video File` or `Webcam`)
- Use default model presets (YOLOv5=20005, FCOS=20004, TinyYOLO=19), then click `Start`
4. API-only path:
- Use `POST /inference/run_video` with `multipart/form-data`
- Start with small values: `sample_every_n=3`, `max_frames=30`
5. Expected:
- Continuous frame-wise inference results are returned.
- Visual page overlays detection boxes on displayed frames.
6. Current scope note:
- ByteTrack tracking output (`track_id` continuity) is not covered in this PoC phase.
### Model/Inference Test Pairs
#### KL520
1. YOLOv5 (model zoo)

Binary file not shown.

View File

@ -0,0 +1,627 @@
<!doctype html>
<html lang="zh-Hant">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>Video Inference Viewer</title>
<style>
:root {
--bg: #0f172a;
--panel: #111827;
--panel-2: #1f2937;
--text: #e5e7eb;
--muted: #9ca3af;
--danger: #ef4444;
}
* { box-sizing: border-box; }
body {
margin: 0;
background: radial-gradient(circle at 10% 10%, #1e293b, var(--bg));
color: var(--text);
font-family: "Segoe UI", "Noto Sans TC", sans-serif;
}
.wrap {
max-width: 1400px;
margin: 16px auto;
padding: 0 16px;
display: grid;
grid-template-columns: 420px 1fr;
gap: 16px;
}
.panel {
background: linear-gradient(180deg, var(--panel), var(--panel-2));
border: 1px solid #334155;
border-radius: 12px;
padding: 14px;
}
.row {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 8px;
margin-bottom: 8px;
}
.row-1 { display: grid; grid-template-columns: 1fr; gap: 8px; margin-bottom: 8px; }
label { font-size: 12px; color: var(--muted); display: block; margin-bottom: 4px; }
input, select, button {
width: 100%;
padding: 8px;
border-radius: 8px;
border: 1px solid #475569;
background: #0b1220;
color: var(--text);
}
button { cursor: pointer; font-weight: 600; }
button.primary { background: #14532d; border-color: #15803d; }
button.warn { background: #7f1d1d; border-color: #b91c1c; }
.status {
margin-top: 10px;
padding: 8px;
background: #0b1220;
border: 1px solid #334155;
border-radius: 8px;
font-size: 12px;
line-height: 1.5;
white-space: pre-wrap;
}
.canvas-wrap {
position: relative;
width: 100%;
background: #000;
border-radius: 12px;
overflow: hidden;
border: 1px solid #334155;
}
canvas { width: 100%; height: auto; display: block; }
.hint { font-size: 12px; color: var(--muted); margin-top: 8px; }
.error { color: var(--danger); }
@media (max-width: 1100px) { .wrap { grid-template-columns: 1fr; } }
</style>
</head>
<body>
<div class="wrap">
<section class="panel">
<h3 style="margin-top:0;">Video Inference (API)</h3>
<div class="row-1">
<div>
<label>Source</label>
<select id="sourceType">
<option value="file" selected>Video File</option>
<option value="webcam">Webcam</option>
</select>
</div>
<div id="videoFileWrap">
<label>Video File</label>
<input id="videoFile" type="file" accept="video/*" />
</div>
<div id="webcamControls" style="display:none;">
<label>Webcam Device</label>
<div class="row" style="margin-bottom:0;">
<select id="webcamDevice"></select>
<button id="refreshCamBtn" type="button">Refresh</button>
</div>
</div>
<div>
<label>API Base URL</label>
<input id="baseUrl" type="text" value="http://127.0.0.1:4398" />
</div>
</div>
<div class="row">
<div>
<label>Model Type</label>
<select id="modelType">
<option value="yolov5">YOLOv5</option>
<option value="fcos">FCOS</option>
<option value="tinyyolo">TinyYOLO</option>
</select>
</div>
<div>
<label>Model ID</label>
<input id="modelId" type="number" value="20005" />
</div>
</div>
<div class="row">
<div>
<label>Input Width</label>
<input id="inW" type="number" value="640" />
</div>
<div>
<label>Input Height</label>
<input id="inH" type="number" value="640" />
</div>
</div>
<div class="row">
<div>
<label>Image Format</label>
<select id="imageFormat">
<option value="RGBA8888" selected>RGBA8888</option>
<option value="RAW8">RAW8</option>
</select>
</div>
<div>
<label>Infer Every N Frames</label>
<input id="sampleEveryN" type="number" value="3" min="1" />
</div>
</div>
<div class="row">
<div>
<label>Num Classes</label>
<input id="numClasses" type="number" value="80" />
</div>
<div>
<label>Score Threshold</label>
<input id="scoreTh" type="number" step="0.01" value="0.25" />
</div>
</div>
<div class="row">
<div>
<label>NMS IoU</label>
<input id="nmsTh" type="number" step="0.01" value="0.45" />
</div>
<div>
<label>Max Boxes</label>
<input id="maxBoxes" type="number" value="200" />
</div>
</div>
<div class="row">
<button id="startBtn" class="primary">Start</button>
<button id="stopBtn" class="warn">Stop</button>
</div>
<div id="status" class="status">Ready.</div>
<div class="hint">預設值可直接測 YOLOv5。先確認 LocalAPI 已啟動,並完成 connect + load model。</div>
</section>
<section class="panel">
<div class="canvas-wrap">
<canvas id="displayCanvas" width="960" height="540"></canvas>
</div>
<video id="video" style="display:none;"></video>
<canvas id="inferCanvas" width="640" height="640" style="display:none;"></canvas>
</section>
</div>
<script>
const videoEl = document.getElementById("video");
const displayCanvas = document.getElementById("displayCanvas");
const inferCanvas = document.getElementById("inferCanvas");
const dctx = displayCanvas.getContext("2d");
const ictx = inferCanvas.getContext("2d");
const sourceType = document.getElementById("sourceType");
const videoFileWrap = document.getElementById("videoFileWrap");
const videoFile = document.getElementById("videoFile");
const webcamControls = document.getElementById("webcamControls");
const webcamDevice = document.getElementById("webcamDevice");
const refreshCamBtn = document.getElementById("refreshCamBtn");
const baseUrl = document.getElementById("baseUrl");
const modelType = document.getElementById("modelType");
const modelId = document.getElementById("modelId");
const inW = document.getElementById("inW");
const inH = document.getElementById("inH");
const imageFormat = document.getElementById("imageFormat");
const sampleEveryN = document.getElementById("sampleEveryN");
const numClasses = document.getElementById("numClasses");
const scoreTh = document.getElementById("scoreTh");
const nmsTh = document.getElementById("nmsTh");
const maxBoxes = document.getElementById("maxBoxes");
const startBtn = document.getElementById("startBtn");
const stopBtn = document.getElementById("stopBtn");
const statusEl = document.getElementById("status");
const YOLO_ANCHORS = [
[[10,14],[23,27],[37,58]],
[[81,82],[135,169],[344,319]]
];
const DEFAULT_MODEL_ID = { yolov5: 20005, fcos: 20004, tinyyolo: 19 };
let running = false;
let inFlight = false;
let frameIndex = -1;
let inferCount = 0;
let lastBoxes = [];
let startTs = 0;
let webcamStream = null;
let currentBlobUrl = "";
modelType.addEventListener("change", () => {
if (modelType.value === "fcos") { inW.value = 512; inH.value = 512; }
else if (modelType.value === "tinyyolo") { inW.value = 224; inH.value = 224; }
else { inW.value = 640; inH.value = 640; }
modelId.value = DEFAULT_MODEL_ID[modelType.value] || 1;
});
function setStatus(text, isError=false) {
statusEl.textContent = text;
statusEl.className = isError ? "status error" : "status";
}
function updateSourceUI() {
const isWebcam = sourceType.value === "webcam";
videoFileWrap.style.display = isWebcam ? "none" : "block";
webcamControls.style.display = isWebcam ? "block" : "none";
}
async function listWebcams() {
webcamDevice.innerHTML = "";
try {
const devices = await navigator.mediaDevices.enumerateDevices();
const cams = devices.filter(d => d.kind === "videoinput");
if (!cams.length) {
const opt = document.createElement("option");
opt.value = "";
opt.textContent = "No webcam found";
webcamDevice.appendChild(opt);
return;
}
cams.forEach((d, idx) => {
const opt = document.createElement("option");
opt.value = d.deviceId;
opt.textContent = d.label || `Camera ${idx + 1}`;
webcamDevice.appendChild(opt);
});
} catch (e) {
setStatus(`List webcam failed:\n${String(e)}`, true);
}
}
function sigmoid(v) { return 1 / (1 + Math.exp(-v)); }
function bytesToBase64(bytes) {
const chunk = 0x8000;
let bin = "";
for (let i = 0; i < bytes.length; i += chunk) {
const sub = bytes.subarray(i, i + chunk);
bin += String.fromCharCode.apply(null, sub);
}
return btoa(bin);
}
function decodeBase64Float32(base64String) {
const binary = atob(String(base64String || "").trim());
const bytes = new Uint8Array(binary.length);
for (let i = 0; i < binary.length; i++) bytes[i] = binary.charCodeAt(i);
const len = Math.floor(bytes.byteLength / 4);
const view = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength);
const out = new Float32Array(len);
for (let i = 0; i < len; i++) out[i] = view.getFloat32(i * 4, true);
return out;
}
function parseOutputs(rawOutputs) {
const arr = rawOutputs || [];
return arr.map((o, i) => ({
node_idx: Number(o.node_idx ?? i),
shape: Array.isArray(o.shape) ? o.shape : [],
data: decodeBase64Float32(o.data_base64)
}));
}
function pickYoloNodes(outputs, clsCount) {
const picked = [];
for (const o of outputs) {
if (o.shape.length !== 4 || o.shape[0] !== 1) continue;
const ch = o.shape[1];
if (ch % (5 + clsCount) !== 0) continue;
picked.push(o);
}
picked.sort((a, b) => b.shape[2] - a.shape[2]);
return picked;
}
function decodeYolo(outputs, mode, clsCount, iw, ih, confTh) {
const nodes = pickYoloNodes(outputs, clsCount);
if (!nodes.length) return [];
const boxes = [];
const attrs = 5 + clsCount;
for (let lv = 0; lv < nodes.length; lv++) {
const o = nodes[lv];
const [, ch, gh, gw] = o.shape;
const na = Math.floor(ch / attrs);
const anchors = YOLO_ANCHORS[Math.min(lv, YOLO_ANCHORS.length - 1)];
const data = o.data;
const at = (ci, y, x) => data[ci * gh * gw + y * gw + x];
for (let a = 0; a < na; a++) {
const [aw, ah] = anchors[Math.min(a, anchors.length - 1)];
const base = a * attrs;
for (let y = 0; y < gh; y++) {
for (let x = 0; x < gw; x++) {
const tx = at(base + 0, y, x);
const ty = at(base + 1, y, x);
const tw = at(base + 2, y, x);
const th = at(base + 3, y, x);
const obj = sigmoid(at(base + 4, y, x));
let bestCls = -1;
let bestProb = -Infinity;
for (let k = 0; k < clsCount; k++) {
const p = sigmoid(at(base + 5 + k, y, x));
if (p > bestProb) { bestProb = p; bestCls = k; }
}
const score = obj * bestProb;
if (score < confTh) continue;
let bx, by, bw, bh;
if (mode === "yolov5") {
const sx = iw / gw, sy = ih / gh;
bx = (sigmoid(tx) * 2 - 0.5 + x) * sx;
by = (sigmoid(ty) * 2 - 0.5 + y) * sy;
bw = Math.pow(sigmoid(tw) * 2, 2) * aw;
bh = Math.pow(sigmoid(th) * 2, 2) * ah;
} else {
bx = (sigmoid(tx) + x) / gw * iw;
by = (sigmoid(ty) + y) / gh * ih;
bw = aw * Math.exp(tw);
bh = ah * Math.exp(th);
}
boxes.push({ cls: bestCls, score, x1: bx - bw / 2, y1: by - bh / 2, x2: bx + bw / 2, y2: by + bh / 2 });
}
}
}
}
return boxes;
}
function decodeFcos(outputs, clsCount, iw, ih, confTh) {
const valid = outputs.filter(o => o.shape.length === 4 && o.shape[0] === 1);
const clsNodes = valid.filter(o => o.shape[1] === clsCount);
const regNodes = valid.filter(o => o.shape[1] === 4);
const ctrNodes = valid.filter(o => o.shape[1] === 1);
const map = new Map();
for (const n of clsNodes) map.set(`${n.shape[2]}x${n.shape[3]}`, { ...(map.get(`${n.shape[2]}x${n.shape[3]}`)||{}), cls:n });
for (const n of regNodes) map.set(`${n.shape[2]}x${n.shape[3]}`, { ...(map.get(`${n.shape[2]}x${n.shape[3]}`)||{}), reg:n });
for (const n of ctrNodes) map.set(`${n.shape[2]}x${n.shape[3]}`, { ...(map.get(`${n.shape[2]}x${n.shape[3]}`)||{}), ctr:n });
const keys = [...map.keys()].filter(k => { const v = map.get(k); return v.cls && v.reg && v.ctr; })
.sort((a,b) => Number(b.split("x")[0]) - Number(a.split("x")[0]));
const strides = [8,16,32,64,128];
const boxes = [];
for (let lv = 0; lv < keys.length; lv++) {
const v = map.get(keys[lv]);
const clsNode = v.cls, regNode = v.reg, ctrNode = v.ctr;
const gh = clsNode.shape[2], gw = clsNode.shape[3], stride = strides[Math.min(lv, strides.length-1)];
const at = (node, ci, y, x) => node.data[ci * gh * gw + y * gw + x];
for (let y = 0; y < gh; y++) {
for (let x = 0; x < gw; x++) {
const ctr = sigmoid(at(ctrNode, 0, y, x));
let bestCls = -1, bestProb = -Infinity;
for (let k = 0; k < Math.min(clsCount, clsNode.shape[1]); k++) {
const p = sigmoid(at(clsNode, k, y, x));
if (p > bestProb) { bestProb = p; bestCls = k; }
}
const score = Math.sqrt(Math.max(0, bestProb * ctr));
if (score < confTh) continue;
const l = Math.max(0, at(regNode, 0, y, x));
const t = Math.max(0, at(regNode, 1, y, x));
const r = Math.max(0, at(regNode, 2, y, x));
const b = Math.max(0, at(regNode, 3, y, x));
const cx = (x + 0.5) * stride, cy = (y + 0.5) * stride;
const x1 = Math.max(0, Math.min(iw, cx - l));
const y1 = Math.max(0, Math.min(ih, cy - t));
const x2 = Math.max(0, Math.min(iw, cx + r));
const y2 = Math.max(0, Math.min(ih, cy + b));
if (x2 <= x1 || y2 <= y1) continue;
boxes.push({ cls: bestCls, score, x1, y1, x2, y2 });
}
}
}
return boxes;
}
function iou(a, b) {
const xx1 = Math.max(a.x1, b.x1), yy1 = Math.max(a.y1, b.y1);
const xx2 = Math.min(a.x2, b.x2), yy2 = Math.min(a.y2, b.y2);
const w = Math.max(0, xx2 - xx1), h = Math.max(0, yy2 - yy1);
const inter = w * h;
if (inter <= 0) return 0;
const areaA = Math.max(0, a.x2 - a.x1) * Math.max(0, a.y2 - a.y1);
const areaB = Math.max(0, b.x2 - b.x1) * Math.max(0, b.y2 - b.y1);
return inter / Math.max(1e-9, areaA + areaB - inter);
}
function nms(boxes, iouTh, maxOutCount) {
const byCls = new Map();
for (const b of boxes) { if (!byCls.has(b.cls)) byCls.set(b.cls, []); byCls.get(b.cls).push(b); }
const kept = [];
for (const arr0 of byCls.values()) {
const arr = arr0.slice().sort((a,b) => b.score - a.score);
const picked = [];
while (arr.length > 0) {
const cur = arr.shift();
picked.push(cur);
for (let i = arr.length - 1; i >= 0; i--) if (iou(cur, arr[i]) > iouTh) arr.splice(i, 1);
}
kept.push(...picked);
}
kept.sort((a,b) => b.score - a.score);
return kept.slice(0, maxOutCount);
}
function drawFrameWithBoxes(boxes) {
const vw = videoEl.videoWidth || 960;
const vh = videoEl.videoHeight || 540;
if (displayCanvas.width !== vw || displayCanvas.height !== vh) {
displayCanvas.width = vw;
displayCanvas.height = vh;
}
dctx.drawImage(videoEl, 0, 0, vw, vh);
const iw = Number(inW.value), ih = Number(inH.value);
const sx = vw / iw, sy = vh / ih;
for (const b of boxes) {
const x1 = b.x1 * sx, y1 = b.y1 * sy, x2 = b.x2 * sx, y2 = b.y2 * sy;
const w = Math.max(1, x2 - x1), h = Math.max(1, y2 - y1);
const hue = (b.cls * 47) % 360;
const color = `hsl(${hue} 90% 50%)`;
dctx.strokeStyle = color;
dctx.lineWidth = 2;
dctx.strokeRect(x1, y1, w, h);
const txt = `${b.cls}:${b.score.toFixed(3)}`;
dctx.font = "12px sans-serif";
dctx.fillStyle = color;
dctx.fillRect(x1, Math.max(0, y1 - 14), dctx.measureText(txt).width + 8, 14);
dctx.fillStyle = "#fff";
dctx.fillText(txt, x1 + 4, Math.max(10, y1 - 3));
}
}
async function inferCurrentFrame() {
const iw = Number(inW.value), ih = Number(inH.value);
inferCanvas.width = iw;
inferCanvas.height = ih;
ictx.drawImage(videoEl, 0, 0, iw, ih);
const raw = ictx.getImageData(0, 0, iw, ih).data;
let bytes;
if (imageFormat.value === "RAW8") {
bytes = new Uint8Array(iw * ih);
for (let i = 0, j = 0; i < raw.length; i += 4, j++) {
const r = raw[i], g = raw[i + 1], b = raw[i + 2];
bytes[j] = Math.max(0, Math.min(255, Math.round(0.299 * r + 0.587 * g + 0.114 * b)));
}
} else {
bytes = new Uint8Array(raw.buffer.slice(raw.byteOffset, raw.byteOffset + raw.byteLength));
}
const payload = {
model_id: Number(modelId.value),
image_format: imageFormat.value,
width: iw,
height: ih,
image_base64: bytesToBase64(bytes),
channels_ordering: "DEFAULT",
output_dtype: "float32"
};
const res = await fetch(`${baseUrl.value.replace(/\/$/, "")}/inference/run`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(payload)
});
const parsed = await res.json();
if (!parsed.ok) throw new Error(JSON.stringify(parsed.error));
const outputs = parseOutputs(parsed.data.outputs || []);
const clsCount = Number(numClasses.value);
const confTh = Number(scoreTh.value);
let rawBoxes = [];
if (modelType.value === "fcos") rawBoxes = decodeFcos(outputs, clsCount, iw, ih, confTh);
else rawBoxes = decodeYolo(outputs, modelType.value === "yolov5" ? "yolov5" : "tinyyolo", clsCount, iw, ih, confTh);
return nms(rawBoxes, Number(nmsTh.value), Number(maxBoxes.value));
}
async function loop() {
if (!running) return;
if (videoEl.paused || videoEl.ended) {
drawFrameWithBoxes(lastBoxes);
requestAnimationFrame(loop);
return;
}
frameIndex++;
if (frameIndex % Math.max(1, Number(sampleEveryN.value || 1)) === 0 && !inFlight) {
inFlight = true;
inferCount++;
try { lastBoxes = await inferCurrentFrame(); }
catch (e) { setStatus(`Inference failed:\n${String(e)}`, true); }
finally { inFlight = false; }
}
drawFrameWithBoxes(lastBoxes);
const sec = (performance.now() - startTs) / 1000;
const apiFps = inferCount / Math.max(sec, 0.001);
setStatus(
`source=${sourceType.value}\n` +
`frame=${frameIndex}\n` +
`infer_count=${inferCount}\n` +
`api_fps=${apiFps.toFixed(2)}\n` +
`boxes=${lastBoxes.length}\n` +
`video_time=${videoEl.currentTime.toFixed(2)}s`
);
requestAnimationFrame(loop);
}
async function stopMediaSource() {
if (webcamStream) {
webcamStream.getTracks().forEach(t => t.stop());
webcamStream = null;
}
if (currentBlobUrl) {
URL.revokeObjectURL(currentBlobUrl);
currentBlobUrl = "";
}
try { videoEl.pause(); } catch {}
videoEl.srcObject = null;
videoEl.removeAttribute("src");
}
function stop() {
running = false;
inFlight = false;
stopMediaSource();
}
sourceType.addEventListener("change", updateSourceUI);
refreshCamBtn.addEventListener("click", async () => {
await listWebcams();
setStatus("Webcam list refreshed.");
});
stopBtn.addEventListener("click", stop);
startBtn.addEventListener("click", async () => {
try {
await stopMediaSource();
const isWebcam = sourceType.value === "webcam";
if (isWebcam) {
const constraints = {
video: webcamDevice.value
? {
deviceId: { exact: webcamDevice.value },
width: { ideal: Number(inW.value) || 640 },
height: { ideal: Number(inH.value) || 640 }
}
: true,
audio: false
};
webcamStream = await navigator.mediaDevices.getUserMedia(constraints);
videoEl.srcObject = webcamStream;
} else {
if (!videoFile.files || !videoFile.files[0]) {
setStatus("Please choose a video file first.", true);
return;
}
currentBlobUrl = URL.createObjectURL(videoFile.files[0]);
videoEl.src = currentBlobUrl;
}
videoEl.muted = true;
videoEl.playsInline = true;
try { videoEl.currentTime = 0; } catch {}
await videoEl.play();
running = true;
frameIndex = -1;
inferCount = 0;
lastBoxes = [];
startTs = performance.now();
setStatus("Running inference...");
requestAnimationFrame(loop);
} catch (e) {
setStatus(`Start failed:\n${String(e)}`, true);
}
});
window.addEventListener("beforeunload", () => { stop(); });
updateSourceUI();
listWebcams();
setStatus("Ready. Choose source and click Start.");
modelType.dispatchEvent(new Event("change"));
</script>
</body>
</html>

View File

@ -0,0 +1,514 @@
from __future__ import annotations
import argparse
import base64
import json
import math
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
import cv2
import numpy as np
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from LocalAPI import postprocess_core as core
YOLO_DEFAULT_ANCHORS: List[List[Tuple[float, float]]] = [
[(10.0, 14.0), (23.0, 27.0), (37.0, 58.0)],
[(81.0, 82.0), (135.0, 169.0), (344.0, 319.0)],
]
def _sigmoid(v: np.ndarray | float) -> np.ndarray | float:
return 1.0 / (1.0 + np.exp(-v))
def _encode_frame(frame_bgr: np.ndarray, image_format: str) -> bytes:
fmt = image_format.upper()
if fmt == "RGBA8888":
rgba = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGBA)
return rgba.tobytes()
if fmt == "RAW8":
gray = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2GRAY)
return gray.tobytes()
if fmt == "RGB565":
bgr565 = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2BGR565)
return bgr565.tobytes()
raise ValueError(f"Unsupported image_format: {image_format}")
def _call_inference_run(
base_url: str,
model_id: int,
image_format: str,
width: int,
height: int,
image_bytes: bytes,
channels_ordering: str = "DEFAULT",
output_dtype: str = "float32",
timeout_sec: float = 20.0,
) -> Dict[str, Any]:
body = {
"model_id": model_id,
"image_format": image_format,
"width": width,
"height": height,
"image_base64": base64.b64encode(image_bytes).decode("ascii"),
"channels_ordering": channels_ordering,
"output_dtype": output_dtype,
}
req = urllib.request.Request(
url=f"{base_url.rstrip('/')}/inference/run",
data=json.dumps(body).encode("utf-8"),
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=timeout_sec) as resp:
content = resp.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as exc:
msg = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"HTTP {exc.code}: {msg}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"Request failed: {exc}") from exc
parsed = json.loads(content)
if not parsed.get("ok"):
raise RuntimeError(json.dumps(parsed.get("error"), ensure_ascii=False))
return parsed["data"]
def _decode_outputs(raw_outputs: Sequence[Dict[str, Any]]) -> List[Dict[str, Any]]:
decoded: List[Dict[str, Any]] = []
for idx, o in enumerate(raw_outputs):
shape = list(o.get("shape") or [])
data_b64 = str(o.get("data_base64") or "")
raw = base64.b64decode(data_b64)
arr = np.frombuffer(raw, dtype="<f4")
expected = int(np.prod(shape)) if shape else arr.size
if expected != arr.size:
raise RuntimeError(f"Output node {idx} size mismatch: expected={expected}, got={arr.size}")
decoded.append(
{
"idx": idx,
"node_idx": int(o.get("node_idx", idx)),
"shape": shape,
"data": arr,
}
)
return decoded
def _pick_yolo_nodes(all_nodes: Sequence[Dict[str, Any]], num_classes: int) -> List[Dict[str, Any]]:
picked: List[Dict[str, Any]] = []
for o in all_nodes:
shape = o["shape"]
if len(shape) != 4 or shape[0] != 1:
continue
ch = int(shape[1])
if ch % (5 + num_classes) != 0:
continue
picked.append(o)
picked.sort(key=lambda n: int(n["shape"][2]), reverse=True)
return picked
def _decode_yolo_common(
all_nodes: Sequence[Dict[str, Any]],
mode: str,
num_classes: int,
input_w: int,
input_h: int,
conf_th: float,
use_sigmoid: bool = True,
use_xy_sigmoid: bool = True,
score_mode: str = "obj_cls",
anchors_by_level: Optional[List[List[Tuple[float, float]]]] = None,
) -> List[Box]:
nodes = _pick_yolo_nodes(all_nodes, num_classes)
if not nodes:
raise RuntimeError("No YOLO-like [1,C,H,W] output nodes found")
anchors_levels = anchors_by_level or YOLO_DEFAULT_ANCHORS
boxes: List[Box] = []
attrs = 5 + num_classes
for lv, o in enumerate(nodes):
_, ch, gh, gw = o["shape"]
na = int(ch // attrs)
data: np.ndarray = o["data"]
anchors = anchors_levels[min(lv, len(anchors_levels) - 1)]
def at(channel_idx: int, y: int, x: int) -> float:
return float(data[channel_idx * gh * gw + y * gw + x])
for a in range(na):
aw, ah = anchors[min(a, len(anchors) - 1)]
base = a * attrs
for y in range(gh):
for x in range(gw):
tx = at(base + 0, y, x)
ty = at(base + 1, y, x)
tw = at(base + 2, y, x)
th = at(base + 3, y, x)
to = at(base + 4, y, x)
obj = float(_sigmoid(to) if use_sigmoid else to)
best_cls = -1
best_prob = -1e9
for k in range(num_classes):
p = at(base + 5 + k, y, x)
p = float(_sigmoid(p) if use_sigmoid else p)
if p > best_prob:
best_prob = p
best_cls = k
if score_mode == "obj":
score = obj
elif score_mode == "cls":
score = best_prob
else:
score = obj * best_prob
if score < conf_th:
continue
if mode == "yolov5":
sx = input_w / gw
sy = input_h / gh
txv = float(_sigmoid(tx) if use_xy_sigmoid else tx)
tyv = float(_sigmoid(ty) if use_xy_sigmoid else ty)
bx = (txv * 2.0 - 0.5 + x) * sx
by = (tyv * 2.0 - 0.5 + y) * sy
bw = (float(_sigmoid(tw)) * 2.0) ** 2 * aw
bh = (float(_sigmoid(th)) * 2.0) ** 2 * ah
else:
txv = float(_sigmoid(tx) if use_xy_sigmoid else tx)
tyv = float(_sigmoid(ty) if use_xy_sigmoid else ty)
bx = (txv + x) / gw * input_w
by = (tyv + y) / gh * input_h
bw = aw * math.exp(tw)
bh = ah * math.exp(th)
boxes.append(
Box(
cls=best_cls,
score=score,
x1=bx - bw / 2.0,
y1=by - bh / 2.0,
x2=bx + bw / 2.0,
y2=by + bh / 2.0,
)
)
return boxes
def _auto_fcos_indices(all_nodes: Sequence[Dict[str, Any]], num_classes: int) -> List[Tuple[int, int, int, int]]:
valid = [o for o in all_nodes if len(o["shape"]) == 4 and o["shape"][0] == 1]
cls_nodes = [o for o in valid if int(o["shape"][1]) == num_classes]
reg_nodes = [o for o in valid if int(o["shape"][1]) == 4]
ctr_nodes = [o for o in valid if int(o["shape"][1]) == 1]
by_hw: Dict[Tuple[int, int], Dict[str, Dict[str, Any]]] = {}
for n in cls_nodes:
by_hw.setdefault((int(n["shape"][2]), int(n["shape"][3])), {})["cls"] = n
for n in reg_nodes:
by_hw.setdefault((int(n["shape"][2]), int(n["shape"][3])), {})["reg"] = n
for n in ctr_nodes:
by_hw.setdefault((int(n["shape"][2]), int(n["shape"][3])), {})["ctr"] = n
levels: List[Tuple[int, int, int, int]] = []
for (h, w), items in by_hw.items():
if not {"cls", "reg", "ctr"}.issubset(items.keys()):
continue
levels.append(
(
h,
int(items["cls"]["node_idx"]),
int(items["reg"]["node_idx"]),
int(items["ctr"]["node_idx"]),
)
)
levels.sort(key=lambda x: x[0], reverse=True)
strides = [8, 16, 32, 64, 128]
return [(cls_i, reg_i, ctr_i, strides[min(i, len(strides) - 1)]) for i, (_, cls_i, reg_i, ctr_i) in enumerate(levels)]
def _decode_fcos(
all_nodes: Sequence[Dict[str, Any]],
num_classes: int,
input_w: int,
input_h: int,
conf_th: float,
use_sigmoid: bool = True,
score_mode: str = "obj_cls",
) -> List[Box]:
levels = _auto_fcos_indices(all_nodes, num_classes)
if not levels:
raise RuntimeError("Cannot auto match FCOS cls/reg/ctr nodes")
boxes: List[Box] = []
by_idx = {int(n["node_idx"]): n for n in all_nodes}
for cls_idx, reg_idx, ctr_idx, stride in levels:
cls_node = by_idx.get(cls_idx)
reg_node = by_idx.get(reg_idx)
ctr_node = by_idx.get(ctr_idx)
if not cls_node or not reg_node or not ctr_node:
continue
gh = int(cls_node["shape"][2])
gw = int(cls_node["shape"][3])
cls_data: np.ndarray = cls_node["data"]
reg_data: np.ndarray = reg_node["data"]
ctr_data: np.ndarray = ctr_node["data"]
def at(node_data: np.ndarray, channel_idx: int, y: int, x: int) -> float:
return float(node_data[channel_idx * gh * gw + y * gw + x])
cls_channels = int(cls_node["shape"][1])
for y in range(gh):
for x in range(gw):
ctr = at(ctr_data, 0, y, x)
ctr = float(_sigmoid(ctr) if use_sigmoid else ctr)
best_cls = -1
best_prob = -1e9
for k in range(min(num_classes, cls_channels)):
p = at(cls_data, k, y, x)
p = float(_sigmoid(p) if use_sigmoid else p)
if p > best_prob:
best_prob = p
best_cls = k
if score_mode == "obj":
score = ctr
elif score_mode == "cls":
score = best_prob
else:
score = math.sqrt(max(0.0, best_prob * ctr))
if score < conf_th:
continue
l = max(0.0, at(reg_data, 0, y, x))
t = max(0.0, at(reg_data, 1, y, x))
r = max(0.0, at(reg_data, 2, y, x))
b = max(0.0, at(reg_data, 3, y, x))
cx = (x + 0.5) * stride
cy = (y + 0.5) * stride
x1 = max(0.0, min(input_w, cx - l))
y1 = max(0.0, min(input_h, cy - t))
x2 = max(0.0, min(input_w, cx + r))
y2 = max(0.0, min(input_h, cy + b))
if x2 <= x1 or y2 <= y1:
continue
boxes.append(Box(cls=best_cls, score=score, x1=x1, y1=y1, x2=x2, y2=y2))
return boxes
def _iou(a: Box, b: Box) -> float:
xx1 = max(a.x1, b.x1)
yy1 = max(a.y1, b.y1)
xx2 = min(a.x2, b.x2)
yy2 = min(a.y2, b.y2)
w = max(0.0, xx2 - xx1)
h = max(0.0, yy2 - yy1)
inter = w * h
if inter <= 0:
return 0.0
area_a = max(0.0, a.x2 - a.x1) * max(0.0, a.y2 - a.y1)
area_b = max(0.0, b.x2 - b.x1) * max(0.0, b.y2 - b.y1)
return inter / max(1e-9, area_a + area_b - inter)
def _nms(boxes: Sequence[Box], iou_th: float, max_out: int) -> List[Box]:
by_cls: Dict[int, List[Box]] = {}
for b in boxes:
by_cls.setdefault(b.cls, []).append(b)
kept: List[Box] = []
for cls_boxes in by_cls.values():
cls_boxes = sorted(cls_boxes, key=lambda b: b.score, reverse=True)
picked: List[Box] = []
while cls_boxes:
cur = cls_boxes.pop(0)
picked.append(cur)
cls_boxes = [b for b in cls_boxes if _iou(cur, b) <= iou_th]
kept.extend(picked)
kept.sort(key=lambda b: b.score, reverse=True)
return kept[:max_out]
def _draw_boxes(frame: np.ndarray, boxes: Sequence[core.Box], input_w: int, input_h: int) -> np.ndarray:
out = frame.copy()
h, w = out.shape[:2]
sx = w / float(input_w)
sy = h / float(input_h)
for b in boxes:
x1 = int(max(0, min(w - 1, round(b.x1 * sx))))
y1 = int(max(0, min(h - 1, round(b.y1 * sy))))
x2 = int(max(0, min(w - 1, round(b.x2 * sx))))
y2 = int(max(0, min(h - 1, round(b.y2 * sy))))
if x2 <= x1 or y2 <= y1:
continue
color = tuple(int(c) for c in cv2.cvtColor(np.uint8([[[b.cls * 47 % 180, 255, 220]]]), cv2.COLOR_HSV2BGR)[0][0])
cv2.rectangle(out, (x1, y1), (x2, y2), color, 2)
text = f"{b.cls}:{b.score:.3f}"
cv2.putText(out, text, (x1, max(14, y1 - 4)), cv2.FONT_HERSHEY_SIMPLEX, 0.45, color, 2, cv2.LINE_AA)
return out
def _pick_video_via_dialog() -> Optional[str]:
try:
import tkinter as tk
from tkinter import filedialog
except Exception:
return None
root = tk.Tk()
root.withdraw()
path = filedialog.askopenfilename(
title="Select video file",
filetypes=[("Video files", "*.mp4 *.avi *.mov *.mkv *.wmv"), ("All files", "*.*")],
)
root.destroy()
return path or None
def _defaults_for_model(model_type: str) -> Tuple[int, int]:
mt = model_type.lower()
if mt == "fcos":
return 512, 512
if mt == "tinyyolo":
return 224, 224
return 640, 640
def main() -> None:
parser = argparse.ArgumentParser(description="Video -> /inference/run -> draw detection boxes")
parser.add_argument("--base-url", default="http://127.0.0.1:4398")
parser.add_argument("--video", default="")
parser.add_argument("--model-id", type=int, required=True)
parser.add_argument("--model-type", choices=["yolov5", "fcos", "tinyyolo"], default="yolov5")
parser.add_argument("--input-width", type=int, default=0)
parser.add_argument("--input-height", type=int, default=0)
parser.add_argument("--image-format", default="RGBA8888")
parser.add_argument("--num-classes", type=int, default=80)
parser.add_argument("--score-th", type=float, default=0.25)
parser.add_argument("--iou-th", type=float, default=0.45)
parser.add_argument("--max-boxes", type=int, default=200)
parser.add_argument("--sample-every-n", type=int, default=3)
parser.add_argument("--save-output", default="")
args = parser.parse_args()
video_path = args.video.strip() or _pick_video_via_dialog()
if not video_path:
raise SystemExit("No video selected")
if not Path(video_path).is_file():
raise SystemExit(f"Video not found: {video_path}")
default_w, default_h = _defaults_for_model(args.model_type)
in_w = int(args.input_width or default_w)
in_h = int(args.input_height or default_h)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise SystemExit(f"Cannot open video: {video_path}")
writer: Optional[cv2.VideoWriter] = None
if args.save_output:
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
fps = float(cap.get(cv2.CAP_PROP_FPS) or 20.0)
frame_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or in_w)
frame_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or in_h)
writer = cv2.VideoWriter(args.save_output, fourcc, fps, (frame_w, frame_h))
print("Press 'q' to quit.")
frame_idx = -1
infer_count = 0
last_boxes: List[core.Box] = []
t0 = time.time()
try:
while True:
ok, frame = cap.read()
if not ok:
break
frame_idx += 1
if frame_idx % max(1, args.sample_every_n) == 0:
infer_count += 1
resized = cv2.resize(frame, (in_w, in_h), interpolation=cv2.INTER_AREA)
image_bytes = _encode_frame(resized, args.image_format)
try:
result = _call_inference_run(
base_url=args.base_url,
model_id=args.model_id,
image_format=args.image_format,
width=in_w,
height=in_h,
image_bytes=image_bytes,
)
raw_outputs = result.get("outputs") or []
outputs = core.decode_outputs(raw_outputs)
if args.model_type == "fcos":
raw_boxes = core.decode_fcos(
outputs,
num_classes=args.num_classes,
input_w=in_w,
input_h=in_h,
conf_th=args.score_th,
)
else:
raw_boxes = core.decode_yolo_common(
outputs,
mode="yolov5" if args.model_type == "yolov5" else "tinyyolo",
num_classes=args.num_classes,
input_w=in_w,
input_h=in_h,
conf_th=args.score_th,
)
last_boxes = core.nms(raw_boxes, iou_th=args.iou_th, max_out=args.max_boxes)
except Exception as exc:
print(f"[frame {frame_idx}] inference failed: {exc}")
vis = _draw_boxes(frame, last_boxes, in_w, in_h)
elapsed = max(1e-6, time.time() - t0)
api_fps = infer_count / elapsed
cv2.putText(
vis,
f"frame={frame_idx} infer={infer_count} api_fps={api_fps:.2f} boxes={len(last_boxes)}",
(10, 24),
cv2.FONT_HERSHEY_SIMPLEX,
0.65,
(0, 255, 0),
2,
cv2.LINE_AA,
)
cv2.imshow("Kneron Video Inference Viewer", vis)
if writer is not None:
writer.write(vis)
key = cv2.waitKey(1) & 0xFF
if key == ord("q"):
break
finally:
cap.release()
if writer is not None:
writer.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()

View File

@ -4,6 +4,7 @@
# HTTP service
fastapi
uvicorn
python-multipart
# Reference packages from C:\Users\user\Documents\KNEOX\README.md
PyQt5