//go:build windows package handlers import ( "fmt" "os" "os/exec" "path/filepath" "strings" "syscall" ) // installKneronWinUSBDriverHandler 呼叫 KneronPLUS SDK 的 libwdi wrapper 安裝 WinUSB driver。 // // 實作參考 edge-ai-platform installer/platform_windows.go 的 installKneronDriverViaSDK: // - 組 Python script 呼叫 kp.core.install_driver_for_windows(pid) 對三種 PID // - PowerShell Start-Process -Verb RunAs 提權(libwdi 要求 admin) // - 結果寫到 temp file 讓 Go 讀回來 func installKneronWinUSBDriverHandler(pythonBin string) error { if _, err := os.Stat(pythonBin); err != nil { return fmt.Errorf("python interpreter not found at %s: %w", pythonBin, err) } resultPath := filepath.Join(os.TempDir(), "visiona-local-driver-result.txt") _ = os.Remove(resultPath) // venv 根目錄 venvRoot := filepath.Dir(filepath.Dir(pythonBin)) // Python script: // 1. 對三種 PID 呼叫 kp.core.install_driver_for_windows // 2. 安裝完立刻 scan_devices() 驗證,看 SDK 能不能開 handle // 3. 不 swallow exception — 把 traceback 完整寫到 result 檔 pyScript := fmt.Sprintf(` import sys, os, traceback result_path = r'%s' lines = [] def out(msg): lines.append(str(msg)) try: sys.stdout.write(str(msg) + '\n'); sys.stdout.flush() except Exception: pass try: kp_lib = os.path.join(r'%s', 'Lib', 'site-packages', 'kp', 'lib') if os.path.isdir(kp_lib): os.environ['PATH'] = kp_lib + ';' + os.environ.get('PATH', '') try: os.add_dll_directory(kp_lib) except Exception as dle: out(f"WARN: add_dll_directory failed: {dle}") import kp out(f"kp module imported: {kp.__file__}") for pid in [kp.ProductId.KP_DEVICE_KL520, kp.ProductId.KP_DEVICE_KL720, kp.ProductId.KP_DEVICE_KL720_LEGACY]: try: out(f"Installing driver for {pid.name} (value={pid.value})...") kp.core.install_driver_for_windows(pid) out(f" OK: {pid.name}") except Exception as e: out(f" SKIP: {pid.name}: {type(e).__name__}: {e}") # 驗證:裝完立刻 scan + 印 is_connectable out("Verifying with scan_devices()...") try: descs = kp.core.scan_devices() out(f" found {descs.device_descriptor_number} Kneron device(s)") for i in range(descs.device_descriptor_number): d = descs.device_descriptor_list[i] out(f" [{i}] pid=0x{d.product_id:04X} usb_port={d.usb_port_id} connectable={d.is_connectable} fw={d.firmware}") except Exception as se: out(f" scan_devices failed: {type(se).__name__}: {se}") out("DONE") except ImportError as e: out(f"ERROR: kp module not available: {e}") out(traceback.format_exc()) except Exception as e: out(f"ERROR: {type(e).__name__}: {e}") out(traceback.format_exc()) finally: try: with open(result_path, 'w', encoding='utf-8') as f: f.write('\n'.join(lines)) except Exception: pass `, resultPath, venvRoot) scriptPath := filepath.Join(os.TempDir(), "visiona-local-install-usb-driver.py") if err := os.WriteFile(scriptPath, []byte(pyScript), 0o644); err != nil { return fmt.Errorf("write driver install script: %w", err) } defer os.Remove(scriptPath) elevateCmd := fmt.Sprintf( `Start-Process -FilePath '%s' -ArgumentList '"%s"' -Verb RunAs -Wait -WindowStyle Hidden`, pythonBin, scriptPath, ) cmd := exec.Command("powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-Command", elevateCmd) cmd.SysProcAttr = &syscall.SysProcAttr{HideWindow: true, CreationFlags: 0x08000000} if out, err := cmd.CombinedOutput(); err != nil { return fmt.Errorf("driver 安裝需要系統管理員權限(UAC 可能被拒絕):%s", strings.TrimSpace(string(out))) } resultData, err := os.ReadFile(resultPath) _ = os.Remove(resultPath) if err != nil { return fmt.Errorf("driver 安裝已執行但無法讀取結果,請在裝置管理員確認或用 Zadig 手動安裝 https://zadig.akeo.ie/") } result := strings.TrimSpace(string(resultData)) // 把完整結果印到 server stderr 方便使用者 / 我們 debug fmt.Fprintln(os.Stderr, "=== Kneron driver install result ===") fmt.Fprintln(os.Stderr, result) fmt.Fprintln(os.Stderr, "=== end ===") if strings.Contains(result, "ERROR:") { return fmt.Errorf("driver 安裝失敗(完整訊息見 server.stderr.log):%s", firstLines(result, 6)) } // 檢查 DONE 標記 + 至少一個 OK if !strings.Contains(result, "DONE") { return fmt.Errorf("driver 安裝未完成(沒有 DONE 標記,見 server.stderr.log):%s", firstLines(result, 6)) } if !strings.Contains(result, "OK:") { return fmt.Errorf("driver 安裝全部失敗,沒有任何 PID 成功(見 server.stderr.log):%s", firstLines(result, 10)) } return nil } // firstLines 取結果的前 n 行,避免 error message 過長 func firstLines(s string, n int) string { lines := strings.Split(s, "\n") if len(lines) > n { lines = lines[:n] lines = append(lines, "...") } return strings.Join(lines, " | ") }