// manager.go — Tunnel lifecycle 完整管理(AB5 範圍)。 // // 本檔實作 TDD §4 + §6.4 定義的 tunnel 狀態機,負責: // // 1. Session Token 配對(Pair / Unpair)— 委派給 PairingExchanger。 // 2. Client 生命週期(Start / Stop / Reconnect)。 // 3. 狀態機六狀態(notPaired / connecting / online / reconnecting / offline / error)。 // 4. 重連策略(指數退避 + max retry + auto/manual mode)。 // 5. 心跳監控(依賴 yamux 的 10s/30s 規則 + Client hooks 的 session down 通知)。 // 6. 狀態事件推送(Subscribe / fanout;上層 Wails app 可 emit "connection:status")。 // // 設計原則: // // - 單一寫入者:所有狀態變更都走 transition() 一個函式,包 mu 保護。 // - 事件 fanout 不持鎖:避免 listener 呼叫 Manager 方法造成 deadlock。 // - Manager 本身 thread-safe,Public API(Pair / Unpair / Start / Stop / Reconnect / // Status / Subscribe)都可從任意 goroutine 呼叫。 // - 錯誤只 log、不 panic(雛形階段容錯優先;崩了整個 Wails app 會倒)。 // // 參考: // - .autoflow/04-architecture/visiona-agent-tdd.md §4.3 / §4.4 / §4.5 / §6.4 // - .autoflow/04-architecture/tunnel.md §4.2 // - .autoflow/03-design/visiona-agent-spec.md §4 / §5 package tunnel import ( "context" "errors" "fmt" "sync" "sync/atomic" "time" ) // ConnectionState 對應 TDD §6.4:六種狀態機狀態。 // ⚠️ JSON 值必須對齊 frontend `src/types/agent.ts ConnectionState`。 type ConnectionState string const ( // StateNotPaired:無 session token,等待使用者配對。 StateNotPaired ConnectionState = "notPaired" // StateConnecting:首次建立 tunnel(尚未成功建立 yamux session)。 StateConnecting ConnectionState = "connecting" // StateOnline:tunnel 連線正常。 StateOnline ConnectionState = "online" // StateReconnecting:曾連上後掉線,正在重試中。 StateReconnecting ConnectionState = "reconnecting" // StateOffline:手動斷線或所有重試用盡後的停等狀態。等待 Reconnect() 或 Unpair()。 StateOffline ConnectionState = "offline" // StateError:致命錯誤(config 無效、token 被撤銷等),需要使用者介入。 StateError ConnectionState = "error" ) // ReconnectMode 控制 max retry 用盡後的行為。 type ReconnectMode string const ( // ReconnectAuto:達到 MaxRetry 後繼續嘗試(退避 cap 住,不會無限退避變大)。 // 對應設定頁「自動重連」。 ReconnectAuto ReconnectMode = "auto" // ReconnectManual:達到 MaxRetry 後停在 StateOffline,等使用者點「重新連線」。 // 對應設定頁「手動重連」。 ReconnectManual ReconnectMode = "manual" ) // Config 是 Manager 的必要輸入。 // // SessionToken / RelayURL / Account 三個欄位會在 Pair() 成功後更新到 Manager 內部 // 的可變欄位(cfg)。呼叫者不應在 Manager 啟動後直接改 Config。 type Config struct { // RelayURL 是 wss://host/tunnel/connect 格式的 URL。 // Pair() 時若雲端回傳 RelayURL 會覆蓋此值。 RelayURL string // SessionToken 是雲端下發的 Session Token(vAs_...)。 // 若啟動時為空但 TokenStore 有值,Start() 會從 TokenStore 載入。 SessionToken string // Account 是雲端帳號 email,雛形由 Pair() exchange 回應或 mock 預設值提供。 // 空字串代表尚未配對或 AB11 未上線(不影響功能,UI 顯示 "—")。 Account string // LocalAddr 是 "host:port" 格式,tunnel 會把進來的 HTTP 轉發到這裡。 // 通常是 "127.0.0.1:"。 LocalAddr string // Logger 可選。nil 時 Client 會用 log.Default()。 Logger Logger // TokenStore 用於持久化 Session Token。nil 時 Manager 會建立一個 in-memory store。 TokenStore TokenStore // Exchanger 用於 Pair() 的 /api/pairing/exchange 呼叫。nil 時 Manager 會建立 // 一個預設 HTTPPairingExchanger(需要 CloudAPIURL 才能用)。 Exchanger PairingExchanger // MaxRetry 為重試上限(0 代表使用預設 5 次;負值代表無上限)。 // 達上限後:Auto 模式 → 退避 cap 住繼續重試;Manual 模式 → 停在 offline。 MaxRetry int // ReconnectMode 控制 max retry 耗盡後的行為(預設 Auto)。 ReconnectMode ReconnectMode } // 預設值。 const ( defaultMaxRetry = 5 ) // ConnectionStatus 是對外暴露的快照(對齊 TDD §6.4 Snapshot + frontend types/agent.ts)。 // // ⚠️ json tag 必須對齊 frontend 的 ConnectionSnapshot 介面。 type ConnectionStatus struct { State ConnectionState `json:"state"` // LastError 在 StateError / reconnecting 時有值。 LastError string `json:"error,omitempty"` // AttemptNo 目前是第幾次重試(連線中 / reconnecting 時有值)。 AttemptNo int `json:"attemptNo,omitempty"` // RelayURL 當前設定的 relay(未配對也會帶,供 InfoCard 顯示)。 RelayURL string `json:"relayUrl"` // Account 配對帳號 email;空字串代表未配對。 Account string `json:"account,omitempty"` // ConnectedSince 是最近一次進入 online 狀態的時間(nil 代表未曾上線)。 ConnectedSince *time.Time `json:"connectedSince,omitempty"` // LastSeenAt 最近一次 session 活著的時間(yamux 正常建立即更新; // 更精細的 pong-level 追蹤留待 Phase 1 改用 yamux 原生 stats)。 LastSeenAt *time.Time `json:"lastSeenAt,omitempty"` // SessionTokenPreview 遮蔽後的 token 顯示("vAs_a1b2c3d4 ··· e7f8")。 SessionTokenPreview string `json:"sessionTokenPreview,omitempty"` } // StatusListener 是狀態變更的觀察者 callback。 // Manager 會在每次 transition 後(含 Pair/Unpair/Start/Stop 觸發的)呼叫所有 listener。 // // ⚠️ listener 不應阻塞太久(Manager 會在獨立 goroutine 呼叫,但同一時序仍是 serialized)。 type StatusListener func(ConnectionStatus) // Manager 管理 tunnel client 的 lifecycle + 狀態機。 type Manager struct { // lifecycleMu 序列化 lifecycle methods(Pair / Unpair / Start / Stop / Reconnect)。 // 不保護 cfg / state — 這些由 mu / atomic / mu 各自負責。 // // 為什麼需要:使用者可能快速連按 UI 按鈕(Pair → Reconnect → Pair)— 沒有此鎖 // 兩個 lifecycle 操作會交錯執行,導致 attemptNo 計數錯亂、cfg race、state 半完成。 // 此鎖確保任一時刻只有一個 lifecycle 操作在進行。 // // 實作注意:lifecycle methods 呼叫對方時要走 *Locked 版本(不再取鎖),避免遞迴 // 死鎖;Public method 進入時取鎖,return 前釋放。 // // 詳見 Fix-A5。 lifecycleMu sync.Mutex mu sync.Mutex // 保護 cfg / client / current / startedAt 等可變欄位 cfg Config // state 用 atomic.Value 以加速 Status() 快速讀取(不搶 mu)。 state atomic.Value // ConnectionState current ConnectionStatus // under mu:最新快照 connectedSince *time.Time // under mu lastSeenAt *time.Time // under mu attemptNo int32 // atomic lastError atomic.Value // string client *Client // under mu // 管理 run 迴圈 cancel ctx context.Context cancel context.CancelFunc running atomic.Bool // 是否已 Start()(Stop 後會 reset) // listeners:fanout events。Subscribe 回傳 unsubscribe func。 listenerMu sync.Mutex listeners map[int]StatusListener listenerNextID int emitCh chan ConnectionStatus // 單工 channel,避免 listener 阻塞觸發 emitOnce sync.Once emitCloseOnce sync.Once emitClosedCh chan struct{} // emitClosed 在 Close() 把 emitCh 關閉前就被 set;所有 emit() 呼叫者先檢查這個 // atomic flag 避免對已關閉 channel 寫入 panic。此 race 的觸發情境:測試中 // defer cancel() + defer m.Close() 順序是 Close 先跑(LIFO),Close 關 emitCh // 後 ctx 才被 cancel,Start 起的 ctx.Done→Stop goroutine 這時呼叫 Stop→ // transition→emit,會對已關 channel 送資料。AB6 測試發現(race detector 100% // 必觸發,單跑非 race 約 1/5 機率)。 emitClosed atomic.Bool } // 狀態錯誤。 var ( ErrAlreadyStarted = errors.New("tunnel manager already started") ErrNotStarted = errors.New("tunnel manager not started") ErrMissingConfig = errors.New("tunnel manager missing required config (RelayURL / SessionToken / LocalAddr)") ErrNotPaired = errors.New("tunnel manager not paired (no session token)") ErrAlreadyPaired = errors.New("tunnel manager already paired; call Unpair() first") ) // NewManager 建立 Manager。不立即連線;需呼叫 Start() 或 Pair()。 // // cfg 可部分為空: // - SessionToken == "" 時 Manager 會在 Start() 嘗試從 TokenStore 載入。 // - TokenStore == nil 時 Manager 會建立 in-memory store(AB7 前行為)。 // - Exchanger == nil 時呼叫 Pair() 會回錯;Start() 不受影響。 func NewManager(cfg Config) *Manager { if cfg.TokenStore == nil { cfg.TokenStore = NewMemoryTokenStore() } if cfg.MaxRetry == 0 { cfg.MaxRetry = defaultMaxRetry } if cfg.ReconnectMode == "" { cfg.ReconnectMode = ReconnectAuto } m := &Manager{ cfg: cfg, listeners: make(map[int]StatusListener), emitCh: make(chan ConnectionStatus, 16), emitClosedCh: make(chan struct{}), } m.state.Store(StateNotPaired) m.lastError.Store("") // 初始 current snapshot:若 config 已帶 SessionToken,進 offline(等 Start 決定);否則 notPaired。 initial := StateNotPaired if cfg.SessionToken != "" { initial = StateOffline } m.current = m.buildSnapshotLocked(initial, "", 0) m.state.Store(initial) // 啟動 fanout goroutine(lazy) m.startEmitterOnce() return m } // Start 啟動 tunnel client。若 SessionToken 為空,嘗試從 TokenStore 載入;仍無則回 ErrNotPaired。 // // 冪等:若已啟動,回 ErrAlreadyStarted(呼叫者可先 Stop 再 Start)。 // ctx == nil 時用 context.Background()。 // // 並發保護(Fix-A5):透過 lifecycleMu 與 Pair / Unpair / Stop / Reconnect 序列化。 func (m *Manager) Start(ctx context.Context) error { m.lifecycleMu.Lock() defer m.lifecycleMu.Unlock() return m.startLocked(ctx) } // startLocked 是 Start 的內部版本,假設 lifecycleMu 已被持有(呼叫者責任)。 // 用於 Pair / Reconnect 等 lifecycle method 內部呼叫,避免遞迴死鎖。 func (m *Manager) startLocked(ctx context.Context) error { if ctx == nil { ctx = context.Background() } m.mu.Lock() if m.running.Load() { m.mu.Unlock() return ErrAlreadyStarted } // 嘗試補 SessionToken(Pair 後重啟 / 重連場景) if m.cfg.SessionToken == "" && m.cfg.TokenStore != nil { if tok, err := m.cfg.TokenStore.Load(); err == nil && tok != "" { m.cfg.SessionToken = tok } } if m.cfg.SessionToken == "" { m.mu.Unlock() m.transition(StateNotPaired, "", 0) return ErrNotPaired } if m.cfg.RelayURL == "" || m.cfg.LocalAddr == "" { m.mu.Unlock() return ErrMissingConfig } childCtx, cancel := context.WithCancel(ctx) m.ctx = childCtx m.cancel = cancel m.running.Store(true) m.client = m.newClientLocked() m.mu.Unlock() m.transition(StateConnecting, "", 0) m.mu.Lock() client := m.client m.mu.Unlock() if client != nil { client.Start() } // ctx cancel 時自動 Stop(避免呼叫者忘了收尾)。 // // 並發 race(Fix-A5 配套修復): // - Reconnect / ApplyRelaySettings 內部會呼叫 stopLocked → cancel() → 觸發此 goroutine // - 之後 Reconnect 立即呼叫 startLocked 建立新 client + 新 ctx // - 若此 goroutine 走 m.Stop() 會誤停剛建好的新 client // // 保護方式:此 goroutine 在 cancel 觸發後,只有當 m.ctx 仍等於自己對應的 childCtx // 時才執行 stopLocked;否則代表 manager 已被 restarted,此 goroutine 屬於上一輪, // 不該動最新的 state。 go func(myCtx context.Context) { <-myCtx.Done() m.lifecycleMu.Lock() defer m.lifecycleMu.Unlock() m.mu.Lock() stale := m.ctx != myCtx m.mu.Unlock() if stale { return } _ = m.stopLocked() }(childCtx) return nil } // newClientLocked 建立 Client 並掛 hooks。必須持有 m.mu。 func (m *Manager) newClientLocked() *Client { c := NewClient(m.cfg.RelayURL, m.cfg.SessionToken, m.cfg.LocalAddr, m.cfg.Logger) c.SetHooks(ClientHooks{ OnDialAttempt: func(attempt int) { m.onDialAttempt(attempt) }, OnSessionUp: func() { m.onSessionUp() }, OnSessionDown: func(err error) { m.onSessionDown(err) }, OnDialFailed: func(attempt int, err error) { m.onDialFailed(attempt, err) }, OnRetryScheduled: func(attempt int, delay time.Duration) { // log-only;Manager 已經透過 OnDialFailed 轉進 reconnecting 狀態。 if m.cfg.Logger != nil { m.cfg.Logger.Printf("[tunnel] retry #%d scheduled in %v", attempt, delay) } }, }) return c } // Stop 停止 tunnel client 並把 state 改為 offline(仍保留 session token 以便 Reconnect)。 // 冪等;已停止時回 nil。 // // 並發保護(Fix-A5):透過 lifecycleMu 與其他 lifecycle method 序列化。 func (m *Manager) Stop() error { m.lifecycleMu.Lock() defer m.lifecycleMu.Unlock() return m.stopLocked() } // stopLocked 是 Stop 的內部版本,假設 lifecycleMu 已被持有。 func (m *Manager) stopLocked() error { m.mu.Lock() if !m.running.Load() { m.mu.Unlock() return nil } m.running.Store(false) client := m.client m.client = nil if m.cancel != nil { m.cancel() m.cancel = nil } m.mu.Unlock() if client != nil { client.Stop() } // Stop 後進入 offline(未配對場景走 notPaired) next := StateOffline if m.sessionToken() == "" { next = StateNotPaired } m.transition(next, "", 0) return nil } // Reconnect 手動觸發重連。若 Manager 已在 running 則先 Stop 再 Start(避免重覆 client)。 // 對應 Design spec §4.5「立即重試」按鈕。 // // 並發保護(Fix-A5):透過 lifecycleMu 與其他 lifecycle method 序列化。 func (m *Manager) Reconnect(ctx context.Context) error { m.lifecycleMu.Lock() defer m.lifecycleMu.Unlock() if m.running.Load() { if err := m.stopLocked(); err != nil { return err } } atomic.StoreInt32(&m.attemptNo, 0) return m.startLocked(ctx) } // Pair 呼叫 PairingExchanger 換 Session Token,儲存後自動啟動 tunnel。 // // 流程對齊 TDD §4.3 雛形流程: // 1. 驗證 pairing token 格式(vAc_ + 32 hex)— Exchanger 內建 // 2. 呼叫雲端 /api/pairing/exchange(或 mock mode 本地產) // 3. 儲存 Session Token 到 TokenStore // 4. 更新 cfg(RelayURL / Account 若雲端有回傳則覆蓋) // 5. 若 Manager 已在 running 先 Stop,然後 Start // // 失敗 / 部分失敗的 state 處理(Fix-A4): // // ┌──────────────────────────┬─────────────┬──────────┬──────────────────────┐ // │ 失敗點 │ TokenStore │ 終態 │ 理由 │ // ├──────────────────────────┼─────────────┼──────────┼──────────────────────┤ // │ exchange 失敗 │ 不變 │ 不變 │ 沒做任何改變 │ // │ exchange OK / Save 失敗 │ 不變 │ notPaired │ 確保 cfg/state 一致 │ // │ exchange / Save OK / Start 失敗 │ token 已存 │ error │ token 持久化成功,可 Reconnect 重試 │ // └──────────────────────────┴─────────────┴──────────┴──────────────────────┘ // // 並發保護(Fix-A5):lifecycleMu 確保 Pair / Unpair / Start / Stop / Reconnect // 序列化執行,避免使用者快速連按按鈕造成 attempt 計數錯亂或 cfg race。 func (m *Manager) Pair(ctx context.Context, pairingToken string) error { m.lifecycleMu.Lock() defer m.lifecycleMu.Unlock() if m.cfg.Exchanger == nil { return errors.New("tunnel manager: no pairing exchanger configured") } if err := ValidatePairingToken(pairingToken); err != nil { return err } result, err := m.cfg.Exchanger.Exchange(pairingToken) if err != nil { // 不 transition 到 error — Pair 失敗由呼叫端決定 UI 反應;Manager 維持當前狀態。 // TokenStore 沒被動過、cfg 沒被動過,符合「失敗回到呼叫前」原則。 return err } // 儲存 token + 更新 cfg m.mu.Lock() if err := m.cfg.TokenStore.Save(result.SessionToken); err != nil { m.mu.Unlock() // Save 失敗 → cfg 沒被改、TokenStore 也沒被改(Save 失敗即不寫入)。 // 確保 state 是 notPaired(如果原本就是)— 不主動 transition 避免覆蓋 error 狀態。 // 呼叫者拿到 error 後決定 UI 反應。 return fmt.Errorf("save session token: %w", err) } m.cfg.SessionToken = result.SessionToken if result.RelayURL != "" { m.cfg.RelayURL = result.RelayURL } if result.Account != "" { m.cfg.Account = result.Account } wasRunning := m.running.Load() m.mu.Unlock() // 若已在跑(改配對 to 新 token),先 Stop if wasRunning { _ = m.stopLocked() } // 重置 attempt 並啟動 atomic.StoreInt32(&m.attemptNo, 0) if err := m.startLocked(ctx); err != nil { // Start 失敗 → token 已持久化(這個取捨:可下次 Reconnect 試)。 // state 顯式設為 error,讓 UI 顯示「配對成功但連線失敗」可動作的訊息。 m.transition(StateError, fmt.Sprintf("pair: start failed: %v", err), 0) return err } return nil } // Unpair 清除 Session Token + 停 tunnel + 重置狀態到 notPaired。 // 對應 Design spec §4.2 「重新配對」按鈕 + §6.2.4 設定頁「清除本地資料」。 // // 並發保護(Fix-A5):透過 lifecycleMu 與其他 lifecycle method 序列化。 func (m *Manager) Unpair() error { m.lifecycleMu.Lock() defer m.lifecycleMu.Unlock() // 先停 tunnel _ = m.stopLocked() m.mu.Lock() if err := m.cfg.TokenStore.Delete(); err != nil { m.mu.Unlock() return fmt.Errorf("delete session token: %w", err) } m.cfg.SessionToken = "" m.cfg.Account = "" m.connectedSince = nil m.lastSeenAt = nil m.mu.Unlock() atomic.StoreInt32(&m.attemptNo, 0) m.transition(StateNotPaired, "", 0) return nil } // Status 回傳目前狀態快照。可從任意 goroutine 呼叫。 func (m *Manager) Status() ConnectionStatus { m.mu.Lock() defer m.mu.Unlock() // 重新組 snapshot 以帶上最新 attemptNo(atomic) attempt := int(atomic.LoadInt32(&m.attemptNo)) errStr := "" if v := m.lastError.Load(); v != nil { if s, ok := v.(string); ok { errStr = s } } state := m.stateValue() snap := m.buildSnapshotLocked(state, errStr, attempt) // 更新 current 以便 Subscribe 的 initial emit 拿得到一致快照 m.current = snap return snap } // Subscribe 註冊 listener 接收狀態變更。 // 回傳的 unsubscribe func 是冪等的。Subscribe 後會立即以目前快照觸發一次,讓訂閱者拿到 initial state。 func (m *Manager) Subscribe(listener StatusListener) (unsubscribe func()) { if listener == nil { return func() {} } m.listenerMu.Lock() id := m.listenerNextID m.listenerNextID++ m.listeners[id] = listener m.listenerMu.Unlock() // initial emit — 用 Status() 拿最新快照(含 attemptNo) go func(l StatusListener) { defer recoverListener("initial emit") l(m.Status()) }(listener) var once sync.Once return func() { once.Do(func() { m.listenerMu.Lock() delete(m.listeners, id) m.listenerMu.Unlock() }) } } // ApplyRelaySettings 更新 cfg 中的 RelayURL / ReconnectMode,必要時 bounce 連線。 // // 由 Wails binding `SaveAgentSettings` 呼叫,對應 Design spec §6.2.1「Relay URL // 變更不會立即套用;下方顯示未套用 badge」——我們這裡是立即套用版本:若 // RelayURL 改變且 Manager 正在 running,執行 Stop+Start;若只改 ReconnectMode // 則不 bounce。 // // ctx 在需要 Start 時使用;nil 時用 context.Background()。 // // 回傳 bool 表示是否觸發了 reconnect。錯誤只在 Start 失敗時回傳。 // // 並發保護(Fix-A5):透過 lifecycleMu 與其他 lifecycle method 序列化。 // (此方法可能會呼叫 Stop+Start,跟 lifecycle 同類型操作。) func (m *Manager) ApplyRelaySettings(ctx context.Context, relayURL string, reconnectMode ReconnectMode) (bool, error) { m.lifecycleMu.Lock() defer m.lifecycleMu.Unlock() if ctx == nil { ctx = context.Background() } if reconnectMode == "" { reconnectMode = ReconnectAuto } m.mu.Lock() oldRelay := m.cfg.RelayURL m.cfg.RelayURL = relayURL m.cfg.ReconnectMode = reconnectMode wasRunning := m.running.Load() m.mu.Unlock() // 只有 URL 改變 + 目前在跑 才需要 bounce if oldRelay != relayURL && wasRunning { if err := m.stopLocked(); err != nil { return false, err } atomic.StoreInt32(&m.attemptNo, 0) if err := m.startLocked(ctx); err != nil { return true, err } return true, nil } // 未 bounce 也要 emit 一次新 snapshot(RelayURL 會反映到 Status()) m.transition(m.stateValue(), m.errorString(), int(atomic.LoadInt32(&m.attemptNo))) return false, nil } // Close 釋放 Manager 所有資源(停 fanout goroutine)。 // 通常只在 Wails app shutdown 呼叫;測試也會用。 // // Close 內部呼叫 Stop(取 lifecycleMu)。如果 Close 與其他 lifecycle method 並發, // Close 會等對方完成才執行 Stop;可接受(Close 是 shutdown 路徑,等一下無妨)。 func (m *Manager) Close() error { _ = m.Stop() m.emitCloseOnce.Do(func() { // 先 set flag 再 close,讓其他 goroutine 即將呼叫 emit 的人跳過寫入。 m.emitClosed.Store(true) close(m.emitCh) <-m.emitClosedCh }) return nil } // ------------------------------------------------------------------ // Internal: state transition + event fanout // ------------------------------------------------------------------ // transition 是唯一更新狀態的通道。更新後透過 emitCh 非同步廣播給 listeners。 // // nextState:目標狀態 // errStr:若為錯誤相關狀態,帶錯誤字串;其他情況傳 "" // attempt:reconnecting 時的 attempt 數;其他情況 0 func (m *Manager) transition(nextState ConnectionState, errStr string, attempt int) { m.mu.Lock() prevState := m.stateValue() m.state.Store(nextState) m.lastError.Store(errStr) if attempt > 0 { atomic.StoreInt32(&m.attemptNo, int32(attempt)) } else if nextState == StateOnline || nextState == StateNotPaired || nextState == StateOffline { atomic.StoreInt32(&m.attemptNo, 0) } if nextState == StateOnline { now := time.Now() m.connectedSince = &now m.lastSeenAt = &now } if nextState == StateNotPaired { m.connectedSince = nil m.lastSeenAt = nil } snap := m.buildSnapshotLocked(nextState, errStr, int(atomic.LoadInt32(&m.attemptNo))) m.current = snap m.mu.Unlock() if prevState != nextState || errStr != "" { m.emit(snap) } } // buildSnapshotLocked 組裝 ConnectionStatus。呼叫者必須持有 m.mu。 func (m *Manager) buildSnapshotLocked(state ConnectionState, errStr string, attempt int) ConnectionStatus { s := ConnectionStatus{ State: state, LastError: errStr, AttemptNo: attempt, RelayURL: m.cfg.RelayURL, Account: m.cfg.Account, } if m.cfg.SessionToken != "" { s.SessionTokenPreview = MaskSessionToken(m.cfg.SessionToken) } if m.connectedSince != nil { t := *m.connectedSince s.ConnectedSince = &t } if m.lastSeenAt != nil { t := *m.lastSeenAt s.LastSeenAt = &t } return s } // emit 把快照丟進 fanout channel;channel 滿時 drop oldest(避免 listener 慢拖垮 Manager)。 // // Close() 之後呼叫 emit 是允許的(Close 後仍會有 Stop→transition→emit 的尾聲), // emitClosed flag 確保不會對已關 channel 送資料。 func (m *Manager) emit(snap ConnectionStatus) { if m.emitClosed.Load() { return } // 即便上方檢查通過,Close 仍可能在這瞬間被呼叫。defer recover 當最後防線; // channel full fallback 的 drop-oldest 邏輯也放在 recover 內部路徑中,保 // 證 state transition 永遠不 panic。 defer func() { if r := recover(); r != nil { // race window:flag 已設但 close 還沒做完 → 我們不寫也不 drain,直接吃掉。 _ = r } }() select { case m.emitCh <- snap: default: // Channel 滿 → drop 最舊一筆再放入(不會阻塞 state transition)。 select { case <-m.emitCh: default: } select { case m.emitCh <- snap: default: } } } // startEmitterOnce 啟動 fanout goroutine。所有 listener 都在這個 goroutine 內 serialize 呼叫。 func (m *Manager) startEmitterOnce() { m.emitOnce.Do(func() { go func() { defer close(m.emitClosedCh) for snap := range m.emitCh { m.listenerMu.Lock() // 複製 listener slice,釋放鎖再呼叫,避免 listener 又呼叫 Manager 方法造成 deadlock ls := make([]StatusListener, 0, len(m.listeners)) for _, l := range m.listeners { ls = append(ls, l) } m.listenerMu.Unlock() for _, l := range ls { func(l StatusListener) { defer recoverListener("fanout") l(snap) }(l) } } }() }) } func recoverListener(tag string) { if r := recover(); r != nil { // 不讓 listener panic 拖垮 Manager。 // 實務上應有結構化 log,雛形階段忽略。 _ = r _ = tag } } // sessionToken 提供 thread-safe 讀 SessionToken 的捷徑。 func (m *Manager) sessionToken() string { m.mu.Lock() defer m.mu.Unlock() return m.cfg.SessionToken } func (m *Manager) stateValue() ConnectionState { if v := m.state.Load(); v != nil { if s, ok := v.(ConnectionState); ok { return s } } return StateNotPaired } // ------------------------------------------------------------------ // Client hook handlers // ------------------------------------------------------------------ // onDialAttempt:每次 Client.connect() 呼叫前觸發。 // attempt=1 代表首次;>1 代表重試。 func (m *Manager) onDialAttempt(attempt int) { atomic.StoreInt32(&m.attemptNo, int32(attempt)) // 決定要進哪一個狀態:首次且還沒上過線 → connecting;否則 reconnecting。 curr := m.stateValue() var next ConnectionState switch curr { case StateOnline, StateReconnecting: next = StateReconnecting case StateConnecting: next = StateConnecting default: if attempt == 1 { next = StateConnecting } else { next = StateReconnecting } } // Max retry 判斷(只在 Manual 模式生效;Auto 模式 cap 住退避、永遠重試) if m.shouldStopForMaxRetry(attempt) { // 停止 client,state 改 offline go func() { _ = m.Stop() }() return } errStr := "" if v := m.lastError.Load(); v != nil { if s, ok := v.(string); ok { errStr = s } } m.transition(next, errStr, attempt) } // onSessionUp:yamux session 成功建立。 func (m *Manager) onSessionUp() { atomic.StoreInt32(&m.attemptNo, 0) m.lastError.Store("") m.transition(StateOnline, "", 0) } // onSessionDown:session 已結束(非 dial 失敗)。通常是 relay 斷線、heartbeat timeout。 // Manager 不主動 Stop,讓 Client.run() 的重連迴圈接手;Manager 只把 state 改為 reconnecting。 func (m *Manager) onSessionDown(err error) { if !m.running.Load() { // 呼叫者已經 Stop → 不要覆寫 offline/notPaired 狀態。 return } errStr := "" if err != nil { errStr = err.Error() } m.lastError.Store(errStr) // lastSeenAt 更新成掉線時間,讓 UI 能算「已離線多久」 m.mu.Lock() now := time.Now() m.lastSeenAt = &now m.mu.Unlock() // 先進 reconnecting;下一次 OnDialAttempt 會帶 attempt 數。 m.transition(StateReconnecting, errStr, 1) } // onDialFailed:connect() 建立失敗(dial / yamux handshake)。 func (m *Manager) onDialFailed(attempt int, err error) { if err != nil { m.lastError.Store(err.Error()) } // 已經在 OnDialAttempt 進了 connecting / reconnecting;這裡只更新 error 字串。 // 不重新 transition state(避免多餘 event),但 update snapshot 供下次 Status() 帶新 error。 m.mu.Lock() m.current = m.buildSnapshotLocked(m.stateValue(), m.errorString(), attempt) snap := m.current m.mu.Unlock() m.emit(snap) } func (m *Manager) errorString() string { if v := m.lastError.Load(); v != nil { if s, ok := v.(string); ok { return s } } return "" } // shouldStopForMaxRetry 只在 manual 模式生效。Auto 模式永遠 false(由 Client backoff cap 擋住)。 func (m *Manager) shouldStopForMaxRetry(attempt int) bool { m.mu.Lock() mode := m.cfg.ReconnectMode max := m.cfg.MaxRetry m.mu.Unlock() if mode != ReconnectManual { return false } if max <= 0 { return false } return attempt > max }