從 local-tool 複製出獨立的「visionA Agent」桌面應用(A3 純橋樑: tunnel client + 配對 UI + 設定,不開 HTTP port、不做本機裝置/推論 UI)。 Bundle ID 與 local-tool 不同(com.innovedus.visiona-agent vs visiona-local), 雙 app 可共存。fork 後不主動 sync,需要時手動 cherry-pick。 Backend / Wails Go(AB1-AB13): - internal/tunnel:6 狀態機(Idle/Connecting/Connected/Reconnecting/Failed/Stopped) + Pair/Unpair/Reconnect/Disconnect binding + ClientHooks event - internal/auth:encrypted file token store(AES-GCM + scrypt + machineID fallback salt + 13 tests) - internal/config:YAML validation + atomic write + 11 tests - internal/log:ring buffer + ExportLog 升級 zip - visionA-backend /api/pairing/exchange:SessionTokenStore + 17 new tests - 三平台 build 驗證(macOS DMG 160 MB / Windows EXE / Linux AppImage) - end-to-end 5 milestone 全綠(pairing → tunnel → forward → reuse 防護 → tunnel drop failover) Frontend / Next.js(AF1-AF7,沿用 visionA-frontend 基礎): - AppShell + Header + TabNav(StatusView / PairView / SettingsView 三 tab) - ConnectionStatusBadge 5 種狀態 - TokenInput regex 驗證 + 7 種錯誤 + 0.5s auto-switch 到狀態頁 - 設定頁 4 區塊(含重新配對 AlertDialog) - agent-api.ts 封裝 Wails bindings(mock/real 雙實作)+ 90 tests Phase 0.7 review-driven fix(Round 2): - A1 Session fixation 防護(RotateSessionID) - A3 mock pairing 預設改 false(必須明確 opt-in)+ startup log - A4 Pair 失敗後 state 清理矩陣(exchange/Save/Start fail 各自終態) - A5 Pair/Unpair/Reconnect lifecycleMu + 50 goroutine race test - F1 重新配對次按鈕 / F2 PairView Esc cancel / F3 Wails BrowserOpenURL / F4 Settings draft 持久 + 未儲存 badge 驗證:agent backend go test -race -count=3 ./... 4 packages 全綠 / agent frontend pnpm test 119 tests 全綠 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
378 lines
11 KiB
Go
378 lines
11 KiB
Go
// Package tunnel 實作 visionA Agent 的 tunnel client。
|
||
//
|
||
// tunnel client 主動撥 WSS 到雲端 relay,accept 反向 yamux stream
|
||
// 並轉發到本機 Gin server(由 Wails shell 啟動的 server 子行程)。
|
||
//
|
||
// 此 package 於 2026-04-22 從 POC (edge-ai-platform) 複製:
|
||
//
|
||
// Source: edge-ai-platform/server/internal/tunnel/client.go
|
||
// Baseline commit: c9d56a62e23bc45554391123152ca90a07a60bdc
|
||
//
|
||
// 與 POC 的差異:
|
||
//
|
||
// 1. Module path 改為 visiona-agent/internal/tunnel(不沿用 server module,
|
||
// 因 Wails app shell 與 server binary 是兩個獨立 Go module;詳見 wsconn/wsconn.go 檔頂註解)
|
||
// 2. KeepAliveInterval 預設 10s(對齊 tunnel.md §4.2 M-5 決議;POC 預設 30s)
|
||
// 3. 修復 backoff bug(POC 單位 mix 導致 attempt>=1 永遠回 30s;見 backoff() 註解)
|
||
// 4. localAddr 不再寫死,由 NewClient 參數注入(呼叫者從 server controller 取 random port)
|
||
// 5. 加入 logger 注入點(nil 時用預設 log.Default)
|
||
//
|
||
// 參考:
|
||
//
|
||
// .autoflow/04-architecture/tunnel.md
|
||
// .autoflow/04-architecture/adr/adr-002-tunnel-protocol.md
|
||
// .autoflow/04-architecture/adr/adr-008-tunnel-client-reuse.md (v2)
|
||
package tunnel
|
||
|
||
import (
|
||
"bufio"
|
||
"io"
|
||
"log"
|
||
"net"
|
||
"net/http"
|
||
"net/url"
|
||
"sync"
|
||
"time"
|
||
|
||
"visiona-agent/internal/wsconn"
|
||
|
||
"github.com/gorilla/websocket"
|
||
"github.com/hashicorp/yamux"
|
||
)
|
||
|
||
// 心跳 / 退避相關常數。
|
||
//
|
||
// KeepAliveInterval 對齊 tunnel.md §4.2 M-5 決議:10 秒送一次 yamux ping,
|
||
// 連續 3 次未收到 pong(= 30 秒)視為 tunnel 掉線。POC 預設為 30 秒,
|
||
// 掉線判定過慢(達 90 秒),故此處改為 10 秒。
|
||
const (
|
||
KeepAliveInterval = 10 * time.Second
|
||
ConnectionWriteTimeout = 10 * time.Second
|
||
|
||
// backoffBase / backoffCap 用於指數退避。
|
||
// base × 2^(attempt-1),clamp 至 cap。例:
|
||
// attempt=1 → 1s,attempt=2 → 2s,attempt=3 → 4s,...,attempt>=6 → 30s
|
||
backoffBase = 1 * time.Second
|
||
backoffCap = 30 * time.Second
|
||
)
|
||
|
||
// Logger 是 tunnel client 的可注入日誌介面。
|
||
// 傳 nil 則使用標準 log.Default()。保持極簡;結構化 log 由呼叫者包裝。
|
||
type Logger interface {
|
||
Printf(format string, args ...interface{})
|
||
}
|
||
|
||
// Client 維持與 relay server 的長連線 tunnel。
|
||
//
|
||
// 生命週期:Start() → 背景 goroutine 持續重連並 accept stream。
|
||
// Stop() 會關閉當前 session 並等 run goroutine 結束。
|
||
type Client struct {
|
||
relayURL string // ws(s)://host:port/tunnel/connect
|
||
token string
|
||
localAddr string // 本機 server 位址,例:127.0.0.1:3721
|
||
|
||
logger Logger
|
||
|
||
// Hooks 讓外層 Manager 觀察 session lifecycle(AB5)。
|
||
// 全部 optional;nil 時不呼叫。所有 callback 在 client 的 run goroutine 內
|
||
// 執行,呼叫者必須自己做 non-blocking(或用 goroutine 發 event)。
|
||
hooks ClientHooks
|
||
|
||
stopCh chan struct{}
|
||
stoppedCh chan struct{}
|
||
}
|
||
|
||
// ClientHooks 讓 Manager 觀察 Client 的連線事件(AB5 新增)。
|
||
//
|
||
// 設計取捨:把狀態決策留在 Manager,不要污染 Client 的簡潔邏輯——Client 仍
|
||
// 只負責「撥 → 連 → 重試」,Manager 訂閱 hooks 後才投射到對外的 ConnectionState。
|
||
type ClientHooks struct {
|
||
// OnDialAttempt 在每次 connect() 開始前呼叫(attempt 從 1 起算)。
|
||
// Manager 用它把 state 推到 "connecting" / "reconnecting"。
|
||
OnDialAttempt func(attempt int)
|
||
|
||
// OnSessionUp 在 yamux.Client 成功建立後呼叫(tunnel 已可轉發)。
|
||
// Manager 用它把 state 推到 "online" 並重置 attempt 計數。
|
||
OnSessionUp func()
|
||
|
||
// OnSessionDown 在 session Accept loop 跳出、session 已結束時呼叫。
|
||
// 參數 err 可能為 nil(主動 Stop)或 non-nil(掉線)。
|
||
OnSessionDown func(err error)
|
||
|
||
// OnDialFailed 在 connect() 建立失敗(dial / yamux handshake 失敗)時呼叫。
|
||
// Manager 用它記錄 lastError / attempt 數。
|
||
OnDialFailed func(attempt int, err error)
|
||
|
||
// OnRetryScheduled 在失敗後排定下一次重試前呼叫,讓 Manager 知道 backoff 值。
|
||
// delay 是 backoff() 回傳的值。
|
||
OnRetryScheduled func(attempt int, delay time.Duration)
|
||
}
|
||
|
||
// NewClient 建立 tunnel client,連到 relayURL 並把進入的 stream 轉發到 localAddr。
|
||
// logger 為 nil 時走標準 log.Default()。
|
||
func NewClient(relayURL, token, localAddr string, logger Logger) *Client {
|
||
if logger == nil {
|
||
logger = log.Default()
|
||
}
|
||
return &Client{
|
||
relayURL: relayURL,
|
||
token: token,
|
||
localAddr: localAddr,
|
||
logger: logger,
|
||
stopCh: make(chan struct{}),
|
||
stoppedCh: make(chan struct{}),
|
||
}
|
||
}
|
||
|
||
// SetHooks 注入 session lifecycle callbacks。必須在 Start() 前呼叫才保證觀測第一次連線。
|
||
// AB5 Manager 用。
|
||
func (c *Client) SetHooks(h ClientHooks) {
|
||
c.hooks = h
|
||
}
|
||
|
||
// Start 在背景啟動 tunnel 連線迴圈,失敗時以指數退避重連。
|
||
// 只能呼叫一次;重複呼叫行為未定義。
|
||
func (c *Client) Start() {
|
||
go c.run()
|
||
}
|
||
|
||
// Stop 關閉 tunnel 連線並停止重連。會阻塞直到 run goroutine 結束。
|
||
// 可安全呼叫多次(第二次會 panic 於 close(stopCh),由呼叫者保證單次)。
|
||
func (c *Client) Stop() {
|
||
close(c.stopCh)
|
||
<-c.stoppedCh
|
||
}
|
||
|
||
func (c *Client) run() {
|
||
defer close(c.stoppedCh)
|
||
|
||
attempt := 0
|
||
for {
|
||
select {
|
||
case <-c.stopCh:
|
||
return
|
||
default:
|
||
}
|
||
|
||
attempt++
|
||
if c.hooks.OnDialAttempt != nil {
|
||
c.hooks.OnDialAttempt(attempt)
|
||
}
|
||
|
||
err := c.connect()
|
||
if err != nil {
|
||
if c.hooks.OnDialFailed != nil {
|
||
c.hooks.OnDialFailed(attempt, err)
|
||
}
|
||
delay := backoff(attempt)
|
||
c.logger.Printf("[tunnel] connection failed (attempt %d): %v — retrying in %v", attempt, err, delay)
|
||
if c.hooks.OnRetryScheduled != nil {
|
||
c.hooks.OnRetryScheduled(attempt, delay)
|
||
}
|
||
|
||
select {
|
||
case <-c.stopCh:
|
||
return
|
||
case <-time.After(delay):
|
||
}
|
||
continue
|
||
}
|
||
|
||
// connect() 正常 return 表示 session 曾經建立後又關閉(非 dial error);
|
||
// attempt 重置後由下一圈重連(不等 backoff,避免 UX 上「重新連線」卡住)。
|
||
attempt = 0
|
||
}
|
||
}
|
||
|
||
// connect 建立一次 tunnel session 並阻塞直到 session 關閉。
|
||
func (c *Client) connect() error {
|
||
u, err := url.Parse(c.relayURL)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
q := u.Query()
|
||
if c.token != "" {
|
||
q.Set("token", c.token)
|
||
}
|
||
u.RawQuery = q.Encode()
|
||
|
||
c.logger.Printf("[tunnel] connecting to %s", u.Host)
|
||
|
||
dialer := websocket.DefaultDialer
|
||
conn, _, err := dialer.Dial(u.String(), nil)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
netConn := wsconn.New(conn)
|
||
|
||
// yamux 設定:改用 10s keepalive(對齊 tunnel.md §4.2 M-5)
|
||
cfg := yamux.DefaultConfig()
|
||
cfg.EnableKeepAlive = true
|
||
cfg.KeepAliveInterval = KeepAliveInterval
|
||
cfg.ConnectionWriteTimeout = ConnectionWriteTimeout
|
||
|
||
session, err := yamux.Client(netConn, cfg)
|
||
if err != nil {
|
||
conn.Close()
|
||
return err
|
||
}
|
||
|
||
c.logger.Printf("[tunnel] connected to relay at %s", u.Host)
|
||
if c.hooks.OnSessionUp != nil {
|
||
c.hooks.OnSessionUp()
|
||
}
|
||
|
||
var wg sync.WaitGroup
|
||
|
||
// stopCh 觸發時主動關 session,讓 Accept() 返回
|
||
go func() {
|
||
<-c.stopCh
|
||
session.Close()
|
||
}()
|
||
|
||
var lastErr error
|
||
for {
|
||
stream, err := session.Accept()
|
||
if err != nil {
|
||
if session.IsClosed() {
|
||
break
|
||
}
|
||
c.logger.Printf("[tunnel] accept error: %v", err)
|
||
lastErr = err
|
||
break
|
||
}
|
||
|
||
wg.Add(1)
|
||
go func(s net.Conn) {
|
||
defer wg.Done()
|
||
c.handleStream(s)
|
||
}(stream)
|
||
}
|
||
|
||
wg.Wait()
|
||
c.logger.Printf("[tunnel] disconnected from relay")
|
||
if c.hooks.OnSessionDown != nil {
|
||
c.hooks.OnSessionDown(lastErr)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// handleStream 從 yamux stream 讀一個 HTTP request,轉給本機 server,再把 response 寫回 stream。
|
||
func (c *Client) handleStream(stream net.Conn) {
|
||
defer stream.Close()
|
||
|
||
req, err := http.ReadRequest(bufio.NewReader(stream))
|
||
if err != nil {
|
||
c.logger.Printf("[tunnel] failed to read request: %v", err)
|
||
return
|
||
}
|
||
|
||
req.URL.Scheme = "http"
|
||
req.URL.Host = c.localAddr
|
||
req.RequestURI = "" // http.Client 要求清空
|
||
|
||
if isWebSocketUpgrade(req) {
|
||
c.handleWebSocket(stream, req)
|
||
return
|
||
}
|
||
|
||
resp, err := http.DefaultTransport.RoundTrip(req)
|
||
if err != nil {
|
||
c.logger.Printf("[tunnel] local request failed: %v", err)
|
||
errResp := &http.Response{
|
||
StatusCode: http.StatusBadGateway,
|
||
ProtoMajor: 1,
|
||
ProtoMinor: 1,
|
||
Header: make(http.Header),
|
||
Body: http.NoBody,
|
||
}
|
||
_ = errResp.Write(stream)
|
||
return
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
_ = resp.Write(stream)
|
||
}
|
||
|
||
// handleWebSocket 用 raw TCP 把 WebSocket upgrade 請求轉發到本機 server,之後做雙向 pipe。
|
||
func (c *Client) handleWebSocket(stream net.Conn, req *http.Request) {
|
||
localConn, err := net.DialTimeout("tcp", c.localAddr, 10*time.Second)
|
||
if err != nil {
|
||
c.logger.Printf("[tunnel] ws: failed to connect to local: %v", err)
|
||
return
|
||
}
|
||
defer localConn.Close()
|
||
|
||
req.RequestURI = req.URL.RequestURI() // raw write 需要還原
|
||
_ = req.Write(localConn)
|
||
|
||
var wg sync.WaitGroup
|
||
wg.Add(2)
|
||
|
||
go func() {
|
||
defer wg.Done()
|
||
_, _ = io.Copy(localConn, stream)
|
||
localConn.Close()
|
||
}()
|
||
go func() {
|
||
defer wg.Done()
|
||
_, _ = io.Copy(stream, localConn)
|
||
stream.Close()
|
||
}()
|
||
|
||
wg.Wait()
|
||
}
|
||
|
||
func isWebSocketUpgrade(r *http.Request) bool {
|
||
for _, v := range r.Header["Upgrade"] {
|
||
if v == "websocket" || v == "Websocket" || v == "WebSocket" {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
// backoff 回傳指數退避時長,上限 backoffCap。
|
||
//
|
||
// 演算法:base × 2^(attempt-1),clamp 至 [base, cap]。
|
||
//
|
||
// POC bug 修復紀錄:
|
||
//
|
||
// POC 原實作:
|
||
// d := time.Duration(math.Min(float64(time.Second)*math.Pow(2, float64(attempt-1)), 30)) * time.Second
|
||
// 問題:math.Min 的第二參數「30」是 float64 純數字,但第一參數是「time.Second
|
||
// (= 1e9 ns) × 2^(attempt-1)」,單位是 ns。attempt=1 時 1e9 vs 30 取 min = 30
|
||
// (ns),再乘以 time.Second → 30s;attempt=2 時 2e9 vs 30 → 30 → 30s;任何
|
||
// attempt>=1 都永遠回 30s(而非預期的 1s、2s、4s...)。下方 clamp `if d < 1s`
|
||
// 把 1ns 拉回 1s,掩蓋了這個 bug 的第一次現身,但從 attempt=1 起就是 30s。
|
||
//
|
||
// 修復後:用純 time.Duration 做位移/比較,不再 mix float64 與 Duration。
|
||
// attempt=0(不應發生)回 base;attempt 非常大時 overflow 前先 cap。
|
||
//
|
||
// 預期輸出:
|
||
//
|
||
// attempt=1 → 1s
|
||
// attempt=2 → 2s
|
||
// attempt=3 → 4s
|
||
// attempt=4 → 8s
|
||
// attempt=5 → 16s
|
||
// attempt=6 → 30s(cap)
|
||
// attempt>=6 → 30s
|
||
func backoff(attempt int) time.Duration {
|
||
if attempt <= 0 {
|
||
return backoffBase
|
||
}
|
||
// shift 太大會 overflow,所以 attempt >= 某個值就直接回 cap
|
||
// backoffBase = 1s = 1e9 ns,2^30 × 1e9 ≈ 10^18 已接近 int64 上限,
|
||
// attempt >= 31 之後一律 cap 即可。
|
||
if attempt >= 31 {
|
||
return backoffCap
|
||
}
|
||
d := backoffBase << (attempt - 1)
|
||
if d > backoffCap || d < backoffBase {
|
||
return backoffCap
|
||
}
|
||
return d
|
||
}
|