從 edge-ai-platform POC 轉為正式產品的雲端後端,含以下整合階段:
- Phase 0:雛形骨架 — `cmd/api-server` (REST :3721) + `cmd/remote-proxy`
(tunnel :3800 / internal :3801) 雙 binary 共用 internal/,沿用 POC 的
WebSocket+yamux tunnel 協定但解耦 relay 與 API
- Phase 0.6:OIDC BFF 接 Innovedus Member Center
- internal/oidc package(coreos/go-oidc + PKCE S256 + state + nonce)
- internal/usersession package(HMAC-SHA256 cookie + RotateSessionID
防 session fixation, OWASP ASVS V3.2.1)
- 4 個 OIDC handler(/api/auth/login|callback|me|logout)+ AuthMiddleware
- 完全拔除 StaticAuthProvider,OIDC 是唯一認證路徑
- 9 個 ADR(含 ADR-010 BFF / ADR-011 取代 static auth /
ADR-012 pending session shared cookie / ADR-013 PKCE-only public client)
- Phase 0.7:A1 改造 + security audit 修復
- OIDC ClientSecret 變選填,支援 stage MC 的 public PKCE-only client
(AuthStyleInParams 強制 token endpoint 不送 client_secret)
- 預留 ServiceClient* 欄位給未來 client_credentials grant
- 移除 13+ 處 resolveUserID(uc, StaticUserID) fallback 改 strict mode
(Audit C1:multi-tenant 隔離破口)
- Pairing exchange MarkUsed 失敗 abort + revoke session token(Audit M3)
- 新增 all_endpoints_require_auth_test 整合測試(51 endpoint × 401)
驗證:go test -race -count=3 ./... 17 packages 全綠 / go vet 0 warning
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
256 lines
9.0 KiB
Go
256 lines
9.0 KiB
Go
// proxy.go — 「把 gin 請求轉發到 local agent」的共用邏輯。
|
||
//
|
||
// 大量 device / camera / media / model load-to-device endpoint 都會走同一條路徑:
|
||
// 1. 從 UserContext 拿到當前使用者
|
||
// 2. 透過 SessionStore / ProxyClient 找到該使用者的 active session token
|
||
// 3. 用 Forwarder.ForwardHTTP 代理請求(body / headers / path 原樣送)
|
||
// 4. 把 response 原樣寫回 gin.ResponseWriter(支援 streaming)
|
||
//
|
||
// 把這段抽成 handler 產生器,讓 devices.go / camera.go 等只需宣告路徑即可。
|
||
|
||
package api
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"net/http"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/gin-gonic/gin"
|
||
|
||
"visiona-backend/internal/session"
|
||
)
|
||
|
||
// defaultProxyRequestTimeout 是「非 streaming 型」proxy 請求的整體 timeout。
|
||
//
|
||
// 對 streaming 端點(MJPEG / SSE)不套用此 timeout — 我們靠 gin 的 ctx 取消機制
|
||
// 在 browser 關閉時順帶關 conn。300s 對 scan / flash(可能很慢)夠寬鬆。
|
||
const defaultProxyRequestTimeout = 300 * time.Second
|
||
|
||
// proxyOptions 控制 proxy handler 的細部行為。
|
||
type proxyOptions struct {
|
||
// streaming 若為 true 代表 response body 可能是長連線(MJPEG / SSE);
|
||
// 這種情況下我們不套 timeout、並對 gin.Writer.Flush 啟用 chunk 推送。
|
||
streaming bool
|
||
|
||
// rewritePath 可選:若非空,就把請求 path 改寫成這個值再送到 local agent。
|
||
// 雛形大多不需要(api-server 的路徑與 local agent 的路徑一致)。
|
||
rewritePath string
|
||
}
|
||
|
||
// newProxyHandler 產生一個 gin.HandlerFunc,會把當前請求透過 Forwarder 轉發到
|
||
// local agent(由 UserContext 對應的 active session 決定)。
|
||
//
|
||
// 用法:
|
||
//
|
||
// g.GET("/devices", newProxyHandler(deps, proxyOptions{}))
|
||
// g.GET("/camera/stream", newProxyHandler(deps, proxyOptions{streaming: true}))
|
||
func newProxyHandler(deps Deps, opts proxyOptions) gin.HandlerFunc {
|
||
return func(c *gin.Context) {
|
||
// 1. 檢查必要依賴
|
||
if deps.Forwarder == nil || deps.SessionStore == nil {
|
||
WriteNotImplemented(c, "forwarder/session store not configured")
|
||
return
|
||
}
|
||
|
||
// 2. 找當前使用者的 active session token
|
||
// Phase 0.7 security fix C1 (見 .autoflow/05-implementation/review/phase-0.7-security-audit.md)
|
||
// 移除 demo-user fallback:apiGroup 下所有 handler 都被 AuthMiddleware 保護,
|
||
// 拿不到 UserContext 代表 middleware 設定錯誤,回 500 比 silent fallback 安全。
|
||
uc, ok := UserContextFrom(c)
|
||
if !ok || uc.UserID == "" {
|
||
WriteError(c, http.StatusInternalServerError, ErrCodeInternalError,
|
||
"missing user context (auth middleware misconfigured?)", nil)
|
||
return
|
||
}
|
||
userID := uc.UserID
|
||
|
||
token, err := pickActiveSessionToken(c.Request.Context(), deps.SessionStore, userID, deps.Logger)
|
||
if err != nil {
|
||
writeTunnelError(c, err)
|
||
return
|
||
}
|
||
|
||
// 3. 決定 rewrite path(可選)
|
||
outPath := c.Request.URL.Path
|
||
if opts.rewritePath != "" {
|
||
outPath = opts.rewritePath
|
||
}
|
||
if c.Request.URL.RawQuery != "" {
|
||
outPath += "?" + c.Request.URL.RawQuery
|
||
}
|
||
|
||
// 4. 組出「打給 local agent」的 http.Request
|
||
ctx := c.Request.Context()
|
||
if !opts.streaming {
|
||
// 對非 streaming 端點加個總 timeout,避免 local agent hang 住
|
||
var cancel context.CancelFunc
|
||
ctx, cancel = context.WithTimeout(ctx, defaultProxyRequestTimeout)
|
||
defer cancel()
|
||
}
|
||
|
||
outReq, err := http.NewRequestWithContext(ctx, c.Request.Method, outPath, c.Request.Body)
|
||
if err != nil {
|
||
WriteError(c, http.StatusInternalServerError, ErrCodeInternalError,
|
||
"proxy: build upstream request: "+err.Error(), nil)
|
||
return
|
||
}
|
||
// 複製 headers;過濾掉 hop-by-hop(Forwarder 不會動,但避免重複)
|
||
copyProxyRequestHeaders(c.Request.Header, outReq.Header)
|
||
// Content-Length 要保留
|
||
if cl := c.Request.ContentLength; cl > 0 {
|
||
outReq.ContentLength = cl
|
||
}
|
||
|
||
// 5. 呼叫 Forwarder
|
||
resp, err := deps.Forwarder.ForwardHTTP(ctx, token, outReq)
|
||
if err != nil {
|
||
writeTunnelError(c, err)
|
||
return
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
// 6. 把 response 寫回 gin.Writer
|
||
writeProxyResponse(c, resp, opts.streaming)
|
||
}
|
||
}
|
||
|
||
// pickActiveSessionToken 找出當前使用者在雲端的 active session token。
|
||
//
|
||
// 雛形邏輯(單一 user + 單一 agent):走 Store.List,過濾 userID 對得上的第一筆。
|
||
// OIDC 模式下 userID 是 Member Center 簽出的 OIDC sub(UUID),tunnel session 在
|
||
// pairing exchange 時被綁到同個 sub,因此能對上。
|
||
//
|
||
// 多 user / 多 device 階段(Phase 1)需要 store.ListByUser(userID) 原生介面,
|
||
// 見 session.Store TODO。
|
||
//
|
||
// Phase 0.7 security audit M2 (見 .autoflow/05-implementation/review/phase-0.7-security-audit.md)
|
||
// **保留寬鬆比對待人工介入修復**:
|
||
// - 完整修法是「s.UserID != "" && s.UserID == userID」strict equality
|
||
// - 但 prototype 的 relay.NewLocalHandle (internal/relay/local_handle.go:31)
|
||
// 在 tunnel handshake 時不查 SessionTokenStore,所以 Summary.UserID 永遠為空
|
||
// - 改 strict 會讓所有 e2e proxy 鏈路全斷(TestE2E_FullFlow_PairingToForward 等)
|
||
// - 正解需 relay 端在 HandleTunnelConnect 時拿 token 查 SessionTokenStore
|
||
// 取得 user_id 並寫入 LocalHandle.summary.UserID(屬 Phase 1 follow-up)
|
||
//
|
||
// 暫保留寬鬆比對;C1/M1 handler-side strict UserContext 已優先處理 — 任何 request
|
||
// 進入此函式時 userID 必非空(handler 在前面已 abort 500),所以唯一仍寬鬆的條件是
|
||
// s.UserID == ""(relay-side 尚未 backfill)。
|
||
//
|
||
// logger 參數保留給未來觀測(list 失敗時 log warn),目前尚未使用;測試傳 nil 即可。
|
||
func pickActiveSessionToken(ctx context.Context, store session.Store, userID string, _ any) (string, error) {
|
||
listCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
|
||
defer cancel()
|
||
|
||
summaries, err := store.List(listCtx)
|
||
if err != nil {
|
||
return "", fmt.Errorf("proxy: list sessions: %w", err)
|
||
}
|
||
if len(summaries) == 0 {
|
||
return "", session.ErrSessionNotFound
|
||
}
|
||
|
||
for _, s := range summaries {
|
||
// 寬鬆比對:handler 已確保 userID 非空(C1 strict mode);
|
||
// 暫接受 s.UserID == "" 直到 relay 端 backfill UserID(M2 待人工介入)。
|
||
if s.UserID == "" || s.UserID == userID {
|
||
return s.Token, nil
|
||
}
|
||
}
|
||
return "", session.ErrSessionNotFound
|
||
}
|
||
|
||
// writeTunnelError 把 forwarder / store 的錯誤映射到統一的 API 錯誤格式。
|
||
//
|
||
// - ErrSessionNotFound / ErrSessionClosed → 502 TUNNEL_DISCONNECTED
|
||
// - 其他 → 502 TUNNEL_ERROR(本質上是 local agent 不可達)
|
||
func writeTunnelError(c *gin.Context, err error) {
|
||
if errors.Is(err, session.ErrSessionNotFound) || errors.Is(err, session.ErrSessionClosed) {
|
||
WriteError(c, http.StatusBadGateway, ErrCodeTunnelDisconnect,
|
||
"local agent 未連線或 tunnel 斷開", nil)
|
||
return
|
||
}
|
||
WriteError(c, http.StatusBadGateway, ErrCodeTunnelError,
|
||
"tunnel error: "+err.Error(), nil)
|
||
}
|
||
|
||
// copyProxyRequestHeaders 把 src 的 headers 複製到 dst,但略過 hop-by-hop。
|
||
//
|
||
// 對齊 RFC 7230 §6.1 hop-by-hop headers:
|
||
//
|
||
// Connection, Keep-Alive, Proxy-Authenticate, Proxy-Authorization,
|
||
// TE, Trailers, Transfer-Encoding, Upgrade
|
||
//
|
||
// 這些由 Forwarder / underlying conn 自動處理,不該 blind copy。
|
||
func copyProxyRequestHeaders(src, dst http.Header) {
|
||
for name, values := range src {
|
||
if isHopByHopHeader(name) {
|
||
continue
|
||
}
|
||
// Authorization header 雛形不必送(local agent 沒有對應的 auth 系統);
|
||
// 但保留其他 custom header(X-From-Api 等 test fixture 會用)
|
||
if strings.EqualFold(name, "Authorization") {
|
||
continue
|
||
}
|
||
for _, v := range values {
|
||
dst.Add(name, v)
|
||
}
|
||
}
|
||
}
|
||
|
||
// isHopByHopHeader 回報 header 名稱是否為 hop-by-hop。
|
||
func isHopByHopHeader(name string) bool {
|
||
switch strings.ToLower(name) {
|
||
case "connection", "keep-alive", "proxy-authenticate", "proxy-authorization",
|
||
"te", "trailers", "transfer-encoding", "upgrade":
|
||
return true
|
||
}
|
||
return false
|
||
}
|
||
|
||
// writeProxyResponse 把 upstream response 原樣寫回 gin。
|
||
//
|
||
// 支援 streaming:若 streaming=true 且 response 有 Flusher,每次 Read 後立即 Flush。
|
||
// 這讓 MJPEG / SSE 的 frame 能即時抵達 browser。
|
||
func writeProxyResponse(c *gin.Context, resp *http.Response, streaming bool) {
|
||
// 複製 headers(略過 hop-by-hop)
|
||
for name, values := range resp.Header {
|
||
if isHopByHopHeader(name) {
|
||
continue
|
||
}
|
||
for _, v := range values {
|
||
c.Writer.Header().Add(name, v)
|
||
}
|
||
}
|
||
c.Writer.WriteHeader(resp.StatusCode)
|
||
|
||
if !streaming {
|
||
// 非 streaming:一口氣 copy 完
|
||
_, _ = io.Copy(c.Writer, resp.Body)
|
||
return
|
||
}
|
||
|
||
// Streaming:邊讀邊 flush。buffer 大小 8KB,平衡延遲與 syscall 次數。
|
||
buf := make([]byte, 8*1024)
|
||
flusher, _ := c.Writer.(http.Flusher)
|
||
for {
|
||
n, rerr := resp.Body.Read(buf)
|
||
if n > 0 {
|
||
if _, werr := c.Writer.Write(buf[:n]); werr != nil {
|
||
// browser 斷線 → 停止(conn 會在 resp.Body.Close 時關掉 upstream)
|
||
return
|
||
}
|
||
if flusher != nil {
|
||
flusher.Flush()
|
||
}
|
||
}
|
||
if rerr != nil {
|
||
// io.EOF 或連線結束都是正常
|
||
return
|
||
}
|
||
}
|
||
}
|