package relay import ( "bufio" "bytes" "context" "encoding/base64" "encoding/json" "io" "log/slog" "net/http" "visiona-backend/internal/session" ) // InternalServer 提供 api-server → remote-proxy 的 internal HTTP API。 // // 對齊 `.autoflow/04-architecture/api/api-internal.md`: // - POST /internal/forward/http — api-server 轉發非 WS 請求給指定 session // - GET /internal/forward/ws — api-server 轉發 WS upgrade(Phase 0 暫為 stub) // - GET /internal/session/:token — 查 session 是否存在與基本資訊 // - GET /internal/sessions — 列出所有在線 session(debug / metrics 用) // - POST /internal/session/:token/close — 後台運維強制斷 tunnel // // 雛形安全性:只監聽 internal port(`VISIONA_PROXY_INTERNAL_PORT`,預設 3801), // 生產環境須由網路層(security group / NetworkPolicy)阻擋外部存取。 // Phase 1 再加 mTLS / shared secret(見 api-internal.md §安全)。 type InternalServer struct { store session.Store logger *slog.Logger } // NewInternalServer 建立 internal HTTP handler。 func NewInternalServer(store session.Store, logger *slog.Logger) *InternalServer { if logger == nil { logger = slog.Default() } return &InternalServer{store: store, logger: logger} } // Routes 把所有 internal endpoints 註冊到 mux;caller 自行決定 listen 埠號。 // // 兩個 forward endpoint 並存(B3 Review Major 1 修復): // - `POST /internal/forward/http` — JSON + base64 封裝,適合簡單 JSON request/response(如 GET /healthz) // - `POST /internal/forward/raw` — hijack 成 raw TCP,支援 streaming(MJPEG / SSE)、長連線、任意 HTTP // 上 WS upgrade;`session.ProxyClient.OpenStream(ctx) net.Conn` 的真實底層(B4 用) // // 詳見 `.autoflow/04-architecture/api/api-internal.md`。 func (s *InternalServer) Routes(mux *http.ServeMux) { mux.HandleFunc("/internal/forward/http", s.HandleForwardHTTP) mux.HandleFunc("/internal/forward/raw", s.HandleForwardRaw) mux.HandleFunc("/internal/forward/ws", s.HandleForwardWS) mux.HandleFunc("/internal/session/", s.handleSessionByToken) // 含 :token 與 :token/close mux.HandleFunc("/internal/sessions", s.HandleListSessions) } // ForwardHTTPRequest 是 api-server 丟給 /internal/forward/http 的請求 body(JSON)。 // // 為了讓雛形簡單易測,我們採用 **JSON 結構化封裝** 的方式傳遞,而非 api-internal.md // 所描述的「raw HTTP bytes」。兩者等價,未來可在不影響 API handler 的情況下切換。 // 這個 JSON 格式是 Phase 0 雛形的便利選擇(見 task B3 prompt)。 type ForwardHTTPRequest struct { // SessionToken 是 local agent tunnel 的 token。 SessionToken string `json:"session_token"` // Method 例:GET / POST。 Method string `json:"method"` // Path 例:/api/devices(不含 scheme + host;local agent 會自己補)。 Path string `json:"path"` // Headers 要帶的 HTTP headers。 Headers map[string]string `json:"headers,omitempty"` // Body 是 base64 編碼的 request body;空字串 → 無 body。 Body string `json:"body,omitempty"` } // ForwardHTTPResponse 是 /internal/forward/http 的回應 body(JSON)。 type ForwardHTTPResponse struct { Status int `json:"status"` Headers map[string][]string `json:"headers,omitempty"` // Body 是 base64 編碼的 response body。 Body string `json:"body,omitempty"` // Error 在轉發過程失敗時填寫(tunnel 斷、stream 失敗等)。 Error *ForwardHTTPError `json:"error,omitempty"` } // ForwardHTTPError 描述轉發失敗的原因。 type ForwardHTTPError struct { Code string `json:"code"` Message string `json:"message"` } // HandleForwardHTTP 實作 POST /internal/forward/http。 // // 雛形採 JSON 格式(ForwardHTTPRequest/Response),適合簡單的一次性 JSON request/response。 // **不支援 streaming(MJPEG / SSE / chunked)與 WebSocket**;這類呼叫應改走 // `POST /internal/forward/raw`(B3 Review Major 1 修復後新增的 raw bytes endpoint)。 // // 為何保留兩條路徑? // - JSON 版:api-server 對簡單 API(如 GET /healthz、POST /api/devices)好寫好測 // - Raw 版:`session.ProxyClient.OpenStream(ctx) net.Conn` 的底層,streaming friendly // // 注意:此 handler 不支援 Flusher 串流回寫(JSON 封裝本質上不能串流)。 func (s *InternalServer) HandleForwardHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeJSONError(w, http.StatusMethodNotAllowed, "METHOD_NOT_ALLOWED", "POST required") return } var req ForwardHTTPRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { writeJSONError(w, http.StatusBadRequest, "INVALID_JSON", err.Error()) return } if req.SessionToken == "" { writeJSONError(w, http.StatusBadRequest, "MISSING_SESSION_TOKEN", "session_token required") return } if req.Method == "" { req.Method = http.MethodGet } if req.Path == "" { req.Path = "/" } resp, ferr := forwardOverTunnel(r.Context(), s.store, req) if ferr != nil { s.logger.Warn("internal forward failed", "error", ferr.Error(), "token_prefix", tokenPrefix(req.SessionToken), "method", req.Method, "path", req.Path) } w.Header().Set("Content-Type", "application/json") if resp.Error != nil { // 轉發錯誤 → 回 502(同 api-internal.md TUNNEL_DISCONNECTED 語意) w.WriteHeader(http.StatusBadGateway) } else { w.WriteHeader(http.StatusOK) } _ = json.NewEncoder(w).Encode(resp) } // HandleForwardWS 是 /internal/forward/ws 的 stub(Phase 0 雛形)。 // // 完整實作需要 Hijack + WebSocket relay,與 HandleProxy.proxyWebSocket 類似; // 為了不過度複雜化 B3,這裡先回 501;真正的 WS forward 在 B5 接入前端時補齊。 func (s *InternalServer) HandleForwardWS(w http.ResponseWriter, r *http.Request) { writeJSONError(w, http.StatusNotImplemented, "NOT_IMPLEMENTED", "WS forward stub — will be implemented in B5 when frontend connects") } // handleSessionByToken 分派 /internal/session/:token 與 /internal/session/:token/close。 func (s *InternalServer) handleSessionByToken(w http.ResponseWriter, r *http.Request) { const prefix = "/internal/session/" rest := r.URL.Path[len(prefix):] if rest == "" { writeJSONError(w, http.StatusBadRequest, "MISSING_TOKEN", "token required in path") return } // /internal/session/:token/close if idx := indexByte(rest, '/'); idx != -1 { token := rest[:idx] action := rest[idx+1:] if action == "close" && r.Method == http.MethodPost { s.closeSession(w, r, token) return } writeJSONError(w, http.StatusNotFound, "NOT_FOUND", "unknown action: "+action) return } // /internal/session/:token if r.Method != http.MethodGet { writeJSONError(w, http.StatusMethodNotAllowed, "METHOD_NOT_ALLOWED", "GET required") return } s.getSession(w, r, rest) } func (s *InternalServer) getSession(w http.ResponseWriter, r *http.Request, token string) { h, err := s.store.Lookup(r.Context(), token) if err != nil { writeJSONError(w, http.StatusNotFound, "NOT_FOUND", "session not found") return } sum := h.Summary() w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{ "token": sum.Token, "connected": !h.IsClosed(), "connected_at": sum.ConnectedAt, "last_heartbeat": sum.LastHeartbeat, "remote_addr": sum.RemoteAddr, "user_id": sum.UserID, "device_id": sum.DeviceID, }) } func (s *InternalServer) closeSession(w http.ResponseWriter, r *http.Request, token string) { h, err := s.store.Lookup(r.Context(), token) if err != nil { writeJSONError(w, http.StatusNotFound, "NOT_FOUND", "session not found") return } _ = h.Close() _ = s.store.Unregister(r.Context(), token) w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{"closed": true}) } // HandleListSessions 實作 GET /internal/sessions。 func (s *InternalServer) HandleListSessions(w http.ResponseWriter, r *http.Request) { summaries, err := s.store.List(r.Context()) if err != nil { writeJSONError(w, http.StatusInternalServerError, "LIST_FAILED", err.Error()) return } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]any{ "sessions": summaries, "total": len(summaries), }) } // forwardOverTunnel 是 /internal/forward/http 的核心: // 1. store.Lookup 找到 handle // 2. OpenStream // 3. 在 stream 上組 HTTP request 並讀取 response // 4. 封裝回 ForwardHTTPResponse func forwardOverTunnel(ctx context.Context, store session.Store, req ForwardHTTPRequest) (ForwardHTTPResponse, error) { h, err := store.Lookup(ctx, req.SessionToken) if err != nil || h.IsClosed() { return ForwardHTTPResponse{ Error: &ForwardHTTPError{ Code: "TUNNEL_DISCONNECTED", Message: "session not connected", }, }, err } stream, err := h.OpenStream(ctx) if err != nil { return ForwardHTTPResponse{ Error: &ForwardHTTPError{ Code: "TUNNEL_ERROR", Message: "open stream failed: " + err.Error(), }, }, err } defer stream.Close() bodyBytes, err := decodeBase64(req.Body) if err != nil { return ForwardHTTPResponse{ Error: &ForwardHTTPError{ Code: "INVALID_BODY", Message: "body base64 decode failed", }, }, err } httpReq, err := http.NewRequest(req.Method, req.Path, bytesReader(bodyBytes)) if err != nil { return ForwardHTTPResponse{ Error: &ForwardHTTPError{ Code: "INVALID_REQUEST", Message: "build request failed: " + err.Error(), }, }, err } // local agent 會自行覆寫 Host;這裡只保留 "127.0.0.1"(placeholder) httpReq.URL.Scheme = "http" httpReq.URL.Host = "127.0.0.1" httpReq.RequestURI = "" httpReq.Host = "127.0.0.1" if len(bodyBytes) > 0 { httpReq.ContentLength = int64(len(bodyBytes)) } for k, v := range req.Headers { httpReq.Header.Set(k, v) } // 設定 Close=false 保留長連線語意由 yamux / local agent 決定 httpReq.Close = false if err := httpReq.Write(stream); err != nil { return ForwardHTTPResponse{ Error: &ForwardHTTPError{ Code: "TUNNEL_WRITE_ERROR", Message: "write request to tunnel failed: " + err.Error(), }, }, err } httpResp, err := http.ReadResponse(bufio.NewReader(stream), httpReq) if err != nil { return ForwardHTTPResponse{ Error: &ForwardHTTPError{ Code: "TUNNEL_READ_ERROR", Message: "read response from tunnel failed: " + err.Error(), }, }, err } defer httpResp.Body.Close() respBody, err := io.ReadAll(httpResp.Body) if err != nil { return ForwardHTTPResponse{ Error: &ForwardHTTPError{ Code: "TUNNEL_READ_ERROR", Message: "read response body failed: " + err.Error(), }, }, err } return ForwardHTTPResponse{ Status: httpResp.StatusCode, Headers: httpResp.Header, Body: encodeBase64(respBody), }, nil } // indexByte 找 rune b 在 s 的第一個位置;沒找到回 -1。 // 另寫是為了不 import strings / bytes,避免 forwardOverTunnel 附近多一個 dep。 func indexByte(s string, b byte) int { for i := 0; i < len(s); i++ { if s[i] == b { return i } } return -1 } // bytesReader 建立 io.Reader;body 為 nil / 空時回 nil(http.NewRequest 接受)。 func bytesReader(body []byte) io.Reader { if len(body) == 0 { return nil } return bytes.NewReader(body) } // decodeBase64 解碼 Forward request body;空字串回 nil。 func decodeBase64(s string) ([]byte, error) { if s == "" { return nil, nil } return base64.StdEncoding.DecodeString(s) } // encodeBase64 編碼 Forward response body。 func encodeBase64(b []byte) string { if len(b) == 0 { return "" } return base64.StdEncoding.EncodeToString(b) }