package kneron import ( "encoding/json" "fmt" "os" "os/exec" "path/filepath" "strings" "time" "visiona-local/server/internal/driver" ) // ResolvePython finds the best Python interpreter for the given script path. // Search order: script-local venv → parent venv → platform config dir venv → system python3/python. func ResolvePython(scriptPath string) string { scriptDir := filepath.Dir(scriptPath) parentDir := filepath.Dir(scriptDir) // Build candidate list with both Unix and Windows venv layouts var candidates []string for _, base := range []string{scriptDir, parentDir} { candidates = append(candidates, filepath.Join(base, "venv", "bin", "python3"), // Unix filepath.Join(base, "venv", "Scripts", "python.exe"), // Windows ) } if home, err := os.UserHomeDir(); err == nil { candidates = append(candidates, filepath.Join(home, ".edge-ai-platform", "venv", "bin", "python3"), filepath.Join(home, ".edge-ai-platform", "venv", "Scripts", "python.exe"), ) } // On Windows, also check %LOCALAPPDATA%\EdgeAIPlatform\venv if appData := os.Getenv("LOCALAPPDATA"); appData != "" { candidates = append(candidates, filepath.Join(appData, "EdgeAIPlatform", "venv", "Scripts", "python.exe"), ) } for _, p := range candidates { if _, err := os.Stat(p); err == nil { return p } } // Fallback to system python for _, name := range []string{"python3", "python"} { if p, err := exec.LookPath(name); err == nil { return p } } return "python3" } // KneronVendorID is the USB vendor ID for Kneron devices. const KneronVendorID uint16 = 0x3231 // Known Kneron product IDs. const ( ProductIDKL520 = "0x0100" ProductIDKL720 = "0x0200" ProductIDKL720Alt = "0x0720" ) // chipFromProductID returns the chip name and device type from the product_id // reported by the Python bridge scan result. func chipFromProductID(productID string) (chip string, deviceType string) { pid := strings.ToLower(strings.TrimSpace(productID)) switch pid { case "0x0100": return "KL520", "kneron_kl520" case "0x0200", "0x0720": return "KL720", "kneron_kl720" default: // Unknown product — default to KL520 for USB Boot devices, // otherwise use the raw product ID as suffix. return "KL520", "kneron_kl520" } } // DetectDevices attempts to discover all connected Kneron devices (KL520, KL720, etc.) // by invoking the Python bridge script with a scan command. If Python or // the bridge script is not available, it returns an empty list. func DetectDevices(scriptPath string) []driver.DeviceInfo { // Try to run the bridge script with a scan command via a short-lived process. pythonBin := ResolvePython(scriptPath) cmd := exec.Command(pythonBin, scriptPath) cmd.Stdin = nil // Ensure libusb-1.0.dll can be found on Windows by adding the binary's // directory to PATH (the installer places the DLL there). scriptDir := filepath.Dir(scriptPath) installDir := filepath.Dir(scriptDir) cmd.Env = append(os.Environ(), fmt.Sprintf("PATH=%s;%s;%s", installDir, scriptDir, os.Getenv("PATH")), ) stdinPipe, err := cmd.StdinPipe() if err != nil { return nil } stdoutPipe, err := cmd.StdoutPipe() if err != nil { stdinPipe.Close() return nil } if err := cmd.Start(); err != nil { return nil } defer func() { stdinPipe.Close() cmd.Process.Kill() cmd.Wait() }() // Read the ready signal. decoder := json.NewDecoder(stdoutPipe) var readyResp map[string]interface{} done := make(chan error, 1) go func() { done <- decoder.Decode(&readyResp) }() select { case err := <-done: if err != nil { return nil } case <-time.After(5 * time.Second): return nil } if status, ok := readyResp["status"].(string); !ok || status != "ready" { return nil } // Send the scan command. scanCmd, _ := json.Marshal(map[string]interface{}{"cmd": "scan"}) scanCmd = append(scanCmd, '\n') if _, err := stdinPipe.Write(scanCmd); err != nil { return nil } // Read the scan response. var scanResp map[string]interface{} scanDone := make(chan error, 1) go func() { scanDone <- decoder.Decode(&scanResp) }() select { case err := <-scanDone: if err != nil { return nil } case <-time.After(5 * time.Second): return nil } // Parse detected devices from the response. devicesRaw, ok := scanResp["devices"].([]interface{}) if !ok || len(devicesRaw) == 0 { return nil } // Track per-chip counters for naming (e.g. "KL520 #1", "KL720 #1"). chipCount := map[string]int{} var devices []driver.DeviceInfo for _, devRaw := range devicesRaw { dev, ok := devRaw.(map[string]interface{}) if !ok { continue } port := "" if p, ok := dev["port"].(string); ok { port = p } fw := "" if f, ok := dev["firmware"].(string); ok { fw = f } productID := "" if p, ok := dev["product_id"].(string); ok { productID = p } chip, devType := chipFromProductID(productID) chipCount[chip]++ idx := chipCount[chip] info := driver.DeviceInfo{ ID: fmt.Sprintf("%s-%d", strings.ToLower(chip), idx-1), Name: fmt.Sprintf("Kneron %s #%d", chip, idx), Type: devType, Port: port, VendorID: KneronVendorID, Status: driver.StatusDetected, FirmwareVer: fw, } devices = append(devices, info) } return devices } // DetectKL720Devices is a backward-compatible alias for DetectDevices. // Deprecated: Use DetectDevices instead. func DetectKL720Devices(scriptPath string) []driver.DeviceInfo { return DetectDevices(scriptPath) }