從 edge-ai-platform POC 轉為正式產品的雲端後端,含以下整合階段:
- Phase 0:雛形骨架 — `cmd/api-server` (REST :3721) + `cmd/remote-proxy`
(tunnel :3800 / internal :3801) 雙 binary 共用 internal/,沿用 POC 的
WebSocket+yamux tunnel 協定但解耦 relay 與 API
- Phase 0.6:OIDC BFF 接 Innovedus Member Center
- internal/oidc package(coreos/go-oidc + PKCE S256 + state + nonce)
- internal/usersession package(HMAC-SHA256 cookie + RotateSessionID
防 session fixation, OWASP ASVS V3.2.1)
- 4 個 OIDC handler(/api/auth/login|callback|me|logout)+ AuthMiddleware
- 完全拔除 StaticAuthProvider,OIDC 是唯一認證路徑
- 9 個 ADR(含 ADR-010 BFF / ADR-011 取代 static auth /
ADR-012 pending session shared cookie / ADR-013 PKCE-only public client)
- Phase 0.7:A1 改造 + security audit 修復
- OIDC ClientSecret 變選填,支援 stage MC 的 public PKCE-only client
(AuthStyleInParams 強制 token endpoint 不送 client_secret)
- 預留 ServiceClient* 欄位給未來 client_credentials grant
- 移除 13+ 處 resolveUserID(uc, StaticUserID) fallback 改 strict mode
(Audit C1:multi-tenant 隔離破口)
- Pairing exchange MarkUsed 失敗 abort + revoke session token(Audit M3)
- 新增 all_endpoints_require_auth_test 整合測試(51 endpoint × 401)
驗證:go test -race -count=3 ./... 17 packages 全綠 / go vet 0 warning
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
357 lines
12 KiB
Go
357 lines
12 KiB
Go
package relay
|
||
|
||
import (
|
||
"bufio"
|
||
"bytes"
|
||
"context"
|
||
"encoding/base64"
|
||
"encoding/json"
|
||
"io"
|
||
"log/slog"
|
||
"net/http"
|
||
|
||
"visiona-backend/internal/session"
|
||
)
|
||
|
||
// InternalServer 提供 api-server → remote-proxy 的 internal HTTP API。
|
||
//
|
||
// 對齊 `.autoflow/04-architecture/api/api-internal.md`:
|
||
// - POST /internal/forward/http — api-server 轉發非 WS 請求給指定 session
|
||
// - GET /internal/forward/ws — api-server 轉發 WS upgrade(Phase 0 暫為 stub)
|
||
// - GET /internal/session/:token — 查 session 是否存在與基本資訊
|
||
// - GET /internal/sessions — 列出所有在線 session(debug / metrics 用)
|
||
// - POST /internal/session/:token/close — 後台運維強制斷 tunnel
|
||
//
|
||
// 雛形安全性:只監聽 internal port(`VISIONA_PROXY_INTERNAL_PORT`,預設 3801),
|
||
// 生產環境須由網路層(security group / NetworkPolicy)阻擋外部存取。
|
||
// Phase 1 再加 mTLS / shared secret(見 api-internal.md §安全)。
|
||
type InternalServer struct {
|
||
store session.Store
|
||
logger *slog.Logger
|
||
}
|
||
|
||
// NewInternalServer 建立 internal HTTP handler。
|
||
func NewInternalServer(store session.Store, logger *slog.Logger) *InternalServer {
|
||
if logger == nil {
|
||
logger = slog.Default()
|
||
}
|
||
return &InternalServer{store: store, logger: logger}
|
||
}
|
||
|
||
// Routes 把所有 internal endpoints 註冊到 mux;caller 自行決定 listen 埠號。
|
||
//
|
||
// 兩個 forward endpoint 並存(B3 Review Major 1 修復):
|
||
// - `POST /internal/forward/http` — JSON + base64 封裝,適合簡單 JSON request/response(如 GET /healthz)
|
||
// - `POST /internal/forward/raw` — hijack 成 raw TCP,支援 streaming(MJPEG / SSE)、長連線、任意 HTTP
|
||
// 上 WS upgrade;`session.ProxyClient.OpenStream(ctx) net.Conn` 的真實底層(B4 用)
|
||
//
|
||
// 詳見 `.autoflow/04-architecture/api/api-internal.md`。
|
||
func (s *InternalServer) Routes(mux *http.ServeMux) {
|
||
mux.HandleFunc("/internal/forward/http", s.HandleForwardHTTP)
|
||
mux.HandleFunc("/internal/forward/raw", s.HandleForwardRaw)
|
||
mux.HandleFunc("/internal/forward/ws", s.HandleForwardWS)
|
||
mux.HandleFunc("/internal/session/", s.handleSessionByToken) // 含 :token 與 :token/close
|
||
mux.HandleFunc("/internal/sessions", s.HandleListSessions)
|
||
}
|
||
|
||
// ForwardHTTPRequest 是 api-server 丟給 /internal/forward/http 的請求 body(JSON)。
|
||
//
|
||
// 為了讓雛形簡單易測,我們採用 **JSON 結構化封裝** 的方式傳遞,而非 api-internal.md
|
||
// 所描述的「raw HTTP bytes」。兩者等價,未來可在不影響 API handler 的情況下切換。
|
||
// 這個 JSON 格式是 Phase 0 雛形的便利選擇(見 task B3 prompt)。
|
||
type ForwardHTTPRequest struct {
|
||
// SessionToken 是 local agent tunnel 的 token。
|
||
SessionToken string `json:"session_token"`
|
||
// Method 例:GET / POST。
|
||
Method string `json:"method"`
|
||
// Path 例:/api/devices(不含 scheme + host;local agent 會自己補)。
|
||
Path string `json:"path"`
|
||
// Headers 要帶的 HTTP headers。
|
||
Headers map[string]string `json:"headers,omitempty"`
|
||
// Body 是 base64 編碼的 request body;空字串 → 無 body。
|
||
Body string `json:"body,omitempty"`
|
||
}
|
||
|
||
// ForwardHTTPResponse 是 /internal/forward/http 的回應 body(JSON)。
|
||
type ForwardHTTPResponse struct {
|
||
Status int `json:"status"`
|
||
Headers map[string][]string `json:"headers,omitempty"`
|
||
// Body 是 base64 編碼的 response body。
|
||
Body string `json:"body,omitempty"`
|
||
// Error 在轉發過程失敗時填寫(tunnel 斷、stream 失敗等)。
|
||
Error *ForwardHTTPError `json:"error,omitempty"`
|
||
}
|
||
|
||
// ForwardHTTPError 描述轉發失敗的原因。
|
||
type ForwardHTTPError struct {
|
||
Code string `json:"code"`
|
||
Message string `json:"message"`
|
||
}
|
||
|
||
// HandleForwardHTTP 實作 POST /internal/forward/http。
|
||
//
|
||
// 雛形採 JSON 格式(ForwardHTTPRequest/Response),適合簡單的一次性 JSON request/response。
|
||
// **不支援 streaming(MJPEG / SSE / chunked)與 WebSocket**;這類呼叫應改走
|
||
// `POST /internal/forward/raw`(B3 Review Major 1 修復後新增的 raw bytes endpoint)。
|
||
//
|
||
// 為何保留兩條路徑?
|
||
// - JSON 版:api-server 對簡單 API(如 GET /healthz、POST /api/devices)好寫好測
|
||
// - Raw 版:`session.ProxyClient.OpenStream(ctx) net.Conn` 的底層,streaming friendly
|
||
//
|
||
// 注意:此 handler 不支援 Flusher 串流回寫(JSON 封裝本質上不能串流)。
|
||
func (s *InternalServer) HandleForwardHTTP(w http.ResponseWriter, r *http.Request) {
|
||
if r.Method != http.MethodPost {
|
||
writeJSONError(w, http.StatusMethodNotAllowed, "METHOD_NOT_ALLOWED", "POST required")
|
||
return
|
||
}
|
||
|
||
var req ForwardHTTPRequest
|
||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||
writeJSONError(w, http.StatusBadRequest, "INVALID_JSON", err.Error())
|
||
return
|
||
}
|
||
if req.SessionToken == "" {
|
||
writeJSONError(w, http.StatusBadRequest, "MISSING_SESSION_TOKEN", "session_token required")
|
||
return
|
||
}
|
||
if req.Method == "" {
|
||
req.Method = http.MethodGet
|
||
}
|
||
if req.Path == "" {
|
||
req.Path = "/"
|
||
}
|
||
|
||
resp, ferr := forwardOverTunnel(r.Context(), s.store, req)
|
||
if ferr != nil {
|
||
s.logger.Warn("internal forward failed",
|
||
"error", ferr.Error(),
|
||
"token_prefix", tokenPrefix(req.SessionToken),
|
||
"method", req.Method,
|
||
"path", req.Path)
|
||
}
|
||
|
||
w.Header().Set("Content-Type", "application/json")
|
||
if resp.Error != nil {
|
||
// 轉發錯誤 → 回 502(同 api-internal.md TUNNEL_DISCONNECTED 語意)
|
||
w.WriteHeader(http.StatusBadGateway)
|
||
} else {
|
||
w.WriteHeader(http.StatusOK)
|
||
}
|
||
_ = json.NewEncoder(w).Encode(resp)
|
||
}
|
||
|
||
// HandleForwardWS 是 /internal/forward/ws 的 stub(Phase 0 雛形)。
|
||
//
|
||
// 完整實作需要 Hijack + WebSocket relay,與 HandleProxy.proxyWebSocket 類似;
|
||
// 為了不過度複雜化 B3,這裡先回 501;真正的 WS forward 在 B5 接入前端時補齊。
|
||
func (s *InternalServer) HandleForwardWS(w http.ResponseWriter, r *http.Request) {
|
||
writeJSONError(w, http.StatusNotImplemented, "NOT_IMPLEMENTED",
|
||
"WS forward stub — will be implemented in B5 when frontend connects")
|
||
}
|
||
|
||
// handleSessionByToken 分派 /internal/session/:token 與 /internal/session/:token/close。
|
||
func (s *InternalServer) handleSessionByToken(w http.ResponseWriter, r *http.Request) {
|
||
const prefix = "/internal/session/"
|
||
rest := r.URL.Path[len(prefix):]
|
||
if rest == "" {
|
||
writeJSONError(w, http.StatusBadRequest, "MISSING_TOKEN", "token required in path")
|
||
return
|
||
}
|
||
|
||
// /internal/session/:token/close
|
||
if idx := indexByte(rest, '/'); idx != -1 {
|
||
token := rest[:idx]
|
||
action := rest[idx+1:]
|
||
if action == "close" && r.Method == http.MethodPost {
|
||
s.closeSession(w, r, token)
|
||
return
|
||
}
|
||
writeJSONError(w, http.StatusNotFound, "NOT_FOUND", "unknown action: "+action)
|
||
return
|
||
}
|
||
|
||
// /internal/session/:token
|
||
if r.Method != http.MethodGet {
|
||
writeJSONError(w, http.StatusMethodNotAllowed, "METHOD_NOT_ALLOWED", "GET required")
|
||
return
|
||
}
|
||
s.getSession(w, r, rest)
|
||
}
|
||
|
||
func (s *InternalServer) getSession(w http.ResponseWriter, r *http.Request, token string) {
|
||
h, err := s.store.Lookup(r.Context(), token)
|
||
if err != nil {
|
||
writeJSONError(w, http.StatusNotFound, "NOT_FOUND", "session not found")
|
||
return
|
||
}
|
||
sum := h.Summary()
|
||
w.Header().Set("Content-Type", "application/json")
|
||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||
"token": sum.Token,
|
||
"connected": !h.IsClosed(),
|
||
"connected_at": sum.ConnectedAt,
|
||
"last_heartbeat": sum.LastHeartbeat,
|
||
"remote_addr": sum.RemoteAddr,
|
||
"user_id": sum.UserID,
|
||
"device_id": sum.DeviceID,
|
||
})
|
||
}
|
||
|
||
func (s *InternalServer) closeSession(w http.ResponseWriter, r *http.Request, token string) {
|
||
h, err := s.store.Lookup(r.Context(), token)
|
||
if err != nil {
|
||
writeJSONError(w, http.StatusNotFound, "NOT_FOUND", "session not found")
|
||
return
|
||
}
|
||
_ = h.Close()
|
||
_ = s.store.Unregister(r.Context(), token)
|
||
w.Header().Set("Content-Type", "application/json")
|
||
_ = json.NewEncoder(w).Encode(map[string]any{"closed": true})
|
||
}
|
||
|
||
// HandleListSessions 實作 GET /internal/sessions。
|
||
func (s *InternalServer) HandleListSessions(w http.ResponseWriter, r *http.Request) {
|
||
summaries, err := s.store.List(r.Context())
|
||
if err != nil {
|
||
writeJSONError(w, http.StatusInternalServerError, "LIST_FAILED", err.Error())
|
||
return
|
||
}
|
||
w.Header().Set("Content-Type", "application/json")
|
||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||
"sessions": summaries,
|
||
"total": len(summaries),
|
||
})
|
||
}
|
||
|
||
// forwardOverTunnel 是 /internal/forward/http 的核心:
|
||
// 1. store.Lookup 找到 handle
|
||
// 2. OpenStream
|
||
// 3. 在 stream 上組 HTTP request 並讀取 response
|
||
// 4. 封裝回 ForwardHTTPResponse
|
||
func forwardOverTunnel(ctx context.Context, store session.Store, req ForwardHTTPRequest) (ForwardHTTPResponse, error) {
|
||
h, err := store.Lookup(ctx, req.SessionToken)
|
||
if err != nil || h.IsClosed() {
|
||
return ForwardHTTPResponse{
|
||
Error: &ForwardHTTPError{
|
||
Code: "TUNNEL_DISCONNECTED",
|
||
Message: "session not connected",
|
||
},
|
||
}, err
|
||
}
|
||
|
||
stream, err := h.OpenStream(ctx)
|
||
if err != nil {
|
||
return ForwardHTTPResponse{
|
||
Error: &ForwardHTTPError{
|
||
Code: "TUNNEL_ERROR",
|
||
Message: "open stream failed: " + err.Error(),
|
||
},
|
||
}, err
|
||
}
|
||
defer stream.Close()
|
||
|
||
bodyBytes, err := decodeBase64(req.Body)
|
||
if err != nil {
|
||
return ForwardHTTPResponse{
|
||
Error: &ForwardHTTPError{
|
||
Code: "INVALID_BODY",
|
||
Message: "body base64 decode failed",
|
||
},
|
||
}, err
|
||
}
|
||
|
||
httpReq, err := http.NewRequest(req.Method, req.Path, bytesReader(bodyBytes))
|
||
if err != nil {
|
||
return ForwardHTTPResponse{
|
||
Error: &ForwardHTTPError{
|
||
Code: "INVALID_REQUEST",
|
||
Message: "build request failed: " + err.Error(),
|
||
},
|
||
}, err
|
||
}
|
||
// local agent 會自行覆寫 Host;這裡只保留 "127.0.0.1"(placeholder)
|
||
httpReq.URL.Scheme = "http"
|
||
httpReq.URL.Host = "127.0.0.1"
|
||
httpReq.RequestURI = ""
|
||
httpReq.Host = "127.0.0.1"
|
||
if len(bodyBytes) > 0 {
|
||
httpReq.ContentLength = int64(len(bodyBytes))
|
||
}
|
||
for k, v := range req.Headers {
|
||
httpReq.Header.Set(k, v)
|
||
}
|
||
// 設定 Close=false 保留長連線語意由 yamux / local agent 決定
|
||
httpReq.Close = false
|
||
|
||
if err := httpReq.Write(stream); err != nil {
|
||
return ForwardHTTPResponse{
|
||
Error: &ForwardHTTPError{
|
||
Code: "TUNNEL_WRITE_ERROR",
|
||
Message: "write request to tunnel failed: " + err.Error(),
|
||
},
|
||
}, err
|
||
}
|
||
|
||
httpResp, err := http.ReadResponse(bufio.NewReader(stream), httpReq)
|
||
if err != nil {
|
||
return ForwardHTTPResponse{
|
||
Error: &ForwardHTTPError{
|
||
Code: "TUNNEL_READ_ERROR",
|
||
Message: "read response from tunnel failed: " + err.Error(),
|
||
},
|
||
}, err
|
||
}
|
||
defer httpResp.Body.Close()
|
||
|
||
respBody, err := io.ReadAll(httpResp.Body)
|
||
if err != nil {
|
||
return ForwardHTTPResponse{
|
||
Error: &ForwardHTTPError{
|
||
Code: "TUNNEL_READ_ERROR",
|
||
Message: "read response body failed: " + err.Error(),
|
||
},
|
||
}, err
|
||
}
|
||
|
||
return ForwardHTTPResponse{
|
||
Status: httpResp.StatusCode,
|
||
Headers: httpResp.Header,
|
||
Body: encodeBase64(respBody),
|
||
}, nil
|
||
}
|
||
|
||
// indexByte 找 rune b 在 s 的第一個位置;沒找到回 -1。
|
||
// 另寫是為了不 import strings / bytes,避免 forwardOverTunnel 附近多一個 dep。
|
||
func indexByte(s string, b byte) int {
|
||
for i := 0; i < len(s); i++ {
|
||
if s[i] == b {
|
||
return i
|
||
}
|
||
}
|
||
return -1
|
||
}
|
||
|
||
// bytesReader 建立 io.Reader;body 為 nil / 空時回 nil(http.NewRequest 接受)。
|
||
func bytesReader(body []byte) io.Reader {
|
||
if len(body) == 0 {
|
||
return nil
|
||
}
|
||
return bytes.NewReader(body)
|
||
}
|
||
|
||
// decodeBase64 解碼 Forward request body;空字串回 nil。
|
||
func decodeBase64(s string) ([]byte, error) {
|
||
if s == "" {
|
||
return nil, nil
|
||
}
|
||
return base64.StdEncoding.DecodeString(s)
|
||
}
|
||
|
||
// encodeBase64 編碼 Forward response body。
|
||
func encodeBase64(b []byte) string {
|
||
if len(b) == 0 {
|
||
return ""
|
||
}
|
||
return base64.StdEncoding.EncodeToString(b)
|
||
}
|