// integration_test.go — AB6:tunnel ↔ 內部 HTTP server 的端對端轉發驗證。 // // 覆蓋 TDD §4.2 / §5.4 / tunnel.md §3(資料流)完整鏈路: // // fake relay (WebSocket + yamux.Server) 端 // ↓ yamux.OpenStream() // ↓ 寫入 HTTP request bytes (http.Request.Write) // tunnel.Client.handleStream(agent 這側) // ↓ http.ReadRequest → req.URL.Host = localAddr // ↓ http.DefaultTransport.RoundTrip // local HTTP server(httptest.NewServer 綁 127.0.0.1) // ↓ handler 處理後 response // 回寫 yamux stream → relay 端 http.ReadResponse 解析 // // **這個 test 通過 = 整個 agent 的核心鏈路通了**。 // // 設計要點: // - 不依賴真 ServerController / server 子行程,用 httptest.NewServer // 扮演 local server 的角色(綁 127.0.0.1:0,由 OS 分配 random port) // - fakeRelay 已由 manager_test.go 提供;此檔新增 helper waitForSession // 暴露 yamux.Session 以便從 relay 側 OpenStream // - 測試情境涵蓋:happy path、多 requests(Concurrency)、local server down(502)、 // stream broken 時的清理 // // 參考: // - .autoflow/04-architecture/visiona-agent-tdd.md §4.2 (啟動時序) / §5.4 (stream 配對) // - .autoflow/04-architecture/tunnel.md §3 (資料流 / yamux stream 格式) package tunnel import ( "bufio" "context" "io" "net" "net/http" "net/http/httptest" "strings" "sync" "testing" "time" ) // --------------------------------------------------------------------- // 工具:從 fake relay 端開 yamux stream,送 HTTP request,讀 response // --------------------------------------------------------------------- // sendHTTPViaRelay 從 relay 端透過 yamux 向 agent 送一個 HTTP request, // 並回傳 agent 經由 tunnel forward 本機 server 後回來的 response。 // // 這是「從雲端側主動對 agent 發 HTTP」的模擬;agent 那頭的 // Client.handleStream 會收到後 RoundTrip 到本機 server,再把 response // 原封寫回 stream。 func sendHTTPViaRelay(t *testing.T, session relayOpener, req *http.Request) (*http.Response, error) { t.Helper() stream, err := session.Open() if err != nil { return nil, err } // 寫 request 到 stream(等同 remote-proxy 的 handleInternalForward) req.RequestURI = "" // http.Request.Write 要求清空 // 設合理的超時避免測試 hang _ = stream.SetDeadline(time.Now().Add(5 * time.Second)) if err := req.Write(stream); err != nil { _ = stream.Close() return nil, err } // 讀 response br := bufio.NewReader(stream) resp, err := http.ReadResponse(br, req) if err != nil { _ = stream.Close() return nil, err } // 包裝 Body 以在 Close 時同時關 stream resp.Body = &streamBody{ReadCloser: resp.Body, stream: stream} return resp, nil } // relayOpener 抽象 fakeRelay 背後的 yamux.Session.Open 能力。 // 實務上 *yamux.Session 直接滿足這個介面。 type relayOpener interface { Open() (net.Conn, error) } type streamBody struct { io.ReadCloser stream net.Conn } func (b *streamBody) Close() error { _ = b.ReadCloser.Close() return b.stream.Close() } // newLocalHTTPServer 起一個 httptest 本機 server 扮演「agent 內部 HTTP server」。 // 回傳的 server 綁在 127.0.0.1:,呼叫端 Close() 時會自動回收。 // // Handler 規格(給 test 用): // - GET /healthz → 200 "ok" // - GET /api/echo?q=xxx → 200 echo query // - POST /api/echo → 200 echo body // - GET /api/fail500 → 500 "server error" func newLocalHTTPServer(t *testing.T) *httptest.Server { t.Helper() mux := http.NewServeMux() mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Backend", "local") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) }) mux.HandleFunc("/api/echo", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") if r.Method == http.MethodGet { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"q":"` + r.URL.Query().Get("q") + `"}`)) return } if r.Method == http.MethodPost { body, _ := io.ReadAll(r.Body) w.WriteHeader(http.StatusOK) _, _ = w.Write(body) return } w.WriteHeader(http.StatusMethodNotAllowed) }) mux.HandleFunc("/api/fail500", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) _, _ = w.Write([]byte("server error")) }) srv := httptest.NewServer(mux) return srv } // localAddrOf 從 httptest.Server.URL 取 "host:port"。 func localAddrOf(srv *httptest.Server) string { // URL 格式:http://127.0.0.1:PORT return strings.TrimPrefix(srv.URL, "http://") } // --------------------------------------------------------------------- // TEST: Happy path — relay 開 stream → agent forward 到 local server → response // --------------------------------------------------------------------- // TestForwardHappyPath 驗證 tunnel 收到雲端請求後,正確 forward 到本機 server。 // 這是 AB6 的核心整合測試 — 通過代表鏈路全通。 func TestForwardHappyPath(t *testing.T) { // 1. 起真 local HTTP server localSrv := newLocalHTTPServer(t) defer localSrv.Close() // 2. 起 fake remote-proxy(WebSocket endpoint) fr := newFakeRelay(t) defer fr.close() // 3. 啟動 Tunnel Manager(連 fake relay,轉發到本機 server) m := NewManager(Config{ RelayURL: fr.wsURL(), SessionToken: "vAs_integration", LocalAddr: localAddrOf(localSrv), }) defer m.Close() ctx, cancel := context.WithCancel(context.Background()) defer cancel() if err := m.Start(ctx); err != nil { t.Fatalf("Manager.Start: %v", err) } waitForState(t, m, StateOnline, 3*time.Second) // 4. 從 fake relay 拿到 yamux session session := fr.waitForSession(t, 3*time.Second) // 5. 情境 A:GET /healthz req, _ := http.NewRequest(http.MethodGet, "http://"+localAddrOf(localSrv)+"/healthz", nil) resp, err := sendHTTPViaRelay(t, session, req) if err != nil { t.Fatalf("sendHTTPViaRelay healthz: %v", err) } if resp.StatusCode != http.StatusOK { t.Errorf("healthz status = %d, want 200", resp.StatusCode) } if got := resp.Header.Get("X-Backend"); got != "local" { t.Errorf("healthz X-Backend = %q, want 'local' (證明 response 是由 local server 產生而非中間層)", got) } body, _ := io.ReadAll(resp.Body) resp.Body.Close() if string(body) != "ok" { t.Errorf("healthz body = %q, want 'ok'", string(body)) } // 6. 情境 B:GET /api/echo?q=hello(query string 要能正確穿過 tunnel) req2, _ := http.NewRequest(http.MethodGet, "http://"+localAddrOf(localSrv)+"/api/echo?q=hello", nil) resp2, err := sendHTTPViaRelay(t, session, req2) if err != nil { t.Fatalf("sendHTTPViaRelay echo: %v", err) } body2, _ := io.ReadAll(resp2.Body) resp2.Body.Close() if got, want := string(body2), `{"q":"hello"}`; got != want { t.Errorf("echo body = %q, want %q", got, want) } // 7. 情境 C:POST /api/echo with body(request body 要能穿過) req3, _ := http.NewRequest(http.MethodPost, "http://"+localAddrOf(localSrv)+"/api/echo", strings.NewReader(`{"msg":"pong"}`)) req3.Header.Set("Content-Type", "application/json") resp3, err := sendHTTPViaRelay(t, session, req3) if err != nil { t.Fatalf("sendHTTPViaRelay post: %v", err) } body3, _ := io.ReadAll(resp3.Body) resp3.Body.Close() if got, want := string(body3), `{"msg":"pong"}`; got != want { t.Errorf("post echo body = %q, want %q", got, want) } } // TestForwardConcurrentRequests:多個 yamux stream 並行 forward 不互相干擾。 // yamux 本身支援多 stream 並行,這個 test 驗證 agent 端的 handleStream 能正確 // 處理並行 stream(每個 stream 各自一個 goroutine,不共享狀態)。 func TestForwardConcurrentRequests(t *testing.T) { localSrv := newLocalHTTPServer(t) defer localSrv.Close() fr := newFakeRelay(t) defer fr.close() m := NewManager(Config{ RelayURL: fr.wsURL(), SessionToken: "vAs_concurrent", LocalAddr: localAddrOf(localSrv), }) defer m.Close() ctx, cancel := context.WithCancel(context.Background()) defer cancel() if err := m.Start(ctx); err != nil { t.Fatalf("Start: %v", err) } waitForState(t, m, StateOnline, 3*time.Second) session := fr.waitForSession(t, 3*time.Second) // 開 20 個並行 request,每個帶不同的 q 參數,驗證每個回傳的 body 對應自己的 q。 const N = 20 var wg sync.WaitGroup errs := make([]error, N) bodies := make([]string, N) for i := 0; i < N; i++ { wg.Add(1) go func(idx int) { defer wg.Done() q := "v" + string(rune('a'+idx%26)) req, _ := http.NewRequest(http.MethodGet, "http://"+localAddrOf(localSrv)+"/api/echo?q="+q, nil) resp, err := sendHTTPViaRelay(t, session, req) if err != nil { errs[idx] = err return } b, _ := io.ReadAll(resp.Body) resp.Body.Close() bodies[idx] = string(b) }(i) } wg.Wait() for i := 0; i < N; i++ { if errs[i] != nil { t.Errorf("req %d err: %v", i, errs[i]) continue } wantQ := "v" + string(rune('a'+i%26)) want := `{"q":"` + wantQ + `"}` if bodies[i] != want { t.Errorf("req %d body = %q, want %q (stream 間有交錯?)", i, bodies[i], want) } } } // TestForwardLocalServerDown:local server 不可達時,tunnel 應回 502 Bad Gateway。 // // 這是錯誤處理中最關鍵的一條路徑:local server 在 tunnel 已建立之後才掛掉(例如 // server 子行程 crash),雲端請求進來時 RoundTrip 會失敗,handleStream 應寫一個 // 502 給 stream,而不是讓 stream 直接斷(那樣雲端會以為 tunnel 壞掉)。 func TestForwardLocalServerDown(t *testing.T) { // 1. 先起 local server,讓 tunnel 連上時能 ready // 但 SessionToken / LocalAddr 指向一個「已關閉」的 local server port。 localSrv := newLocalHTTPServer(t) deadAddr := localAddrOf(localSrv) // 立刻關掉,port 會釋出但沒人 listen — 等同 server 掛掉的情境 localSrv.Close() // 稍等 TCP 狀態清理 time.Sleep(50 * time.Millisecond) fr := newFakeRelay(t) defer fr.close() m := NewManager(Config{ RelayURL: fr.wsURL(), SessionToken: "vAs_down", LocalAddr: deadAddr, }) defer m.Close() ctx, cancel := context.WithCancel(context.Background()) defer cancel() if err := m.Start(ctx); err != nil { t.Fatalf("Start: %v", err) } waitForState(t, m, StateOnline, 3*time.Second) session := fr.waitForSession(t, 3*time.Second) // 發一個 request,預期拿到 502(local RoundTrip 失敗 → handleStream 寫 502 回 stream) req, _ := http.NewRequest(http.MethodGet, "http://"+deadAddr+"/healthz", nil) resp, err := sendHTTPViaRelay(t, session, req) if err != nil { t.Fatalf("sendHTTPViaRelay: %v (預期應拿到 502 而非 stream 錯誤)", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusBadGateway { t.Errorf("status = %d, want %d (502 — local server down 的訊號)", resp.StatusCode, http.StatusBadGateway) } } // TestForwardUpstream500:local server 回 500 時,tunnel 要原封轉回(不當作自己的錯誤)。 // 確保 agent 不會吃掉 local server 的 error response — RoundTrip 對 5xx 不算 err。 func TestForwardUpstream500(t *testing.T) { localSrv := newLocalHTTPServer(t) defer localSrv.Close() fr := newFakeRelay(t) defer fr.close() m := NewManager(Config{ RelayURL: fr.wsURL(), SessionToken: "vAs_500", LocalAddr: localAddrOf(localSrv), }) defer m.Close() ctx, cancel := context.WithCancel(context.Background()) defer cancel() if err := m.Start(ctx); err != nil { t.Fatalf("Start: %v", err) } waitForState(t, m, StateOnline, 3*time.Second) session := fr.waitForSession(t, 3*time.Second) req, _ := http.NewRequest(http.MethodGet, "http://"+localAddrOf(localSrv)+"/api/fail500", nil) resp, err := sendHTTPViaRelay(t, session, req) if err != nil { t.Fatalf("sendHTTPViaRelay: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusInternalServerError { t.Errorf("status = %d, want 500(local server 的錯誤應原封轉回)", resp.StatusCode) } body, _ := io.ReadAll(resp.Body) if string(body) != "server error" { t.Errorf("body = %q, want 'server error'", string(body)) } }