Phase 0.8 把 kneron_model_converter 的轉檔功能整合進 visionA Cloud。
visionA backend 當 streaming proxy(upload)+ delegated download token broker(download)+
ownership trust boundary,converter / FAA / MC 三方零修改。
新增 internal/conversion/ 套件(8 個檔,~10,000 行 prod+test,117+ test cases,race -count=3 全綠):
- conversion.go:Service interface 5 method、Job/PromoteResult/InitJobInput types
- errors.go:13+ sentinel errors + ErrorCode/HTTPStatus mapping,對齊 conversion.md §6
- mc_token_client.go:service-to-service token (client_credentials grant) + DCL cache
(exp - 15s 重取,per-scope cache),IssueDelegatedDownload(MC delegated download token)
錯誤分 idp_misconfigured (4xx) / idp_unavailable (5xx) / download_token_failed / mc_token_unavailable
- converter_client.go:對 converter scheduler 4 method(InitJob multipart streaming /
GetJob / Promote / ListInProgressJobs),InitJob 不 retry 5xx(streaming body 無法 replay)
- faa_client.go:對 FAA GET /files/{key} server-to-server pull,Phase A retry(GET 無 body
可 replay)對齊 §9.1 retry 矩陣,streaming io.ReadCloser 透傳避 OOM
- ownership.go:in-memory job_id → user_id map + per-user mutex 防 thundering herd lazy rebuild
(不同 user 平行 fetch,同 user 100 caller 收斂成 1 次),visionA 重啟靠 converter
ListInProgressJobs(user) 重建
- flow.go:Service interface 整合層(5 method 串接 converter/FAA/MC/ownership)
- InitJob 用 io.Pipe + multipart.Reader/Writer 重組 streaming proxy(黑名單 client user_id
+ 灌入 OIDC sub)
- DownloadRedirectURL 自動觸發 promote(spec §1 Stage 3b),用 ensurePromoted helper
- PromoteToModels 冪等(modelStore.FindBySourceJobID 為 source-of-truth)
- OwnershipMismatch → ErrJobNotFound 不 forbidden(§7.2 防枚舉)
- storage / modelStore 失敗包 ErrStorageUnavailable / ErrModelStoreUnavailable
(視為 visionA 自身 500 而非 502 gateway,SRE alarm 才打對 team)
新增 internal/api/conversion.go(5 endpoint handler + main.go wire):
- POST /api/conversion/init(multipart streaming proxy,不呼叫 c.MultipartForm())
- GET /api/conversion/active(lazy rebuild ownership)
- GET /api/conversion/{job_id}(poll status)
- POST /api/conversion/{job_id}/promote-to-models(FAA pull → models 三段式)
- GET /api/conversion/{job_id}/download(server-side HTTP 302 → FAA,token 不過 frontend
JS,仿 FAA TestSite DownloadFileDirect pattern;Cache-Control: no-store)
5 個 endpoint 全部走 OIDC AuthMiddleware;user_id 從 cookie session 灌(trust boundary),
從不接受 client multipart form / JSON / query 的 user_id。
TestAllAPIEndpointsRequire401WithoutCookie 自動覆蓋新 5 endpoint regression 防呆。
新增 cmd/api-server/conversion_e2e_test.go(4 個 e2e 場景):
- TestConversionE2E_StreamingProxy(10MB body + trust boundary regression)
- TestConversionE2E_LazyRebuildAfterRestart(visionA 重啟仍能 /active)
- TestConversionE2E_Download302Redirect(驗 302 + Location header + token 不在 body)
- TestConversionE2E_ActiveJobConflict(409 + active_job 詳情)
修改 internal/config/{config,load}.go:新增 ConversionConfig 5 欄位
(ConverterBaseURL / FAABaseURL / TenantID / ServiceClientID / ServiceClientSecret)+
Enabled() helper(雙非空判定)。
修改 cmd/api-server/main.go:條件 wire(cfg.Conversion.Enabled() 為 true 才建 client + Service;
否則 Deps.Conversion=nil,handler 自動回 501)。
修改 .env.example:新增 Phase 0.8 區塊註解。
新增 cmd/api-server/conversion_adapters.go:narrow interface adapter(接既有
internal/model.Repository / internal/storage.Store → conversion.ModelStore / Storage,避免 import cycle)。
驗證:go test -race -count=3 ./... 17 packages 全綠 / go vet 0 warning / go build 成功。
對齊文件:
- .autoflow/04-architecture/adr/adr-014-conversion-integration.md
- .autoflow/04-architecture/conversion.md (TDD)
- .autoflow/04-architecture/api/api-conversion.md
- .autoflow/02-prd/features/feature-converter-integration.md
- .autoflow/03-design/wireframes/wireframe-conversion.md
- .autoflow/03-design/flows/flow-conversion.md
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
625 lines
24 KiB
Go
625 lines
24 KiB
Go
// MC token client — visionA-backend 對 Member Center 取兩種 token:
|
||
// - service token(client_credentials grant):自己呼叫 converter / FAA 用,per-scope cache
|
||
// - delegated download token:給 user 換 short-lived FAA download URL(不 cache,每次新簽)
|
||
//
|
||
// 設計參考:
|
||
// - kneron_model_converter/apps/task-scheduler/src/auth/oauthClient.js(Node 版同模式,
|
||
// 已在 production 跑過;這裡 Go 版改用 sync.Mutex + DCL,不用 promise dedup)
|
||
// - 本檔案搭配 .autoflow/04-architecture/conversion.md §2.4 / §5 / §9.1 retry 矩陣
|
||
//
|
||
// 安全:
|
||
// - **絕不**把 client_secret / access_token / Authorization header 內容寫進 log
|
||
// - 錯誤訊息只揭露 status + 是否 retry,不揭露 server 端細節
|
||
//
|
||
// Phase 0.8 conversion (見 .autoflow/04-architecture/conversion.md §2.4 / §5)
|
||
package conversion
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"log/slog"
|
||
"net/http"
|
||
"net/url"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
// ==========================================================================
|
||
// 對外 type / interface
|
||
// ==========================================================================
|
||
|
||
// MCTokenClient 對 Member Center 取兩種 token。
|
||
//
|
||
// 兩個 method 的錯誤處理策略對齊 conversion.md §6:
|
||
//
|
||
// - `ServiceToken`(打 MC `/oauth/token`,client_credentials grant):
|
||
// 401/403 → ErrServiceClientUnauthorized(500/idp_misconfigured 對外)
|
||
// 其他 4xx → ErrIDPMisconfigured(500/idp_misconfigured,i18n=idp_misconfig)
|
||
// 5xx / network 持續失敗 → ErrIDPUnavailable(503/idp_unavailable,i18n=idp_down)
|
||
//
|
||
// - `IssueDelegatedDownload`(打 MC `/file-access/download-tokens`):
|
||
// 401/403 → ErrServiceClientUnauthorized
|
||
// 其他 4xx → ErrDownloadTokenFailed(502/download_token_failed,i18n=token_failed)
|
||
// 5xx / network 持續失敗 → ErrMCTokenUnavailable(502/mc_token_unavailable,i18n=token_failed)
|
||
//
|
||
// 兩 endpoint 的 4xx / 5xx 用不同 sentinel — 因為 §6 的 i18n 訊息設計區分了
|
||
// 「IDP 設定錯誤」「IDP 暫時不可用」「下載授權失敗」「MC 不可達」四種不同的 user-facing 提示
|
||
// (前者引導使用者「聯絡支援」,後者引導「稍後再試」)。
|
||
//
|
||
// goroutine-safe:cache 用 sync.Mutex,DCL 確保併發 fetch 只發一次 request。
|
||
type MCTokenClient interface {
|
||
// ServiceToken 取一個 access token(client_credentials grant),可 cache 重用。
|
||
//
|
||
// scope 範例:
|
||
// "converter:job.write converter:job.read files:download.read files:download.delegate"
|
||
// (多 scope 用空白分隔,依 RFC 6749 §3.3)
|
||
//
|
||
// cache 行為(見 §5.2):
|
||
// - per-scope cache(不同 scope 各自獨立)
|
||
// - 過期判斷:now() >= exp - 15s(提前 15 秒 refresh 避免邊界 race)
|
||
// - 失敗不 cache,下一次呼叫會重試
|
||
// - DCL 防併發爆量(100 個 caller 同時要 token,只 fetch 一次)
|
||
ServiceToken(ctx context.Context, scope string) (string, error)
|
||
|
||
// IssueDelegatedDownload 跟 MC 換 browser 直連 FAA 用的 opaque token。
|
||
//
|
||
// 流程:
|
||
// 1. 先取 service token(scope=files:download.delegate)— 內部呼 ServiceToken
|
||
// 2. POST {issuer}/file-access/download-tokens
|
||
// 3. 回 opaque token + 過期時間
|
||
//
|
||
// caller 通常是 flow.DownloadRedirectURL,拿到後組
|
||
// https://<faa>/files/<key>?access_token=<token>
|
||
// 走 server-side 302 redirect 給 browser(見 conversion.md §10.4)。
|
||
IssueDelegatedDownload(ctx context.Context, in IssueDownloadReq) (*DelegatedDownloadToken, error)
|
||
}
|
||
|
||
// IssueDownloadReq 是 IssueDelegatedDownload 的輸入。
|
||
//
|
||
// 欄位來源(trust boundary 見 conversion.md §7):
|
||
// - TenantID / UserID / ObjectKey 由 visionA-backend 內部產生(OIDC sub + promote 結果),
|
||
// 不接受 client 傳入
|
||
// - ExpiresInSeconds 預設 300(5 分鐘),可在 caller 指定(範圍由 caller 自行檢查)
|
||
type IssueDownloadReq struct {
|
||
TenantID string
|
||
UserID string
|
||
ObjectKey string
|
||
ExpiresInSeconds int // <= 0 時自動套用預設 300
|
||
}
|
||
|
||
// DelegatedDownloadToken 是 MC 簽出來的 short-lived token。
|
||
//
|
||
// Token 是 opaque(FAA 收到後再對 MC validate),visionA-backend 不解碼。
|
||
type DelegatedDownloadToken struct {
|
||
Token string
|
||
ExpiresAt time.Time
|
||
}
|
||
|
||
// MCTokenClientOpts 是 NewMCTokenClient 的依賴注入。
|
||
//
|
||
// HTTPClient / Now / Logger 為 optional(nil 自動填預設)— 方便 unit test 注入 fake。
|
||
type MCTokenClientOpts struct {
|
||
// Issuer 是 MC issuer URL(不帶結尾斜線)。
|
||
// 會打:
|
||
// POST {Issuer}/oauth/token
|
||
// POST {Issuer}/file-access/download-tokens
|
||
Issuer string
|
||
|
||
// ClientID / ClientSecret 是 visionA service client 在 MC 的註冊資訊。
|
||
// **禁止 commit 進 repo**;由 main.go 從 env var 讀進 config 後注入。
|
||
ClientID string
|
||
ClientSecret string
|
||
|
||
// HTTPClient 為 optional;nil 用預設(timeout 10s)。測試會注入 httptest.Server.Client()。
|
||
HTTPClient *http.Client
|
||
|
||
// Now 為 optional;nil 用 time.Now。測試會注入 fake clock 控制 cache 過期。
|
||
Now func() time.Time
|
||
|
||
// Logger 為 optional;nil 用 slog.Default()。
|
||
Logger *slog.Logger
|
||
}
|
||
|
||
// ==========================================================================
|
||
// 內部實作
|
||
// ==========================================================================
|
||
|
||
// 內部固定常數(不對外,避免 caller hardcode)。
|
||
const (
|
||
// tokenRefreshSkew 是 cache 過期判斷的緩衝;now() >= exp - skew 視為過期。
|
||
// 15s 對齊 conversion.md §2.4 / §5.2。
|
||
tokenRefreshSkew = 15 * time.Second
|
||
|
||
// httpTimeout 是預設 HTTP client timeout(dialer + response 整體)。
|
||
httpTimeout = 10 * time.Second
|
||
|
||
// maxRetries 是 5xx / network / timeout 的最大重試次數(不含第一次)。
|
||
// 對齊 conversion.md §9.1:MC oauth/token 與 file-access/download-tokens 都 max 2 次。
|
||
maxRetries = 2
|
||
|
||
// retryBaseDelay 是指數退避的 base(1s, 2s)。
|
||
retryBaseDelay = 1 * time.Second
|
||
|
||
// defaultDelegatedTTL 是 IssueDelegatedDownload 預設 TTL(caller 不傳就 300)。
|
||
defaultDelegatedTTL = 300
|
||
)
|
||
|
||
// cachedToken 是 ServiceToken cache 內部結構。
|
||
type cachedToken struct {
|
||
token string
|
||
expiresAt time.Time
|
||
}
|
||
|
||
// mcTokenClient 是 MCTokenClient 的預設實作。
|
||
//
|
||
// 套件內 unexported struct(caller 拿 interface),讓未來換實作不影響 caller。
|
||
type mcTokenClient struct {
|
||
issuer string
|
||
clientID string
|
||
clientSecret string
|
||
http *http.Client
|
||
now func() time.Time
|
||
logger *slog.Logger
|
||
|
||
// cache 由 mu 保護;key=scope(multi-scope string 直接當 key,
|
||
// 不做 normalize — caller 應傳穩定排序的 scope 字串)。
|
||
mu sync.Mutex // sync.Mutex 比 RWMutex 簡單;fetch 路徑 IO bound,RWMutex 沒有實質好處
|
||
cache map[string]cachedToken
|
||
}
|
||
|
||
// NewMCTokenClient 建立一個 MCTokenClient 實例。
|
||
//
|
||
// 必填:Issuer / ClientID / ClientSecret。其他 optional。
|
||
// 注意:constructor 不會驗 Issuer 連線,第一次 ServiceToken 呼叫才會打網路。
|
||
func NewMCTokenClient(opts MCTokenClientOpts) MCTokenClient {
|
||
httpClient := opts.HTTPClient
|
||
if httpClient == nil {
|
||
httpClient = &http.Client{Timeout: httpTimeout}
|
||
}
|
||
now := opts.Now
|
||
if now == nil {
|
||
now = time.Now
|
||
}
|
||
logger := opts.Logger
|
||
if logger == nil {
|
||
logger = slog.Default()
|
||
}
|
||
return &mcTokenClient{
|
||
issuer: strings.TrimRight(opts.Issuer, "/"),
|
||
clientID: opts.ClientID,
|
||
clientSecret: opts.ClientSecret,
|
||
http: httpClient,
|
||
now: now,
|
||
logger: logger,
|
||
cache: make(map[string]cachedToken),
|
||
}
|
||
}
|
||
|
||
// ==========================================================================
|
||
// ServiceToken 實作(含 DCL cache)
|
||
// ==========================================================================
|
||
|
||
// ServiceToken 實作 MCTokenClient.ServiceToken。
|
||
//
|
||
// DCL 流程:
|
||
// 1. 拿鎖 → 看 cache → 還新鮮就 unlock 後 return(fast path)
|
||
// 2. cache 過期 → 持鎖直接 fetch(在鎖內執行 HTTP request)
|
||
//
|
||
// 鎖內 fetch 的取捨:
|
||
// - 優點:實作極簡,無 in-flight Promise / sync.Once dance;併發 100 個 caller 全部
|
||
// 在同一個 mutex 上排隊,第一個 fetch 完寫 cache 後,後續 caller 走 fast path
|
||
// - 缺點:fetch 期間(最多 10s timeout + 2 retries = 最壞 ~13s)所有同 scope 的
|
||
// caller 全部 block;不同 scope 因為共用同一個 mu,也會 block(比 per-scope 鎖差)
|
||
//
|
||
// 為什麼不用 per-scope 鎖:
|
||
// - Phase 0.8 同時只用 1-2 個 scope,per-scope 鎖的好處邊際
|
||
// - 簡單性 > 微優化;若未來 profiling 顯示瓶頸再改 sync.Map + per-scope mutex
|
||
//
|
||
// 為什麼不用 sync.Once:
|
||
// - sync.Once 不能 reset(cache 過期後要重 fetch)— 不適用
|
||
func (c *mcTokenClient) ServiceToken(ctx context.Context, scope string) (string, error) {
|
||
if scope == "" {
|
||
return "", fmt.Errorf("conversion/mc_token_client: scope is required")
|
||
}
|
||
|
||
c.mu.Lock()
|
||
defer c.mu.Unlock()
|
||
|
||
// fast path:cache hit 且仍新鮮
|
||
if entry, ok := c.cache[scope]; ok && c.isStillFresh(entry) {
|
||
return entry.token, nil
|
||
}
|
||
|
||
// cache miss / 過期 → fetch(在鎖內執行)
|
||
token, exp, err := c.fetchServiceToken(ctx, scope)
|
||
if err != nil {
|
||
// 失敗不寫 cache;下次重試
|
||
return "", err
|
||
}
|
||
|
||
c.cache[scope] = cachedToken{
|
||
token: token,
|
||
expiresAt: exp,
|
||
}
|
||
return token, nil
|
||
}
|
||
|
||
// isStillFresh 判斷 cache entry 是否還能用。
|
||
// 真正的過期時間是 expiresAt - tokenRefreshSkew(提前 15s 視為過期)。
|
||
func (c *mcTokenClient) isStillFresh(entry cachedToken) bool {
|
||
if entry.token == "" {
|
||
return false
|
||
}
|
||
return c.now().Before(entry.expiresAt.Add(-tokenRefreshSkew))
|
||
}
|
||
|
||
// fetchServiceToken 真正打 MC oauth/token endpoint 取 token。
|
||
// 已 retry 過所有可重試錯誤;回傳 error 時 caller 應視為 fatal(這次取不到)。
|
||
func (c *mcTokenClient) fetchServiceToken(ctx context.Context, scope string) (string, time.Time, error) {
|
||
tokenURL := c.issuer + "/oauth/token"
|
||
|
||
form := url.Values{}
|
||
form.Set("grant_type", "client_credentials")
|
||
form.Set("scope", scope)
|
||
|
||
body, err := c.doWithRetry(ctx, endpointKindServiceToken, scope, func() (*http.Request, error) {
|
||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, tokenURL,
|
||
strings.NewReader(form.Encode()))
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||
req.Header.Set("Accept", "application/json")
|
||
// RFC 6749 §2.3.1 推薦:client credentials 走 Basic auth header(比 body 安全)
|
||
req.SetBasicAuth(c.clientID, c.clientSecret)
|
||
return req, nil
|
||
})
|
||
if err != nil {
|
||
return "", time.Time{}, err
|
||
}
|
||
|
||
// 解析 token endpoint response shape(RFC 6749 §5.1)
|
||
var resp struct {
|
||
AccessToken string `json:"access_token"`
|
||
TokenType string `json:"token_type"`
|
||
ExpiresIn int `json:"expires_in"`
|
||
Scope string `json:"scope,omitempty"`
|
||
}
|
||
if err := json.Unmarshal(body, &resp); err != nil {
|
||
c.logger.Warn("conversion.mc_token.parse_failed",
|
||
slog.String("endpoint", endpointKindServiceToken),
|
||
slog.String("scope", scope),
|
||
// 不 log body(可能含 access_token),只 log 錯誤訊息
|
||
slog.String("err", truncate(err.Error(), 100)))
|
||
// IdP 回了 200 但 body 不是合法 JSON — 視為服務暫時失常(503/idp_unavailable)
|
||
return "", time.Time{}, fmt.Errorf("%w: parse service token response: %v",
|
||
ErrIDPUnavailable, err)
|
||
}
|
||
if resp.AccessToken == "" || resp.ExpiresIn <= 0 {
|
||
c.logger.Warn("conversion.mc_token.invalid_shape",
|
||
slog.String("endpoint", endpointKindServiceToken),
|
||
slog.String("scope", scope),
|
||
slog.Int("access_token_length", len(resp.AccessToken)),
|
||
slog.Int("expires_in", resp.ExpiresIn))
|
||
// IdP 回了 200 但 shape 不對 — 同上視為 503/idp_unavailable
|
||
return "", time.Time{}, fmt.Errorf("%w: invalid service token response shape",
|
||
ErrIDPUnavailable)
|
||
}
|
||
|
||
expiresAt := c.now().Add(time.Duration(resp.ExpiresIn) * time.Second)
|
||
|
||
// 不 log token 本身;只 log 長度 + 過期時間(給除錯用)
|
||
c.logger.Info("conversion.mc_token.obtained",
|
||
slog.String("endpoint", endpointKindServiceToken),
|
||
slog.String("scope", scope),
|
||
slog.Int("expires_in_sec", resp.ExpiresIn),
|
||
slog.Int("token_len", len(resp.AccessToken)))
|
||
|
||
return resp.AccessToken, expiresAt, nil
|
||
}
|
||
|
||
// ==========================================================================
|
||
// IssueDelegatedDownload 實作
|
||
// ==========================================================================
|
||
|
||
// IssueDelegatedDownload 實作 MCTokenClient.IssueDelegatedDownload。
|
||
//
|
||
// 流程:
|
||
// 1. ServiceToken(ctx, "files:download.delegate") 取 service token
|
||
// 2. POST {issuer}/file-access/download-tokens (Bearer)
|
||
// 3. 回 opaque token + 過期時間
|
||
//
|
||
// 不 cache(每次都新簽)— delegated token TTL 短(5 分鐘預設),cache 沒意義。
|
||
func (c *mcTokenClient) IssueDelegatedDownload(ctx context.Context, in IssueDownloadReq) (*DelegatedDownloadToken, error) {
|
||
if in.TenantID == "" || in.UserID == "" || in.ObjectKey == "" {
|
||
return nil, fmt.Errorf("conversion/mc_token_client: tenant_id / user_id / object_key required")
|
||
}
|
||
ttl := in.ExpiresInSeconds
|
||
if ttl <= 0 {
|
||
ttl = defaultDelegatedTTL
|
||
}
|
||
|
||
// 1. 取 service token(注意:這個呼叫本身可能 fetch,會走 cache fast path 或 fetch + retry)
|
||
// ServiceToken 內部已依 §6 mapping 失敗(ErrServiceClientUnauthorized / ErrIDPMisconfigured /
|
||
// ErrIDPUnavailable)— 這裡用 fmt.Errorf("%w") 透傳,不再二次包裝,避免錯誤碼被「升級」成
|
||
// ErrMCTokenUnavailable 而失去原本的 i18n 區分(idp_misconfig vs idp_down)。
|
||
serviceToken, err := c.ServiceToken(ctx, "files:download.delegate")
|
||
if err != nil {
|
||
return nil, fmt.Errorf("conversion: get service token for delegated download: %w", err)
|
||
}
|
||
|
||
endpoint := c.issuer + "/file-access/download-tokens"
|
||
|
||
reqBody, err := json.Marshal(map[string]any{
|
||
"tenant_id": in.TenantID,
|
||
"user_id": in.UserID,
|
||
"object_key": in.ObjectKey,
|
||
"method": "GET",
|
||
"expires_in_seconds": ttl,
|
||
})
|
||
if err != nil {
|
||
// 本地 marshal 失敗(理論不會發生)— 視為 MC 不可達(502/mc_token_unavailable)
|
||
return nil, fmt.Errorf("%w: marshal delegated download request: %v",
|
||
ErrMCTokenUnavailable, err)
|
||
}
|
||
|
||
body, err := c.doWithRetry(ctx, endpointKindDelegatedDownload, in.ObjectKey, func() (*http.Request, error) {
|
||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint,
|
||
strings.NewReader(string(reqBody)))
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
req.Header.Set("Content-Type", "application/json")
|
||
req.Header.Set("Accept", "application/json")
|
||
req.Header.Set("Authorization", "Bearer "+serviceToken)
|
||
return req, nil
|
||
})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// MC delegated download token response shape:
|
||
// {"token": "<opaque>", "expires_at": "<ISO8601>"}
|
||
// 若 MC 改用 expires_in_seconds,這裡 fallback 處理。
|
||
var resp struct {
|
||
Token string `json:"token"`
|
||
ExpiresAt time.Time `json:"expires_at"`
|
||
ExpiresInSeconds int `json:"expires_in_seconds,omitempty"`
|
||
}
|
||
if err := json.Unmarshal(body, &resp); err != nil {
|
||
c.logger.Warn("conversion.mc_token.parse_failed",
|
||
slog.String("endpoint", endpointKindDelegatedDownload),
|
||
slog.String("err", truncate(err.Error(), 100)))
|
||
// MC 回 200 但 body 不是合法 JSON — 視為 MC 不可達(502/mc_token_unavailable)
|
||
return nil, fmt.Errorf("%w: parse delegated download response: %v",
|
||
ErrMCTokenUnavailable, err)
|
||
}
|
||
if resp.Token == "" {
|
||
c.logger.Warn("conversion.mc_token.invalid_shape",
|
||
slog.String("endpoint", endpointKindDelegatedDownload))
|
||
// 同上:shape 不對視為 502/mc_token_unavailable
|
||
return nil, fmt.Errorf("%w: invalid delegated download response shape",
|
||
ErrMCTokenUnavailable)
|
||
}
|
||
expiresAt := resp.ExpiresAt
|
||
if expiresAt.IsZero() && resp.ExpiresInSeconds > 0 {
|
||
expiresAt = c.now().Add(time.Duration(resp.ExpiresInSeconds) * time.Second)
|
||
}
|
||
if expiresAt.IsZero() {
|
||
// 都沒有 → 用 caller 傳入 ttl 推算(best-effort)
|
||
expiresAt = c.now().Add(time.Duration(ttl) * time.Second)
|
||
}
|
||
|
||
c.logger.Info("conversion.mc_token.delegated_obtained",
|
||
slog.String("endpoint", endpointKindDelegatedDownload),
|
||
slog.Int("ttl_sec", ttl),
|
||
slog.Int("token_len", len(resp.Token)))
|
||
|
||
return &DelegatedDownloadToken{
|
||
Token: resp.Token,
|
||
ExpiresAt: expiresAt,
|
||
}, nil
|
||
}
|
||
|
||
// ==========================================================================
|
||
// HTTP 共用:retry / 錯誤分類
|
||
// ==========================================================================
|
||
|
||
// endpointKind 常數 — doWithRetry / doOnce 用來區分 4xx/5xx 該映射到哪個 sentinel。
|
||
//
|
||
// Phase 0.8 conversion (見 .autoflow/04-architecture/conversion.md §6)
|
||
const (
|
||
endpointKindServiceToken = "service_token" // MC /oauth/token
|
||
endpointKindDelegatedDownload = "delegated_download" // MC /file-access/download-tokens
|
||
)
|
||
|
||
// errClient4xx 取得「其他 4xx(非 401/403)」對應的 sentinel error。
|
||
// service_token endpoint → ErrIDPMisconfigured(IDP grant 設定錯誤)
|
||
// delegated_download endpoint → ErrDownloadTokenFailed(換下載 token 失敗)
|
||
func errClient4xx(endpointKind string) error {
|
||
if endpointKind == endpointKindServiceToken {
|
||
return ErrIDPMisconfigured
|
||
}
|
||
return ErrDownloadTokenFailed
|
||
}
|
||
|
||
// errServer5xxOrNetwork 取得「5xx / network / timeout」對應的 sentinel error。
|
||
// service_token endpoint → ErrIDPUnavailable(認證服務暫時不可用,503)
|
||
// delegated_download endpoint → ErrMCTokenUnavailable(MC 不可達,502)
|
||
func errServer5xxOrNetwork(endpointKind string) error {
|
||
if endpointKind == endpointKindServiceToken {
|
||
return ErrIDPUnavailable
|
||
}
|
||
return ErrMCTokenUnavailable
|
||
}
|
||
|
||
// doWithRetry 執行一次 HTTP request;遇到 5xx / network / timeout 時依
|
||
// conversion.md §9.1 退避重試。每次 retry 之間檢查 ctx.Done()。
|
||
//
|
||
// reqBuilder 是「每次 attempt 都重新建一個 *http.Request」的 closure
|
||
// — 因為 request body 可能在 retry 時已被讀完,必須重建。caller 內部用
|
||
// strings.NewReader 等可重建的 body source。
|
||
//
|
||
// 4xx 不 retry,直接 mapping 後 return。
|
||
//
|
||
// endpointKind 是 log 用的標記("service_token" / "delegated_download")。
|
||
// label 給 log 額外 context(scope or object_key)。
|
||
func (c *mcTokenClient) doWithRetry(
|
||
ctx context.Context,
|
||
endpointKind, label string,
|
||
reqBuilder func() (*http.Request, error),
|
||
) ([]byte, error) {
|
||
var lastErr error
|
||
for attempt := 0; attempt <= maxRetries; attempt++ {
|
||
// retry 前檢查 ctx
|
||
if attempt > 0 {
|
||
select {
|
||
case <-ctx.Done():
|
||
// ctx cancel/deadline → 立即 return(不 retry,不包成 ErrMCTokenUnavailable)
|
||
return nil, ctx.Err()
|
||
case <-time.After(retryBackoff(attempt)):
|
||
}
|
||
}
|
||
|
||
req, err := reqBuilder()
|
||
if err != nil {
|
||
// 建 request 失敗(例如 URL parse error)— 視為「打不出去」的網路類問題,
|
||
// 依 endpoint 種類映射到對應 sentinel。
|
||
return nil, fmt.Errorf("%w: build request: %v",
|
||
errServer5xxOrNetwork(endpointKind), err)
|
||
}
|
||
|
||
body, classifiedErr, retryable := c.doOnce(req, endpointKind, label, attempt)
|
||
if classifiedErr == nil {
|
||
return body, nil
|
||
}
|
||
lastErr = classifiedErr
|
||
if !retryable {
|
||
// 4xx / 401-403 / ctx cancel:直接 return,不再 retry
|
||
return nil, classifiedErr
|
||
}
|
||
// retryable 5xx / network / timeout:繼續下一輪
|
||
}
|
||
// 用完 retry 額度
|
||
c.logger.Warn("conversion.mc_token.retry_exhausted",
|
||
slog.String("endpoint", endpointKind),
|
||
slog.String("label", label),
|
||
slog.Int("attempts", maxRetries+1))
|
||
return nil, lastErr
|
||
}
|
||
|
||
// doOnce 執行一次 HTTP request,回傳 body(成功時)+ 分類好的 error + 是否可重試。
|
||
//
|
||
// 回傳 retryable=false 表示 caller 不應 retry:
|
||
// - ctx 已 cancel
|
||
// - 4xx response(client error,retry 沒用)
|
||
// - JSON parse 失敗只在 caller 處理,不在這裡分類
|
||
func (c *mcTokenClient) doOnce(
|
||
req *http.Request,
|
||
endpointKind, label string,
|
||
attempt int,
|
||
) (body []byte, err error, retryable bool) {
|
||
startedAt := c.now()
|
||
res, err := c.http.Do(req)
|
||
duration := c.now().Sub(startedAt)
|
||
if err != nil {
|
||
// network / timeout / context cancel
|
||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||
c.logger.Warn("conversion.mc_token.ctx_cancelled",
|
||
slog.String("endpoint", endpointKind),
|
||
slog.String("label", label),
|
||
slog.Int("attempt", attempt+1),
|
||
slog.Duration("duration", duration))
|
||
return nil, err, false
|
||
}
|
||
c.logger.Warn("conversion.mc_token.network_error",
|
||
slog.String("endpoint", endpointKind),
|
||
slog.String("label", label),
|
||
slog.Int("attempt", attempt+1),
|
||
slog.Duration("duration", duration),
|
||
// err.Error() 不會含 secret(http.Client 錯誤訊息只有 URL + 連線層 errno),
|
||
// 但仍 truncate 防 log 爆量
|
||
slog.String("err", truncate(err.Error(), 200)))
|
||
return nil, fmt.Errorf("%w: %s network error: %v",
|
||
errServer5xxOrNetwork(endpointKind), endpointKind, err), true
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
bodyBytes, readErr := io.ReadAll(res.Body)
|
||
if readErr != nil {
|
||
c.logger.Warn("conversion.mc_token.body_read_failed",
|
||
slog.String("endpoint", endpointKind),
|
||
slog.String("label", label),
|
||
slog.Int("status", res.StatusCode),
|
||
slog.String("err", truncate(readErr.Error(), 200)))
|
||
// body read 失敗視為網路問題,可重試(依 endpoint 映射)
|
||
return nil, fmt.Errorf("%w: read response body: %v",
|
||
errServer5xxOrNetwork(endpointKind), readErr), true
|
||
}
|
||
|
||
// 成功 2xx
|
||
if res.StatusCode >= 200 && res.StatusCode < 300 {
|
||
c.logger.Debug("conversion.mc_token.success",
|
||
slog.String("endpoint", endpointKind),
|
||
slog.String("label", label),
|
||
slog.Int("status", res.StatusCode),
|
||
slog.Int("attempt", attempt+1),
|
||
slog.Duration("duration", duration))
|
||
return bodyBytes, nil, false
|
||
}
|
||
|
||
// 錯誤分類(不寫 body 進 log — error_description 可能含 client_id / requestId)
|
||
c.logger.Warn("conversion.mc_token.endpoint_error",
|
||
slog.String("endpoint", endpointKind),
|
||
slog.String("label", label),
|
||
slog.Int("status", res.StatusCode),
|
||
slog.Int("attempt", attempt+1),
|
||
slog.Duration("duration", duration))
|
||
|
||
// 401 / 403:client 認證失敗 — 不可重試(重試也會繼續 401)
|
||
// 兩個 endpoint 都用同一個 sentinel(caller 可用 errors.Is 做精細處理,
|
||
// 例如 cache invalidate;對外仍透過 ErrorCode mask 成 idp_misconfigured/500)
|
||
if res.StatusCode == http.StatusUnauthorized || res.StatusCode == http.StatusForbidden {
|
||
return nil, fmt.Errorf("%w: %s endpoint returned %d",
|
||
ErrServiceClientUnauthorized, endpointKind, res.StatusCode), false
|
||
}
|
||
|
||
// 其他 4xx:不可重試 — 依 endpoint 種類映射到對應 sentinel:
|
||
// service_token → ErrIDPMisconfigured (500/idp_misconfigured)
|
||
// delegated_download → ErrDownloadTokenFailed (502/download_token_failed)
|
||
if res.StatusCode >= 400 && res.StatusCode < 500 {
|
||
return nil, fmt.Errorf("%w: %s endpoint returned %d",
|
||
errClient4xx(endpointKind), endpointKind, res.StatusCode), false
|
||
}
|
||
|
||
// 5xx:可重試 — 依 endpoint 種類映射到對應 sentinel:
|
||
// service_token → ErrIDPUnavailable (503/idp_unavailable)
|
||
// delegated_download → ErrMCTokenUnavailable (502/mc_token_unavailable)
|
||
return nil, fmt.Errorf("%w: %s endpoint returned %d",
|
||
errServer5xxOrNetwork(endpointKind), endpointKind, res.StatusCode), true
|
||
}
|
||
|
||
// retryBackoff 回傳第 n 次 retry(n 從 1 開始)的等待時間。
|
||
// 1 → 1s, 2 → 2s(對齊 conversion.md §9.1)
|
||
//
|
||
// 不加 jitter — Phase 0.8 預期同時 fetch 的 caller 已被 DCL 收斂到單一執行,
|
||
// 不會有大量併發打 MC,jitter 邊際效益低。
|
||
func retryBackoff(attempt int) time.Duration {
|
||
if attempt < 1 {
|
||
return retryBaseDelay
|
||
}
|
||
return retryBaseDelay * time.Duration(attempt)
|
||
}
|
||
|
||
// truncate 把字串截到 max 長度(避免 log 太長)。
|
||
func truncate(s string, max int) string {
|
||
if len(s) <= max {
|
||
return s
|
||
}
|
||
return s[:max] + "...(truncated)"
|
||
}
|