// FAA client — visionA-backend 對 File Access Agent 的 server-to-server HTTP client。 // // Phase 0.8 只用 GET /files/{object_key}(給 promote-to-models 流程從 FAA pull NEF 用)。 // 其他 endpoint(PUT / DELETE / HEAD / metadata)目前 visionA 不需要,未來再補。 // // 設計要點: // - 走 service token(scope=files:download.read);token 由注入的 MCTokenClient 提供 // - **回 streaming body**(io.ReadCloser)— 不 io.ReadAll,避免 500MB NEF 全進 RAM // - **Phase A retry**:dial → 拿到 response header 之間的 5xx / network / timeout 失敗 // 依 §9.1 指數退避重試 max 2 次(1s, 2s)。一旦拿到 200 response(進 Phase B: // streaming body 給 caller),這層責任就結束 — body 中斷由 caller 處理(不可 replay)。 // 詳見下方 GetFile doc comment 的「Phase A vs Phase B retry」段。 // - 4xx → 對應 sentinel(401/403 → ErrServiceClientUnauthorized;404 → ErrFAAFileNotFound; // 其他 4xx → ErrFAAUnavailable,避免新增更多 sentinel) // // 與 T3 InitJob 的對比(為什麼 T3 不 retry 但 T4 GetFile retry): // - T3 InitJob:multipart **request body** 是 streaming(io.Reader 來自上游 c.Body); // 一旦 http.Client.Do 開始送 request body,io.Reader 已被消費,retry 無法 rewind → // 從第一次 attempt 起就「不可重試」。 // - T4 GetFile:GET 沒有 request body,request 完全 idempotent;retry window 涵蓋 // dial → 拿到 response header(Phase A)。Phase A 結束後(200 已到),response body // 才是「不可 replay」的 streaming,但那不在本層責任範圍 — 本層拿到 200 就 return *FAAFile。 // // 安全: // - **絕不**寫 Authorization header / service token / response body 進 log // - object_key 過長時截斷(避免 log 膨脹;FAA object_key 由 visionA 內部組,不含 user 敏感資訊) // // Phase 0.8 conversion (見 .autoflow/04-architecture/conversion.md §2.3 / §2.6 / §9.1) package conversion import ( "context" "crypto/sha256" "encoding/hex" "errors" "fmt" "io" "log/slog" "net" "net/http" "net/url" "strings" "time" ) // ========================================================================== // 對外 type / interface // ========================================================================== // FAAClient 對 File Access Agent 的 server-to-server client。 // // goroutine-safe:每次呼叫獨立 *http.Request;無內部 mutable state(cache 由注入的 MCTokenClient 管)。 type FAAClient interface { // GetFile 從 FAA pull 一個 object(server-to-server,用 service token)。 // // scope: files:download.read // // 回傳 *FAAFile.Body 是 streaming body(io.ReadCloser);**caller 必須 Close**, // 不然底層 http.Response.Body 不會釋放、connection 也回不了 pool(goroutine + fd leak)。 // 推薦 pattern: // // file, err := faa.GetFile(ctx, key) // if err != nil { return err } // defer file.Body.Close() // _, err = io.Copy(dst, file.Body) // streaming 寫進 visionA storage // // 重試行為(Phase A retry only,對齊 §9.1): // - dial / TLS / response header 階段的 5xx / network / timeout: // 指數退避重試 max 2 次(1s, 2s)— GET 沒 request body 完全 idempotent,可放心 retry // - 401 / 403 / 404 / 其他 4xx:不重試,立即 return 對應 sentinel // - ctx cancel / deadline:立即 return ctx.Err()(即使在 retry sleep 中也立即中斷) // - 一旦拿到 200 response(進 Phase B):return *FAAFile,body 由 caller 自己讀; // caller 在讀 body 時遇到網路中斷不再重試(streaming response 不可 replay) // // 錯誤映射(對齊 conversion.md §6 + errors.go): // - ctx cancel/deadline → 透傳 ctx.Err(不包成 sentinel) // - 401 / 403 → ErrServiceClientUnauthorized(對外 idp_misconfigured/500) // - 404 → ErrFAAFileNotFound(對外 faa_unavailable/502) // - 其他 4xx / 5xx exhausted / network exhausted → ErrFAAUnavailable(對外 faa_unavailable/502) GetFile(ctx context.Context, objectKey string) (*FAAFile, error) } // FAAFile 是 GetFile 成功回傳的 streaming response。 // // **caller 必須 Body.Close()**(即使中途 error,也應 defer Close)。 type FAAFile struct { // Body 是 streaming response body;caller 用 io.Copy 等方式 streaming 消費。 Body io.ReadCloser // ContentLength 對應 FAA response 的 Content-Length header。 // 若 FAA 走 chunked transfer 沒帶這個 header,值為 -1(net/http 慣例)。 ContentLength int64 // ContentType 對應 FAA response 的 Content-Type header(如 "application/octet-stream")。 ContentType string // ETag 對應 FAA response 的 ETag header(FAA 端取自 storage adapter)。 // 若 FAA 沒帶,為空字串。 ETag string } // FAAClientOpts 是 NewFAAClient 的依賴注入。 // // HTTPClient / Now / Logger 為 optional(nil 自動填預設)— 方便 unit test 注入 fake。 type FAAClientOpts struct { // BaseURL 是 FAA base URL(不帶結尾斜線)。 // 範例:http://192.168.0.130:5081 BaseURL string // Tokens 是 MCTokenClient(注入,non-nil 必填)— 用來取 service token。 Tokens MCTokenClient // HTTPClient 為 optional;nil 用預設(含 dial / response header timeout,但無整體 timeout)。 // 測試會注入 httptest.Server.Client()。 // // 為什麼預設 client 不設 Timeout: // 500MB NEF 在慢網路下 download 可能 5-10 分鐘;http.Client.Timeout 是「整體 timeout」 // 涵蓋「dial + response header + body 讀完」三段,會在大檔下載中途斷線。 // 改用 transport 層的 DialTimeout + ResponseHeaderTimeout(10s 各自)— 連線階段卡死才算 fail, // body streaming 階段交給 ctx.Done() 控制(caller 用帶 deadline 的 ctx 即可)。 HTTPClient *http.Client // Now 為 optional;nil 用 time.Now。測試會注入 fake clock。 Now func() time.Time // Logger 為 optional;nil 用 slog.Default()。 Logger *slog.Logger } // ========================================================================== // 內部固定常數 // ========================================================================== const ( // scopeFAADownloadRead 對齊 FAA README §「初步 API 邊界」與 FileAccessScopes.DownloadRead。 scopeFAADownloadRead = "files:download.read" // faaDialTimeout 是 dial 階段的 timeout(連 TCP / TLS 握手)。 // 連線一直建不起來通常是路由問題,10s 已足夠;超過視為 FAA 不可達。 faaDialTimeout = 10 * time.Second // faaResponseHeaderTimeout 是「送完 request → 收到 response status 行」的 timeout。 // 這段是 server-side 處理時間(FAA 找檔、auth validate);10s 對小檔 metadata 階段夠寬鬆。 // 注意:這個 timeout **不涵蓋 body streaming 階段**(body streaming 由 ctx 控制)。 faaResponseHeaderTimeout = 10 * time.Second // faaMaxRetries 是 Phase A 5xx / network / timeout 的最大重試次數(不含第一次)。 // 對齊 conversion.md §9.1:FAA GET /files/{key} max 2 retries(1s, 2s)。 faaMaxRetries = 2 // faaRetryBaseDelay 是指數退避的 base(1s, 2s)。 faaRetryBaseDelay = 1 * time.Second // objectKeyHashLen 是 log 中 object_key 的截短後 hash 長度(前 16 hex chars)。 objectKeyHashLen = 16 // faaErrorBodyReadCap 是失敗 response 從 body 讀進 io.Discard 的最大量(4KB)。 // 失敗時讀少量 body 主要是讓 keep-alive 能 reuse connection,避免空 body 留在 pipe。 faaErrorBodyReadCap = 4 * 1024 ) // faaEndpointKind 是 log / 錯誤分類用的 endpoint 標記(目前只有一個)。 const faaEndpointKind = "faa_get_file" // ========================================================================== // 構造 + 內部實作 // ========================================================================== // faaClient 是 FAAClient 的預設實作。 // // 套件內 unexported struct(caller 拿 interface),讓未來換實作不影響 caller。 type faaClient struct { baseURL string tokens MCTokenClient http *http.Client now func() time.Time logger *slog.Logger } // NewFAAClient 建立一個 FAAClient 實例。 // // 必填:BaseURL / Tokens。其他 optional。 // 注意:constructor 不會驗 BaseURL 連線,第一次 GetFile 才會打網路。 func NewFAAClient(opts FAAClientOpts) FAAClient { httpClient := opts.HTTPClient if httpClient == nil { httpClient = newDefaultFAAHTTPClient() } now := opts.Now if now == nil { now = time.Now } logger := opts.Logger if logger == nil { logger = slog.Default() } return &faaClient{ baseURL: strings.TrimRight(opts.BaseURL, "/"), tokens: opts.Tokens, http: httpClient, now: now, logger: logger, } } // newDefaultFAAHTTPClient 建一個適合 streaming download 的預設 http.Client。 // // 為什麼自訂 transport: // - http.Client.Timeout 不適用大檔下載(會中斷 body streaming) // - 需要分別控制 dial / response header timeout,body streaming 不限制(由 ctx 控) // // transport 其餘參數沿用 net/http DefaultTransport 的合理預設(MaxIdleConns 等)。 func newDefaultFAAHTTPClient() *http.Client { transport := &http.Transport{ DialContext: (&net.Dialer{ Timeout: faaDialTimeout, KeepAlive: 30 * time.Second, }).DialContext, ResponseHeaderTimeout: faaResponseHeaderTimeout, // 沿用 DefaultTransport 的合理預設 MaxIdleConns: 100, MaxIdleConnsPerHost: 10, IdleConnTimeout: 90 * time.Second, TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, } return &http.Client{ Transport: transport, // **不設 Timeout** — body streaming 階段由 ctx 控制 } } // ========================================================================== // GetFile — Phase A retry,Phase B 不 retry 的 streaming pull // ========================================================================== // GetFile 實作 FAAClient.GetFile。 // // 流程: // 1. 取 service token(透過 MCTokenClient;其錯誤透傳,不重新分類) // 2. 組 URL + 建 request // 3. doWithRetry:max (1 + faaMaxRetries) attempts;每 attempt 重新 c.http.Do // - 拿到 200:直接 return *FAAFile(不 close body) // - 拿到 4xx:close body 後依 status mapping 對應 sentinel,不 retry // - 拿到 5xx:close body,等 backoff 後 retry // - network / dial / responseHeader timeout:等 backoff 後 retry // - ctx cancel / deadline:立即 return ctx.Err() func (c *faaClient) GetFile(ctx context.Context, objectKey string) (*FAAFile, error) { if objectKey == "" { return nil, fmt.Errorf("conversion/faa_client: object_key is required") } keyHash := hashObjectKey(objectKey) // 1. 取 service token // ServiceToken 內部已依 §6 mapping 失敗(ErrServiceClientUnauthorized / ErrIDPMisconfigured / // ErrIDPUnavailable)— 這裡用 fmt.Errorf("%w") 透傳,不再二次包裝(避免錯誤碼被「升級」 // 成 ErrFAAUnavailable 而失去原本的 i18n 區分 idp_misconfig vs idp_down)。 token, err := c.tokens.ServiceToken(ctx, scopeFAADownloadRead) if err != nil { return nil, fmt.Errorf("conversion: get service token for faa download: %w", err) } // 2. 組 endpoint。注意 FAA 的 object_key 可能含路徑分隔符(如 "tenant/jobs/abc/output.nef")— // 用 ResolveReference 處理;net/http 內部會做 path escape,避免 "../" 等問題。 endpoint, err := c.buildFileURL(objectKey) if err != nil { return nil, fmt.Errorf("%w: build faa url: %v", ErrFAAUnavailable, err) } // 3. 進 retry loop(Phase A only) return c.doWithRetry(ctx, keyHash, endpoint, token) } // doWithRetry 是 GetFile 的 Phase A retry 執行器。 // // 與 mc_token_client.doWithRetry / converter_client.doWithRetry 結構類似,但有以下差異: // - 成功路徑回傳 *FAAFile(含未 close 的 streaming body),不是 []byte // - 沒有「每次 attempt 重新建 request」需求 — GET 沒 body,request 物件可重用, // 但為了讓 ctx-aware 行為一致(ctx cancel 後不重用舊 request),這裡每次都新建一個 // - reqBuilder 不接 token 參數 — token 在 GetFile 取一次,retry 期間沿用同一 token // (retry window 短:max 1+2+3=6s,token 不會在這段期間過期) // // 為什麼 retry 期間不重新取 token: // - 簡化:避免 token 取失敗 vs HTTP 失敗 兩種錯誤交織的處理 // - 安全:401 在這層被分類為「不可 retry」,不會走到「token expired 中途要 refresh」場景 // - 效能:cache hit 情境下成本低但仍多一次 mutex;6s window 內 token 不會 expire func (c *faaClient) doWithRetry( ctx context.Context, keyHash, endpoint, token string, ) (*FAAFile, error) { var lastErr error for attempt := 0; attempt <= faaMaxRetries; attempt++ { // retry 前等待退避;ctx cancel 立即中斷 if attempt > 0 { select { case <-ctx.Done(): // ctx cancel/deadline → 立即 return(不 retry,不包成 sentinel) return nil, ctx.Err() case <-time.After(faaRetryBackoff(attempt)): } } req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) if err != nil { // 建 request 失敗(極罕見:URL parse 異常)— 不可 retry return nil, fmt.Errorf("%w: build faa request: %v", ErrFAAUnavailable, err) } req.Header.Set("Accept", "application/octet-stream") req.Header.Set("Authorization", "Bearer "+token) file, classifiedErr, retryable := c.doOnce(req, keyHash, attempt) if classifiedErr == nil { // 成功 — file 含未 close 的 body,由 caller 接手 return file, nil } lastErr = classifiedErr if !retryable { // 4xx / 401-403 / 404 / ctx cancel:直接 return,不再 retry return nil, classifiedErr } // retryable 5xx / network / timeout:繼續下一輪 } // 用完 retry 額度 c.logger.Warn("conversion.faa.retry_exhausted", slog.String("endpoint", faaEndpointKind), slog.String("object_key_hash", keyHash), slog.Int("attempts", faaMaxRetries+1)) return nil, lastErr } // doOnce 執行一次 Phase A:發 request → 等 response header → 分類結果。 // // 回傳: // - 成功(2xx):file != nil(含未 close 的 streaming body), classifiedErr=nil, retryable=false // - 失敗:file=nil, classifiedErr 為 sentinel-wrapped error, retryable 表示是否該重試 // // 重要:成功時 caller(doWithRetry)會直接把 file 透傳出去 — 這層**不 close body**。 // 失敗時這層**會 close body**(讀少量讓 keep-alive reuse connection)。 func (c *faaClient) doOnce( req *http.Request, keyHash string, attempt int, ) (file *FAAFile, err error, retryable bool) { startedAt := c.now() res, doErr := c.http.Do(req) duration := c.now().Sub(startedAt) if doErr != nil { // network / dial / response header timeout / ctx cancel if errors.Is(doErr, context.Canceled) || errors.Is(doErr, context.DeadlineExceeded) { c.logger.Warn("conversion.faa.ctx_cancelled", slog.String("endpoint", faaEndpointKind), slog.String("object_key_hash", keyHash), slog.Int("attempt", attempt+1), slog.Duration("duration", duration)) return nil, doErr, false } c.logger.Warn("conversion.faa.network_error", slog.String("endpoint", faaEndpointKind), slog.String("object_key_hash", keyHash), slog.Int("attempt", attempt+1), slog.Duration("duration", duration), // err.Error() 不會含 secret(http.Client 錯誤訊息只有 URL + 連線層 errno), // 但仍 truncate 防 log 爆量 slog.String("err", truncate(doErr.Error(), 200))) return nil, fmt.Errorf("%w: faa network error: %v", ErrFAAUnavailable, doErr), true } // 成功(2xx):直接把 res.Body 透傳給 caller streaming 消費 — **不在這裡 close**! // 注意:成功路徑沒 defer res.Body.Close() — body 的所有權交給 *FAAFile.Body。 if res.StatusCode >= 200 && res.StatusCode < 300 { c.logger.Info("conversion.faa.get_success", slog.String("endpoint", faaEndpointKind), slog.String("object_key_hash", keyHash), slog.Int("status", res.StatusCode), slog.Int("attempt", attempt+1), slog.Int64("content_length", res.ContentLength), slog.Duration("duration", duration)) return &FAAFile{ Body: res.Body, // caller 責任 Close ContentLength: res.ContentLength, ContentType: res.Header.Get("Content-Type"), ETag: res.Header.Get("ETag"), }, nil, false } // 失敗(非 2xx):讀少量 body 做 log(避免 5xx 帶大 body 爆 log),然後 close // 讀進 io.Discard 而不是真的存下來: // - 不寫進 log(FAA 錯誤 body 可能含 requestId / 路徑等內部資訊) // - 只是讓 keep-alive 能 reuse connection(read-to-EOF or close) defer res.Body.Close() _, _ = io.CopyN(io.Discard, res.Body, faaErrorBodyReadCap) c.logger.Warn("conversion.faa.endpoint_error", slog.String("endpoint", faaEndpointKind), slog.String("object_key_hash", keyHash), slog.Int("status", res.StatusCode), slog.Int("attempt", attempt+1), slog.Duration("duration", duration)) mappedErr, isRetryable := c.mapGetFileError(res.StatusCode) return nil, mappedErr, isRetryable } // mapGetFileError 把 FAA `GET /files/{key}` 的非 2xx 對應到 sentinel + 是否 retryable。 // // 對齊 FAA Program.cs MapGet("/files/{**objectKey}") 的失敗回應: // - 401 invalid_token / validation_unavailable → ErrServiceClientUnauthorized(不 retry — secret 設定錯) // - 403 tenant_mismatch / object_key_mismatch / method_mismatch → ErrServiceClientUnauthorized(不 retry) // - 404 file_not_found → ErrFAAFileNotFound(不 retry — object 不存在) // - 400 invalid_object_key → ErrFAAUnavailable(不 retry — visionA 端 object_key 命名 bug) // - 其他 4xx → ErrFAAUnavailable(不 retry) // - 5xx → ErrFAAUnavailable(**可 retry**:FAA / 下游 storage 暫時失常) func (c *faaClient) mapGetFileError(status int) (err error, retryable bool) { switch { case status == http.StatusUnauthorized || status == http.StatusForbidden: return fmt.Errorf("%w: faa get file %d", ErrServiceClientUnauthorized, status), false case status == http.StatusNotFound: return fmt.Errorf("%w: faa get file %d", ErrFAAFileNotFound, status), false case status >= 400 && status < 500: // 400 / 其他 4xx:不可 retry return fmt.Errorf("%w: faa get file %d", ErrFAAUnavailable, status), false default: // 5xx:可 retry return fmt.Errorf("%w: faa get file %d", ErrFAAUnavailable, status), true } } // faaRetryBackoff 回傳第 n 次 retry(n 從 1 開始)的等待時間。 // 1 → 1s, 2 → 2s(對齊 conversion.md §9.1) // // 不加 jitter — Phase 0.8 同時打 FAA 的 caller 數量有限(promote-to-models 流程是 // 序列式 per-job 觸發),併發競爭機率低;jitter 的邊際效益低。 func faaRetryBackoff(attempt int) time.Duration { if attempt < 1 { return faaRetryBaseDelay } return faaRetryBaseDelay * time.Duration(attempt) } // buildFileURL 用 url.Parse + ResolveReference 組 GET /files/{objectKey} 的完整 URL。 // // 為什麼用 ResolveReference 而不是 string concat: // - object_key 可能含路徑分隔符("tenant/jobs/abc/output.nef") // - 直接 concat 容易踩 trailing-slash / encoding 雷 // - net/url 會做必要的 percent-escape(保留 '/' 為 path separator) func (c *faaClient) buildFileURL(objectKey string) (string, error) { base, err := url.Parse(c.baseURL) if err != nil { return "", fmt.Errorf("parse base url: %w", err) } // 用 url.URL{Path: ...} 避免手動 escape;net/url 會處理 path encoding。 // 注意:base.Path 可能為空或結尾帶 "/",ResolveReference 會處理。 ref := &url.URL{Path: "/files/" + objectKey} return base.ResolveReference(ref).String(), nil } // hashObjectKey 把 object_key 算 SHA-256 後取前 16 hex chars,當 log 用的穩定 hash。 // // 為什麼不直接 log object_key: // - object_key 可能含路徑("tenant/jobs/uuid/output.nef")— 過長 // - 目前 visionA 的 object_key 不直接含 user 敏感資訊,但保險起見統一 hash // - 16 chars hex(64-bit)對 visionA 內部 job 數量來說碰撞機率極低,足以追蹤單一 request func hashObjectKey(objectKey string) string { sum := sha256.Sum256([]byte(objectKey)) return hex.EncodeToString(sum[:])[:objectKeyHashLen] }