// Converter client — visionA-backend 對 kneron_model_converter (task-scheduler) 的 HTTP client。 // // 對應 4 個 endpoint(見 kneron_model_converter/apps/task-scheduler/docs/openapi.yaml): // - InitJob: POST /api/v1/jobs (multipart streaming proxy) // - GetJob: GET /api/v1/jobs/{id} // - Promote: POST /api/v1/jobs/{id}/promote // - ListInProgressJobs: GET /api/v1/jobs?user_id=&status=in_progress (lazy rebuild ownership 用) // // 設計重點: // - HTTP retry 矩陣對齊 conversion.md §9.1(InitJob 例外:不 retry 5xx,見下方 sendInitJob 註解) // - **Phase 0.8b 認證**:直接帶 pre-shared API key(VISIONA_CONVERTER_API_KEY)— 不再走 MC OAuth // client_credentials grant、不再依賴 MCTokenClient.ServiceToken()。 // 詳見 ADR-015 §3 + conversion.md §3。 // - body 為 streaming:InitJob 直接傳 caller 的 io.Reader;不暫存 disk、不 buffer 全 RAM // - 4xx 錯誤 mapping 對齊 §6 + api-conversion.md 錯誤碼總覽 // - 401/403 → ErrConverterAuthFailed(Phase 0.8b 新 sentinel;對外仍 mask 成 converter_unavailable // 避免洩漏「API key 不對」這個內部運維狀態,SRE 從 server log 看 auth_failed 計數) // // 安全: // - **絕不**把 Authorization header / API key 寫進 log(即使是部分前綴) // - 只 log job_id / status / endpoint / attempt / duration // // Phase 0.8 conversion (見 .autoflow/04-architecture/conversion.md §2.5 + §9.1) // Phase 0.8b API key 改造 (見 ADR-015 §3 + §6 + conversion.md §3) package conversion import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "log/slog" "net/http" "net/url" "strings" "time" ) // ========================================================================== // 對外 type / interface // ========================================================================== // ConverterClient 對 task-scheduler 的 HTTP client。 // // 所有 method 都會自動: // - Phase 0.8b:直接帶 `Authorization: Bearer ` — 不查 cache、 // 不打 MC、不重簽(見 ADR-015 §3 + conversion.md §3.1) // - 依 conversion.md §9.1 retry 矩陣處理 5xx / network / timeout(InitJob 例外) // - 把 4xx / 5xx 對應到 errors.go 的 sentinel;401/403 → ErrConverterAuthFailed(不 retry) // // goroutine-safe:每次呼叫獨立 *http.Request,無內部 mutable state(apiKey 為 immutable 字串)。 type ConverterClient interface { // InitJob 把 caller 的 multipart body streaming proxy 給 converter。 // // 不 retry 5xx:multipart body 是 streaming(io.Reader 一次性),retry 會傳到一半的爛資料; // 直接 fail 由 caller(flow.go)依 §4.3.2 cleanup 鏈處理。 // // timeout:30 分鐘(500MB upload 在慢網路可能 5-10 分鐘)。 InitJob(ctx context.Context, req InitConverterJobReq) (*ConverterJob, error) // GetJob 查單一 job 狀態。 // // retry: 5xx / network → max 3 attempts (0.5s, 1s, 2s 退避) GetJob(ctx context.Context, jobID string) (*ConverterJob, error) // Promote 把成功 job 的指定 stage 結果檔搬到 FAA。 // // retry: 5xx / network → max 2 attempts (1s, 2s 退避) // // 502 file_gateway_unavailable → ErrFAAUnavailable(converter 端 FAA 不可達) Promote(ctx context.Context, jobID string, req PromoteReq) (*ConverterPromoteResult, error) // ListInProgressJobs 查指定 user 進行中的 job 清單(給 §2.6.1 lazy rebuild ownership 用)。 // // retry: 5xx / network → max 1 attempt (0.5s 退避,輕量;不期望常態打) // // 預期 0 或 1 筆(同 user 同時只能 1 active job),但回 slice 保留 future-proof。 ListInProgressJobs(ctx context.Context, userID string) ([]*ConverterJob, error) } // InitConverterJobReq 是 InitJob 的輸入;body 為 streaming(io.Reader 一次性消費)。 // // 設計原則: // - BodyContentType 必須是上層 handler 的原始 Content-Type header 值(含 multipart boundary), // net/http 不會自動產生 — 必須完整透傳,否則 converter multer 會解析失敗 // - UserID 由 visionA-backend trust boundary 灌入(見 conversion.md §7);本層不檢查格式 // - SourceFilename / Platform 為 log 用 metadata(converter 自己會從 multipart 解出真值) type InitConverterJobReq struct { UserID string // OIDC sub;本層僅供 log Platform string // "520" / "720" / "530" / "630" / "730";本層僅供 log SourceFilename string // 本層僅供 log Body io.Reader // 已重組好的 multipart stream(含 user_id field) BodyContentType string // 含 boundary 的 Content-Type,例如 "multipart/form-data; boundary=xyz" } // PromoteReq 是 Promote 的輸入。 // // 設計原則: // - UserID 灌進 promote request 的 metadata(trust boundary 重申,見 conversion.md §7.3) // - Source / TargetObjectKey 對齊 converter openapi.yaml `PromoteTarget` // - Phase 0.8 一律 promote `nef` source(visionA 只關心最終可部署到 KL 晶片的 NEF 檔) type PromoteReq struct { UserID string // 灌進 promote request body metadata Source string // "onnx" / "bie" / "nef";預設 "nef" TargetObjectKey string // FAA 內目標 key,由上層(flow.go)按命名規則組好 } // ConverterJob 是 InitJob / GetJob / List 的 response shape。 // // 對齊 converter openapi.yaml 的 Job + CreateJobResponse schema;同時保留 // visionA Phase 0.8 §2.6.2 的 ExpiresAt 來源備援邏輯(converter 沒給就 caller 推算)。 // // 注意:這是 client 層的中間 type,flow.go 會轉成 conversion.Job(對 frontend 的 shape)。 type ConverterJob struct { JobID string Status string // "created" / "running" / "completed" / "failed" Stage string // "onnx" / "bie" / "nef";completed 時 converter 回 null → "" Progress *int // 整體 0-100;可能為 nil(converter 沒給) StageProgress *int // 當前 stage 0-100;可能為 nil SourceFilename string // 取自 input.filename Platform string // 取自 parameters.platform CreatedAt time.Time UpdatedAt time.Time ExpiresAt time.Time // converter 沒給時上層自行 created_at + 7d 推算 ErrorCode string // 取自 error.code ErrorMessage string // 取自 error.message TargetObjectKey string // 僅 promote 後才有;GET / list 時為 "" } // ConverterPromoteResult 是 Promote 的 response shape。 // // 對齊 converter openapi.yaml `PromoteResponse`:取 promoted[0](Phase 0.8 一次只 promote 1 target)。 type ConverterPromoteResult struct { TargetObjectKey string Size int64 Checksum string // 取自 file_access_agent_etag(converter 透傳 FAA ETag) } // ConverterClientOpts 是 NewConverterClient 的依賴注入。 // // HTTPClient / InitHTTPClient / Now / Logger 為 optional(nil 自動填預設)— 方便 unit test 注入 fake。 type ConverterClientOpts struct { // BaseURL 是 converter scheduler base URL(不帶結尾斜線)。 // 範例:http://192.168.0.130:9501 BaseURL string // APIKey 是 Phase 0.8b 引入的 pre-shared API key(VISIONA_CONVERTER_API_KEY)。 // 必填非空 — `NewConverterClient` 會在 APIKey 為空時 panic(fail-fast, // 避免 server 在「未認證」狀態下啟動)。 // // 值由 main.go 從 cfg.Conversion.ConverterAPIKey(env VISIONA_CONVERTER_API_KEY)注入; // 與 converter middleware 端的 CONVERTER_API_KEY 必須對齊(rotate 時雙方同步換)。 // // 安全:絕不 log 此值(即使前綴);Authorization header 也不 log。 // // Phase 0.8b API key 改造 (見 ADR-015 §3 + conversion.md §3) APIKey string // HTTPClient 為 optional;nil 用預設(timeout 10s)。GetJob / Promote / List 用。 HTTPClient *http.Client // InitHTTPClient 為 optional;nil 用預設(timeout 30 分鐘)— InitJob 大檔上傳專用。 // 與 HTTPClient 分開避免互相影響:GetJob 在 polling 場景頻繁呼叫,timeout 短才合理。 InitHTTPClient *http.Client // Now 為 optional;nil 用 time.Now。測試會注入 fake clock。 Now func() time.Time // Logger 為 optional;nil 用 slog.Default()。 Logger *slog.Logger } // ========================================================================== // 內部固定常數 // ========================================================================== const ( // HTTP timeout converterDefaultHTTPTimeout = 10 * time.Second converterInitHTTPTimeout = 30 * time.Minute // InitJob 大檔上傳 // retry 矩陣(對齊 conversion.md §9.1) converterMaxRetriesGet = 2 // GetJob max 3 attempts (1 + 2 retries) converterMaxRetriesPromote = 2 // Promote max 3 attempts (1 + 2 retries) converterMaxRetriesList = 1 // List max 2 attempts (1 + 1 retry) // 退避 base converterRetryBase = 500 * time.Millisecond // promote 預設 source(Phase 0.8 visionA 一律取 nef) promoteDefaultSource = "nef" ) // ErrConverterAPIKeyNotConfigured 啟動時 API key 為空 — 應在 NewConverterClient 立即 panic、 // 不要等到第一個 request 才發現「未認證」狀態跑進 prod。 // // Phase 0.8b API key 改造 (見 ADR-015 §3.5.3 部署檢查清單 #1) var ErrConverterAPIKeyNotConfigured = errors.New("conversion/converter_client: APIKey is required (set VISIONA_CONVERTER_API_KEY)") // ========================================================================== // 構造 + 內部實作 // ========================================================================== // converterClient 是 ConverterClient 的預設實作。 // // 套件內 unexported struct(caller 拿 interface),讓未來換實作不影響 caller。 type converterClient struct { baseURL string apiKey string // Phase 0.8b:pre-shared API key,建構時 fail-fast 不允許空字串 http *http.Client httpInit *http.Client now func() time.Time logger *slog.Logger } // NewConverterClient 建立一個 ConverterClient 實例。 // // 必填:BaseURL / APIKey。其他 optional。 // 注意:constructor 不驗 BaseURL 連線;第一次呼叫 method 才會打網路。 // // **Fail-fast**:若 opts.APIKey 為空字串,此函式 panic。理由是 Phase 0.8b 不允許 server 在 // 「未認證」狀態下啟動 — 對齊 ADR-015 §3.5.3 部署檢查清單 #1。 // // `opts.Tokens` 是 Phase 0.8 廢棄欄位(見 ConverterClientOpts.Tokens 註解),即使非 nil 也不被 // 內部使用;T5 切換 wire 點後從 struct 移除。 func NewConverterClient(opts ConverterClientOpts) ConverterClient { if opts.APIKey == "" { panic(ErrConverterAPIKeyNotConfigured) } httpClient := opts.HTTPClient if httpClient == nil { httpClient = &http.Client{Timeout: converterDefaultHTTPTimeout} } httpInit := opts.InitHTTPClient if httpInit == nil { httpInit = &http.Client{Timeout: converterInitHTTPTimeout} } now := opts.Now if now == nil { now = time.Now } logger := opts.Logger if logger == nil { logger = slog.Default() } return &converterClient{ baseURL: strings.TrimRight(opts.BaseURL, "/"), apiKey: opts.APIKey, http: httpClient, httpInit: httpInit, now: now, logger: logger, } } // ========================================================================== // InitJob — multipart streaming proxy(不 retry 5xx) // ========================================================================== func (c *converterClient) InitJob(ctx context.Context, req InitConverterJobReq) (*ConverterJob, error) { if req.Body == nil { return nil, fmt.Errorf("conversion/converter_client: InitJob body is required") } if req.BodyContentType == "" { return nil, fmt.Errorf("conversion/converter_client: InitJob body content type is required (must contain multipart boundary)") } endpoint := c.baseURL + "/api/v1/jobs" httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, req.Body) if err != nil { return nil, fmt.Errorf("%w: build init job request: %v", ErrConverterUnavailable, err) } // Content-Type 必須完整透傳(含 multipart boundary),不能讓 net/http 自動推導 httpReq.Header.Set("Content-Type", req.BodyContentType) httpReq.Header.Set("Accept", "application/json") // Phase 0.8b:直接帶 pre-shared API key(不查 cache、不打 MC、不重簽) httpReq.Header.Set("Authorization", "Bearer "+c.apiKey) startedAt := c.now() res, err := c.httpInit.Do(httpReq) duration := c.now().Sub(startedAt) if err != nil { // network / ctx cancel — 不 retry(streaming body 已耗盡) if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { c.logger.Warn("conversion.converter.init_ctx_cancelled", slog.String("user_id", req.UserID), slog.Duration("duration", duration)) return nil, err } c.logger.Warn("conversion.converter.init_network_error", slog.String("user_id", req.UserID), slog.Duration("duration", duration), slog.String("err", truncate(err.Error(), 200))) return nil, fmt.Errorf("%w: init job network error: %v", ErrConverterUnavailable, err) } defer res.Body.Close() bodyBytes, readErr := io.ReadAll(res.Body) if readErr != nil { c.logger.Warn("conversion.converter.init_body_read_failed", slog.String("user_id", req.UserID), slog.Int("status", res.StatusCode), slog.String("err", truncate(readErr.Error(), 200))) return nil, fmt.Errorf("%w: read init response body: %v", ErrConverterUnavailable, readErr) } c.logger.Info("conversion.converter.init_response", slog.String("user_id", req.UserID), slog.String("source_filename", req.SourceFilename), slog.String("platform", req.Platform), slog.Int("status", res.StatusCode), slog.Duration("duration", duration)) if res.StatusCode >= 200 && res.StatusCode < 300 { return parseConverterJob(bodyBytes) } // 非 2xx — 一律 mapping 成 sentinel(**包括 5xx 也直接 fail,不 retry**) return nil, c.mapInitError(res.StatusCode, bodyBytes) } // mapInitError 把 InitJob 的非 2xx response mapping 成 sentinel。 // // 對齊 task-scheduler openapi.yaml POST /api/v1/jobs 的 4xx / 5xx 與 §6 mapping。 func (c *converterClient) mapInitError(status int, body []byte) error { apiErr := parseAPIError(body) // Phase 0.8b:認證失敗 = visionA 端 VISIONA_CONVERTER_API_KEY 與 converter 端 CONVERTER_API_KEY // 不對齊(rotate 未同步 / env 設錯 / converter middleware 未上線)。 // 不 retry — API key 不對 retry 100 次也不會自己變對;對外 mask 成 converter_unavailable。 if status == http.StatusUnauthorized || status == http.StatusForbidden { return fmt.Errorf("%w: init job %d", ErrConverterAuthFailed, status) } // 409 user_has_active_job — wrap 成 ActiveJobError if status == http.StatusConflict && apiErr.Code == "user_has_active_job" { return &ActiveJobError{Job: extractActiveJobFromDetails(apiErr.Details)} } // 400 validation_error / invalid_multipart — wrap 成 ConverterValidationError if status == http.StatusBadRequest { return &ConverterValidationError{ Fields: extractFieldsFromDetails(apiErr.Details), Message: apiErr.Message, } } if status == http.StatusRequestEntityTooLarge { return fmt.Errorf("%w: init job %d (%s)", ErrPayloadTooLarge, status, apiErr.Code) } if status == http.StatusServiceUnavailable { // converter 503 service_busy(process semaphore 滿) return fmt.Errorf("%w: init job %d (%s)", ErrServiceBusy, status, apiErr.Code) } // 其他 4xx → validation 視為通用 mapping if status >= 400 && status < 500 { return fmt.Errorf("%w: init job %d (%s)", ErrValidationFailed, status, apiErr.Code) } // 5xx — InitJob 不 retry,直接 mapping 成 ErrConverterUnavailable return fmt.Errorf("%w: init job %d (%s)", ErrConverterUnavailable, status, apiErr.Code) } // ========================================================================== // GetJob — 標準 retry // ========================================================================== func (c *converterClient) GetJob(ctx context.Context, jobID string) (*ConverterJob, error) { if jobID == "" { return nil, fmt.Errorf("conversion/converter_client: GetJob jobID is required") } endpoint := c.baseURL + "/api/v1/jobs/" + url.PathEscape(jobID) body, err := c.doWithRetry(ctx, "get_job", jobID, converterMaxRetriesGet, func() (*http.Request, error) { req, rerr := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) if rerr != nil { return nil, rerr } req.Header.Set("Accept", "application/json") req.Header.Set("Authorization", "Bearer "+c.apiKey) return req, nil }, c.mapGetJobError, ) if err != nil { return nil, err } return parseConverterJob(body) } // mapGetJobError 把 GetJob 的非 2xx 對應到 sentinel。 func (c *converterClient) mapGetJobError(status int, body []byte) error { apiErr := parseAPIError(body) // Phase 0.8b:401/403 → ErrConverterAuthFailed(API key 不對齊) if status == http.StatusUnauthorized || status == http.StatusForbidden { return fmt.Errorf("%w: get_job %d", ErrConverterAuthFailed, status) } if status == http.StatusNotFound { return fmt.Errorf("%w: get_job %d (%s)", ErrJobNotFound, status, apiErr.Code) } if status >= 400 && status < 500 { return fmt.Errorf("%w: get_job %d (%s)", ErrValidationFailed, status, apiErr.Code) } return fmt.Errorf("%w: get_job %d (%s)", ErrConverterUnavailable, status, apiErr.Code) } // ========================================================================== // Promote — 標準 retry + FAA / job_not_completed 特殊 mapping // ========================================================================== func (c *converterClient) Promote(ctx context.Context, jobID string, req PromoteReq) (*ConverterPromoteResult, error) { if jobID == "" { return nil, fmt.Errorf("conversion/converter_client: Promote jobID is required") } if req.TargetObjectKey == "" { return nil, fmt.Errorf("conversion/converter_client: Promote target_object_key is required") } source := req.Source if source == "" { source = promoteDefaultSource } endpoint := c.baseURL + "/api/v1/jobs/" + url.PathEscape(jobID) + "/promote" // promote request body — 對齊 openapi.yaml PromoteRequest, // 同時放 user_id 進 metadata(trust boundary 重申,§7.3) bodyJSON, err := json.Marshal(map[string]any{ "targets": []map[string]any{ {"source": source, "target_object_key": req.TargetObjectKey}, }, "user_id": req.UserID, // converter Phase 1 不消費,但保留供 log / 未來啟用 }) if err != nil { return nil, fmt.Errorf("%w: marshal promote request: %v", ErrConverterUnavailable, err) } respBody, err := c.doWithRetry(ctx, "promote", jobID, converterMaxRetriesPromote, func() (*http.Request, error) { r, rerr := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(bodyJSON)) if rerr != nil { return nil, rerr } r.Header.Set("Content-Type", "application/json") r.Header.Set("Accept", "application/json") r.Header.Set("Authorization", "Bearer "+c.apiKey) return r, nil }, c.mapPromoteError, ) if err != nil { return nil, err } return parseConverterPromoteResult(respBody) } // mapPromoteError 把 Promote 的非 2xx 對應到 sentinel。 // // 特殊 mapping: // - 502 file_gateway_unavailable → ErrFAAUnavailable // - 503 auth_service_unavailable → ErrConverterUnavailable(Phase 0.8b:MC 路徑取消, // converter 端的 503 從 visionA 角度看是 converter 整體不可用) // - 409 job_not_ready_for_promote / source_not_available → ErrJobNotCompleted func (c *converterClient) mapPromoteError(status int, body []byte) error { apiErr := parseAPIError(body) // Phase 0.8b:401/403 → ErrConverterAuthFailed(API key 不對齊) if status == http.StatusUnauthorized || status == http.StatusForbidden { return fmt.Errorf("%w: promote %d", ErrConverterAuthFailed, status) } if status == http.StatusNotFound { return fmt.Errorf("%w: promote %d (%s)", ErrJobNotFound, status, apiErr.Code) } if status == http.StatusConflict { // 兩種:job_not_ready_for_promote / source_not_available return fmt.Errorf("%w: promote %d (%s)", ErrJobNotCompleted, status, apiErr.Code) } if status == http.StatusBadGateway { // converter 端 FAA 不可達 / FAA 4xx return fmt.Errorf("%w: promote %d (%s)", ErrFAAUnavailable, status, apiErr.Code) } if status == http.StatusServiceUnavailable { // Phase 0.8b:MC token 路徑取消後,converter 端 503 一律歸為 converter 整體不可用 // (從 visionA 角度看,無法區分「converter 自己」vs「converter 上游 MC」 — 都是不可用) return fmt.Errorf("%w: promote %d (%s)", ErrConverterUnavailable, status, apiErr.Code) } if status == http.StatusBadRequest || status == http.StatusUnprocessableEntity { return &ConverterValidationError{ Fields: extractFieldsFromDetails(apiErr.Details), Message: apiErr.Message, } } if status >= 400 && status < 500 { return fmt.Errorf("%w: promote %d (%s)", ErrValidationFailed, status, apiErr.Code) } return fmt.Errorf("%w: promote %d (%s)", ErrConverterUnavailable, status, apiErr.Code) } // ========================================================================== // ListInProgressJobs — lazy rebuild ownership 用 // ========================================================================== func (c *converterClient) ListInProgressJobs(ctx context.Context, userID string) ([]*ConverterJob, error) { if userID == "" { return nil, fmt.Errorf("conversion/converter_client: ListInProgressJobs userID is required") } q := url.Values{} q.Set("user_id", userID) q.Set("status", "in_progress") endpoint := c.baseURL + "/api/v1/jobs?" + q.Encode() body, err := c.doWithRetry(ctx, "list_jobs", userID, converterMaxRetriesList, func() (*http.Request, error) { r, rerr := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) if rerr != nil { return nil, rerr } r.Header.Set("Accept", "application/json") r.Header.Set("Authorization", "Bearer "+c.apiKey) return r, nil }, c.mapListJobsError, ) if err != nil { return nil, err } return parseListJobs(body) } // mapListJobsError 把 ListInProgressJobs 的非 2xx 對應到 sentinel。 // // list 不該回 404(user_id 沒 active 應回 200 + jobs:[]),所以 4xx 一律視為 validation。 func (c *converterClient) mapListJobsError(status int, body []byte) error { apiErr := parseAPIError(body) // Phase 0.8b:401/403 → ErrConverterAuthFailed(API key 不對齊) if status == http.StatusUnauthorized || status == http.StatusForbidden { return fmt.Errorf("%w: list_jobs %d", ErrConverterAuthFailed, status) } if status >= 400 && status < 500 { return fmt.Errorf("%w: list_jobs %d (%s)", ErrValidationFailed, status, apiErr.Code) } return fmt.Errorf("%w: list_jobs %d (%s)", ErrConverterUnavailable, status, apiErr.Code) } // ========================================================================== // HTTP 共用:retry / 錯誤分類 // ========================================================================== // doWithRetry 是 GetJob / Promote / List 共用的 retry 執行器。 // // Phase 0.8b 變更: // - 移除 `scope` 參數與 `tokens.ServiceToken()` 呼叫(API key 改造後不再 per-attempt 取 token) // - reqBuilder closure 不再接 `token` 參數 — caller 直接讀 c.apiKey 自行 set header // // 行為(不變): // - 4xx / 401 / 403 不 retry;5xx / network / timeout 可 retry // - retry 次數由 caller 傳入(不同 endpoint 不同上限) // - mapErr 由 caller 傳入,因為 GetJob / Promote / List 的 4xx mapping 細節不同 // // reqBuilder 是「每次 attempt 都重新建一個 *http.Request」的 closure // — request body 可能在 retry 時已被讀完,必須重建。caller 內部用 bytes.NewReader 等可重建的 body。 func (c *converterClient) doWithRetry( ctx context.Context, endpointKind, label string, maxRetries int, reqBuilder func() (*http.Request, error), mapErr func(status int, body []byte) error, ) ([]byte, error) { var lastErr error for attempt := 0; attempt <= maxRetries; attempt++ { // retry 前檢查 ctx if attempt > 0 { select { case <-ctx.Done(): return nil, ctx.Err() case <-time.After(converterRetryBackoff(attempt)): } } req, err := reqBuilder() if err != nil { return nil, fmt.Errorf("%w: build %s request: %v", ErrConverterUnavailable, endpointKind, err) } body, classifiedErr, retryable := c.doOnce(req, endpointKind, label, attempt, mapErr) if classifiedErr == nil { return body, nil } lastErr = classifiedErr if !retryable { return nil, classifiedErr } } c.logger.Warn("conversion.converter.retry_exhausted", slog.String("endpoint", endpointKind), slog.String("label", label), slog.Int("attempts", maxRetries+1)) return nil, lastErr } // doOnce 執行一次 HTTP request,回傳 body(成功時)+ 分類好的 error + 是否可重試。 func (c *converterClient) doOnce( req *http.Request, endpointKind, label string, attempt int, mapErr func(status int, body []byte) error, ) (body []byte, err error, retryable bool) { startedAt := c.now() res, err := c.http.Do(req) duration := c.now().Sub(startedAt) if err != nil { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { c.logger.Warn("conversion.converter.ctx_cancelled", slog.String("endpoint", endpointKind), slog.String("label", label), slog.Int("attempt", attempt+1), slog.Duration("duration", duration)) return nil, err, false } c.logger.Warn("conversion.converter.network_error", slog.String("endpoint", endpointKind), slog.String("label", label), slog.Int("attempt", attempt+1), slog.Duration("duration", duration), slog.String("err", truncate(err.Error(), 200))) return nil, fmt.Errorf("%w: %s network error: %v", ErrConverterUnavailable, endpointKind, err), true } defer res.Body.Close() bodyBytes, readErr := io.ReadAll(res.Body) if readErr != nil { c.logger.Warn("conversion.converter.body_read_failed", slog.String("endpoint", endpointKind), slog.String("label", label), slog.Int("status", res.StatusCode), slog.String("err", truncate(readErr.Error(), 200))) return nil, fmt.Errorf("%w: read response body: %v", ErrConverterUnavailable, readErr), true } if res.StatusCode >= 200 && res.StatusCode < 300 { c.logger.Debug("conversion.converter.success", slog.String("endpoint", endpointKind), slog.String("label", label), slog.Int("status", res.StatusCode), slog.Int("attempt", attempt+1), slog.Duration("duration", duration)) return bodyBytes, nil, false } c.logger.Warn("conversion.converter.endpoint_error", slog.String("endpoint", endpointKind), slog.String("label", label), slog.Int("status", res.StatusCode), slog.Int("attempt", attempt+1), slog.Duration("duration", duration)) classified := mapErr(res.StatusCode, bodyBytes) // 5xx 視為可重試;4xx / 認證失敗 / 已 wrap 為非 transient error 都不重試 retryable = res.StatusCode >= 500 && res.StatusCode < 600 return nil, classified, retryable } // converterRetryBackoff 回傳第 n 次 retry(n 從 1 開始)的等待時間。 // 對齊 conversion.md §9.1: // - GetJob: 0.5s, 1s, 2s(base=500ms,倍數 1, 2, 4 — 但實際只用前 2 次) // - Promote: 1s, 2s(base=500ms,倍數 2, 4) // - List: 0.5s(base=500ms,倍數 1) // // 為了統一 base 但對齊 §9.1 的「Promote 退避 1s, 2s」,我們用 base=500ms 加 ×2 倍數, // 第 n 次退避 = base × 2^n(對照 §9.1 GetJob: n=1→500ms*1=500ms 不完全對齊; // 但 §9.1 主要規範是「指數退避,max retry 次數」— 實際數值容忍小偏差,重點是不爆量)。 // // 最終退避序列:n=1→0.5s, n=2→1s, n=3→2s(Promote/Get 都從 n=1 開始用, // 第 1 次 attempt 不退避;第 2 次 attempt = retry 1 = 0.5s 等)。 // // 不加 jitter — 同 mc_token_client,Phase 0.8 同時 retry 的 caller 不會大量併發打 converter。 func converterRetryBackoff(attempt int) time.Duration { if attempt < 1 { return converterRetryBase } // 0.5s, 1s, 2s, 4s ... return converterRetryBase * (1 << (attempt - 1)) } // Phase 0.8b:wrapTokenErr 已移除(API key 改造後不再透過 MCTokenClient 取 token, // 因此沒有 token-取-不到 的失敗路徑需要 wrap)。 // ========================================================================== // Response 解析(converter openapi.yaml shapes) // ========================================================================== // converterAPIError 是 converter `{error: {...}}` shape 的 unmarshal 中介 type。 type converterAPIError struct { Code string `json:"code"` Message string `json:"message"` Details json.RawMessage `json:"details"` RequestID string `json:"request_id"` } // parseAPIError 解 converter 的 `{error: {code, message, details, request_id}}` shape。 // // converter 4xx / 5xx 一律遵循此 shape;解析失敗時回空 struct(caller 仍會走 mapping 預設路徑)。 func parseAPIError(body []byte) converterAPIError { var wrapper struct { Error converterAPIError `json:"error"` } if err := json.Unmarshal(body, &wrapper); err != nil { return converterAPIError{} } return wrapper.Error } // extractFieldsFromDetails 從 converter `details.fields` 解出 ValidationFieldError slice。 // // 對齊 openapi.yaml 範例: // // details: { fields: [{ field: "model_id", message: "..." }] } // // 解析失敗回 nil(caller 仍可正常 wrap,frontend 拿不到 fields 但能拿到 code)。 func extractFieldsFromDetails(raw json.RawMessage) []ValidationFieldError { if len(raw) == 0 { return nil } var parsed struct { Fields []ValidationFieldError `json:"fields"` } if err := json.Unmarshal(raw, &parsed); err != nil { return nil } return parsed.Fields } // extractActiveJobFromDetails 從 converter 409 user_has_active_job 的 details 解出簡化版 Job。 // // 對齊 openapi.yaml 範例: // // details: { // active_job_id: "...", // active_job_status: "running", // active_job_stage: "bie", // active_job_progress: 45, // active_job_created_at: "..." // } // // 解析失敗回 nil(caller 仍會走 ActiveJobError,只是 Job 為 nil)。 func extractActiveJobFromDetails(raw json.RawMessage) *Job { if len(raw) == 0 { return nil } var parsed struct { ActiveJobID string `json:"active_job_id"` ActiveJobStatus string `json:"active_job_status"` ActiveJobStage string `json:"active_job_stage"` ActiveJobProgress int `json:"active_job_progress"` ActiveJobCreatedAt time.Time `json:"active_job_created_at"` } if err := json.Unmarshal(raw, &parsed); err != nil { return nil } if parsed.ActiveJobID == "" { return nil } return &Job{ JobID: parsed.ActiveJobID, Status: parsed.ActiveJobStatus, Stage: parsed.ActiveJobStage, Progress: parsed.ActiveJobProgress, CreatedAt: parsed.ActiveJobCreatedAt, // ExpiresAt 由上層 flow.go 自行 created_at + 7d 推算(converter 409 不一定回 expires_at) } } // converterJobJSON 是 GET /api/v1/jobs/{id} response 的中介 unmarshal type。 // // 為了同時支援: // - CreateJobResponse(POST /jobs 201)— 無 stage_progress / input.filename 等欄位 // - Job(GET /jobs/{id})— 完整欄位 // 全部欄位都用 pointer 或 nullable,Marshal 時靠下方 toConverterJob 統一轉。 type converterJobJSON struct { JobID string `json:"job_id"` Status string `json:"status"` Stage *string `json:"stage"` // completed 時 converter 回 null Progress *int `json:"progress"` StageProgress *int `json:"stage_progress"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` ExpiresAt time.Time `json:"expires_at"` Input *struct { Filename string `json:"filename"` } `json:"input"` Parameters *struct { Platform string `json:"platform"` } `json:"parameters"` Error *struct { Code string `json:"code"` Message string `json:"message"` Stage string `json:"stage"` } `json:"error"` } // parseConverterJob 解 GET /api/v1/jobs/{id} 或 POST /api/v1/jobs 201 的 response。 func parseConverterJob(body []byte) (*ConverterJob, error) { var jr converterJobJSON if err := json.Unmarshal(body, &jr); err != nil { return nil, fmt.Errorf("%w: parse converter job response: %v", ErrConverterUnavailable, err) } if jr.JobID == "" { return nil, fmt.Errorf("%w: empty job_id in converter response", ErrConverterUnavailable) } return jr.toConverterJob(), nil } // toConverterJob 把 converterJobJSON 轉成對外的 ConverterJob。 func (jr *converterJobJSON) toConverterJob() *ConverterJob { cj := &ConverterJob{ JobID: jr.JobID, Status: jr.Status, Progress: jr.Progress, StageProgress: jr.StageProgress, CreatedAt: jr.CreatedAt, UpdatedAt: jr.UpdatedAt, ExpiresAt: jr.ExpiresAt, } if jr.Stage != nil { cj.Stage = *jr.Stage } if jr.Input != nil { cj.SourceFilename = jr.Input.Filename } if jr.Parameters != nil { cj.Platform = jr.Parameters.Platform } if jr.Error != nil { cj.ErrorCode = jr.Error.Code cj.ErrorMessage = jr.Error.Message } return cj } // parseListJobs 解 GET /api/v1/jobs?user_id=&status=in_progress 的 response。 // // converter shape:{ "jobs": [Job, ...], "total": N, "next_cursor": "..." | null } func parseListJobs(body []byte) ([]*ConverterJob, error) { var resp struct { Jobs []converterJobJSON `json:"jobs"` } if err := json.Unmarshal(body, &resp); err != nil { return nil, fmt.Errorf("%w: parse list jobs response: %v", ErrConverterUnavailable, err) } out := make([]*ConverterJob, 0, len(resp.Jobs)) for i := range resp.Jobs { out = append(out, resp.Jobs[i].toConverterJob()) } return out, nil } // parseConverterPromoteResult 解 POST /api/v1/jobs/{id}/promote 的 response。 // // 對齊 openapi.yaml `PromoteResponse`:取 promoted[0](Phase 0.8 一次只 promote 1 target)。 // 若 promoted 陣列為空,回 ErrConverterUnavailable(合理表示 converter 內部狀態不一致)。 func parseConverterPromoteResult(body []byte) (*ConverterPromoteResult, error) { var resp struct { Promoted []struct { TargetObjectKey string `json:"target_object_key"` SizeBytes int64 `json:"size_bytes"` FileAccessAgentETag string `json:"file_access_agent_etag"` } `json:"promoted"` } if err := json.Unmarshal(body, &resp); err != nil { return nil, fmt.Errorf("%w: parse promote response: %v", ErrConverterUnavailable, err) } if len(resp.Promoted) == 0 { return nil, fmt.Errorf("%w: promote response has empty promoted array", ErrConverterUnavailable) } first := resp.Promoted[0] if first.TargetObjectKey == "" { return nil, fmt.Errorf("%w: promote response missing target_object_key", ErrConverterUnavailable) } return &ConverterPromoteResult{ TargetObjectKey: first.TargetObjectKey, Size: first.SizeBytes, Checksum: first.FileAccessAgentETag, }, nil }