從 edge-ai-platform POC 轉為正式產品的雲端後端,含以下整合階段:
- Phase 0:雛形骨架 — `cmd/api-server` (REST :3721) + `cmd/remote-proxy`
(tunnel :3800 / internal :3801) 雙 binary 共用 internal/,沿用 POC 的
WebSocket+yamux tunnel 協定但解耦 relay 與 API
- Phase 0.6:OIDC BFF 接 Innovedus Member Center
- internal/oidc package(coreos/go-oidc + PKCE S256 + state + nonce)
- internal/usersession package(HMAC-SHA256 cookie + RotateSessionID
防 session fixation, OWASP ASVS V3.2.1)
- 4 個 OIDC handler(/api/auth/login|callback|me|logout)+ AuthMiddleware
- 完全拔除 StaticAuthProvider,OIDC 是唯一認證路徑
- 9 個 ADR(含 ADR-010 BFF / ADR-011 取代 static auth /
ADR-012 pending session shared cookie / ADR-013 PKCE-only public client)
- Phase 0.7:A1 改造 + security audit 修復
- OIDC ClientSecret 變選填,支援 stage MC 的 public PKCE-only client
(AuthStyleInParams 強制 token endpoint 不送 client_secret)
- 預留 ServiceClient* 欄位給未來 client_credentials grant
- 移除 13+ 處 resolveUserID(uc, StaticUserID) fallback 改 strict mode
(Audit C1:multi-tenant 隔離破口)
- Pairing exchange MarkUsed 失敗 abort + revoke session token(Audit M3)
- 新增 all_endpoints_require_auth_test 整合測試(51 endpoint × 401)
驗證:go test -race -count=3 ./... 17 packages 全綠 / go vet 0 warning
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
463 lines
14 KiB
Go
463 lines
14 KiB
Go
// Package relay 實作 remote-proxy 端的 tunnel server 與通用代理轉發。
|
||
//
|
||
// 核心職責:
|
||
// - 接受 local agent 的 `/tunnel/connect` WebSocket upgrade,建立 yamux session
|
||
// - 把 session 註冊到 session.Store(由 remote-proxy 唯一持有)
|
||
// - 提供通用代理 `handleProxy`:依 token 找到 session → open stream → 轉發 HTTP/WS
|
||
// - 提供 `/relay/status` 簡易連線狀態查詢
|
||
//
|
||
// 從 POC `edge-ai-platform/server/internal/relay/server.go` 複製後改造:
|
||
// 1. Session map → session.Store interface(由外部注入)
|
||
// 2. yamux KeepAliveInterval:POC 的 30s → 10s(對齊 tunnel.md §4.2 M-5)
|
||
// 3. Token 格式驗證(vAs_ + 64 hex 或 vAc_ + 32 hex 雛形可交替)
|
||
// 4. 使用結構化 JSON logger(log/slog),不再用 `log.Printf`
|
||
package relay
|
||
|
||
import (
|
||
"bufio"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"log/slog"
|
||
"net"
|
||
"net/http"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/gorilla/websocket"
|
||
"github.com/hashicorp/yamux"
|
||
|
||
"visiona-backend/internal/auth"
|
||
"visiona-backend/internal/session"
|
||
"visiona-backend/internal/wsconn"
|
||
)
|
||
|
||
// DefaultKeepAliveInterval 是 yamux 的心跳間隔(對齊 tunnel.md §4.2 M-5)。
|
||
// 連續 3 次未收到 pong(= 30s)即判定掉線。
|
||
const DefaultKeepAliveInterval = 10 * time.Second
|
||
|
||
// DefaultConnectionWriteTimeout 是單一 yamux 寫入的最大等待時間。
|
||
const DefaultConnectionWriteTimeout = 10 * time.Second
|
||
|
||
// Options 提供 NewServer 可選設定。
|
||
type Options struct {
|
||
// KeepAliveInterval:yamux keep-alive 心跳;0 → 採用 DefaultKeepAliveInterval。
|
||
KeepAliveInterval time.Duration
|
||
|
||
// ConnectionWriteTimeout:yamux 寫入 timeout;0 → 採用 DefaultConnectionWriteTimeout。
|
||
ConnectionWriteTimeout time.Duration
|
||
|
||
// AllowedOrigins:WebSocket upgrade 的 Origin 白名單;
|
||
// 空 slice → 接受任意 Origin(對齊 tunnel.md §4.1;local agent 非瀏覽器無 origin 風險)。
|
||
AllowedOrigins []string
|
||
}
|
||
|
||
// defaultOptions 建立含預設值的 Options。
|
||
func defaultOptions() Options {
|
||
return Options{
|
||
KeepAliveInterval: DefaultKeepAliveInterval,
|
||
ConnectionWriteTimeout: DefaultConnectionWriteTimeout,
|
||
}
|
||
}
|
||
|
||
// Server 是 remote-proxy 端的 tunnel relay server。
|
||
//
|
||
// 與 POC 的差異:
|
||
// - 不自己維護 session map;全部委託 session.Store
|
||
// - 允許注入 logger,行為與生產環境一致
|
||
type Server struct {
|
||
store session.Store
|
||
logger *slog.Logger
|
||
opts Options
|
||
upgrader websocket.Upgrader
|
||
|
||
mu sync.Mutex
|
||
shutdown bool
|
||
}
|
||
|
||
// NewServer 建立一個 relay.Server。
|
||
//
|
||
// store:session.Store 實作(remote-proxy 端通常為 *session.InMemoryStore)。
|
||
// logger:結構化 logger;nil 時使用 slog.Default。
|
||
func NewServer(store session.Store, logger *slog.Logger, opts ...Options) *Server {
|
||
if logger == nil {
|
||
logger = slog.Default()
|
||
}
|
||
o := defaultOptions()
|
||
if len(opts) > 0 {
|
||
// 覆寫非零欄位
|
||
if opts[0].KeepAliveInterval > 0 {
|
||
o.KeepAliveInterval = opts[0].KeepAliveInterval
|
||
}
|
||
if opts[0].ConnectionWriteTimeout > 0 {
|
||
o.ConnectionWriteTimeout = opts[0].ConnectionWriteTimeout
|
||
}
|
||
o.AllowedOrigins = opts[0].AllowedOrigins
|
||
}
|
||
|
||
return &Server{
|
||
store: store,
|
||
logger: logger,
|
||
opts: o,
|
||
upgrader: websocket.Upgrader{
|
||
CheckOrigin: func(r *http.Request) bool {
|
||
// local agent 不跑在瀏覽器;預設放行任意 Origin。
|
||
// Phase 1 若有需要可對 AllowedOrigins 做比對。
|
||
if len(o.AllowedOrigins) == 0 {
|
||
return true
|
||
}
|
||
origin := r.Header.Get("Origin")
|
||
for _, a := range o.AllowedOrigins {
|
||
if strings.EqualFold(a, origin) {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
},
|
||
},
|
||
}
|
||
}
|
||
|
||
// yamuxConfig 建構 yamux.Config — 套用我們統一的 10s keepalive。
|
||
func (s *Server) yamuxConfig(logOutput io.Writer) *yamux.Config {
|
||
cfg := yamux.DefaultConfig()
|
||
cfg.EnableKeepAlive = true
|
||
cfg.KeepAliveInterval = s.opts.KeepAliveInterval
|
||
cfg.ConnectionWriteTimeout = s.opts.ConnectionWriteTimeout
|
||
if logOutput != nil {
|
||
cfg.LogOutput = logOutput
|
||
}
|
||
return cfg
|
||
}
|
||
|
||
// HandleTunnelConnect 處理 local agent 的 WebSocket upgrade 請求。
|
||
//
|
||
// Route: `GET /tunnel/connect?token=<token>`(亦接受 `X-Relay-Token` header)。
|
||
// 流程:
|
||
// 1. 取出 + 驗證 token 格式(vAs_ / vAc_)
|
||
// 2. WebSocket upgrade
|
||
// 3. 包成 net.Conn → yamux.Server
|
||
// 4. 建 LocalHandle + store.Register(舊 session 會自動被 Close)
|
||
// 5. 阻塞於 session.CloseChan;斷線後 Unregister
|
||
func (s *Server) HandleTunnelConnect(w http.ResponseWriter, r *http.Request) {
|
||
// B3 Review Minor #2 修補:shutdown 期間拒絕新的 tunnel upgrade,
|
||
// 避免 graceful shutdown 過程中又有新 session 註冊進來。
|
||
s.mu.Lock()
|
||
isShutdown := s.shutdown
|
||
s.mu.Unlock()
|
||
if isShutdown {
|
||
writeJSONError(w, http.StatusServiceUnavailable, "SHUTTING_DOWN", "server is shutting down")
|
||
return
|
||
}
|
||
|
||
tok := getToken(r)
|
||
if tok == "" {
|
||
writeJSONError(w, http.StatusUnauthorized, "NO_TOKEN", "token required")
|
||
return
|
||
}
|
||
if !isAcceptableToken(tok) {
|
||
writeJSONError(w, http.StatusUnauthorized, "INVALID_TOKEN_FORMAT", "token format invalid")
|
||
return
|
||
}
|
||
|
||
wsConn, err := s.upgrader.Upgrade(w, r, nil)
|
||
if err != nil {
|
||
// Upgrader 失敗時已經寫了 HTTP 回應
|
||
s.logger.Warn("tunnel upgrade failed",
|
||
"error", err,
|
||
"remote_addr", r.RemoteAddr,
|
||
"token_prefix", tokenPrefix(tok))
|
||
return
|
||
}
|
||
|
||
netConn := wsconn.New(wsConn)
|
||
|
||
ymCfg := s.yamuxConfig(nil)
|
||
ym, err := yamux.Server(netConn, ymCfg)
|
||
if err != nil {
|
||
s.logger.Error("yamux server creation failed",
|
||
"error", err,
|
||
"token_prefix", tokenPrefix(tok))
|
||
_ = wsConn.Close()
|
||
return
|
||
}
|
||
|
||
handle := NewLocalHandle(ym, tok, r.RemoteAddr)
|
||
|
||
// Register 會 Close 同 token 舊連線(Q5 裁決:後連覆蓋前連)
|
||
if err := s.store.Register(r.Context(), tok, handle); err != nil {
|
||
s.logger.Error("session register failed",
|
||
"error", err,
|
||
"token_prefix", tokenPrefix(tok))
|
||
_ = ym.Close()
|
||
return
|
||
}
|
||
|
||
s.logger.Info("tunnel connected",
|
||
"token_prefix", tokenPrefix(tok),
|
||
"remote_addr", r.RemoteAddr,
|
||
"keepalive_interval", s.opts.KeepAliveInterval.String())
|
||
|
||
// 阻塞到 yamux session 關閉
|
||
<-ym.CloseChan()
|
||
|
||
// 只有在「當前還是這個 handle」時才移除,避免覆蓋後的舊流程意外刪了新的。
|
||
if cur, lookupErr := s.store.Lookup(r.Context(), tok); lookupErr == nil {
|
||
if cur == handle {
|
||
_ = s.store.Unregister(r.Context(), tok)
|
||
}
|
||
} else if errors.Is(lookupErr, session.ErrSessionNotFound) {
|
||
// 已被清掉或已被新連線取代,無動作
|
||
}
|
||
|
||
s.logger.Info("tunnel disconnected",
|
||
"token_prefix", tokenPrefix(tok),
|
||
"remote_addr", r.RemoteAddr)
|
||
}
|
||
|
||
// HandleRelayStatus 回報指定 token 的連線狀態(debug / health 用)。
|
||
//
|
||
// Route: `GET /relay/status?token=<token>`(或無 token → 全體線上數量)。
|
||
func (s *Server) HandleRelayStatus(w http.ResponseWriter, r *http.Request) {
|
||
w.Header().Set("Content-Type", "application/json")
|
||
|
||
tok := getToken(r)
|
||
if tok != "" {
|
||
h, err := s.store.Lookup(r.Context(), tok)
|
||
if err != nil {
|
||
// 不存在 → online=false
|
||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||
"online": false,
|
||
})
|
||
return
|
||
}
|
||
sum := h.Summary()
|
||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||
"online": !h.IsClosed(),
|
||
"connected_at": sum.ConnectedAt,
|
||
"last_heartbeat": sum.LastHeartbeat,
|
||
"remote_addr": sum.RemoteAddr,
|
||
})
|
||
return
|
||
}
|
||
|
||
summaries, err := s.store.List(r.Context())
|
||
if err != nil {
|
||
writeJSONError(w, http.StatusInternalServerError, "LIST_FAILED", err.Error())
|
||
return
|
||
}
|
||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||
"online": len(summaries) > 0,
|
||
"tunnel_count": len(summaries),
|
||
})
|
||
}
|
||
|
||
// HandleProxy 是通用的 HTTP 反向代理 handler。
|
||
//
|
||
// 主要給 debug / 舊相容路徑用(POC 原本讓瀏覽器直接打 proxy);
|
||
// 雛形正式流量走 `/internal/forward/http`(見 remote-proxy 的 main.go)。
|
||
//
|
||
// Route: `Any /*`(或自行綁在其他 path)。
|
||
func (s *Server) HandleProxy(w http.ResponseWriter, r *http.Request) {
|
||
tok := getToken(r)
|
||
if tok == "" {
|
||
writeJSONError(w, http.StatusUnauthorized, "NO_TOKEN", "X-Relay-Token header or token query param required")
|
||
return
|
||
}
|
||
|
||
h, err := s.store.Lookup(r.Context(), tok)
|
||
if err != nil || h.IsClosed() {
|
||
writeJSONError(w, http.StatusBadGateway, "TUNNEL_DISCONNECTED", "edge agent is not connected")
|
||
return
|
||
}
|
||
|
||
stream, err := h.OpenStream(r.Context())
|
||
if err != nil {
|
||
s.logger.Warn("open stream failed", "error", err, "token_prefix", tokenPrefix(tok))
|
||
writeJSONError(w, http.StatusBadGateway, "TUNNEL_ERROR", "failed to open tunnel stream")
|
||
return
|
||
}
|
||
defer stream.Close()
|
||
|
||
// 不把 token / internal header 轉給 local agent
|
||
r.Header.Del("X-Relay-Token")
|
||
|
||
if isWebSocketUpgrade(r) {
|
||
s.proxyWebSocket(w, r, stream)
|
||
return
|
||
}
|
||
|
||
if err := r.Write(stream); err != nil {
|
||
s.logger.Warn("write request to tunnel failed", "error", err)
|
||
writeJSONError(w, http.StatusBadGateway, "TUNNEL_WRITE_ERROR", "failed to write request to tunnel")
|
||
return
|
||
}
|
||
|
||
resp, err := http.ReadResponse(bufio.NewReader(stream), r)
|
||
if err != nil {
|
||
s.logger.Warn("read response from tunnel failed", "error", err)
|
||
writeJSONError(w, http.StatusBadGateway, "TUNNEL_READ_ERROR", "failed to read response from tunnel")
|
||
return
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
for key, vals := range resp.Header {
|
||
for _, v := range vals {
|
||
w.Header().Add(key, v)
|
||
}
|
||
}
|
||
w.WriteHeader(resp.StatusCode)
|
||
|
||
// 串流支援(MJPEG / SSE):有 Flusher 就每塊 flush 一次
|
||
if flusher, ok := w.(http.Flusher); ok {
|
||
buf := make([]byte, 32*1024)
|
||
for {
|
||
n, rerr := resp.Body.Read(buf)
|
||
if n > 0 {
|
||
if _, werr := w.Write(buf[:n]); werr != nil {
|
||
return
|
||
}
|
||
flusher.Flush()
|
||
}
|
||
if rerr != nil {
|
||
break
|
||
}
|
||
}
|
||
} else {
|
||
_, _ = io.Copy(w, resp.Body)
|
||
}
|
||
}
|
||
|
||
// proxyWebSocket 處理瀏覽器(或 api-server 內部)的 WebSocket upgrade:
|
||
// 把 upgrade request 透過 yamux stream 送到 local agent,
|
||
// 再 Hijack 當前連線做雙向 pipe(POC 原始邏輯)。
|
||
func (s *Server) proxyWebSocket(w http.ResponseWriter, r *http.Request, stream net.Conn) {
|
||
if err := r.Write(stream); err != nil {
|
||
s.logger.Warn("ws: write upgrade request failed", "error", err)
|
||
writeJSONError(w, http.StatusBadGateway, "TUNNEL_WRITE_ERROR", "failed to write upgrade request to tunnel")
|
||
return
|
||
}
|
||
|
||
resp, err := http.ReadResponse(bufio.NewReader(stream), r)
|
||
if err != nil {
|
||
s.logger.Warn("ws: read upgrade response failed", "error", err)
|
||
writeJSONError(w, http.StatusBadGateway, "TUNNEL_READ_ERROR", "failed to read upgrade response from tunnel")
|
||
return
|
||
}
|
||
|
||
if resp.StatusCode != http.StatusSwitchingProtocols {
|
||
for key, vals := range resp.Header {
|
||
for _, v := range vals {
|
||
w.Header().Add(key, v)
|
||
}
|
||
}
|
||
w.WriteHeader(resp.StatusCode)
|
||
_, _ = io.Copy(w, resp.Body)
|
||
_ = resp.Body.Close()
|
||
return
|
||
}
|
||
|
||
hijacker, ok := w.(http.Hijacker)
|
||
if !ok {
|
||
http.Error(w, "hijacking not supported", http.StatusInternalServerError)
|
||
return
|
||
}
|
||
clientConn, clientBuf, err := hijacker.Hijack()
|
||
if err != nil {
|
||
s.logger.Warn("ws: hijack failed", "error", err)
|
||
return
|
||
}
|
||
defer clientConn.Close()
|
||
|
||
// 把 101 回傳給 caller
|
||
_ = resp.Write(clientBuf)
|
||
_ = clientBuf.Flush()
|
||
|
||
// 雙向 pipe
|
||
var wg sync.WaitGroup
|
||
wg.Add(2)
|
||
go func() {
|
||
defer wg.Done()
|
||
_, _ = io.Copy(stream, clientConn)
|
||
_ = stream.Close()
|
||
}()
|
||
go func() {
|
||
defer wg.Done()
|
||
_, _ = io.Copy(clientConn, stream)
|
||
_ = clientConn.Close()
|
||
}()
|
||
wg.Wait()
|
||
}
|
||
|
||
// Shutdown 關閉 Server 所管理的所有 session(通常在程序結束時呼叫)。
|
||
//
|
||
// Store 不需要在此被關閉(Store 由 caller 注入);只通知:不再接受新 tunnel。
|
||
func (s *Server) Shutdown() {
|
||
s.mu.Lock()
|
||
s.shutdown = true
|
||
s.mu.Unlock()
|
||
// 實際的 session close 會由 CleanupExpired / cmd/remote-proxy 主迴圈處理。
|
||
}
|
||
|
||
// ----------------------------------------------------------------------
|
||
// Helpers
|
||
// ----------------------------------------------------------------------
|
||
|
||
// getToken 從 `X-Relay-Token` header 或 `token` query 參數取出 token。
|
||
// Header 優先於 query,行為與 POC 一致。
|
||
func getToken(r *http.Request) string {
|
||
if tok := r.Header.Get("X-Relay-Token"); tok != "" {
|
||
return tok
|
||
}
|
||
return r.URL.Query().Get("token")
|
||
}
|
||
|
||
// isAcceptableToken 檢查 token 是否為 pairing 或 session 任一合法格式。
|
||
//
|
||
// 雛形階段 local agent 仍用 pairing token 連線(見 tunnel.md §2.2.1);
|
||
// Phase 1 升級兩階段 token 後仍然是 session token 為主。此處兩者皆接受。
|
||
func isAcceptableToken(tok string) bool {
|
||
return auth.IsValidPairingToken(tok) || auth.IsValidSessionToken(tok)
|
||
}
|
||
|
||
// tokenPrefix 回傳 token 的前 8 字元(log 用,避免 log 完整 token)。
|
||
func tokenPrefix(tok string) string {
|
||
if len(tok) <= 8 {
|
||
return tok
|
||
}
|
||
return tok[:8]
|
||
}
|
||
|
||
// isWebSocketUpgrade 判斷 request 是否為 WebSocket upgrade。
|
||
//
|
||
// B3 Review Minor #4 修補:同時檢查 Upgrade 與 Connection header,
|
||
// 避免只看 Upgrade 在極端 case(curl 手工送單一 header)時誤判。
|
||
// RFC 6455 §4.1 規定合法的 WS upgrade 要同時包含兩個 header。
|
||
func isWebSocketUpgrade(r *http.Request) bool {
|
||
if !strings.EqualFold(r.Header.Get("Upgrade"), "websocket") {
|
||
return false
|
||
}
|
||
// Connection header 可能是 "upgrade" 或 "keep-alive, Upgrade" 等組合,
|
||
// 用 Contains 不區分大小寫即可。
|
||
return strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade")
|
||
}
|
||
|
||
// writeJSONError 寫回統一格式的 JSON error(對齊 API error schema)。
|
||
func writeJSONError(w http.ResponseWriter, status int, code, message string) {
|
||
w.Header().Set("Content-Type", "application/json")
|
||
w.WriteHeader(status)
|
||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||
"error": map[string]any{
|
||
"code": code,
|
||
"message": message,
|
||
},
|
||
})
|
||
}
|
||
|
||
// FormatAddr 把 port 格式化為 ":{port}",供 http.Server.Addr 使用。
|
||
func FormatAddr(port int) string {
|
||
return fmt.Sprintf(":%d", port)
|
||
}
|