// Package relay 實作 remote-proxy 端的 tunnel server 與通用代理轉發。 // // 核心職責: // - 接受 local agent 的 `/tunnel/connect` WebSocket upgrade,建立 yamux session // - 把 session 註冊到 session.Store(由 remote-proxy 唯一持有) // - 提供通用代理 `handleProxy`:依 token 找到 session → open stream → 轉發 HTTP/WS // - 提供 `/relay/status` 簡易連線狀態查詢 // // 從 POC `edge-ai-platform/server/internal/relay/server.go` 複製後改造: // 1. Session map → session.Store interface(由外部注入) // 2. yamux KeepAliveInterval:POC 的 30s → 10s(對齊 tunnel.md §4.2 M-5) // 3. Token 格式驗證(vAs_ + 64 hex 或 vAc_ + 32 hex 雛形可交替) // 4. 使用結構化 JSON logger(log/slog),不再用 `log.Printf` package relay import ( "bufio" "encoding/json" "errors" "fmt" "io" "log/slog" "net" "net/http" "strings" "sync" "time" "github.com/gorilla/websocket" "github.com/hashicorp/yamux" "visiona-backend/internal/auth" "visiona-backend/internal/session" "visiona-backend/internal/wsconn" ) // DefaultKeepAliveInterval 是 yamux 的心跳間隔(對齊 tunnel.md §4.2 M-5)。 // 連續 3 次未收到 pong(= 30s)即判定掉線。 const DefaultKeepAliveInterval = 10 * time.Second // DefaultConnectionWriteTimeout 是單一 yamux 寫入的最大等待時間。 const DefaultConnectionWriteTimeout = 10 * time.Second // Options 提供 NewServer 可選設定。 type Options struct { // KeepAliveInterval:yamux keep-alive 心跳;0 → 採用 DefaultKeepAliveInterval。 KeepAliveInterval time.Duration // ConnectionWriteTimeout:yamux 寫入 timeout;0 → 採用 DefaultConnectionWriteTimeout。 ConnectionWriteTimeout time.Duration // AllowedOrigins:WebSocket upgrade 的 Origin 白名單; // 空 slice → 接受任意 Origin(對齊 tunnel.md §4.1;local agent 非瀏覽器無 origin 風險)。 AllowedOrigins []string } // defaultOptions 建立含預設值的 Options。 func defaultOptions() Options { return Options{ KeepAliveInterval: DefaultKeepAliveInterval, ConnectionWriteTimeout: DefaultConnectionWriteTimeout, } } // Server 是 remote-proxy 端的 tunnel relay server。 // // 與 POC 的差異: // - 不自己維護 session map;全部委託 session.Store // - 允許注入 logger,行為與生產環境一致 type Server struct { store session.Store logger *slog.Logger opts Options upgrader websocket.Upgrader mu sync.Mutex shutdown bool } // NewServer 建立一個 relay.Server。 // // store:session.Store 實作(remote-proxy 端通常為 *session.InMemoryStore)。 // logger:結構化 logger;nil 時使用 slog.Default。 func NewServer(store session.Store, logger *slog.Logger, opts ...Options) *Server { if logger == nil { logger = slog.Default() } o := defaultOptions() if len(opts) > 0 { // 覆寫非零欄位 if opts[0].KeepAliveInterval > 0 { o.KeepAliveInterval = opts[0].KeepAliveInterval } if opts[0].ConnectionWriteTimeout > 0 { o.ConnectionWriteTimeout = opts[0].ConnectionWriteTimeout } o.AllowedOrigins = opts[0].AllowedOrigins } return &Server{ store: store, logger: logger, opts: o, upgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { // local agent 不跑在瀏覽器;預設放行任意 Origin。 // Phase 1 若有需要可對 AllowedOrigins 做比對。 if len(o.AllowedOrigins) == 0 { return true } origin := r.Header.Get("Origin") for _, a := range o.AllowedOrigins { if strings.EqualFold(a, origin) { return true } } return false }, }, } } // yamuxConfig 建構 yamux.Config — 套用我們統一的 10s keepalive。 func (s *Server) yamuxConfig(logOutput io.Writer) *yamux.Config { cfg := yamux.DefaultConfig() cfg.EnableKeepAlive = true cfg.KeepAliveInterval = s.opts.KeepAliveInterval cfg.ConnectionWriteTimeout = s.opts.ConnectionWriteTimeout if logOutput != nil { cfg.LogOutput = logOutput } return cfg } // HandleTunnelConnect 處理 local agent 的 WebSocket upgrade 請求。 // // Route: `GET /tunnel/connect?token=`(亦接受 `X-Relay-Token` header)。 // 流程: // 1. 取出 + 驗證 token 格式(vAs_ / vAc_) // 2. WebSocket upgrade // 3. 包成 net.Conn → yamux.Server // 4. 建 LocalHandle + store.Register(舊 session 會自動被 Close) // 5. 阻塞於 session.CloseChan;斷線後 Unregister func (s *Server) HandleTunnelConnect(w http.ResponseWriter, r *http.Request) { // B3 Review Minor #2 修補:shutdown 期間拒絕新的 tunnel upgrade, // 避免 graceful shutdown 過程中又有新 session 註冊進來。 s.mu.Lock() isShutdown := s.shutdown s.mu.Unlock() if isShutdown { writeJSONError(w, http.StatusServiceUnavailable, "SHUTTING_DOWN", "server is shutting down") return } tok := getToken(r) if tok == "" { writeJSONError(w, http.StatusUnauthorized, "NO_TOKEN", "token required") return } if !isAcceptableToken(tok) { writeJSONError(w, http.StatusUnauthorized, "INVALID_TOKEN_FORMAT", "token format invalid") return } wsConn, err := s.upgrader.Upgrade(w, r, nil) if err != nil { // Upgrader 失敗時已經寫了 HTTP 回應 s.logger.Warn("tunnel upgrade failed", "error", err, "remote_addr", r.RemoteAddr, "token_prefix", tokenPrefix(tok)) return } netConn := wsconn.New(wsConn) ymCfg := s.yamuxConfig(nil) ym, err := yamux.Server(netConn, ymCfg) if err != nil { s.logger.Error("yamux server creation failed", "error", err, "token_prefix", tokenPrefix(tok)) _ = wsConn.Close() return } handle := NewLocalHandle(ym, tok, r.RemoteAddr) // Register 會 Close 同 token 舊連線(Q5 裁決:後連覆蓋前連) if err := s.store.Register(r.Context(), tok, handle); err != nil { s.logger.Error("session register failed", "error", err, "token_prefix", tokenPrefix(tok)) _ = ym.Close() return } s.logger.Info("tunnel connected", "token_prefix", tokenPrefix(tok), "remote_addr", r.RemoteAddr, "keepalive_interval", s.opts.KeepAliveInterval.String()) // 阻塞到 yamux session 關閉 <-ym.CloseChan() // 只有在「當前還是這個 handle」時才移除,避免覆蓋後的舊流程意外刪了新的。 if cur, lookupErr := s.store.Lookup(r.Context(), tok); lookupErr == nil { if cur == handle { _ = s.store.Unregister(r.Context(), tok) } } else if errors.Is(lookupErr, session.ErrSessionNotFound) { // 已被清掉或已被新連線取代,無動作 } s.logger.Info("tunnel disconnected", "token_prefix", tokenPrefix(tok), "remote_addr", r.RemoteAddr) } // HandleRelayStatus 回報指定 token 的連線狀態(debug / health 用)。 // // Route: `GET /relay/status?token=`(或無 token → 全體線上數量)。 func (s *Server) HandleRelayStatus(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") tok := getToken(r) if tok != "" { h, err := s.store.Lookup(r.Context(), tok) if err != nil { // 不存在 → online=false _ = json.NewEncoder(w).Encode(map[string]any{ "online": false, }) return } sum := h.Summary() _ = json.NewEncoder(w).Encode(map[string]any{ "online": !h.IsClosed(), "connected_at": sum.ConnectedAt, "last_heartbeat": sum.LastHeartbeat, "remote_addr": sum.RemoteAddr, }) return } summaries, err := s.store.List(r.Context()) if err != nil { writeJSONError(w, http.StatusInternalServerError, "LIST_FAILED", err.Error()) return } _ = json.NewEncoder(w).Encode(map[string]any{ "online": len(summaries) > 0, "tunnel_count": len(summaries), }) } // HandleProxy 是通用的 HTTP 反向代理 handler。 // // 主要給 debug / 舊相容路徑用(POC 原本讓瀏覽器直接打 proxy); // 雛形正式流量走 `/internal/forward/http`(見 remote-proxy 的 main.go)。 // // Route: `Any /*`(或自行綁在其他 path)。 func (s *Server) HandleProxy(w http.ResponseWriter, r *http.Request) { tok := getToken(r) if tok == "" { writeJSONError(w, http.StatusUnauthorized, "NO_TOKEN", "X-Relay-Token header or token query param required") return } h, err := s.store.Lookup(r.Context(), tok) if err != nil || h.IsClosed() { writeJSONError(w, http.StatusBadGateway, "TUNNEL_DISCONNECTED", "edge agent is not connected") return } stream, err := h.OpenStream(r.Context()) if err != nil { s.logger.Warn("open stream failed", "error", err, "token_prefix", tokenPrefix(tok)) writeJSONError(w, http.StatusBadGateway, "TUNNEL_ERROR", "failed to open tunnel stream") return } defer stream.Close() // 不把 token / internal header 轉給 local agent r.Header.Del("X-Relay-Token") if isWebSocketUpgrade(r) { s.proxyWebSocket(w, r, stream) return } if err := r.Write(stream); err != nil { s.logger.Warn("write request to tunnel failed", "error", err) writeJSONError(w, http.StatusBadGateway, "TUNNEL_WRITE_ERROR", "failed to write request to tunnel") return } resp, err := http.ReadResponse(bufio.NewReader(stream), r) if err != nil { s.logger.Warn("read response from tunnel failed", "error", err) writeJSONError(w, http.StatusBadGateway, "TUNNEL_READ_ERROR", "failed to read response from tunnel") return } defer resp.Body.Close() for key, vals := range resp.Header { for _, v := range vals { w.Header().Add(key, v) } } w.WriteHeader(resp.StatusCode) // 串流支援(MJPEG / SSE):有 Flusher 就每塊 flush 一次 if flusher, ok := w.(http.Flusher); ok { buf := make([]byte, 32*1024) for { n, rerr := resp.Body.Read(buf) if n > 0 { if _, werr := w.Write(buf[:n]); werr != nil { return } flusher.Flush() } if rerr != nil { break } } } else { _, _ = io.Copy(w, resp.Body) } } // proxyWebSocket 處理瀏覽器(或 api-server 內部)的 WebSocket upgrade: // 把 upgrade request 透過 yamux stream 送到 local agent, // 再 Hijack 當前連線做雙向 pipe(POC 原始邏輯)。 func (s *Server) proxyWebSocket(w http.ResponseWriter, r *http.Request, stream net.Conn) { if err := r.Write(stream); err != nil { s.logger.Warn("ws: write upgrade request failed", "error", err) writeJSONError(w, http.StatusBadGateway, "TUNNEL_WRITE_ERROR", "failed to write upgrade request to tunnel") return } resp, err := http.ReadResponse(bufio.NewReader(stream), r) if err != nil { s.logger.Warn("ws: read upgrade response failed", "error", err) writeJSONError(w, http.StatusBadGateway, "TUNNEL_READ_ERROR", "failed to read upgrade response from tunnel") return } if resp.StatusCode != http.StatusSwitchingProtocols { for key, vals := range resp.Header { for _, v := range vals { w.Header().Add(key, v) } } w.WriteHeader(resp.StatusCode) _, _ = io.Copy(w, resp.Body) _ = resp.Body.Close() return } hijacker, ok := w.(http.Hijacker) if !ok { http.Error(w, "hijacking not supported", http.StatusInternalServerError) return } clientConn, clientBuf, err := hijacker.Hijack() if err != nil { s.logger.Warn("ws: hijack failed", "error", err) return } defer clientConn.Close() // 把 101 回傳給 caller _ = resp.Write(clientBuf) _ = clientBuf.Flush() // 雙向 pipe var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() _, _ = io.Copy(stream, clientConn) _ = stream.Close() }() go func() { defer wg.Done() _, _ = io.Copy(clientConn, stream) _ = clientConn.Close() }() wg.Wait() } // Shutdown 關閉 Server 所管理的所有 session(通常在程序結束時呼叫)。 // // Store 不需要在此被關閉(Store 由 caller 注入);只通知:不再接受新 tunnel。 func (s *Server) Shutdown() { s.mu.Lock() s.shutdown = true s.mu.Unlock() // 實際的 session close 會由 CleanupExpired / cmd/remote-proxy 主迴圈處理。 } // ---------------------------------------------------------------------- // Helpers // ---------------------------------------------------------------------- // getToken 從 `X-Relay-Token` header 或 `token` query 參數取出 token。 // Header 優先於 query,行為與 POC 一致。 func getToken(r *http.Request) string { if tok := r.Header.Get("X-Relay-Token"); tok != "" { return tok } return r.URL.Query().Get("token") } // isAcceptableToken 檢查 token 是否為 pairing 或 session 任一合法格式。 // // 雛形階段 local agent 仍用 pairing token 連線(見 tunnel.md §2.2.1); // Phase 1 升級兩階段 token 後仍然是 session token 為主。此處兩者皆接受。 func isAcceptableToken(tok string) bool { return auth.IsValidPairingToken(tok) || auth.IsValidSessionToken(tok) } // tokenPrefix 回傳 token 的前 8 字元(log 用,避免 log 完整 token)。 func tokenPrefix(tok string) string { if len(tok) <= 8 { return tok } return tok[:8] } // isWebSocketUpgrade 判斷 request 是否為 WebSocket upgrade。 // // B3 Review Minor #4 修補:同時檢查 Upgrade 與 Connection header, // 避免只看 Upgrade 在極端 case(curl 手工送單一 header)時誤判。 // RFC 6455 §4.1 規定合法的 WS upgrade 要同時包含兩個 header。 func isWebSocketUpgrade(r *http.Request) bool { if !strings.EqualFold(r.Header.Get("Upgrade"), "websocket") { return false } // Connection header 可能是 "upgrade" 或 "keep-alive, Upgrade" 等組合, // 用 Contains 不區分大小寫即可。 return strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade") } // writeJSONError 寫回統一格式的 JSON error(對齊 API error schema)。 func writeJSONError(w http.ResponseWriter, status int, code, message string) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(map[string]any{ "error": map[string]any{ "code": code, "message": message, }, }) } // FormatAddr 把 port 格式化為 ":{port}",供 http.Server.Addr 使用。 func FormatAddr(port int) string { return fmt.Sprintf(":%d", port) }