jim800121chen c03eb6fd0e feat(local-tool): M9-2 — Go driver UpgradeFirmware + firmware service module
A 階段第二個 milestone、銜接 M9-1 bridge.py、暴露 service layer 給 M9-3 API/WebSocket。

New module `server/internal/firmware/`:
- types.go: 123 行(FirmwareVersion / FirmwareProgress / ActiveTaskInfo / UpgradeDriver interface / 8 reason const)
- progress.go: 147 行(仿 flash pattern 的 Tracker、Task.cancel 預留 SIGTERM force-cancel godoc)
- service.go: 373 行(核心 service:UpgradeFirmware / HasActiveTask / GetActiveTaskInfo / RequestShutdown / WaitForActiveTasks / ListBundledVersions / GetCurrentVersion)
- service_test.go: 676 行、13 個 test 含 MultiDeviceParallel

Driver layer:
- kl720_driver.go: 697 → 1054 行(+357、新 UpgradeFirmware method + tryRouteFirmwareEvent + sendCommandForUpgrade snapshot pattern)
- kl720_driver_test.go: 360 行、11 個 test(含 InfoNotBlockedDuringUpgrade / CtxCancelReleasesBridge / StderrEventAfterCtxCancel 100 round stress)

關鍵設計:
- flash 與 firmware 模組分離(不 import flash)
- UpgradeDriver interface 隔離 driver 細節、DeviceLookup interface 隔離 device manager
- 中介 channel pattern(service ↔ driver)方便 service 補欄位(DeviceID / Direction / BeforeVersion)
- timeout 雙保險:chip timeout + 30s margin
- 8 reason enum 對齊 bridge.py、stage 採 Design 命名

Concurrency race 修復(M9-2 Reviewer round 1 → round 2):
- Major 1(mutex deadlock):新 fwUpgradeMu 獨立鎖 + sendCommandForUpgrade snapshot stdin/stdout pattern、避開 d.mu field-level race + 升級期間 Info/Disconnect 不被卡 + timeout 路徑無死鎖
- Major 2(close-channel race):tryRouteFirmwareEvent 持 fwMu 整段、配合 defer setFirmwareProgressCh(nil) 提供 happen-before、絕無 send on closed channel panic

Reviewer 兩輪審查:
- Round 1: 0 Critical / 2 Major / 5 Minor / 5 Suggestion
- Round 2: 0 Critical / 0 Major / 2 Minor / 2 Suggestion(11/12 issue 修到位、Suggestion 4 留 follow-up)

M9-1 follow-up 順手清:
- m5(test 死碼 _firmware_upgrade_start_ts 殘留兩行)已清
- s5(test 註解 idempotent shape 說明)已加

測試:
- go test ./... -race -count=1: 全綠(28s、無 regression)
- Python: 36 tests + 22 subtests 全綠(0.31s)
- go vet / build: 0 output

下一步:M9-3 API handler + WebSocket progress(CI 建議 `go test -race -count=3` 提升 race 偵測強度)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-25 11:27:36 +08:00

1055 lines
34 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package kneron
import (
"bufio"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"visiona-local/server/internal/driver"
"visiona-local/server/internal/firmware"
"visiona-local/server/pkg/logger"
)
// LogFunc is a function that writes a log line to both stderr and
// the WebSocket broadcaster. When nil, logs go only to stderr.
type LogFunc func(level, msg string)
// KneronDriver implements driver.DeviceDriver for Kneron NPU devices
// (KL520, KL720, etc.). It delegates hardware operations to a Python
// subprocess (kneron_bridge.py) that communicates via JSON-RPC over
// stdin/stdout.
type KneronDriver struct {
info driver.DeviceInfo
connected bool
inferring bool
modelLoaded string
chipType string // "KL520" or "KL720" — derived from info.Type
mu sync.Mutex
scriptPath string
pythonCmd *exec.Cmd
stdin io.WriteCloser
stdout *bufio.Scanner
pythonReady bool
logBroadcaster *logger.Broadcaster
needsReset bool // true on first connect after server start to clear stale models
// fwUpgradeMu 是 firmware 升級期間 sendCommand 專用的細粒度 mutex。
//
// Major 1 修法:升級期間 sendCommand goroutine 可能 blocking 等 bridge stdout
// 60-200s、若整段持 d.mu 會卡住 Info / IsConnected / Disconnect 等 method、
// 並造成 ctx.Done 路徑 deadlockservice ctx.Done → driver 想 d.mu.Lock 殺
// bridge → 但 sendCommand goroutine 還持 d.mu
//
// 解法:升級期間 sendCommand 改持 fwUpgradeMu、不持 d.mu其他 method 仍
// 走 d.mu、彼此不互卡。ctx.Done 路徑也走 d.mu、可以順利 stopPython。
fwUpgradeMu sync.Mutex
// firmware 升降版進行中專用的 progress channel。stderr goroutine 偵測到
// `{"event":"firmware_progress",...}` JSON line 時、會 route 到這個 channel。
// nil 時 stderr goroutine 走預設 broadcaster 路徑。
//
// Major 2 修法fwMu 在 tryRouteFirmwareEvent 內全程持有(檢查 + send 不可
// 分開)、保證與 setFirmwareProgressCh(nil) 互斥service 端在 close 之前
// 透過 driver 的 defer setFirmwareProgressCh(nil) + fwMu happen-before 保證
// close(intermediate) 後不會再有 inflight send。
fwMu sync.Mutex
fwProgressCh chan<- firmware.FirmwareProgress
}
// NewKneronDriver creates a new KneronDriver with the given device info and
// path to the kneron_bridge.py script. Works for any Kneron chip variant.
func NewKneronDriver(info driver.DeviceInfo, scriptPath string) *KneronDriver {
chip := "KL520"
if strings.Contains(strings.ToLower(info.Type), "kl720") {
chip = "KL720"
}
return &KneronDriver{
info: info,
scriptPath: scriptPath,
chipType: chip,
needsReset: true,
}
}
// SetLogBroadcaster attaches a log broadcaster so that bridge stderr
// and driver messages are forwarded to the frontend.
func (d *KneronDriver) SetLogBroadcaster(b *logger.Broadcaster) {
d.logBroadcaster = b
}
// driverLog writes a log message to stderr and the broadcaster.
func (d *KneronDriver) driverLog(level, format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
fmt.Fprintf(os.Stderr, "%s\n", msg)
if d.logBroadcaster != nil {
d.logBroadcaster.Push(level, msg)
}
}
// NewKL720Driver is a backward-compatible alias for NewKneronDriver.
// Deprecated: Use NewKneronDriver instead.
func NewKL720Driver(info driver.DeviceInfo, scriptPath string) *KneronDriver {
return NewKneronDriver(info, scriptPath)
}
// KL720Driver is a backward-compatible type alias for KneronDriver.
// Deprecated: Use KneronDriver instead.
type KL720Driver = KneronDriver
// resolvePython finds the best Python interpreter using the package-level resolver.
func (d *KneronDriver) resolvePython() string {
return ResolvePython(d.scriptPath)
}
// startPython launches the Python bridge subprocess and waits for the
// "ready" signal on stdout.
func (d *KneronDriver) startPython() error {
pythonBin := d.resolvePython()
scriptDir := filepath.Dir(d.scriptPath)
cmd := exec.Command(pythonBin, d.scriptPath)
// On macOS with Apple Silicon, Kneron SDK requires x86_64 (Rosetta 2).
// The venv should already contain the correct architecture Python.
// Set DYLD_LIBRARY_PATH so libkplus.dylib can be found.
cmd.Env = append(os.Environ(),
"PYTHONUNBUFFERED=1",
)
// Add library path for native kp module if lib directory exists.
libDir := filepath.Join(scriptDir, "lib")
if _, err := os.Stat(libDir); err == nil {
if runtime.GOOS == "darwin" {
cmd.Env = append(cmd.Env, "DYLD_LIBRARY_PATH="+libDir)
} else {
cmd.Env = append(cmd.Env, "LD_LIBRARY_PATH="+libDir)
}
}
// On Windows, ensure libusb-1.0.dll can be found by adding the install
// directory to PATH (installer places the DLL there).
if runtime.GOOS == "windows" {
installDir := filepath.Dir(scriptDir)
cmd.Env = append(cmd.Env,
fmt.Sprintf("PATH=%s;%s;%s", installDir, scriptDir, os.Getenv("PATH")))
}
stdinPipe, err := cmd.StdinPipe()
if err != nil {
return fmt.Errorf("failed to create stdin pipe: %w", err)
}
stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
stdinPipe.Close()
return fmt.Errorf("failed to create stdout pipe: %w", err)
}
// Capture stderr from the Python bridge: forward each line to both
// os.Stderr and the WebSocket broadcaster so it shows in the frontend.
stderrPipe, err := cmd.StderrPipe()
if err != nil {
stdinPipe.Close()
stdoutPipe.Close()
return fmt.Errorf("failed to create stderr pipe: %w", err)
}
if err := cmd.Start(); err != nil {
stdinPipe.Close()
return fmt.Errorf("failed to start python bridge (%s): %w", pythonBin, err)
}
// Forward bridge stderr line-by-line to os.Stderr + broadcaster.
// 同時截 firmware_progress JSON event 給目前的 firmware upgrade goroutine如有
go func() {
scanner := bufio.NewScanner(stderrPipe)
// stderr 行可能很長traceback、JSON event保險開到 1MB。
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
for scanner.Scan() {
line := scanner.Text()
// 嘗試解析為 firmware_progress event成本strings.HasPrefix
// 短路、JSON.Unmarshal 只在前綴 match 時跑)
if strings.HasPrefix(line, `{"event":"firmware_progress"`) ||
strings.Contains(line, `"event": "firmware_progress"`) {
if d.tryRouteFirmwareEvent(line) {
// 已 route 給 firmware ch同時還是寫一份到 os.Stderr
// 方便終端使用者看見 progress但不灌進 broadcaster 避免噪音
fmt.Fprintln(os.Stderr, line)
continue
}
}
fmt.Fprintln(os.Stderr, line)
if d.logBroadcaster != nil {
d.logBroadcaster.Push("DEBUG", line)
}
}
}()
d.pythonCmd = cmd
d.stdin = stdinPipe
d.stdout = bufio.NewScanner(stdoutPipe)
// Increase scanner buffer for large inference responses.
d.stdout.Buffer(make([]byte, 0, 64*1024), 1024*1024)
// Wait for the ready signal from the Python process.
if d.stdout.Scan() {
var resp map[string]interface{}
if err := json.Unmarshal([]byte(d.stdout.Text()), &resp); err == nil {
if status, ok := resp["status"].(string); ok && status == "ready" {
d.pythonReady = true
return nil
}
}
}
// If we didn't get a ready signal, clean up and report failure.
d.stopPython()
return fmt.Errorf("python bridge did not send ready signal")
}
// sendCommand sends a JSON command to the Python subprocess and returns
// the parsed JSON response.
//
// 呼叫者必須持有 d.mu保證 d.stdin / d.stdout / d.pythonReady 一致)。
// firmware upgrade 場景因為要避開長期持 d.mu、走 sendCommandUnlocked。
func (d *KneronDriver) sendCommand(cmd map[string]interface{}) (map[string]interface{}, error) {
if !d.pythonReady {
return nil, fmt.Errorf("python bridge is not running")
}
data, err := json.Marshal(cmd)
if err != nil {
return nil, fmt.Errorf("failed to marshal command: %w", err)
}
// Write the JSON command followed by a newline.
if _, err := fmt.Fprintf(d.stdin, "%s\n", data); err != nil {
return nil, fmt.Errorf("failed to write to python bridge: %w", err)
}
// Read the response line.
if !d.stdout.Scan() {
if err := d.stdout.Err(); err != nil {
return nil, fmt.Errorf("failed to read from python bridge: %w", err)
}
return nil, fmt.Errorf("python bridge closed unexpectedly")
}
var resp map[string]interface{}
if err := json.Unmarshal([]byte(d.stdout.Text()), &resp); err != nil {
return nil, fmt.Errorf("failed to parse python response: %w", err)
}
// Check for error responses from the bridge.
if errMsg, ok := resp["error"].(string); ok {
return nil, fmt.Errorf("python bridge error: %s", errMsg)
}
return resp, nil
}
// sendCommandForUpgrade 是 firmware upgrade 專用的 sendCommand 變形。
//
// Major 1 修法配套:升級期間 d.mu 不能長持(會卡 Info / IsConnected
// sendCommand 直接讀 d.stdin / d.stdout / d.pythonReady 三個 field 會與
// stopPythonctx.Done 路徑下持 d.mu 寫這些 field產生 race。
//
// 解法:本函式在持 d.mu 期間 snapshot stdin / stdout / pythonReady 到 local
// var、release d.mu 後在 ref 上做 I/O。stopPython 之後即使把 d.stdin = nil
// 也只影響 future 呼叫;本次拿到的 stdin / stdout ref 仍可 I/Oprocess kill
// 後 stdout pipe EOF、Scan 自然 fail return error。
func (d *KneronDriver) sendCommandForUpgrade(cmd map[string]interface{}) (map[string]interface{}, error) {
// snapshot 階段持 d.mu 確保三個 field 一致;只持很短的時間。
d.mu.Lock()
if !d.pythonReady {
d.mu.Unlock()
return nil, fmt.Errorf("python bridge is not running")
}
stdin := d.stdin
stdout := d.stdout
d.mu.Unlock()
if stdin == nil || stdout == nil {
return nil, fmt.Errorf("python bridge stdin/stdout not initialized")
}
data, err := json.Marshal(cmd)
if err != nil {
return nil, fmt.Errorf("failed to marshal command: %w", err)
}
if _, err := fmt.Fprintf(stdin, "%s\n", data); err != nil {
return nil, fmt.Errorf("failed to write to python bridge: %w", err)
}
if !stdout.Scan() {
if err := stdout.Err(); err != nil {
return nil, fmt.Errorf("failed to read from python bridge: %w", err)
}
return nil, fmt.Errorf("python bridge closed unexpectedly")
}
var resp map[string]interface{}
if err := json.Unmarshal([]byte(stdout.Text()), &resp); err != nil {
return nil, fmt.Errorf("failed to parse python response: %w", err)
}
if errMsg, ok := resp["error"].(string); ok {
return nil, fmt.Errorf("python bridge error: %s", errMsg)
}
return resp, nil
}
// stopPython kills the Python subprocess and cleans up resources.
func (d *KneronDriver) stopPython() {
d.pythonReady = false
if d.stdin != nil {
d.stdin.Close()
d.stdin = nil
}
if d.pythonCmd != nil && d.pythonCmd.Process != nil {
d.pythonCmd.Process.Kill()
d.pythonCmd.Wait()
d.pythonCmd = nil
}
d.stdout = nil
}
// Info returns the current device information.
func (d *KneronDriver) Info() driver.DeviceInfo {
d.mu.Lock()
defer d.mu.Unlock()
return d.info
}
// Connect starts the Python bridge subprocess and connects to the Kneron device.
// On the first connect after server start, the device is reset to clear any
// stale model from a previous session.
func (d *KneronDriver) Connect() error {
d.mu.Lock()
if d.connected {
d.mu.Unlock()
return nil
}
needsReset := d.needsReset
d.info.Status = driver.StatusConnecting
// Start the Python bridge process.
if err := d.startPython(); err != nil {
d.info.Status = driver.StatusError
d.mu.Unlock()
return fmt.Errorf("failed to start hardware bridge: %w", err)
}
// Send connect command to the bridge.
resp, err := d.sendCommand(map[string]interface{}{
"cmd": "connect",
"port": d.info.Port,
"index": 0,
"device_type": d.info.Type,
})
if err != nil {
d.stopPython()
d.info.Status = driver.StatusError
d.mu.Unlock()
return fmt.Errorf("failed to connect to device: %w", err)
}
d.connected = true
d.needsReset = false
d.info.Status = driver.StatusConnected
if fw, ok := resp["firmware"].(string); ok {
d.info.FirmwareVer = fw
}
// Bridge reports whether firmware was freshly loaded during this connect.
// Freshly loaded firmware = clean state → no reset needed.
// Firmware already present (残留 from previous session) → must reset to
// avoid Error 15 SEND_DATA_TOO_LARGE on first inference.
freshFirmware, _ := resp["fresh_firmware_loaded"].(bool)
d.mu.Unlock()
// First connect after server start: reset device to clear stale session.
//
// Why reset is needed:
// - KL720: flash-basedfirmware 和 model 保留在 flashreset 清 stale
// model 才有意義。
// - KL520: USB Boot / RAM-based。若 session 間 firmware 殘留(不是剛載
// 的 Comp/U直接 load_model + inference 100% 炸 Error 15。必須
// reset → Loader → reload firmware → Comp/U 得到乾淨 session。
//
// Why we skip reset when freshFirmware=true:
// - 這次 connect 內部剛做過完整 firmware load → Comp/U 是新鮮乾淨的。
// 再做 reset 會再砍掉 reload 一次,浪費 30-60s 沒意義。
// - Windows cold boot 情境最常見device 斷電後第一次 connect
// 省下 restartBridge 的 ~65s 代價。
skipReset := freshFirmware
if needsReset && !skipReset {
d.driverLog("INFO", "[kneron] first connect — resetting %s to clear stale session (firmware was already present)...", d.chipType)
if err := d.restartBridge(); err != nil {
d.driverLog("WARN", "[kneron] reset on connect failed (non-fatal): %v", err)
} else {
d.driverLog("INFO", "[kneron] device reset complete — clean state ready")
}
} else if needsReset && skipReset {
d.driverLog("INFO", "[kneron] %s: skipping reset — firmware just loaded, session already clean", d.chipType)
}
return nil
}
// Disconnect stops the Python bridge and disconnects from the device.
func (d *KneronDriver) Disconnect() error {
d.mu.Lock()
defer d.mu.Unlock()
if !d.connected {
return nil
}
// Try to send disconnect command if Python is running.
if d.pythonReady {
d.sendCommand(map[string]interface{}{"cmd": "disconnect"})
}
d.stopPython()
d.connected = false
d.inferring = false
d.info.Status = driver.StatusDisconnected
return nil
}
// IsConnected returns whether the driver is currently connected.
func (d *KneronDriver) IsConnected() bool {
d.mu.Lock()
defer d.mu.Unlock()
return d.connected
}
// restartBridge resets the Kneron device and restarts the Python bridge.
//
// The KL520 USB Boot mode only allows loading one model per firmware
// session. To load a different model we must:
// 1. Send a "reset" command via the current bridge — this calls
// kp.core.reset_device() which forces the device back to Loader
// (USB Boot) state, wiping firmware + model from RAM.
// 2. Kill the Python bridge process.
// 3. Wait for the device to re-enumerate on USB (~8 s).
// 4. Start a fresh Python bridge.
// 5. Send "connect" which reloads firmware from scratch.
//
// After this the device is in a clean state ready for load_model.
//
// Caller must NOT hold d.mu.
func (d *KneronDriver) restartBridge() error {
d.mu.Lock()
port := d.info.Port
d.modelLoaded = ""
// Step 1: Ask the running bridge to reset the device.
if d.pythonReady {
d.driverLog("INFO", "[kneron] sending reset command to device...")
d.sendCommand(map[string]interface{}{"cmd": "reset"})
// Ignore errors — the device may have already disconnected.
}
// Step 2: Kill the bridge process.
d.stopPython()
d.mu.Unlock()
// Step 3: Wait for USB device to re-enumerate after hardware reset.
// The reset causes the device to drop off USB and reappear as a
// Loader-mode device. This typically takes 5-8 seconds.
d.driverLog("INFO", "[kneron] bridge stopped, waiting for USB re-enumerate after reset...")
time.Sleep(8 * time.Second)
d.mu.Lock()
defer d.mu.Unlock()
// Step 4: Start a fresh Python bridge.
d.driverLog("INFO", "[kneron] starting new bridge process...")
if err := d.startPython(); err != nil {
return fmt.Errorf("failed to restart bridge: %w", err)
}
// Step 5: Reconnect — firmware will be loaded fresh.
d.driverLog("INFO", "[kneron] bridge started, reconnecting to device (port=%s)...", port)
_, err := d.sendCommand(map[string]interface{}{
"cmd": "connect",
"port": port,
"index": 0,
"device_type": d.info.Type,
})
if err != nil {
d.stopPython()
return fmt.Errorf("failed to reconnect after bridge restart: %w", err)
}
d.driverLog("INFO", "[kneron] device reconnected after reset + bridge restart")
return nil
}
// Flash loads a model onto the Kneron device. Progress is reported through
// the provided channel.
//
// Behavior differs by chip:
// - KL520 (USB Boot): only one model per session. Error 40 triggers
// a full device reset + bridge restart + firmware reload.
// - KL720 (flash-based): models can be freely reloaded. Error 40
// should not occur; if it does, a simple retry is attempted first.
func (d *KneronDriver) Flash(modelPath string, progressCh chan<- driver.FlashProgress) error {
d.mu.Lock()
d.info.Status = driver.StatusFlashing
pythonReady := d.pythonReady
currentModel := d.modelLoaded
chip := d.chipType
d.mu.Unlock()
if !pythonReady {
d.mu.Lock()
d.info.Status = driver.StatusConnected
d.mu.Unlock()
return fmt.Errorf("hardware bridge is not running — cannot flash model")
}
// Same model already loaded — skip, report success
if currentModel != "" && currentModel == modelPath {
d.driverLog("INFO", "[kneron] model already loaded (%s), skipping reload", modelPath)
progressCh <- driver.FlashProgress{
Percent: 50,
Stage: "transferring",
Message: "model already loaded on device",
}
d.mu.Lock()
d.info.Status = driver.StatusConnected
d.mu.Unlock()
progressCh <- driver.FlashProgress{Percent: 100, Stage: "done", Message: "Flash complete (model already loaded)"}
return nil
}
// Try loading the model
progressCh <- driver.FlashProgress{
Percent: 5,
Stage: "preparing",
Message: "preparing... loading model to device",
}
d.mu.Lock()
_, err := d.sendCommand(map[string]interface{}{
"cmd": "load_model",
"path": modelPath,
})
d.mu.Unlock()
// Handle retryable errors (error 40, broken pipe).
if err != nil {
errMsg := err.Error()
d.driverLog("WARN", "[kneron] load_model failed: %s", errMsg)
isRetryable := strings.Contains(errMsg, "Error code: 40") ||
strings.Contains(errMsg, "SECOND_MODEL") ||
strings.Contains(errMsg, "broken pipe") ||
strings.Contains(errMsg, "USB_TIMEOUT")
if isRetryable {
if chip == "KL720" {
// KL720: error 40 should not occur. Try a simple retry
// without full bridge restart first.
d.driverLog("WARN", "[kneron] KL720 unexpected retryable error, retrying without restart...")
progressCh <- driver.FlashProgress{
Percent: 5,
Stage: "preparing",
Message: "preparing... retrying model load",
}
d.mu.Lock()
_, err = d.sendCommand(map[string]interface{}{
"cmd": "load_model",
"path": modelPath,
})
d.mu.Unlock()
// If still failing, fall back to bridge restart as last resort.
if err != nil {
d.driverLog("WARN", "[kneron] KL720 retry failed: %v, falling back to bridge restart...", err)
if restartErr := d.restartBridge(); restartErr != nil {
d.mu.Lock()
d.info.Status = driver.StatusConnected
d.mu.Unlock()
return fmt.Errorf("failed to reset device: %w", restartErr)
}
d.mu.Lock()
d.info.Status = driver.StatusFlashing
_, err = d.sendCommand(map[string]interface{}{
"cmd": "load_model",
"path": modelPath,
})
d.mu.Unlock()
}
} else {
// KL520: error 40 means a model is already loaded in this
// USB Boot session. Must reset device + reload firmware.
d.driverLog("WARN", "[kneron] KL520 retryable error, restarting bridge...")
progressCh <- driver.FlashProgress{
Percent: 5,
Stage: "preparing",
Message: "preparing... resetting device for new model",
}
if restartErr := d.restartBridge(); restartErr != nil {
d.driverLog("ERROR", "[kneron] restartBridge failed: %v", restartErr)
d.mu.Lock()
d.info.Status = driver.StatusConnected
d.mu.Unlock()
return fmt.Errorf("failed to reset device: %w", restartErr)
}
d.driverLog("INFO", "[kneron] bridge restarted, retrying load_model...")
d.mu.Lock()
d.info.Status = driver.StatusFlashing
_, err = d.sendCommand(map[string]interface{}{
"cmd": "load_model",
"path": modelPath,
})
d.mu.Unlock()
}
}
}
if err != nil {
d.driverLog("ERROR", "[kneron] load_model ultimately failed: %v", err)
d.mu.Lock()
d.info.Status = driver.StatusConnected
d.mu.Unlock()
return fmt.Errorf("failed to load model: %w", err)
}
d.driverLog("INFO", "[kneron] load_model succeeded: %s", modelPath)
// Simulate remaining flash progress stages (the Kneron SDK does not
// provide granular progress, so we approximate it after the model
// has been loaded successfully).
type stage struct {
name string
duration time.Duration
startPct int
endPct int
}
stages := []stage{
{"transferring", 2 * time.Second, 10, 80},
{"verifying", 1 * time.Second, 80, 95},
{"finalizing", 500 * time.Millisecond, 95, 99},
}
// KL720 is faster (USB 3.0, no firmware reload needed)
if chip == "KL720" {
stages = []stage{
{"transferring", 1 * time.Second, 10, 80},
{"verifying", 500 * time.Millisecond, 80, 95},
{"finalizing", 200 * time.Millisecond, 95, 99},
}
}
for _, s := range stages {
steps := (s.endPct - s.startPct) / 5
if steps < 1 {
steps = 1
}
interval := s.duration / time.Duration(steps)
for i := 0; i <= steps; i++ {
pct := s.startPct + (s.endPct-s.startPct)*i/steps
progressCh <- driver.FlashProgress{
Percent: pct,
Stage: s.name,
Message: fmt.Sprintf("%s... %d%%", s.name, pct),
}
time.Sleep(interval)
}
}
d.mu.Lock()
d.modelLoaded = modelPath
d.info.FlashedModel = modelPath
d.info.Status = driver.StatusConnected
d.mu.Unlock()
progressCh <- driver.FlashProgress{Percent: 100, Stage: "done", Message: "Flash complete"}
return nil
}
// StartInference begins continuous inference mode.
func (d *KneronDriver) StartInference() error {
d.mu.Lock()
defer d.mu.Unlock()
if !d.connected {
return fmt.Errorf("device not connected")
}
d.inferring = true
d.info.Status = driver.StatusInferencing
return nil
}
// StopInference stops continuous inference mode.
func (d *KneronDriver) StopInference() error {
d.mu.Lock()
defer d.mu.Unlock()
d.inferring = false
d.info.Status = driver.StatusConnected
return nil
}
// ReadInference reads the latest inference result. This is equivalent to
// calling RunInference with nil image data.
func (d *KneronDriver) ReadInference() (*driver.InferenceResult, error) {
return d.RunInference(nil)
}
// RunInference runs inference on the provided image data and returns
// the result. If imageData is nil, the bridge will run inference on
// a default/empty input.
func (d *KneronDriver) RunInference(imageData []byte) (*driver.InferenceResult, error) {
d.mu.Lock()
pythonReady := d.pythonReady
d.mu.Unlock()
if !pythonReady {
return nil, fmt.Errorf("hardware bridge is not running — device may not be connected")
}
// Encode image data as base64 for transmission to Python.
imageB64 := ""
if imageData != nil {
imageB64 = base64.StdEncoding.EncodeToString(imageData)
}
d.mu.Lock()
resp, err := d.sendCommand(map[string]interface{}{
"cmd": "inference",
"image_base64": imageB64,
})
d.mu.Unlock()
if err != nil {
return nil, fmt.Errorf("inference failed: %w", err)
}
return parseInferenceResult(resp)
}
// parseInferenceResult converts a JSON response map into an InferenceResult.
func parseInferenceResult(resp map[string]interface{}) (*driver.InferenceResult, error) {
// Re-marshal to JSON and unmarshal into the struct for clean conversion.
data, err := json.Marshal(resp)
if err != nil {
return nil, fmt.Errorf("failed to marshal response: %w", err)
}
var result driver.InferenceResult
if err := json.Unmarshal(data, &result); err != nil {
return nil, fmt.Errorf("failed to parse inference result: %w", err)
}
return &result, nil
}
// GetModelInfo returns information about the currently loaded model.
func (d *KneronDriver) GetModelInfo() (*driver.ModelInfo, error) {
d.mu.Lock()
defer d.mu.Unlock()
if d.modelLoaded == "" {
return nil, fmt.Errorf("no model loaded")
}
return &driver.ModelInfo{
ID: d.modelLoaded,
Name: d.modelLoaded,
LoadedAt: time.Now(),
}, nil
}
// ──────────────────────────────────────────────────────────────────────
// Firmware upgrade (M9-2、A 階段 KDP1 → KDP2)
// ──────────────────────────────────────────────────────────────────────
// bridgeFirmwareEvent 對應 bridge.py `_fw_emit_progress` 推到 stderr
// 的 JSON schemakneron_bridge.py:1263 / TDD §4.2)。
//
// 欄位 snake_case 與 bridge.py 一致Go 端轉成 firmware.FirmwareProgress
//camelCase後再走 WebSocket。
type bridgeFirmwareEvent struct {
Event string `json:"event"`
Percent int `json:"percent"`
Stage string `json:"stage"`
Message string `json:"message"`
ElapsedMs int64 `json:"elapsed_ms"`
EtaMs int64 `json:"eta_ms"`
Error string `json:"error,omitempty"`
Reason string `json:"reason,omitempty"`
RawError string `json:"raw_error,omitempty"`
BeforeVersion string `json:"before_version,omitempty"`
AfterVersion string `json:"after_version,omitempty"`
Method string `json:"method,omitempty"`
}
// setFirmwareProgressCh 註冊 / 解除 firmware progress 路由 channel。
//
// nil 代表解除(回到「全部 stderr 走 broadcaster」的預設模式
func (d *KneronDriver) setFirmwareProgressCh(ch chan<- firmware.FirmwareProgress) {
d.fwMu.Lock()
d.fwProgressCh = ch
d.fwMu.Unlock()
}
// tryRouteFirmwareEvent 嘗試把一行 stderr JSON 解析成 firmware progress event
// 並寫入目前註冊的 fwProgressCh。失敗無 channel / JSON 不合 / event 不對)
// 回 false、caller 應走預設 broadcaster 路徑。
//
// Major 2 修法close-channel racefwMu 在整段「檢查 ch + send」期間都持
// 有、不再「取出 ref 後 release 才 send」。配合 setFirmwareProgressCh(nil)
// 取得同一把 fwMu、保證以下時序
//
// 1. driver UpgradeFirmware return → defer setFirmwareProgressCh(nil)
// happen-before 透過 fwMu
// 2. service goroutine 收到 driverDone → close(intermediate)
//
// 步驟 1 完成後、任何後續 tryRouteFirmwareEvent 取到的 ch 都是 nil、直接
// return false、不會 send 到 close 的 channelinflight 的 tryRoute call
// 也因持鎖、setFirmwareProgressCh(nil) 會等到它做完。
func (d *KneronDriver) tryRouteFirmwareEvent(line string) bool {
// 先嘗試 parse、避免持鎖期間做 CPU-bound JSON 解析。
var ev bridgeFirmwareEvent
if err := json.Unmarshal([]byte(line), &ev); err != nil {
return false
}
if ev.Event != "firmware_progress" {
return false
}
fp := firmware.FirmwareProgress{
Stage: ev.Stage,
Percent: ev.Percent,
Message: ev.Message,
ElapsedMs: ev.ElapsedMs,
EtaMs: ev.EtaMs,
Error: ev.Error,
Reason: ev.Reason,
RawError: ev.RawError,
BeforeVersion: ev.BeforeVersion,
AfterVersion: ev.AfterVersion,
Method: ev.Method,
}
// fwMu 持鎖期間做「檢查 nil + send」、與 setFirmwareProgressCh(nil) 互斥。
// 非阻塞 sendchannel 滿時寧可丟單一 event 也不害住 stderr goroutine、
// 也不長時間持 fwMu。
d.fwMu.Lock()
defer d.fwMu.Unlock()
ch := d.fwProgressCh
if ch == nil {
return false
}
select {
case ch <- fp:
default:
// 落到 broadcaster 走 DEBUG 線、debug 用
if d.logBroadcaster != nil {
d.logBroadcaster.Push("WARN", "[kneron] firmware progress channel full, dropping event: "+line)
}
}
return true
}
// UpgradeFirmware 觸發 bridge.py firmware_upgrade handler、收集 stderr 上來
// 的 firmware_progress events 寫到 progressCh、回終態 error。
//
// 實作 firmware.UpgradeDriver interface。
//
// 流程(對應 TDD §5.1
// 1. lock check須已 connectedpythonReadychip 須 supported
// 2. 設定 fwProgressCh、stderr goroutine 開始 route firmware events
// 3. 把 firmware_upgrade JSON command 寫到 bridge stdin
// 4. spawn goroutine 等 bridge stdout 回應(用 sendCommand 拉 stdout
// 5. selectctx.Done()service timeout→ stopPython 強殺 bridge
// stdout 回應 → 解析成功 / 失敗
// 6. 推終態 eventdone / error給 progressCh
// 7. 解除 fwProgressCh、回 error
//
// 注意:升級期間 driver 的 d.mu 不持續 locksendCommand 內已自己處理);
// 但仍需要 d.pythonReady = true已 connected才能呼叫、否則 caller 應
// 先 Connect()。
func (d *KneronDriver) UpgradeFirmware(
ctx context.Context,
chip string,
progressCh chan<- firmware.FirmwareProgress,
) error {
d.mu.Lock()
pythonReady := d.pythonReady
port := d.info.Port
d.info.Status = "upgrading"
d.mu.Unlock()
// Minor 3 修法:只有真進入 sendCommand 後、才把 needsReset 設 true避免
// 早退路徑[pythonReady=false / unsupported chip]誤標 needsReset
var attemptedUpgrade bool
defer func() {
d.mu.Lock()
// 升級結束(不論成功失敗)→ Status 回 Connected、由上層決定是否 rescan
if d.connected {
d.info.Status = driver.StatusConnected
}
// 對應 TDD §8.5「needsReset 重用」:真做過升級的、設 needsReset=true
// 讓下次 connect 走完整 reset、避開 Error 15。
if attemptedUpgrade {
d.needsReset = true
}
d.mu.Unlock()
}()
if !pythonReady {
return fmt.Errorf("python bridge not running; device must be connected first")
}
if !firmware.SupportedUpgradeChip(chip) {
return fmt.Errorf("unsupported chip for upgrade: %q (A 階段限 KL520/KL720)", chip)
}
// Step 1: register progress routing
d.setFirmwareProgressCh(progressCh)
defer d.setFirmwareProgressCh(nil)
// Step 2: spawn sendCommand in goroutine so we can race with ctx.Done()
//
// Major 1 修法sendCommand goroutine 用 fwUpgradeMu 序列化、不持 d.mu。
// 升級期間可能 blocking 60-200s 等 bridge stdout、若持 d.mu 會卡住
// Info / IsConnected / Disconnect、並造成 ctx.Done 路徑 deadlockservice
// timeout → driver 想 d.mu.Lock 殺 bridge → sendCommand goroutine 還持
// d.mu → 永遠死鎖。fwUpgradeMu 與 d.mu 完全分離、避免互卡。
//
// 注意:仍須在進 sendCommand 前確認 fwUpgradeMu 取得、避免同一 driver 上
// 兩個升級 race不過 service 端 tracker 已防同 device 重複 task、這是
// 雙保險)。
attemptedUpgrade = true
type result struct {
resp map[string]interface{}
err error
}
resCh := make(chan result, 1)
go func() {
d.fwUpgradeMu.Lock()
// 用 sendCommandForUpgradesnapshot d.stdin / d.stdout 後 release d.mu、
// 避免長期持 d.mu 卡住 Info / IsConnected並避免 d.stdin / d.stdout 與
// stopPythonctx.Done 路徑)的 field-level race。
resp, err := d.sendCommandForUpgrade(map[string]interface{}{
"cmd": "firmware_upgrade",
"port": port,
"chip": chip,
})
d.fwUpgradeMu.Unlock()
resCh <- result{resp, err}
}()
// Step 3: race ctx vs sendCommand
select {
case <-ctx.Done():
// service 端 timeout 或被 cancel強制 kill bridge 讓 sendCommand goroutine 結束。
// 注意:因為 sendCommand goroutine 不持 d.mu只持 fwUpgradeMu、這裡
// d.mu.Lock 不會 deadlock。stopPython 會 kill bridge process、讓 stdout
// scanner 拿到 EOF、sendCommand 從 Scan 中返回 error、goroutine 結束。
d.driverLog("WARN", "[kneron] firmware_upgrade context done (%v), killing bridge to release sendCommand", ctx.Err())
d.mu.Lock()
d.stopPython()
d.connected = false
d.mu.Unlock()
// 推 timeout event讓上層收到終態
ev := firmware.FirmwareProgress{
Stage: firmware.StageError,
Percent: -1,
Error: fmt.Sprintf("upgrade context done: %v", ctx.Err()),
Reason: firmware.ReasonTimeout,
RawError: ctx.Err().Error(),
}
// 走非阻塞、避免 service 端已不收
select {
case progressCh <- ev:
default:
}
// drain sendCommand goroutine避免 leak
go func() { <-resCh }()
return fmt.Errorf("firmware_upgrade timeout / cancel: %w", ctx.Err())
case r := <-resCh:
if r.err != nil {
// bridge 回 error JSON 或 stdout 中斷;推 error event 給 caller
// 注意bridge 端的 stage="error" event 通常已透過 stderr 推過了、
// 這裡是 fallback safety net避免 channel 終態事件遺失)
d.driverLog("ERROR", "[kneron] firmware_upgrade bridge error: %v", r.err)
select {
case progressCh <- firmware.FirmwareProgress{
Stage: firmware.StageError,
Percent: -1,
Error: r.err.Error(),
Reason: firmware.ReasonUpgradeMidFailed,
RawError: r.err.Error(),
}:
default:
}
return fmt.Errorf("firmware_upgrade failed: %w", r.err)
}
// 成功路徑bridge 回 `{"status":"upgraded", "before_firmware":..., ...}`。
// stderr goroutine 應該已推過 done event這裡補充 / 容錯:若 status
// 是 upgraded 但沒看到 stage=done event、補一個 done event 給 caller
// 確保終態被觀察到。
afterFw, _ := r.resp["after_firmware"].(string)
beforeFw, _ := r.resp["before_firmware"].(string)
method, _ := r.resp["method"].(string)
duration, _ := r.resp["duration_ms"].(float64) // JSON number
// 確保 caller 一定能看到一個 done event即便 stderr 沒推到、或事件
// 已被吃掉)。重複 done 對 service 也無害service forward 是冪等的)。
select {
case progressCh <- firmware.FirmwareProgress{
Stage: firmware.StageDone,
Percent: 100,
BeforeVersion: beforeFw,
AfterVersion: afterFw,
Method: method,
ElapsedMs: int64(duration),
}:
default:
}
// 更新 driver Info 的 firmware 字串
if afterFw != "" {
d.mu.Lock()
d.info.FirmwareVer = afterFw
d.mu.Unlock()
}
return nil
}
}