package relay import ( "bufio" "bytes" "context" "encoding/base64" "encoding/json" "io" "log/slog" "net" "net/http" "net/http/httptest" "net/url" "strings" "testing" "time" "github.com/gorilla/websocket" "github.com/hashicorp/yamux" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "visiona-backend/internal/session" "visiona-backend/internal/wsconn" ) // testPairingToken 是一個格式合法的 pairing token,用於測試。 const testPairingToken = "vAc_0123456789abcdef0123456789abcdef" // startFakeLocalAgent 啟動一個「假 local agent」: // - 對指定 relay URL 開 WebSocket // - 在 WS 上建立 yamux Client // - 對每一個 stream 做 http.ReadRequest → 回傳 handler 提供的 response // // 這模擬 POC edge-ai-server 的 tunnel client 角色,用於驗證 relay forwarding 路徑。 // // handler 的 http.Handler 對應「local server(127.0.0.1:3721)」;此函式會在 // tunnel stream 之上直接用 http.ReadRequest 把請求轉給 handler 並寫回 response。 func startFakeLocalAgent(t *testing.T, relayURL string, token string, handler http.Handler) (stop func()) { t.Helper() u, err := url.Parse(relayURL) require.NoError(t, err) q := u.Query() q.Set("token", token) u.RawQuery = q.Encode() rawWS, _, err := websocket.DefaultDialer.Dial(u.String(), nil) require.NoError(t, err) netConn := wsconn.New(rawWS) ym, err := yamux.Client(netConn, yamux.DefaultConfig()) require.NoError(t, err) done := make(chan struct{}) go func() { defer close(done) for { stream, aerr := ym.Accept() if aerr != nil { return } go func(s net.Conn) { defer s.Close() req, rerr := http.ReadRequest(bufio.NewReader(s)) if rerr != nil { return } // handler 需要一個 ResponseWriter 能寫回 raw stream; // 用 httptest.NewRecorder 收集 response 再自己寫回。 rec := httptest.NewRecorder() handler.ServeHTTP(rec, req) result := rec.Result() defer result.Body.Close() _ = result.Write(s) }(stream) } }() return func() { _ = ym.Close() _ = rawWS.Close() <-done } } // 以 stdlib net.Conn alias(避免再 import 一次)。 // yamux.Client.Accept() 回傳 net.Conn,此 alias 只為測試可讀性。 // 注意:這裡沒有實際 type 定義,直接使用 stdlib 的 net.Conn。 // TestServer_TunnelConnect_RejectsMissingToken 驗證沒帶 token 的 upgrade 會被拒。 func TestServer_TunnelConnect_RejectsMissingToken(t *testing.T) { store := session.NewInMemoryStore() srv := NewServer(store, slog.Default()) mux := http.NewServeMux() mux.HandleFunc("/tunnel/connect", srv.HandleTunnelConnect) ts := httptest.NewServer(mux) defer ts.Close() resp, err := http.Get(ts.URL + "/tunnel/connect") require.NoError(t, err) defer resp.Body.Close() assert.Equal(t, http.StatusUnauthorized, resp.StatusCode) } // TestServer_TunnelConnect_RejectsInvalidTokenFormat 驗證 token 格式錯誤會被拒。 func TestServer_TunnelConnect_RejectsInvalidTokenFormat(t *testing.T) { store := session.NewInMemoryStore() srv := NewServer(store, slog.Default()) mux := http.NewServeMux() mux.HandleFunc("/tunnel/connect", srv.HandleTunnelConnect) ts := httptest.NewServer(mux) defer ts.Close() resp, err := http.Get(ts.URL + "/tunnel/connect?token=garbage") require.NoError(t, err) defer resp.Body.Close() assert.Equal(t, http.StatusUnauthorized, resp.StatusCode) } // TestServer_TunnelConnect_RegistersAndUnregisters 驗證: // - 合法 token → upgrade 成功 → session 註冊進 store // - local agent 斷開 → session 從 store 移除 func TestServer_TunnelConnect_RegistersAndUnregisters(t *testing.T) { store := session.NewInMemoryStore() srv := NewServer(store, slog.Default(), Options{KeepAliveInterval: 500 * time.Millisecond}) mux := http.NewServeMux() mux.HandleFunc("/tunnel/connect", srv.HandleTunnelConnect) ts := httptest.NewServer(mux) defer ts.Close() wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + "/tunnel/connect" stop := startFakeLocalAgent(t, wsURL, testPairingToken, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })) // 等 register 完成 require.Eventually(t, func() bool { ok, _ := store.Exists(context.Background(), testPairingToken) return ok }, 2*time.Second, 20*time.Millisecond) // 斷線 stop() // 等 unregister require.Eventually(t, func() bool { ok, _ := store.Exists(context.Background(), testPairingToken) return !ok }, 2*time.Second, 20*time.Millisecond) } // TestServer_HandleProxy_ForwardsRequest 驗證: // - 透過 session store 找到 handle // - OpenStream + 轉發 HTTP request // - local agent 回的 response 可寫回 caller func TestServer_HandleProxy_ForwardsRequest(t *testing.T) { store := session.NewInMemoryStore() srv := NewServer(store, slog.Default(), Options{KeepAliveInterval: 500 * time.Millisecond}) mux := http.NewServeMux() mux.HandleFunc("/tunnel/connect", srv.HandleTunnelConnect) mux.HandleFunc("/proxy/", srv.HandleProxy) ts := httptest.NewServer(mux) defer ts.Close() // fake local agent:回 JSON {"ok": true, "path": <收到的 path>} wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + "/tunnel/connect" stop := startFakeLocalAgent(t, wsURL, testPairingToken, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte(`{"ok":true,"path":"` + r.URL.Path + `"}`)) })) defer stop() require.Eventually(t, func() bool { ok, _ := store.Exists(context.Background(), testPairingToken) return ok }, 2*time.Second, 20*time.Millisecond) // 透過 HandleProxy 轉發 req, _ := http.NewRequest(http.MethodGet, ts.URL+"/proxy/api/devices", nil) req.Header.Set("X-Relay-Token", testPairingToken) resp, err := http.DefaultClient.Do(req) require.NoError(t, err) defer resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) body, _ := io.ReadAll(resp.Body) assert.Contains(t, string(body), `"ok":true`) } // TestServer_HandleProxy_NoTunnel 當指定 token 沒 session 時,回 502。 func TestServer_HandleProxy_NoTunnel(t *testing.T) { store := session.NewInMemoryStore() srv := NewServer(store, slog.Default()) mux := http.NewServeMux() mux.HandleFunc("/proxy/", srv.HandleProxy) ts := httptest.NewServer(mux) defer ts.Close() req, _ := http.NewRequest(http.MethodGet, ts.URL+"/proxy/api/anything", nil) req.Header.Set("X-Relay-Token", testPairingToken) resp, err := http.DefaultClient.Do(req) require.NoError(t, err) defer resp.Body.Close() assert.Equal(t, http.StatusBadGateway, resp.StatusCode) } // TestServer_HandleRelayStatus_ReportsOnline 驗證 /relay/status?token=... 能報告連線狀態。 func TestServer_HandleRelayStatus_ReportsOnline(t *testing.T) { store := session.NewInMemoryStore() srv := NewServer(store, slog.Default(), Options{KeepAliveInterval: 500 * time.Millisecond}) mux := http.NewServeMux() mux.HandleFunc("/tunnel/connect", srv.HandleTunnelConnect) mux.HandleFunc("/relay/status", srv.HandleRelayStatus) ts := httptest.NewServer(mux) defer ts.Close() wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + "/tunnel/connect" stop := startFakeLocalAgent(t, wsURL, testPairingToken, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })) defer stop() require.Eventually(t, func() bool { ok, _ := store.Exists(context.Background(), testPairingToken) return ok }, 2*time.Second, 20*time.Millisecond) resp, err := http.Get(ts.URL + "/relay/status?token=" + testPairingToken) require.NoError(t, err) defer resp.Body.Close() var body map[string]any require.NoError(t, json.NewDecoder(resp.Body).Decode(&body)) assert.Equal(t, true, body["online"]) } // TestInternalServer_ForwardHTTP 驗證 internal forward JSON API 可以轉發 HTTP 請求。 // 這是 api-server → remote-proxy 的 Phase 0 關鍵路徑。 func TestInternalServer_ForwardHTTP(t *testing.T) { store := session.NewInMemoryStore() srv := NewServer(store, slog.Default(), Options{KeepAliveInterval: 500 * time.Millisecond}) internal := NewInternalServer(store, slog.Default()) // Tunnel server tunnelMux := http.NewServeMux() tunnelMux.HandleFunc("/tunnel/connect", srv.HandleTunnelConnect) tunnelSrv := httptest.NewServer(tunnelMux) defer tunnelSrv.Close() // Internal server internalMux := http.NewServeMux() internal.Routes(internalMux) internalSrv := httptest.NewServer(internalMux) defer internalSrv.Close() wsURL := "ws" + strings.TrimPrefix(tunnelSrv.URL, "http") + "/tunnel/connect" stop := startFakeLocalAgent(t, wsURL, testPairingToken, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.Header().Set("X-Request-Path", r.URL.Path) w.WriteHeader(http.StatusOK) _, _ = io.WriteString(w, `{"forwarded":true}`) })) defer stop() require.Eventually(t, func() bool { ok, _ := store.Exists(context.Background(), testPairingToken) return ok }, 2*time.Second, 20*time.Millisecond) // 打 internal forward payload := ForwardHTTPRequest{ SessionToken: testPairingToken, Method: http.MethodGet, Path: "/api/devices", Headers: map[string]string{"X-Test": "1"}, } bb, _ := json.Marshal(payload) resp, err := http.Post(internalSrv.URL+"/internal/forward/http", "application/json", bytes.NewReader(bb)) require.NoError(t, err) defer resp.Body.Close() require.Equal(t, http.StatusOK, resp.StatusCode) var fr ForwardHTTPResponse require.NoError(t, json.NewDecoder(resp.Body).Decode(&fr)) require.Nil(t, fr.Error, "error: %+v", fr.Error) assert.Equal(t, http.StatusOK, fr.Status) decoded, err := base64.StdEncoding.DecodeString(fr.Body) require.NoError(t, err) assert.Contains(t, string(decoded), `"forwarded":true`) } // TestInternalServer_GetSession 驗證 GET /internal/session/:token 能回傳 session 摘要。 func TestInternalServer_GetSession(t *testing.T) { store := session.NewInMemoryStore() srv := NewServer(store, slog.Default(), Options{KeepAliveInterval: 500 * time.Millisecond}) internal := NewInternalServer(store, slog.Default()) tunnelMux := http.NewServeMux() tunnelMux.HandleFunc("/tunnel/connect", srv.HandleTunnelConnect) tunnelSrv := httptest.NewServer(tunnelMux) defer tunnelSrv.Close() internalMux := http.NewServeMux() internal.Routes(internalMux) internalSrv := httptest.NewServer(internalMux) defer internalSrv.Close() wsURL := "ws" + strings.TrimPrefix(tunnelSrv.URL, "http") + "/tunnel/connect" stop := startFakeLocalAgent(t, wsURL, testPairingToken, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })) defer stop() require.Eventually(t, func() bool { ok, _ := store.Exists(context.Background(), testPairingToken) return ok }, 2*time.Second, 20*time.Millisecond) resp, err := http.Get(internalSrv.URL + "/internal/session/" + testPairingToken) require.NoError(t, err) defer resp.Body.Close() require.Equal(t, http.StatusOK, resp.StatusCode) var body map[string]any require.NoError(t, json.NewDecoder(resp.Body).Decode(&body)) assert.Equal(t, testPairingToken, body["token"]) assert.Equal(t, true, body["connected"]) } // TestInternalServer_GetSession_NotFound func TestInternalServer_GetSession_NotFound(t *testing.T) { store := session.NewInMemoryStore() internal := NewInternalServer(store, slog.Default()) mux := http.NewServeMux() internal.Routes(mux) ts := httptest.NewServer(mux) defer ts.Close() resp, err := http.Get(ts.URL + "/internal/session/vAc_ffffffffffffffffffffffffffffffff") require.NoError(t, err) defer resp.Body.Close() assert.Equal(t, http.StatusNotFound, resp.StatusCode) } // TestTokenHelpers 驗證小工具函式。 func TestTokenHelpers(t *testing.T) { assert.True(t, isAcceptableToken(testPairingToken)) assert.False(t, isAcceptableToken("not-a-token")) assert.Equal(t, "vAc_0123", tokenPrefix(testPairingToken)) assert.Equal(t, "short", tokenPrefix("short")) }