visionA/visionA-backend/internal/relay/internal_forward.go
jim800121chen 22f0837ba8 feat(visionA-backend): Phase 0 → 0.7 雲端後端(雙 binary + OIDC BFF + stage 部署)
從 edge-ai-platform POC 轉為正式產品的雲端後端,含以下整合階段:

- Phase 0:雛形骨架 — `cmd/api-server` (REST :3721) + `cmd/remote-proxy`
  (tunnel :3800 / internal :3801) 雙 binary 共用 internal/,沿用 POC 的
  WebSocket+yamux tunnel 協定但解耦 relay 與 API
- Phase 0.6:OIDC BFF 接 Innovedus Member Center
  - internal/oidc package(coreos/go-oidc + PKCE S256 + state + nonce)
  - internal/usersession package(HMAC-SHA256 cookie + RotateSessionID
    防 session fixation, OWASP ASVS V3.2.1)
  - 4 個 OIDC handler(/api/auth/login|callback|me|logout)+ AuthMiddleware
  - 完全拔除 StaticAuthProvider,OIDC 是唯一認證路徑
  - 9 個 ADR(含 ADR-010 BFF / ADR-011 取代 static auth /
    ADR-012 pending session shared cookie / ADR-013 PKCE-only public client)
- Phase 0.7:A1 改造 + security audit 修復
  - OIDC ClientSecret 變選填,支援 stage MC 的 public PKCE-only client
    (AuthStyleInParams 強制 token endpoint 不送 client_secret)
  - 預留 ServiceClient* 欄位給未來 client_credentials grant
  - 移除 13+ 處 resolveUserID(uc, StaticUserID) fallback 改 strict mode
    (Audit C1:multi-tenant 隔離破口)
  - Pairing exchange MarkUsed 失敗 abort + revoke session token(Audit M3)
  - 新增 all_endpoints_require_auth_test 整合測試(51 endpoint × 401)

驗證:go test -race -count=3 ./... 17 packages 全綠 / go vet 0 warning

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 11:21:20 +08:00

357 lines
12 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package relay
import (
"bufio"
"bytes"
"context"
"encoding/base64"
"encoding/json"
"io"
"log/slog"
"net/http"
"visiona-backend/internal/session"
)
// InternalServer 提供 api-server → remote-proxy 的 internal HTTP API。
//
// 對齊 `.autoflow/04-architecture/api/api-internal.md`
// - POST /internal/forward/http — api-server 轉發非 WS 請求給指定 session
// - GET /internal/forward/ws — api-server 轉發 WS upgradePhase 0 暫為 stub
// - GET /internal/session/:token — 查 session 是否存在與基本資訊
// - GET /internal/sessions — 列出所有在線 sessiondebug / metrics 用)
// - POST /internal/session/:token/close — 後台運維強制斷 tunnel
//
// 雛形安全性:只監聽 internal port`VISIONA_PROXY_INTERNAL_PORT`,預設 3801
// 生產環境須由網路層security group / NetworkPolicy阻擋外部存取。
// Phase 1 再加 mTLS / shared secret見 api-internal.md §安全)。
type InternalServer struct {
store session.Store
logger *slog.Logger
}
// NewInternalServer 建立 internal HTTP handler。
func NewInternalServer(store session.Store, logger *slog.Logger) *InternalServer {
if logger == nil {
logger = slog.Default()
}
return &InternalServer{store: store, logger: logger}
}
// Routes 把所有 internal endpoints 註冊到 muxcaller 自行決定 listen 埠號。
//
// 兩個 forward endpoint 並存B3 Review Major 1 修復):
// - `POST /internal/forward/http` — JSON + base64 封裝,適合簡單 JSON request/response如 GET /healthz
// - `POST /internal/forward/raw` — hijack 成 raw TCP支援 streamingMJPEG / SSE、長連線、任意 HTTP
// 上 WS upgrade`session.ProxyClient.OpenStream(ctx) net.Conn` 的真實底層B4 用)
//
// 詳見 `.autoflow/04-architecture/api/api-internal.md`。
func (s *InternalServer) Routes(mux *http.ServeMux) {
mux.HandleFunc("/internal/forward/http", s.HandleForwardHTTP)
mux.HandleFunc("/internal/forward/raw", s.HandleForwardRaw)
mux.HandleFunc("/internal/forward/ws", s.HandleForwardWS)
mux.HandleFunc("/internal/session/", s.handleSessionByToken) // 含 :token 與 :token/close
mux.HandleFunc("/internal/sessions", s.HandleListSessions)
}
// ForwardHTTPRequest 是 api-server 丟給 /internal/forward/http 的請求 bodyJSON
//
// 為了讓雛形簡單易測,我們採用 **JSON 結構化封裝** 的方式傳遞,而非 api-internal.md
// 所描述的「raw HTTP bytes」。兩者等價未來可在不影響 API handler 的情況下切換。
// 這個 JSON 格式是 Phase 0 雛形的便利選擇(見 task B3 prompt
type ForwardHTTPRequest struct {
// SessionToken 是 local agent tunnel 的 token。
SessionToken string `json:"session_token"`
// Method 例GET / POST。
Method string `json:"method"`
// Path 例:/api/devices不含 scheme + hostlocal agent 會自己補)。
Path string `json:"path"`
// Headers 要帶的 HTTP headers。
Headers map[string]string `json:"headers,omitempty"`
// Body 是 base64 編碼的 request body空字串 → 無 body。
Body string `json:"body,omitempty"`
}
// ForwardHTTPResponse 是 /internal/forward/http 的回應 bodyJSON
type ForwardHTTPResponse struct {
Status int `json:"status"`
Headers map[string][]string `json:"headers,omitempty"`
// Body 是 base64 編碼的 response body。
Body string `json:"body,omitempty"`
// Error 在轉發過程失敗時填寫tunnel 斷、stream 失敗等)。
Error *ForwardHTTPError `json:"error,omitempty"`
}
// ForwardHTTPError 描述轉發失敗的原因。
type ForwardHTTPError struct {
Code string `json:"code"`
Message string `json:"message"`
}
// HandleForwardHTTP 實作 POST /internal/forward/http。
//
// 雛形採 JSON 格式ForwardHTTPRequest/Response適合簡單的一次性 JSON request/response。
// **不支援 streamingMJPEG / SSE / chunked與 WebSocket**;這類呼叫應改走
// `POST /internal/forward/raw`B3 Review Major 1 修復後新增的 raw bytes endpoint
//
// 為何保留兩條路徑?
// - JSON 版api-server 對簡單 API如 GET /healthz、POST /api/devices好寫好測
// - Raw 版:`session.ProxyClient.OpenStream(ctx) net.Conn` 的底層streaming friendly
//
// 注意:此 handler 不支援 Flusher 串流回寫JSON 封裝本質上不能串流)。
func (s *InternalServer) HandleForwardHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
writeJSONError(w, http.StatusMethodNotAllowed, "METHOD_NOT_ALLOWED", "POST required")
return
}
var req ForwardHTTPRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeJSONError(w, http.StatusBadRequest, "INVALID_JSON", err.Error())
return
}
if req.SessionToken == "" {
writeJSONError(w, http.StatusBadRequest, "MISSING_SESSION_TOKEN", "session_token required")
return
}
if req.Method == "" {
req.Method = http.MethodGet
}
if req.Path == "" {
req.Path = "/"
}
resp, ferr := forwardOverTunnel(r.Context(), s.store, req)
if ferr != nil {
s.logger.Warn("internal forward failed",
"error", ferr.Error(),
"token_prefix", tokenPrefix(req.SessionToken),
"method", req.Method,
"path", req.Path)
}
w.Header().Set("Content-Type", "application/json")
if resp.Error != nil {
// 轉發錯誤 → 回 502同 api-internal.md TUNNEL_DISCONNECTED 語意)
w.WriteHeader(http.StatusBadGateway)
} else {
w.WriteHeader(http.StatusOK)
}
_ = json.NewEncoder(w).Encode(resp)
}
// HandleForwardWS 是 /internal/forward/ws 的 stubPhase 0 雛形)。
//
// 完整實作需要 Hijack + WebSocket relay與 HandleProxy.proxyWebSocket 類似;
// 為了不過度複雜化 B3這裡先回 501真正的 WS forward 在 B5 接入前端時補齊。
func (s *InternalServer) HandleForwardWS(w http.ResponseWriter, r *http.Request) {
writeJSONError(w, http.StatusNotImplemented, "NOT_IMPLEMENTED",
"WS forward stub — will be implemented in B5 when frontend connects")
}
// handleSessionByToken 分派 /internal/session/:token 與 /internal/session/:token/close。
func (s *InternalServer) handleSessionByToken(w http.ResponseWriter, r *http.Request) {
const prefix = "/internal/session/"
rest := r.URL.Path[len(prefix):]
if rest == "" {
writeJSONError(w, http.StatusBadRequest, "MISSING_TOKEN", "token required in path")
return
}
// /internal/session/:token/close
if idx := indexByte(rest, '/'); idx != -1 {
token := rest[:idx]
action := rest[idx+1:]
if action == "close" && r.Method == http.MethodPost {
s.closeSession(w, r, token)
return
}
writeJSONError(w, http.StatusNotFound, "NOT_FOUND", "unknown action: "+action)
return
}
// /internal/session/:token
if r.Method != http.MethodGet {
writeJSONError(w, http.StatusMethodNotAllowed, "METHOD_NOT_ALLOWED", "GET required")
return
}
s.getSession(w, r, rest)
}
func (s *InternalServer) getSession(w http.ResponseWriter, r *http.Request, token string) {
h, err := s.store.Lookup(r.Context(), token)
if err != nil {
writeJSONError(w, http.StatusNotFound, "NOT_FOUND", "session not found")
return
}
sum := h.Summary()
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{
"token": sum.Token,
"connected": !h.IsClosed(),
"connected_at": sum.ConnectedAt,
"last_heartbeat": sum.LastHeartbeat,
"remote_addr": sum.RemoteAddr,
"user_id": sum.UserID,
"device_id": sum.DeviceID,
})
}
func (s *InternalServer) closeSession(w http.ResponseWriter, r *http.Request, token string) {
h, err := s.store.Lookup(r.Context(), token)
if err != nil {
writeJSONError(w, http.StatusNotFound, "NOT_FOUND", "session not found")
return
}
_ = h.Close()
_ = s.store.Unregister(r.Context(), token)
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{"closed": true})
}
// HandleListSessions 實作 GET /internal/sessions。
func (s *InternalServer) HandleListSessions(w http.ResponseWriter, r *http.Request) {
summaries, err := s.store.List(r.Context())
if err != nil {
writeJSONError(w, http.StatusInternalServerError, "LIST_FAILED", err.Error())
return
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{
"sessions": summaries,
"total": len(summaries),
})
}
// forwardOverTunnel 是 /internal/forward/http 的核心:
// 1. store.Lookup 找到 handle
// 2. OpenStream
// 3. 在 stream 上組 HTTP request 並讀取 response
// 4. 封裝回 ForwardHTTPResponse
func forwardOverTunnel(ctx context.Context, store session.Store, req ForwardHTTPRequest) (ForwardHTTPResponse, error) {
h, err := store.Lookup(ctx, req.SessionToken)
if err != nil || h.IsClosed() {
return ForwardHTTPResponse{
Error: &ForwardHTTPError{
Code: "TUNNEL_DISCONNECTED",
Message: "session not connected",
},
}, err
}
stream, err := h.OpenStream(ctx)
if err != nil {
return ForwardHTTPResponse{
Error: &ForwardHTTPError{
Code: "TUNNEL_ERROR",
Message: "open stream failed: " + err.Error(),
},
}, err
}
defer stream.Close()
bodyBytes, err := decodeBase64(req.Body)
if err != nil {
return ForwardHTTPResponse{
Error: &ForwardHTTPError{
Code: "INVALID_BODY",
Message: "body base64 decode failed",
},
}, err
}
httpReq, err := http.NewRequest(req.Method, req.Path, bytesReader(bodyBytes))
if err != nil {
return ForwardHTTPResponse{
Error: &ForwardHTTPError{
Code: "INVALID_REQUEST",
Message: "build request failed: " + err.Error(),
},
}, err
}
// local agent 會自行覆寫 Host這裡只保留 "127.0.0.1"placeholder
httpReq.URL.Scheme = "http"
httpReq.URL.Host = "127.0.0.1"
httpReq.RequestURI = ""
httpReq.Host = "127.0.0.1"
if len(bodyBytes) > 0 {
httpReq.ContentLength = int64(len(bodyBytes))
}
for k, v := range req.Headers {
httpReq.Header.Set(k, v)
}
// 設定 Close=false 保留長連線語意由 yamux / local agent 決定
httpReq.Close = false
if err := httpReq.Write(stream); err != nil {
return ForwardHTTPResponse{
Error: &ForwardHTTPError{
Code: "TUNNEL_WRITE_ERROR",
Message: "write request to tunnel failed: " + err.Error(),
},
}, err
}
httpResp, err := http.ReadResponse(bufio.NewReader(stream), httpReq)
if err != nil {
return ForwardHTTPResponse{
Error: &ForwardHTTPError{
Code: "TUNNEL_READ_ERROR",
Message: "read response from tunnel failed: " + err.Error(),
},
}, err
}
defer httpResp.Body.Close()
respBody, err := io.ReadAll(httpResp.Body)
if err != nil {
return ForwardHTTPResponse{
Error: &ForwardHTTPError{
Code: "TUNNEL_READ_ERROR",
Message: "read response body failed: " + err.Error(),
},
}, err
}
return ForwardHTTPResponse{
Status: httpResp.StatusCode,
Headers: httpResp.Header,
Body: encodeBase64(respBody),
}, nil
}
// indexByte 找 rune b 在 s 的第一個位置;沒找到回 -1。
// 另寫是為了不 import strings / bytes避免 forwardOverTunnel 附近多一個 dep。
func indexByte(s string, b byte) int {
for i := 0; i < len(s); i++ {
if s[i] == b {
return i
}
}
return -1
}
// bytesReader 建立 io.Readerbody 為 nil / 空時回 nilhttp.NewRequest 接受)。
func bytesReader(body []byte) io.Reader {
if len(body) == 0 {
return nil
}
return bytes.NewReader(body)
}
// decodeBase64 解碼 Forward request body空字串回 nil。
func decodeBase64(s string) ([]byte, error) {
if s == "" {
return nil, nil
}
return base64.StdEncoding.DecodeString(s)
}
// encodeBase64 編碼 Forward response body。
func encodeBase64(b []byte) string {
if len(b) == 0 {
return ""
}
return base64.StdEncoding.EncodeToString(b)
}