package kneron import ( "bufio" "encoding/base64" "encoding/json" "fmt" "io" "os" "os/exec" "path/filepath" "runtime" "strings" "sync" "time" "visiona-local/server/internal/driver" "visiona-local/server/pkg/logger" ) // LogFunc is a function that writes a log line to both stderr and // the WebSocket broadcaster. When nil, logs go only to stderr. type LogFunc func(level, msg string) // KneronDriver implements driver.DeviceDriver for Kneron NPU devices // (KL520, KL720, etc.). It delegates hardware operations to a Python // subprocess (kneron_bridge.py) that communicates via JSON-RPC over // stdin/stdout. type KneronDriver struct { info driver.DeviceInfo connected bool inferring bool modelLoaded string chipType string // "KL520" or "KL720" — derived from info.Type mu sync.Mutex scriptPath string pythonCmd *exec.Cmd stdin io.WriteCloser stdout *bufio.Scanner pythonReady bool logBroadcaster *logger.Broadcaster needsReset bool // true on first connect after server start to clear stale models } // NewKneronDriver creates a new KneronDriver with the given device info and // path to the kneron_bridge.py script. Works for any Kneron chip variant. func NewKneronDriver(info driver.DeviceInfo, scriptPath string) *KneronDriver { chip := "KL520" if strings.Contains(strings.ToLower(info.Type), "kl720") { chip = "KL720" } return &KneronDriver{ info: info, scriptPath: scriptPath, chipType: chip, needsReset: true, } } // SetLogBroadcaster attaches a log broadcaster so that bridge stderr // and driver messages are forwarded to the frontend. func (d *KneronDriver) SetLogBroadcaster(b *logger.Broadcaster) { d.logBroadcaster = b } // driverLog writes a log message to stderr and the broadcaster. func (d *KneronDriver) driverLog(level, format string, args ...interface{}) { msg := fmt.Sprintf(format, args...) fmt.Fprintf(os.Stderr, "%s\n", msg) if d.logBroadcaster != nil { d.logBroadcaster.Push(level, msg) } } // NewKL720Driver is a backward-compatible alias for NewKneronDriver. // Deprecated: Use NewKneronDriver instead. func NewKL720Driver(info driver.DeviceInfo, scriptPath string) *KneronDriver { return NewKneronDriver(info, scriptPath) } // KL720Driver is a backward-compatible type alias for KneronDriver. // Deprecated: Use KneronDriver instead. type KL720Driver = KneronDriver // resolvePython finds the best Python interpreter using the package-level resolver. func (d *KneronDriver) resolvePython() string { return ResolvePython(d.scriptPath) } // startPython launches the Python bridge subprocess and waits for the // "ready" signal on stdout. func (d *KneronDriver) startPython() error { pythonBin := d.resolvePython() scriptDir := filepath.Dir(d.scriptPath) cmd := exec.Command(pythonBin, d.scriptPath) // On macOS with Apple Silicon, Kneron SDK requires x86_64 (Rosetta 2). // The venv should already contain the correct architecture Python. // Set DYLD_LIBRARY_PATH so libkplus.dylib can be found. cmd.Env = append(os.Environ(), "PYTHONUNBUFFERED=1", ) // Add library path for native kp module if lib directory exists. libDir := filepath.Join(scriptDir, "lib") if _, err := os.Stat(libDir); err == nil { if runtime.GOOS == "darwin" { cmd.Env = append(cmd.Env, "DYLD_LIBRARY_PATH="+libDir) } else { cmd.Env = append(cmd.Env, "LD_LIBRARY_PATH="+libDir) } } // On Windows, ensure libusb-1.0.dll can be found by adding the install // directory to PATH (installer places the DLL there). if runtime.GOOS == "windows" { installDir := filepath.Dir(scriptDir) cmd.Env = append(cmd.Env, fmt.Sprintf("PATH=%s;%s;%s", installDir, scriptDir, os.Getenv("PATH"))) } stdinPipe, err := cmd.StdinPipe() if err != nil { return fmt.Errorf("failed to create stdin pipe: %w", err) } stdoutPipe, err := cmd.StdoutPipe() if err != nil { stdinPipe.Close() return fmt.Errorf("failed to create stdout pipe: %w", err) } // Capture stderr from the Python bridge: forward each line to both // os.Stderr and the WebSocket broadcaster so it shows in the frontend. stderrPipe, err := cmd.StderrPipe() if err != nil { stdinPipe.Close() stdoutPipe.Close() return fmt.Errorf("failed to create stderr pipe: %w", err) } if err := cmd.Start(); err != nil { stdinPipe.Close() return fmt.Errorf("failed to start python bridge (%s): %w", pythonBin, err) } // Forward bridge stderr line-by-line to os.Stderr + broadcaster. go func() { scanner := bufio.NewScanner(stderrPipe) for scanner.Scan() { line := scanner.Text() fmt.Fprintln(os.Stderr, line) if d.logBroadcaster != nil { d.logBroadcaster.Push("DEBUG", line) } } }() d.pythonCmd = cmd d.stdin = stdinPipe d.stdout = bufio.NewScanner(stdoutPipe) // Increase scanner buffer for large inference responses. d.stdout.Buffer(make([]byte, 0, 64*1024), 1024*1024) // Wait for the ready signal from the Python process. if d.stdout.Scan() { var resp map[string]interface{} if err := json.Unmarshal([]byte(d.stdout.Text()), &resp); err == nil { if status, ok := resp["status"].(string); ok && status == "ready" { d.pythonReady = true return nil } } } // If we didn't get a ready signal, clean up and report failure. d.stopPython() return fmt.Errorf("python bridge did not send ready signal") } // sendCommand sends a JSON command to the Python subprocess and returns // the parsed JSON response. func (d *KneronDriver) sendCommand(cmd map[string]interface{}) (map[string]interface{}, error) { if !d.pythonReady { return nil, fmt.Errorf("python bridge is not running") } data, err := json.Marshal(cmd) if err != nil { return nil, fmt.Errorf("failed to marshal command: %w", err) } // Write the JSON command followed by a newline. if _, err := fmt.Fprintf(d.stdin, "%s\n", data); err != nil { return nil, fmt.Errorf("failed to write to python bridge: %w", err) } // Read the response line. if !d.stdout.Scan() { if err := d.stdout.Err(); err != nil { return nil, fmt.Errorf("failed to read from python bridge: %w", err) } return nil, fmt.Errorf("python bridge closed unexpectedly") } var resp map[string]interface{} if err := json.Unmarshal([]byte(d.stdout.Text()), &resp); err != nil { return nil, fmt.Errorf("failed to parse python response: %w", err) } // Check for error responses from the bridge. if errMsg, ok := resp["error"].(string); ok { return nil, fmt.Errorf("python bridge error: %s", errMsg) } return resp, nil } // stopPython kills the Python subprocess and cleans up resources. func (d *KneronDriver) stopPython() { d.pythonReady = false if d.stdin != nil { d.stdin.Close() d.stdin = nil } if d.pythonCmd != nil && d.pythonCmd.Process != nil { d.pythonCmd.Process.Kill() d.pythonCmd.Wait() d.pythonCmd = nil } d.stdout = nil } // Info returns the current device information. func (d *KneronDriver) Info() driver.DeviceInfo { d.mu.Lock() defer d.mu.Unlock() return d.info } // Connect starts the Python bridge subprocess and connects to the Kneron device. // On the first connect after server start, the device is reset to clear any // stale model from a previous session. func (d *KneronDriver) Connect() error { d.mu.Lock() if d.connected { d.mu.Unlock() return nil } needsReset := d.needsReset d.info.Status = driver.StatusConnecting // Start the Python bridge process. if err := d.startPython(); err != nil { d.info.Status = driver.StatusError d.mu.Unlock() return fmt.Errorf("failed to start hardware bridge: %w", err) } // Send connect command to the bridge. resp, err := d.sendCommand(map[string]interface{}{ "cmd": "connect", "port": d.info.Port, "index": 0, "device_type": d.info.Type, }) if err != nil { d.stopPython() d.info.Status = driver.StatusError d.mu.Unlock() return fmt.Errorf("failed to connect to device: %w", err) } d.connected = true d.needsReset = false d.info.Status = driver.StatusConnected if fw, ok := resp["firmware"].(string); ok { d.info.FirmwareVer = fw } d.mu.Unlock() // First connect after server start: reset device to clear stale models. if needsReset { d.driverLog("INFO", "[kneron] first connect after server start — resetting device to clear stale model...") if err := d.restartBridge(); err != nil { d.driverLog("WARN", "[kneron] reset on connect failed (non-fatal): %v", err) // Non-fatal: device is still connected, just might have stale model } else { d.driverLog("INFO", "[kneron] device reset complete — clean state ready") } } return nil } // Disconnect stops the Python bridge and disconnects from the device. func (d *KneronDriver) Disconnect() error { d.mu.Lock() defer d.mu.Unlock() if !d.connected { return nil } // Try to send disconnect command if Python is running. if d.pythonReady { d.sendCommand(map[string]interface{}{"cmd": "disconnect"}) } d.stopPython() d.connected = false d.inferring = false d.info.Status = driver.StatusDisconnected return nil } // IsConnected returns whether the driver is currently connected. func (d *KneronDriver) IsConnected() bool { d.mu.Lock() defer d.mu.Unlock() return d.connected } // restartBridge resets the Kneron device and restarts the Python bridge. // // The KL520 USB Boot mode only allows loading one model per firmware // session. To load a different model we must: // 1. Send a "reset" command via the current bridge — this calls // kp.core.reset_device() which forces the device back to Loader // (USB Boot) state, wiping firmware + model from RAM. // 2. Kill the Python bridge process. // 3. Wait for the device to re-enumerate on USB (~8 s). // 4. Start a fresh Python bridge. // 5. Send "connect" which reloads firmware from scratch. // // After this the device is in a clean state ready for load_model. // // Caller must NOT hold d.mu. func (d *KneronDriver) restartBridge() error { d.mu.Lock() port := d.info.Port d.modelLoaded = "" // Step 1: Ask the running bridge to reset the device. if d.pythonReady { d.driverLog("INFO", "[kneron] sending reset command to device...") d.sendCommand(map[string]interface{}{"cmd": "reset"}) // Ignore errors — the device may have already disconnected. } // Step 2: Kill the bridge process. d.stopPython() d.mu.Unlock() // Step 3: Wait for USB device to re-enumerate after hardware reset. // The reset causes the device to drop off USB and reappear as a // Loader-mode device. This typically takes 5-8 seconds. d.driverLog("INFO", "[kneron] bridge stopped, waiting for USB re-enumerate after reset...") time.Sleep(8 * time.Second) d.mu.Lock() defer d.mu.Unlock() // Step 4: Start a fresh Python bridge. d.driverLog("INFO", "[kneron] starting new bridge process...") if err := d.startPython(); err != nil { return fmt.Errorf("failed to restart bridge: %w", err) } // Step 5: Reconnect — firmware will be loaded fresh. d.driverLog("INFO", "[kneron] bridge started, reconnecting to device (port=%s)...", port) _, err := d.sendCommand(map[string]interface{}{ "cmd": "connect", "port": port, "index": 0, "device_type": d.info.Type, }) if err != nil { d.stopPython() return fmt.Errorf("failed to reconnect after bridge restart: %w", err) } d.driverLog("INFO", "[kneron] device reconnected after reset + bridge restart") return nil } // Flash loads a model onto the Kneron device. Progress is reported through // the provided channel. // // Behavior differs by chip: // - KL520 (USB Boot): only one model per session. Error 40 triggers // a full device reset + bridge restart + firmware reload. // - KL720 (flash-based): models can be freely reloaded. Error 40 // should not occur; if it does, a simple retry is attempted first. func (d *KneronDriver) Flash(modelPath string, progressCh chan<- driver.FlashProgress) error { d.mu.Lock() d.info.Status = driver.StatusFlashing pythonReady := d.pythonReady currentModel := d.modelLoaded chip := d.chipType d.mu.Unlock() if !pythonReady { d.mu.Lock() d.info.Status = driver.StatusConnected d.mu.Unlock() return fmt.Errorf("hardware bridge is not running — cannot flash model") } // Same model already loaded — skip, report success if currentModel != "" && currentModel == modelPath { d.driverLog("INFO", "[kneron] model already loaded (%s), skipping reload", modelPath) progressCh <- driver.FlashProgress{ Percent: 50, Stage: "transferring", Message: "model already loaded on device", } d.mu.Lock() d.info.Status = driver.StatusConnected d.mu.Unlock() progressCh <- driver.FlashProgress{Percent: 100, Stage: "done", Message: "Flash complete (model already loaded)"} return nil } // Try loading the model progressCh <- driver.FlashProgress{ Percent: 5, Stage: "preparing", Message: "preparing... loading model to device", } d.mu.Lock() _, err := d.sendCommand(map[string]interface{}{ "cmd": "load_model", "path": modelPath, }) d.mu.Unlock() // Handle retryable errors (error 40, broken pipe). if err != nil { errMsg := err.Error() d.driverLog("WARN", "[kneron] load_model failed: %s", errMsg) isRetryable := strings.Contains(errMsg, "Error code: 40") || strings.Contains(errMsg, "SECOND_MODEL") || strings.Contains(errMsg, "broken pipe") || strings.Contains(errMsg, "USB_TIMEOUT") if isRetryable { if chip == "KL720" { // KL720: error 40 should not occur. Try a simple retry // without full bridge restart first. d.driverLog("WARN", "[kneron] KL720 unexpected retryable error, retrying without restart...") progressCh <- driver.FlashProgress{ Percent: 5, Stage: "preparing", Message: "preparing... retrying model load", } d.mu.Lock() _, err = d.sendCommand(map[string]interface{}{ "cmd": "load_model", "path": modelPath, }) d.mu.Unlock() // If still failing, fall back to bridge restart as last resort. if err != nil { d.driverLog("WARN", "[kneron] KL720 retry failed: %v, falling back to bridge restart...", err) if restartErr := d.restartBridge(); restartErr != nil { d.mu.Lock() d.info.Status = driver.StatusConnected d.mu.Unlock() return fmt.Errorf("failed to reset device: %w", restartErr) } d.mu.Lock() d.info.Status = driver.StatusFlashing _, err = d.sendCommand(map[string]interface{}{ "cmd": "load_model", "path": modelPath, }) d.mu.Unlock() } } else { // KL520: error 40 means a model is already loaded in this // USB Boot session. Must reset device + reload firmware. d.driverLog("WARN", "[kneron] KL520 retryable error, restarting bridge...") progressCh <- driver.FlashProgress{ Percent: 5, Stage: "preparing", Message: "preparing... resetting device for new model", } if restartErr := d.restartBridge(); restartErr != nil { d.driverLog("ERROR", "[kneron] restartBridge failed: %v", restartErr) d.mu.Lock() d.info.Status = driver.StatusConnected d.mu.Unlock() return fmt.Errorf("failed to reset device: %w", restartErr) } d.driverLog("INFO", "[kneron] bridge restarted, retrying load_model...") d.mu.Lock() d.info.Status = driver.StatusFlashing _, err = d.sendCommand(map[string]interface{}{ "cmd": "load_model", "path": modelPath, }) d.mu.Unlock() } } } if err != nil { d.driverLog("ERROR", "[kneron] load_model ultimately failed: %v", err) d.mu.Lock() d.info.Status = driver.StatusConnected d.mu.Unlock() return fmt.Errorf("failed to load model: %w", err) } d.driverLog("INFO", "[kneron] load_model succeeded: %s", modelPath) // Simulate remaining flash progress stages (the Kneron SDK does not // provide granular progress, so we approximate it after the model // has been loaded successfully). type stage struct { name string duration time.Duration startPct int endPct int } stages := []stage{ {"transferring", 2 * time.Second, 10, 80}, {"verifying", 1 * time.Second, 80, 95}, {"finalizing", 500 * time.Millisecond, 95, 99}, } // KL720 is faster (USB 3.0, no firmware reload needed) if chip == "KL720" { stages = []stage{ {"transferring", 1 * time.Second, 10, 80}, {"verifying", 500 * time.Millisecond, 80, 95}, {"finalizing", 200 * time.Millisecond, 95, 99}, } } for _, s := range stages { steps := (s.endPct - s.startPct) / 5 if steps < 1 { steps = 1 } interval := s.duration / time.Duration(steps) for i := 0; i <= steps; i++ { pct := s.startPct + (s.endPct-s.startPct)*i/steps progressCh <- driver.FlashProgress{ Percent: pct, Stage: s.name, Message: fmt.Sprintf("%s... %d%%", s.name, pct), } time.Sleep(interval) } } d.mu.Lock() d.modelLoaded = modelPath d.info.FlashedModel = modelPath d.info.Status = driver.StatusConnected d.mu.Unlock() progressCh <- driver.FlashProgress{Percent: 100, Stage: "done", Message: "Flash complete"} return nil } // StartInference begins continuous inference mode. func (d *KneronDriver) StartInference() error { d.mu.Lock() defer d.mu.Unlock() if !d.connected { return fmt.Errorf("device not connected") } d.inferring = true d.info.Status = driver.StatusInferencing return nil } // StopInference stops continuous inference mode. func (d *KneronDriver) StopInference() error { d.mu.Lock() defer d.mu.Unlock() d.inferring = false d.info.Status = driver.StatusConnected return nil } // ReadInference reads the latest inference result. This is equivalent to // calling RunInference with nil image data. func (d *KneronDriver) ReadInference() (*driver.InferenceResult, error) { return d.RunInference(nil) } // RunInference runs inference on the provided image data and returns // the result. If imageData is nil, the bridge will run inference on // a default/empty input. func (d *KneronDriver) RunInference(imageData []byte) (*driver.InferenceResult, error) { d.mu.Lock() pythonReady := d.pythonReady d.mu.Unlock() if !pythonReady { return nil, fmt.Errorf("hardware bridge is not running — device may not be connected") } // Encode image data as base64 for transmission to Python. imageB64 := "" if imageData != nil { imageB64 = base64.StdEncoding.EncodeToString(imageData) } d.mu.Lock() resp, err := d.sendCommand(map[string]interface{}{ "cmd": "inference", "image_base64": imageB64, }) d.mu.Unlock() if err != nil { return nil, fmt.Errorf("inference failed: %w", err) } return parseInferenceResult(resp) } // parseInferenceResult converts a JSON response map into an InferenceResult. func parseInferenceResult(resp map[string]interface{}) (*driver.InferenceResult, error) { // Re-marshal to JSON and unmarshal into the struct for clean conversion. data, err := json.Marshal(resp) if err != nil { return nil, fmt.Errorf("failed to marshal response: %w", err) } var result driver.InferenceResult if err := json.Unmarshal(data, &result); err != nil { return nil, fmt.Errorf("failed to parse inference result: %w", err) } return &result, nil } // GetModelInfo returns information about the currently loaded model. func (d *KneronDriver) GetModelInfo() (*driver.ModelInfo, error) { d.mu.Lock() defer d.mu.Unlock() if d.modelLoaded == "" { return nil, fmt.Errorf("no model loaded") } return &driver.ModelInfo{ ID: d.modelLoaded, Name: d.modelLoaded, LoadedAt: time.Now(), }, nil }