jim800121chen 22f0837ba8 feat(visionA-backend): Phase 0 → 0.7 雲端後端(雙 binary + OIDC BFF + stage 部署)
從 edge-ai-platform POC 轉為正式產品的雲端後端,含以下整合階段:

- Phase 0:雛形骨架 — `cmd/api-server` (REST :3721) + `cmd/remote-proxy`
  (tunnel :3800 / internal :3801) 雙 binary 共用 internal/,沿用 POC 的
  WebSocket+yamux tunnel 協定但解耦 relay 與 API
- Phase 0.6:OIDC BFF 接 Innovedus Member Center
  - internal/oidc package(coreos/go-oidc + PKCE S256 + state + nonce)
  - internal/usersession package(HMAC-SHA256 cookie + RotateSessionID
    防 session fixation, OWASP ASVS V3.2.1)
  - 4 個 OIDC handler(/api/auth/login|callback|me|logout)+ AuthMiddleware
  - 完全拔除 StaticAuthProvider,OIDC 是唯一認證路徑
  - 9 個 ADR(含 ADR-010 BFF / ADR-011 取代 static auth /
    ADR-012 pending session shared cookie / ADR-013 PKCE-only public client)
- Phase 0.7:A1 改造 + security audit 修復
  - OIDC ClientSecret 變選填,支援 stage MC 的 public PKCE-only client
    (AuthStyleInParams 強制 token endpoint 不送 client_secret)
  - 預留 ServiceClient* 欄位給未來 client_credentials grant
  - 移除 13+ 處 resolveUserID(uc, StaticUserID) fallback 改 strict mode
    (Audit C1:multi-tenant 隔離破口)
  - Pairing exchange MarkUsed 失敗 abort + revoke session token(Audit M3)
  - 新增 all_endpoints_require_auth_test 整合測試(51 endpoint × 401)

驗證:go test -race -count=3 ./... 17 packages 全綠 / go vet 0 warning

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 11:21:20 +08:00

463 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package relay 實作 remote-proxy 端的 tunnel server 與通用代理轉發。
//
// 核心職責:
// - 接受 local agent 的 `/tunnel/connect` WebSocket upgrade建立 yamux session
// - 把 session 註冊到 session.Store由 remote-proxy 唯一持有)
// - 提供通用代理 `handleProxy`:依 token 找到 session → open stream → 轉發 HTTP/WS
// - 提供 `/relay/status` 簡易連線狀態查詢
//
// 從 POC `edge-ai-platform/server/internal/relay/server.go` 複製後改造:
// 1. Session map → session.Store interface由外部注入
// 2. yamux KeepAliveIntervalPOC 的 30s → 10s對齊 tunnel.md §4.2 M-5
// 3. Token 格式驗證vAs_ + 64 hex 或 vAc_ + 32 hex 雛形可交替)
// 4. 使用結構化 JSON loggerlog/slog不再用 `log.Printf`
package relay
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net"
"net/http"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/hashicorp/yamux"
"visiona-backend/internal/auth"
"visiona-backend/internal/session"
"visiona-backend/internal/wsconn"
)
// DefaultKeepAliveInterval 是 yamux 的心跳間隔(對齊 tunnel.md §4.2 M-5
// 連續 3 次未收到 pong= 30s即判定掉線。
const DefaultKeepAliveInterval = 10 * time.Second
// DefaultConnectionWriteTimeout 是單一 yamux 寫入的最大等待時間。
const DefaultConnectionWriteTimeout = 10 * time.Second
// Options 提供 NewServer 可選設定。
type Options struct {
// KeepAliveIntervalyamux keep-alive 心跳0 → 採用 DefaultKeepAliveInterval。
KeepAliveInterval time.Duration
// ConnectionWriteTimeoutyamux 寫入 timeout0 → 採用 DefaultConnectionWriteTimeout。
ConnectionWriteTimeout time.Duration
// AllowedOriginsWebSocket upgrade 的 Origin 白名單;
// 空 slice → 接受任意 Origin對齊 tunnel.md §4.1local agent 非瀏覽器無 origin 風險)。
AllowedOrigins []string
}
// defaultOptions 建立含預設值的 Options。
func defaultOptions() Options {
return Options{
KeepAliveInterval: DefaultKeepAliveInterval,
ConnectionWriteTimeout: DefaultConnectionWriteTimeout,
}
}
// Server 是 remote-proxy 端的 tunnel relay server。
//
// 與 POC 的差異:
// - 不自己維護 session map全部委託 session.Store
// - 允許注入 logger行為與生產環境一致
type Server struct {
store session.Store
logger *slog.Logger
opts Options
upgrader websocket.Upgrader
mu sync.Mutex
shutdown bool
}
// NewServer 建立一個 relay.Server。
//
// storesession.Store 實作remote-proxy 端通常為 *session.InMemoryStore
// logger結構化 loggernil 時使用 slog.Default。
func NewServer(store session.Store, logger *slog.Logger, opts ...Options) *Server {
if logger == nil {
logger = slog.Default()
}
o := defaultOptions()
if len(opts) > 0 {
// 覆寫非零欄位
if opts[0].KeepAliveInterval > 0 {
o.KeepAliveInterval = opts[0].KeepAliveInterval
}
if opts[0].ConnectionWriteTimeout > 0 {
o.ConnectionWriteTimeout = opts[0].ConnectionWriteTimeout
}
o.AllowedOrigins = opts[0].AllowedOrigins
}
return &Server{
store: store,
logger: logger,
opts: o,
upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// local agent 不跑在瀏覽器;預設放行任意 Origin。
// Phase 1 若有需要可對 AllowedOrigins 做比對。
if len(o.AllowedOrigins) == 0 {
return true
}
origin := r.Header.Get("Origin")
for _, a := range o.AllowedOrigins {
if strings.EqualFold(a, origin) {
return true
}
}
return false
},
},
}
}
// yamuxConfig 建構 yamux.Config — 套用我們統一的 10s keepalive。
func (s *Server) yamuxConfig(logOutput io.Writer) *yamux.Config {
cfg := yamux.DefaultConfig()
cfg.EnableKeepAlive = true
cfg.KeepAliveInterval = s.opts.KeepAliveInterval
cfg.ConnectionWriteTimeout = s.opts.ConnectionWriteTimeout
if logOutput != nil {
cfg.LogOutput = logOutput
}
return cfg
}
// HandleTunnelConnect 處理 local agent 的 WebSocket upgrade 請求。
//
// Route: `GET /tunnel/connect?token=<token>`(亦接受 `X-Relay-Token` header
// 流程:
// 1. 取出 + 驗證 token 格式vAs_ / vAc_
// 2. WebSocket upgrade
// 3. 包成 net.Conn → yamux.Server
// 4. 建 LocalHandle + store.Register舊 session 會自動被 Close
// 5. 阻塞於 session.CloseChan斷線後 Unregister
func (s *Server) HandleTunnelConnect(w http.ResponseWriter, r *http.Request) {
// B3 Review Minor #2 修補shutdown 期間拒絕新的 tunnel upgrade
// 避免 graceful shutdown 過程中又有新 session 註冊進來。
s.mu.Lock()
isShutdown := s.shutdown
s.mu.Unlock()
if isShutdown {
writeJSONError(w, http.StatusServiceUnavailable, "SHUTTING_DOWN", "server is shutting down")
return
}
tok := getToken(r)
if tok == "" {
writeJSONError(w, http.StatusUnauthorized, "NO_TOKEN", "token required")
return
}
if !isAcceptableToken(tok) {
writeJSONError(w, http.StatusUnauthorized, "INVALID_TOKEN_FORMAT", "token format invalid")
return
}
wsConn, err := s.upgrader.Upgrade(w, r, nil)
if err != nil {
// Upgrader 失敗時已經寫了 HTTP 回應
s.logger.Warn("tunnel upgrade failed",
"error", err,
"remote_addr", r.RemoteAddr,
"token_prefix", tokenPrefix(tok))
return
}
netConn := wsconn.New(wsConn)
ymCfg := s.yamuxConfig(nil)
ym, err := yamux.Server(netConn, ymCfg)
if err != nil {
s.logger.Error("yamux server creation failed",
"error", err,
"token_prefix", tokenPrefix(tok))
_ = wsConn.Close()
return
}
handle := NewLocalHandle(ym, tok, r.RemoteAddr)
// Register 會 Close 同 token 舊連線Q5 裁決:後連覆蓋前連)
if err := s.store.Register(r.Context(), tok, handle); err != nil {
s.logger.Error("session register failed",
"error", err,
"token_prefix", tokenPrefix(tok))
_ = ym.Close()
return
}
s.logger.Info("tunnel connected",
"token_prefix", tokenPrefix(tok),
"remote_addr", r.RemoteAddr,
"keepalive_interval", s.opts.KeepAliveInterval.String())
// 阻塞到 yamux session 關閉
<-ym.CloseChan()
// 只有在「當前還是這個 handle」時才移除避免覆蓋後的舊流程意外刪了新的。
if cur, lookupErr := s.store.Lookup(r.Context(), tok); lookupErr == nil {
if cur == handle {
_ = s.store.Unregister(r.Context(), tok)
}
} else if errors.Is(lookupErr, session.ErrSessionNotFound) {
// 已被清掉或已被新連線取代,無動作
}
s.logger.Info("tunnel disconnected",
"token_prefix", tokenPrefix(tok),
"remote_addr", r.RemoteAddr)
}
// HandleRelayStatus 回報指定 token 的連線狀態debug / health 用)。
//
// Route: `GET /relay/status?token=<token>`(或無 token → 全體線上數量)。
func (s *Server) HandleRelayStatus(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
tok := getToken(r)
if tok != "" {
h, err := s.store.Lookup(r.Context(), tok)
if err != nil {
// 不存在 → online=false
_ = json.NewEncoder(w).Encode(map[string]any{
"online": false,
})
return
}
sum := h.Summary()
_ = json.NewEncoder(w).Encode(map[string]any{
"online": !h.IsClosed(),
"connected_at": sum.ConnectedAt,
"last_heartbeat": sum.LastHeartbeat,
"remote_addr": sum.RemoteAddr,
})
return
}
summaries, err := s.store.List(r.Context())
if err != nil {
writeJSONError(w, http.StatusInternalServerError, "LIST_FAILED", err.Error())
return
}
_ = json.NewEncoder(w).Encode(map[string]any{
"online": len(summaries) > 0,
"tunnel_count": len(summaries),
})
}
// HandleProxy 是通用的 HTTP 反向代理 handler。
//
// 主要給 debug / 舊相容路徑用POC 原本讓瀏覽器直接打 proxy
// 雛形正式流量走 `/internal/forward/http`(見 remote-proxy 的 main.go
//
// Route: `Any /*`(或自行綁在其他 path
func (s *Server) HandleProxy(w http.ResponseWriter, r *http.Request) {
tok := getToken(r)
if tok == "" {
writeJSONError(w, http.StatusUnauthorized, "NO_TOKEN", "X-Relay-Token header or token query param required")
return
}
h, err := s.store.Lookup(r.Context(), tok)
if err != nil || h.IsClosed() {
writeJSONError(w, http.StatusBadGateway, "TUNNEL_DISCONNECTED", "edge agent is not connected")
return
}
stream, err := h.OpenStream(r.Context())
if err != nil {
s.logger.Warn("open stream failed", "error", err, "token_prefix", tokenPrefix(tok))
writeJSONError(w, http.StatusBadGateway, "TUNNEL_ERROR", "failed to open tunnel stream")
return
}
defer stream.Close()
// 不把 token / internal header 轉給 local agent
r.Header.Del("X-Relay-Token")
if isWebSocketUpgrade(r) {
s.proxyWebSocket(w, r, stream)
return
}
if err := r.Write(stream); err != nil {
s.logger.Warn("write request to tunnel failed", "error", err)
writeJSONError(w, http.StatusBadGateway, "TUNNEL_WRITE_ERROR", "failed to write request to tunnel")
return
}
resp, err := http.ReadResponse(bufio.NewReader(stream), r)
if err != nil {
s.logger.Warn("read response from tunnel failed", "error", err)
writeJSONError(w, http.StatusBadGateway, "TUNNEL_READ_ERROR", "failed to read response from tunnel")
return
}
defer resp.Body.Close()
for key, vals := range resp.Header {
for _, v := range vals {
w.Header().Add(key, v)
}
}
w.WriteHeader(resp.StatusCode)
// 串流支援MJPEG / SSE有 Flusher 就每塊 flush 一次
if flusher, ok := w.(http.Flusher); ok {
buf := make([]byte, 32*1024)
for {
n, rerr := resp.Body.Read(buf)
if n > 0 {
if _, werr := w.Write(buf[:n]); werr != nil {
return
}
flusher.Flush()
}
if rerr != nil {
break
}
}
} else {
_, _ = io.Copy(w, resp.Body)
}
}
// proxyWebSocket 處理瀏覽器(或 api-server 內部)的 WebSocket upgrade
// 把 upgrade request 透過 yamux stream 送到 local agent
// 再 Hijack 當前連線做雙向 pipePOC 原始邏輯)。
func (s *Server) proxyWebSocket(w http.ResponseWriter, r *http.Request, stream net.Conn) {
if err := r.Write(stream); err != nil {
s.logger.Warn("ws: write upgrade request failed", "error", err)
writeJSONError(w, http.StatusBadGateway, "TUNNEL_WRITE_ERROR", "failed to write upgrade request to tunnel")
return
}
resp, err := http.ReadResponse(bufio.NewReader(stream), r)
if err != nil {
s.logger.Warn("ws: read upgrade response failed", "error", err)
writeJSONError(w, http.StatusBadGateway, "TUNNEL_READ_ERROR", "failed to read upgrade response from tunnel")
return
}
if resp.StatusCode != http.StatusSwitchingProtocols {
for key, vals := range resp.Header {
for _, v := range vals {
w.Header().Add(key, v)
}
}
w.WriteHeader(resp.StatusCode)
_, _ = io.Copy(w, resp.Body)
_ = resp.Body.Close()
return
}
hijacker, ok := w.(http.Hijacker)
if !ok {
http.Error(w, "hijacking not supported", http.StatusInternalServerError)
return
}
clientConn, clientBuf, err := hijacker.Hijack()
if err != nil {
s.logger.Warn("ws: hijack failed", "error", err)
return
}
defer clientConn.Close()
// 把 101 回傳給 caller
_ = resp.Write(clientBuf)
_ = clientBuf.Flush()
// 雙向 pipe
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
_, _ = io.Copy(stream, clientConn)
_ = stream.Close()
}()
go func() {
defer wg.Done()
_, _ = io.Copy(clientConn, stream)
_ = clientConn.Close()
}()
wg.Wait()
}
// Shutdown 關閉 Server 所管理的所有 session通常在程序結束時呼叫
//
// Store 不需要在此被關閉Store 由 caller 注入);只通知:不再接受新 tunnel。
func (s *Server) Shutdown() {
s.mu.Lock()
s.shutdown = true
s.mu.Unlock()
// 實際的 session close 會由 CleanupExpired / cmd/remote-proxy 主迴圈處理。
}
// ----------------------------------------------------------------------
// Helpers
// ----------------------------------------------------------------------
// getToken 從 `X-Relay-Token` header 或 `token` query 參數取出 token。
// Header 優先於 query行為與 POC 一致。
func getToken(r *http.Request) string {
if tok := r.Header.Get("X-Relay-Token"); tok != "" {
return tok
}
return r.URL.Query().Get("token")
}
// isAcceptableToken 檢查 token 是否為 pairing 或 session 任一合法格式。
//
// 雛形階段 local agent 仍用 pairing token 連線(見 tunnel.md §2.2.1
// Phase 1 升級兩階段 token 後仍然是 session token 為主。此處兩者皆接受。
func isAcceptableToken(tok string) bool {
return auth.IsValidPairingToken(tok) || auth.IsValidSessionToken(tok)
}
// tokenPrefix 回傳 token 的前 8 字元log 用,避免 log 完整 token
func tokenPrefix(tok string) string {
if len(tok) <= 8 {
return tok
}
return tok[:8]
}
// isWebSocketUpgrade 判斷 request 是否為 WebSocket upgrade。
//
// B3 Review Minor #4 修補:同時檢查 Upgrade 與 Connection header
// 避免只看 Upgrade 在極端 casecurl 手工送單一 header時誤判。
// RFC 6455 §4.1 規定合法的 WS upgrade 要同時包含兩個 header。
func isWebSocketUpgrade(r *http.Request) bool {
if !strings.EqualFold(r.Header.Get("Upgrade"), "websocket") {
return false
}
// Connection header 可能是 "upgrade" 或 "keep-alive, Upgrade" 等組合,
// 用 Contains 不區分大小寫即可。
return strings.Contains(strings.ToLower(r.Header.Get("Connection")), "upgrade")
}
// writeJSONError 寫回統一格式的 JSON error對齊 API error schema
func writeJSONError(w http.ResponseWriter, status int, code, message string) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(map[string]any{
"error": map[string]any{
"code": code,
"message": message,
},
})
}
// FormatAddr 把 port 格式化為 ":{port}",供 http.Server.Addr 使用。
func FormatAddr(port int) string {
return fmt.Sprintf(":%d", port)
}