// manager_test.go — Tunnel Manager 狀態機 / Pair / Unpair / listener 測試(AB5 範圍)。 // // 測試目標(對齊 TDD §4 + §6.4): // // - 狀態機 6 種狀態的 transition:notPaired → connecting → online → reconnecting → online // - Pair(mock mode)成功後自動進入 connecting,且 token 寫入 TokenStore // - Unpair 後回到 notPaired 且 TokenStore 被清空 // - Subscribe 會收到 initial snapshot + 後續變更事件 // - Reconnect 會重置 attempt 計數 // - Stop 之後 state 進入 offline(若有 token)或 notPaired(若無) // - 預設 Auto 模式下不會因為 MaxRetry 停止 // // 注意:本檔不測 yamux 的真實重連 backoff timing(會很脆弱),而是用 fakeRelay // 模擬連線成功 + 主動關閉,觀察 Manager state 是否正確反映。 package tunnel import ( "context" "errors" "net" "net/http" "net/http/httptest" "net/url" "strings" "sync" "sync/atomic" "testing" "time" "visiona-agent/internal/wsconn" "github.com/gorilla/websocket" "github.com/hashicorp/yamux" ) // --------------------------------------------------------------------- // Test fixtures // --------------------------------------------------------------------- // fakeRelay 開一個 WebSocket server 模擬 remote-proxy 的 tunnel endpoint。 // Agent 側連上時 server 端跑 yamux.Server() 並保持 session 打開。 type fakeRelay struct { server *httptest.Server upgrader websocket.Upgrader tokenSeen chan string sessionsMu sync.Mutex sessions []*yamux.Session } func newFakeRelay(t *testing.T) *fakeRelay { t.Helper() fr := &fakeRelay{ upgrader: websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}, tokenSeen: make(chan string, 16), } fr.server = httptest.NewServer(http.HandlerFunc(fr.handle)) return fr } func (fr *fakeRelay) close() { fr.sessionsMu.Lock() for _, s := range fr.sessions { _ = s.Close() } fr.sessionsMu.Unlock() fr.server.Close() } func (fr *fakeRelay) wsURL() string { u, _ := url.Parse(fr.server.URL) u.Scheme = "ws" u.Path = "/tunnel/connect" return u.String() } // closeAllSessions 主動關閉目前所有 yamux session,模擬 relay 斷線。 func (fr *fakeRelay) closeAllSessions() { fr.sessionsMu.Lock() defer fr.sessionsMu.Unlock() for _, s := range fr.sessions { _ = s.Close() } fr.sessions = fr.sessions[:0] } // waitForSession 輪詢直到 agent 連上來、sessions 中至少有一個 yamux.Session, // 或 timeout。integration_test.go 用來從 relay 端開 stream 測試 forward。 func (fr *fakeRelay) waitForSession(t *testing.T, timeout time.Duration) *yamux.Session { t.Helper() deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { fr.sessionsMu.Lock() if len(fr.sessions) > 0 { s := fr.sessions[len(fr.sessions)-1] fr.sessionsMu.Unlock() return s } fr.sessionsMu.Unlock() time.Sleep(20 * time.Millisecond) } t.Fatalf("waitForSession: no session after %v", timeout) return nil } func (fr *fakeRelay) handle(w http.ResponseWriter, r *http.Request) { token := r.URL.Query().Get("token") select { case fr.tokenSeen <- token: default: } ws, err := fr.upgrader.Upgrade(w, r, nil) if err != nil { return } nc := wsconn.New(ws) sess, err := yamux.Server(nc, yamux.DefaultConfig()) if err != nil { _ = ws.Close() return } fr.sessionsMu.Lock() fr.sessions = append(fr.sessions, sess) fr.sessionsMu.Unlock() <-sess.CloseChan() } // waitForState 輪詢 Manager 直到 state 符合 want,逾時 fail。 func waitForState(t *testing.T, m *Manager, want ConnectionState, timeout time.Duration) { t.Helper() deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { if got := m.Status().State; got == want { return } time.Sleep(20 * time.Millisecond) } t.Fatalf("waitForState: want %v, got %v after %v", want, m.Status().State, timeout) } // fakeExchanger 實作 PairingExchanger,測試 Pair 流程不打真 HTTP。 type fakeExchanger struct { result ExchangeResult err error calls int32 } func (f *fakeExchanger) Exchange(pairingToken string) (ExchangeResult, error) { atomic.AddInt32(&f.calls, 1) if f.err != nil { return ExchangeResult{}, f.err } return f.result, nil } // --------------------------------------------------------------------- // Basic lifecycle (前置 AB4 測試的 modernized 版本) // --------------------------------------------------------------------- // TestManagerMissingConfig:缺必要設定時 Start 應回錯誤。 func TestManagerMissingConfig(t *testing.T) { cases := []struct { name string cfg Config wantErr error }{ {"no relay url", Config{SessionToken: "t", LocalAddr: "127.0.0.1:1"}, ErrMissingConfig}, {"no token no store", Config{RelayURL: "ws://x", LocalAddr: "127.0.0.1:1"}, ErrNotPaired}, {"no local addr", Config{RelayURL: "ws://x", SessionToken: "t"}, ErrMissingConfig}, } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { m := NewManager(tc.cfg) defer m.Close() err := m.Start(context.Background()) if !errors.Is(err, tc.wantErr) { t.Errorf("err = %v, want %v", err, tc.wantErr) } }) } } // TestManagerStartStop:Manager 能 Start → 收到 token → Stop。 func TestManagerStartStop(t *testing.T) { fr := newFakeRelay(t) defer fr.close() ln, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatalf("listen: %v", err) } defer ln.Close() m := NewManager(Config{ RelayURL: fr.wsURL(), SessionToken: "vAs_test", LocalAddr: ln.Addr().String(), Account: "demo@visionA.local", }) defer m.Close() // 初始狀態:有 SessionToken → StateOffline(尚未 Start) if got := m.Status().State; got != StateOffline { t.Errorf("initial state = %v, want offline", got) } ctx, cancel := context.WithCancel(context.Background()) defer cancel() if err := m.Start(ctx); err != nil { t.Fatalf("Start: %v", err) } // 等待 relay 收到 token select { case got := <-fr.tokenSeen: if got != "vAs_test" { t.Errorf("relay saw token %q, want vAs_test", got) } case <-time.After(3 * time.Second): t.Fatal("timeout waiting for relay to see token") } // 應進入 online waitForState(t, m, StateOnline, 3*time.Second) // Stop if err := m.Stop(); err != nil { t.Fatalf("Stop: %v", err) } // 有 token 所以進 offline,不是 notPaired waitForState(t, m, StateOffline, 2*time.Second) // 再 Stop 一次應該 idempotent if err := m.Stop(); err != nil { t.Errorf("second Stop: %v", err) } } // TestManagerDoubleStart:連 Start 兩次應回 ErrAlreadyStarted。 func TestManagerDoubleStart(t *testing.T) { fr := newFakeRelay(t) defer fr.close() ln, _ := net.Listen("tcp", "127.0.0.1:0") defer ln.Close() m := NewManager(Config{ RelayURL: fr.wsURL(), SessionToken: "vAs_test", LocalAddr: ln.Addr().String(), }) defer m.Close() ctx, cancel := context.WithCancel(context.Background()) defer cancel() if err := m.Start(ctx); err != nil { t.Fatalf("first Start: %v", err) } defer m.Stop() if err := m.Start(ctx); !errors.Is(err, ErrAlreadyStarted) { t.Errorf("second Start err = %v, want ErrAlreadyStarted", err) } } // --------------------------------------------------------------------- // State machine transitions // --------------------------------------------------------------------- // TestStateTransitionConnectingToOnline:Start → connecting → online 的完整流轉。 func TestStateTransitionConnectingToOnline(t *testing.T) { fr := newFakeRelay(t) defer fr.close() ln, _ := net.Listen("tcp", "127.0.0.1:0") defer ln.Close() // listener 收集所有狀態變更 var ( mu sync.Mutex states []ConnectionState ) m := NewManager(Config{ RelayURL: fr.wsURL(), SessionToken: "vAs_abc", LocalAddr: ln.Addr().String(), }) defer m.Close() unsub := m.Subscribe(func(s ConnectionStatus) { mu.Lock() states = append(states, s.State) mu.Unlock() }) defer unsub() if err := m.Start(context.Background()); err != nil { t.Fatalf("Start: %v", err) } waitForState(t, m, StateOnline, 3*time.Second) // 驗證觀察到的序列包含 connecting 和 online time.Sleep(100 * time.Millisecond) // 讓 fanout goroutine 有時間 flush mu.Lock() seenConnecting := false seenOnline := false for _, s := range states { if s == StateConnecting { seenConnecting = true } if s == StateOnline { seenOnline = true } } mu.Unlock() if !seenConnecting { t.Error("did not observe StateConnecting transition") } if !seenOnline { t.Error("did not observe StateOnline transition") } _ = m.Stop() } // TestStateTransitionOnlineToReconnecting:連上後 relay 關 session,應進入 reconnecting。 // 透過 Subscribe 觀察所有 state transition(poll 會錯過快速切換的中間狀態)。 func TestStateTransitionOnlineToReconnecting(t *testing.T) { fr := newFakeRelay(t) defer fr.close() ln, _ := net.Listen("tcp", "127.0.0.1:0") defer ln.Close() m := NewManager(Config{ RelayURL: fr.wsURL(), SessionToken: "vAs_test", LocalAddr: ln.Addr().String(), }) defer m.Close() var ( mu sync.Mutex states []ConnectionState ) unsub := m.Subscribe(func(s ConnectionStatus) { mu.Lock() states = append(states, s.State) mu.Unlock() }) defer unsub() if err := m.Start(context.Background()); err != nil { t.Fatalf("Start: %v", err) } defer m.Stop() waitForState(t, m, StateOnline, 3*time.Second) // Relay 主動關 session → Manager 應先進 reconnecting,再(若 relay 仍開著)回 online fr.closeAllSessions() // 等到我們在 listener 看到 reconnecting 或 3 秒超時 deadline := time.Now().Add(3 * time.Second) sawReconnecting := false for time.Now().Before(deadline) { mu.Lock() for _, s := range states { if s == StateReconnecting { sawReconnecting = true break } } mu.Unlock() if sawReconnecting { break } time.Sleep(20 * time.Millisecond) } if !sawReconnecting { mu.Lock() t.Errorf("did not observe reconnecting after relay session close; observed states = %v", states) mu.Unlock() } } // --------------------------------------------------------------------- // Pair / Unpair // --------------------------------------------------------------------- // TestPairSuccess:mock mode Pair 成功後 token 存入 TokenStore 並啟動 tunnel。 func TestPairSuccess(t *testing.T) { fr := newFakeRelay(t) defer fr.close() ln, _ := net.Listen("tcp", "127.0.0.1:0") defer ln.Close() store := NewMemoryTokenStore() fakeToken := "vAs_" + "deadbeef" + "0000111122223333444455556666777788889999aaaabbbbccccdddd" fakeExch := &fakeExchanger{ result: ExchangeResult{ SessionToken: fakeToken, Account: "pairtest@visionA.local", RelayURL: fr.wsURL(), }, } m := NewManager(Config{ LocalAddr: ln.Addr().String(), TokenStore: store, Exchanger: fakeExch, }) defer m.Close() // 初始:未配對 if m.Status().State != StateNotPaired { t.Fatalf("initial state = %v, want notPaired", m.Status().State) } pairToken := "vAc_0123456789abcdef0123456789abcdef" if err := m.Pair(context.Background(), pairToken); err != nil { t.Fatalf("Pair: %v", err) } waitForState(t, m, StateOnline, 3*time.Second) // TokenStore 應寫入 got, _ := store.Load() if got != fakeToken { t.Errorf("TokenStore token = %q, want %q", got, fakeToken) } // Account 應更新 if m.Status().Account != "pairtest@visionA.local" { t.Errorf("Status.Account = %q, want pairtest@visionA.local", m.Status().Account) } // Session preview 應遮蔽 if preview := m.Status().SessionTokenPreview; preview == "" || preview == fakeToken { t.Errorf("SessionTokenPreview should be masked, got %q", preview) } _ = m.Stop() } // TestPairInvalidTokenFormat:格式不合應直接 reject,不呼叫 Exchanger。 func TestPairInvalidTokenFormat(t *testing.T) { fakeExch := &fakeExchanger{} m := NewManager(Config{ LocalAddr: "127.0.0.1:1", TokenStore: NewMemoryTokenStore(), Exchanger: fakeExch, }) defer m.Close() err := m.Pair(context.Background(), "bogus-token") if !errors.Is(err, ErrInvalidTokenFormat) { t.Errorf("err = %v, want ErrInvalidTokenFormat", err) } if atomic.LoadInt32(&fakeExch.calls) != 0 { t.Errorf("Exchanger.Exchange should not be called for invalid format; calls = %d", fakeExch.calls) } } // TestPairNoExchangerConfigured:沒設 Exchanger 應回錯誤。 func TestPairNoExchangerConfigured(t *testing.T) { m := NewManager(Config{ LocalAddr: "127.0.0.1:1", TokenStore: NewMemoryTokenStore(), }) defer m.Close() err := m.Pair(context.Background(), "vAc_0123456789abcdef0123456789abcdef") if err == nil { t.Error("expected error when Exchanger is nil") } } // TestUnpair:Unpair 後 token 清空且 state = notPaired。 func TestUnpair(t *testing.T) { fr := newFakeRelay(t) defer fr.close() ln, _ := net.Listen("tcp", "127.0.0.1:0") defer ln.Close() store := NewMemoryTokenStore() _ = store.Save("vAs_already_paired_token") m := NewManager(Config{ RelayURL: fr.wsURL(), SessionToken: "vAs_already_paired_token", LocalAddr: ln.Addr().String(), TokenStore: store, }) defer m.Close() _ = m.Start(context.Background()) waitForState(t, m, StateOnline, 3*time.Second) if err := m.Unpair(); err != nil { t.Fatalf("Unpair: %v", err) } waitForState(t, m, StateNotPaired, 2*time.Second) got, _ := store.Load() if got != "" { t.Errorf("TokenStore should be empty after Unpair, got %q", got) } if m.Status().Account != "" { t.Errorf("Account should be cleared, got %q", m.Status().Account) } } // --------------------------------------------------------------------- // Subscribe / fanout // --------------------------------------------------------------------- // TestSubscribeReceivesInitialSnapshot:Subscribe 後立刻收到當前狀態。 func TestSubscribeReceivesInitialSnapshot(t *testing.T) { m := NewManager(Config{ RelayURL: "ws://example/relay", LocalAddr: "127.0.0.1:1", TokenStore: NewMemoryTokenStore(), }) defer m.Close() done := make(chan ConnectionStatus, 1) unsub := m.Subscribe(func(s ConnectionStatus) { select { case done <- s: default: } }) defer unsub() select { case s := <-done: if s.State != StateNotPaired { t.Errorf("initial snapshot state = %v, want notPaired", s.State) } if s.RelayURL != "ws://example/relay" { t.Errorf("RelayURL = %q, want ws://example/relay", s.RelayURL) } case <-time.After(1 * time.Second): t.Fatal("timeout waiting for initial snapshot") } } // TestSubscribeUnsubscribe:unsubscribe 後應不再收到事件。 func TestSubscribeUnsubscribe(t *testing.T) { m := NewManager(Config{ RelayURL: "ws://example/relay", LocalAddr: "127.0.0.1:1", TokenStore: NewMemoryTokenStore(), }) defer m.Close() var count int32 unsub := m.Subscribe(func(s ConnectionStatus) { atomic.AddInt32(&count, 1) }) // 等 initial snapshot 送達 time.Sleep(50 * time.Millisecond) atomic.StoreInt32(&count, 0) unsub() unsub() // idempotent // 觸發一次 transition,unsubscribed listener 不該收到 m.transition(StateError, "test", 0) time.Sleep(50 * time.Millisecond) if atomic.LoadInt32(&count) != 0 { t.Errorf("unsubscribed listener still received %d events", count) } } // TestListenerPanicDoesNotCrashManager:listener panic 不該拖垮 Manager。 func TestListenerPanicDoesNotCrashManager(t *testing.T) { m := NewManager(Config{ RelayURL: "ws://x", LocalAddr: "127.0.0.1:1", TokenStore: NewMemoryTokenStore(), }) defer m.Close() unsub := m.Subscribe(func(s ConnectionStatus) { panic("test panic in listener") }) defer unsub() // 觸發幾次 transition m.transition(StateConnecting, "", 1) m.transition(StateError, "boom", 0) time.Sleep(100 * time.Millisecond) // 沒有 panic 拖垮 goroutine 就算通過 } // --------------------------------------------------------------------- // Reconnect // --------------------------------------------------------------------- // TestReconnectResetsAttempt:Reconnect 應把 attemptNo 歸零。 func TestReconnectResetsAttempt(t *testing.T) { fr := newFakeRelay(t) defer fr.close() ln, _ := net.Listen("tcp", "127.0.0.1:0") defer ln.Close() m := NewManager(Config{ RelayURL: fr.wsURL(), SessionToken: "vAs_test", LocalAddr: ln.Addr().String(), }) defer m.Close() _ = m.Start(context.Background()) waitForState(t, m, StateOnline, 3*time.Second) // 人工塞 attempt 數 atomic.StoreInt32(&m.attemptNo, 42) if err := m.Reconnect(context.Background()); err != nil { t.Fatalf("Reconnect: %v", err) } waitForState(t, m, StateOnline, 3*time.Second) if got := atomic.LoadInt32(&m.attemptNo); got != 0 { t.Errorf("attemptNo after Reconnect = %d, want 0", got) } } // --------------------------------------------------------------------- // Backoff / client.go 已有測試覆蓋;Manager 這層只確認 Auto 模式不會因 MaxRetry 停止。 // --------------------------------------------------------------------- // TestAutoModeNeverStopsForMaxRetry:Auto 模式下 attempt 超過 MaxRetry 仍不呼叫 Stop。 // 因為真實重試很慢(backoff),用 hook 直接測 shouldStopForMaxRetry 的邏輯。 func TestAutoModeNeverStopsForMaxRetry(t *testing.T) { m := NewManager(Config{ RelayURL: "ws://x", LocalAddr: "127.0.0.1:1", TokenStore: NewMemoryTokenStore(), MaxRetry: 3, ReconnectMode: ReconnectAuto, }) defer m.Close() for i := 1; i <= 10; i++ { if m.shouldStopForMaxRetry(i) { t.Errorf("Auto mode should never stop (attempt=%d)", i) } } } // TestManualModeStopsAfterMaxRetry:Manual 模式 attempt > MaxRetry 後 shouldStopForMaxRetry 回 true。 func TestManualModeStopsAfterMaxRetry(t *testing.T) { m := NewManager(Config{ RelayURL: "ws://x", LocalAddr: "127.0.0.1:1", TokenStore: NewMemoryTokenStore(), MaxRetry: 3, ReconnectMode: ReconnectManual, }) defer m.Close() cases := []struct { attempt int want bool }{ {1, false}, {2, false}, {3, false}, // 剛好 = MaxRetry,還不算超過 {4, true}, {5, true}, {100, true}, } for _, tc := range cases { if got := m.shouldStopForMaxRetry(tc.attempt); got != tc.want { t.Errorf("shouldStopForMaxRetry(%d) = %v, want %v", tc.attempt, got, tc.want) } } } // --------------------------------------------------------------------- // TokenStore integration // --------------------------------------------------------------------- // TestStartLoadsTokenFromStore:Config.SessionToken 為空時應從 TokenStore 載入。 func TestStartLoadsTokenFromStore(t *testing.T) { fr := newFakeRelay(t) defer fr.close() ln, _ := net.Listen("tcp", "127.0.0.1:0") defer ln.Close() store := NewMemoryTokenStore() _ = store.Save("vAs_from_store") m := NewManager(Config{ RelayURL: fr.wsURL(), LocalAddr: ln.Addr().String(), TokenStore: store, // SessionToken 刻意留空 }) defer m.Close() if err := m.Start(context.Background()); err != nil { t.Fatalf("Start: %v", err) } defer m.Stop() // Relay 看到的 token 應該是 store 裡的 select { case got := <-fr.tokenSeen: if got != "vAs_from_store" { t.Errorf("relay saw token %q, want vAs_from_store", got) } case <-time.After(3 * time.Second): t.Fatal("timeout") } } // --------------------------------------------------------------------- // Fix-A4:Pair 失敗 / 部分失敗 state 清理測試 // --------------------------------------------------------------------- // failingExchanger 模擬 exchange 失敗。 type failingExchanger struct { err error } func (f *failingExchanger) Exchange(_ string) (ExchangeResult, error) { return ExchangeResult{}, f.err } // failingTokenStore 模擬 Save 失敗(其他方法正常)。 type failingTokenStore struct { saveErr error deleteErr error mu sync.RWMutex tok string } func (s *failingTokenStore) Save(tok string) error { if s.saveErr != nil { return s.saveErr } s.mu.Lock() defer s.mu.Unlock() s.tok = tok return nil } func (s *failingTokenStore) Load() (string, error) { s.mu.RLock() defer s.mu.RUnlock() return s.tok, nil } func (s *failingTokenStore) Delete() error { if s.deleteErr != nil { return s.deleteErr } s.mu.Lock() defer s.mu.Unlock() s.tok = "" return nil } // TestPairExchangeFailure_NoStateChange:exchange 失敗 → state 維持 notPaired,TokenStore 不變。 func TestPairExchangeFailure_NoStateChange(t *testing.T) { store := NewMemoryTokenStore() m := NewManager(Config{ LocalAddr: "127.0.0.1:1", TokenStore: store, Exchanger: &failingExchanger{err: ErrTokenExpired}, }) defer m.Close() preState := m.Status().State err := m.Pair(context.Background(), "vAc_0123456789abcdef0123456789abcdef") if !errors.Is(err, ErrTokenExpired) { t.Fatalf("err = %v, want ErrTokenExpired", err) } if got := m.Status().State; got != preState { t.Errorf("state changed unexpectedly: %v → %v", preState, got) } if tok, _ := store.Load(); tok != "" { t.Errorf("TokenStore should be empty, got %q", tok) } } // TestPairSaveFailure_NoStateChange_NoToken:exchange OK + Save 失敗 → state 不變,token 沒存。 func TestPairSaveFailure_NoStateChange_NoToken(t *testing.T) { store := &failingTokenStore{saveErr: errors.New("disk full")} m := NewManager(Config{ LocalAddr: "127.0.0.1:1", TokenStore: store, Exchanger: &fakeExchanger{ result: ExchangeResult{ SessionToken: "vAs_token_xyz", Account: "x@y", }, }, }) defer m.Close() preState := m.Status().State err := m.Pair(context.Background(), "vAc_0123456789abcdef0123456789abcdef") if err == nil { t.Fatalf("expected error from Save failure") } if got := m.Status().State; got != preState { t.Errorf("state changed: %v → %v (Pair Save failure should not transition)", preState, got) } if tok, _ := store.Load(); tok != "" { t.Errorf("token should not be persisted on Save fail, got %q", tok) } // cfg.SessionToken 不該被更新(因為 Save 失敗在更新前 return) if got := m.sessionToken(); got != "" { t.Errorf("cfg.SessionToken should remain empty on Save fail, got %q", got) } } // TestPairStartFailure_TokenStoredStateError:exchange + Save OK 但 Start 失敗 // → token 已存(可下次 Reconnect 試),state = error。 func TestPairStartFailure_TokenStoredStateError(t *testing.T) { store := NewMemoryTokenStore() m := NewManager(Config{ // 故意不給 LocalAddr → Start 會回 ErrMissingConfig TokenStore: store, Exchanger: &fakeExchanger{ result: ExchangeResult{ SessionToken: "vAs_paired_but_cant_start", RelayURL: "ws://example/relay", Account: "x@y", }, }, }) defer m.Close() err := m.Pair(context.Background(), "vAc_0123456789abcdef0123456789abcdef") if !errors.Is(err, ErrMissingConfig) { t.Fatalf("err = %v, want ErrMissingConfig", err) } // Token 已持久化(Reconnect 可重試) if tok, _ := store.Load(); tok != "vAs_paired_but_cant_start" { t.Errorf("token should be persisted even when Start fails, got %q", tok) } // state = error,UI 可顯示「配對成功但連線失敗」 if got := m.Status().State; got != StateError { t.Errorf("state = %v, want StateError after Start fail", got) } if errStr := m.Status().LastError; errStr == "" { t.Error("LastError should be set on Start failure") } } // --------------------------------------------------------------------- // Fix-A5:Lifecycle method 並發保護測試 // --------------------------------------------------------------------- // TestLifecycleConcurrentRaceFree:50 個 goroutine 同時呼叫 Pair / Reconnect / Unpair / Start / Stop // 應該 race-free(go test -race 不報錯)+ 不 panic。 // // 不驗證最終 state 是哪個(取決於哪個 lifecycle method 最後贏),只驗證: // - 沒 race detector 警告 // - 沒 panic(emit / state machine / store 不會 corrupt) // - Manager 仍處於可用狀態(最後 Status() 可取) func TestLifecycleConcurrentRaceFree(t *testing.T) { fr := newFakeRelay(t) defer fr.close() ln, _ := net.Listen("tcp", "127.0.0.1:0") defer ln.Close() store := NewMemoryTokenStore() _ = store.Save("vAs_initial_token") m := NewManager(Config{ RelayURL: fr.wsURL(), SessionToken: "vAs_initial_token", LocalAddr: ln.Addr().String(), TokenStore: store, Exchanger: &fakeExchanger{ result: ExchangeResult{ SessionToken: "vAs_paired_" + strings.Repeat("a", 56), Account: "concurrent@test", RelayURL: fr.wsURL(), }, }, MaxRetry: 1, ReconnectMode: ReconnectAuto, }) defer m.Close() const numGoroutines = 50 var wg sync.WaitGroup wg.Add(numGoroutines) pairToken := "vAc_0123456789abcdef0123456789abcdef" ctx := context.Background() for i := 0; i < numGoroutines; i++ { go func(idx int) { defer wg.Done() defer func() { // 任何 panic 都讓測試 fail(透過 recover 確保 wg.Done 仍跑) if r := recover(); r != nil { t.Errorf("goroutine %d panicked: %v", idx, r) } }() switch idx % 5 { case 0: _ = m.Pair(ctx, pairToken) case 1: _ = m.Reconnect(ctx) case 2: _ = m.Unpair() case 3: _ = m.Start(ctx) case 4: _ = m.Stop() } }(i) } wg.Wait() // Manager 仍可用:Status() 不 panic、可取到合法 ConnectionStatus snap := m.Status() switch snap.State { case StateNotPaired, StateConnecting, StateOnline, StateReconnecting, StateOffline, StateError: // OK,任一合法狀態都接受 default: t.Errorf("Status() returned invalid state: %q", snap.State) } // attemptNo 不該爆出負值或極端值 if snap.AttemptNo < 0 { t.Errorf("AttemptNo negative: %d", snap.AttemptNo) } } // TestPairReconnectSerialization:Pair 和 Reconnect 並發呼叫應該序列化執行 // (透過驗證 attemptNo 不會混亂)。 // // 場景:使用者點 Pair 按鈕的同時又連點 Reconnect → 沒有 lifecycleMu 時 attempt 計數 // 可能在兩個 goroutine 間交錯增減;有 lifecycleMu 後兩個操作序列化。 func TestPairReconnectSerialization(t *testing.T) { fr := newFakeRelay(t) defer fr.close() ln, _ := net.Listen("tcp", "127.0.0.1:0") defer ln.Close() m := NewManager(Config{ LocalAddr: ln.Addr().String(), TokenStore: NewMemoryTokenStore(), Exchanger: &fakeExchanger{ result: ExchangeResult{ SessionToken: "vAs_" + strings.Repeat("b", 64), Account: "serialize@test", RelayURL: fr.wsURL(), }, }, }) defer m.Close() // 先 Pair 一次讓 manager 進入 running if err := m.Pair(context.Background(), "vAc_0123456789abcdef0123456789abcdef"); err != nil { t.Fatalf("initial Pair: %v", err) } waitForState(t, m, StateOnline, 3*time.Second) // 並發呼叫多次 Reconnect — 每次都應該序列化執行(不會 race) var wg sync.WaitGroup for i := 0; i < 20; i++ { wg.Add(1) go func() { defer wg.Done() _ = m.Reconnect(context.Background()) }() } wg.Wait() // 等待 manager 穩定回 online waitForState(t, m, StateOnline, 5*time.Second) // Manager 仍可用,attemptNo 應為 0(最後一次 Reconnect 後 onSessionUp 重置) if got := atomic.LoadInt32(&m.attemptNo); got < 0 { t.Errorf("attemptNo invalid: %d", got) } } // TestMaskSessionToken 驗證遮蔽格式。 func TestMaskSessionToken(t *testing.T) { cases := []struct { in string want string }{ {"vAs_a1b2c3d4e5f6789abcdef0000111122223333444455556666777788889999e7f8", "vAs_a1b2c3d4 ··· e7f8"}, {"not-a-token", ""}, {"vAs_short", ""}, } for _, tc := range cases { if got := MaskSessionToken(tc.in); got != tc.want { t.Errorf("MaskSessionToken(%q) = %q, want %q", tc.in, got, tc.want) } } }