Phase 0.8 把 kneron_model_converter 的轉檔功能整合進 visionA Cloud。
visionA backend 當 streaming proxy(upload)+ delegated download token broker(download)+
ownership trust boundary,converter / FAA / MC 三方零修改。
新增 internal/conversion/ 套件(8 個檔,~10,000 行 prod+test,117+ test cases,race -count=3 全綠):
- conversion.go:Service interface 5 method、Job/PromoteResult/InitJobInput types
- errors.go:13+ sentinel errors + ErrorCode/HTTPStatus mapping,對齊 conversion.md §6
- mc_token_client.go:service-to-service token (client_credentials grant) + DCL cache
(exp - 15s 重取,per-scope cache),IssueDelegatedDownload(MC delegated download token)
錯誤分 idp_misconfigured (4xx) / idp_unavailable (5xx) / download_token_failed / mc_token_unavailable
- converter_client.go:對 converter scheduler 4 method(InitJob multipart streaming /
GetJob / Promote / ListInProgressJobs),InitJob 不 retry 5xx(streaming body 無法 replay)
- faa_client.go:對 FAA GET /files/{key} server-to-server pull,Phase A retry(GET 無 body
可 replay)對齊 §9.1 retry 矩陣,streaming io.ReadCloser 透傳避 OOM
- ownership.go:in-memory job_id → user_id map + per-user mutex 防 thundering herd lazy rebuild
(不同 user 平行 fetch,同 user 100 caller 收斂成 1 次),visionA 重啟靠 converter
ListInProgressJobs(user) 重建
- flow.go:Service interface 整合層(5 method 串接 converter/FAA/MC/ownership)
- InitJob 用 io.Pipe + multipart.Reader/Writer 重組 streaming proxy(黑名單 client user_id
+ 灌入 OIDC sub)
- DownloadRedirectURL 自動觸發 promote(spec §1 Stage 3b),用 ensurePromoted helper
- PromoteToModels 冪等(modelStore.FindBySourceJobID 為 source-of-truth)
- OwnershipMismatch → ErrJobNotFound 不 forbidden(§7.2 防枚舉)
- storage / modelStore 失敗包 ErrStorageUnavailable / ErrModelStoreUnavailable
(視為 visionA 自身 500 而非 502 gateway,SRE alarm 才打對 team)
新增 internal/api/conversion.go(5 endpoint handler + main.go wire):
- POST /api/conversion/init(multipart streaming proxy,不呼叫 c.MultipartForm())
- GET /api/conversion/active(lazy rebuild ownership)
- GET /api/conversion/{job_id}(poll status)
- POST /api/conversion/{job_id}/promote-to-models(FAA pull → models 三段式)
- GET /api/conversion/{job_id}/download(server-side HTTP 302 → FAA,token 不過 frontend
JS,仿 FAA TestSite DownloadFileDirect pattern;Cache-Control: no-store)
5 個 endpoint 全部走 OIDC AuthMiddleware;user_id 從 cookie session 灌(trust boundary),
從不接受 client multipart form / JSON / query 的 user_id。
TestAllAPIEndpointsRequire401WithoutCookie 自動覆蓋新 5 endpoint regression 防呆。
新增 cmd/api-server/conversion_e2e_test.go(4 個 e2e 場景):
- TestConversionE2E_StreamingProxy(10MB body + trust boundary regression)
- TestConversionE2E_LazyRebuildAfterRestart(visionA 重啟仍能 /active)
- TestConversionE2E_Download302Redirect(驗 302 + Location header + token 不在 body)
- TestConversionE2E_ActiveJobConflict(409 + active_job 詳情)
修改 internal/config/{config,load}.go:新增 ConversionConfig 5 欄位
(ConverterBaseURL / FAABaseURL / TenantID / ServiceClientID / ServiceClientSecret)+
Enabled() helper(雙非空判定)。
修改 cmd/api-server/main.go:條件 wire(cfg.Conversion.Enabled() 為 true 才建 client + Service;
否則 Deps.Conversion=nil,handler 自動回 501)。
修改 .env.example:新增 Phase 0.8 區塊註解。
新增 cmd/api-server/conversion_adapters.go:narrow interface adapter(接既有
internal/model.Repository / internal/storage.Store → conversion.ModelStore / Storage,避免 import cycle)。
驗證:go test -race -count=3 ./... 17 packages 全綠 / go vet 0 warning / go build 成功。
對齊文件:
- .autoflow/04-architecture/adr/adr-014-conversion-integration.md
- .autoflow/04-architecture/conversion.md (TDD)
- .autoflow/04-architecture/api/api-conversion.md
- .autoflow/02-prd/features/feature-converter-integration.md
- .autoflow/03-design/wireframes/wireframe-conversion.md
- .autoflow/03-design/flows/flow-conversion.md
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
315 lines
11 KiB
Go
315 lines
11 KiB
Go
// Ownership store — visionA-backend 對 conversion job 的擁有權追蹤。
|
||
//
|
||
// 動機:
|
||
// - converter 端只認 user_id(OIDC sub),不認 visionA 的 OIDC cookie session
|
||
// - visionA-backend 處於 trust boundary,每個 GET / promote / download / promote-to-models
|
||
// 都必須先檢查「這個 jobID 是不是當前 userID 的」,不符 → 403 forbidden
|
||
// - 對齊 conversion.md §7.2 ownership 檢查 + §2.6.1 lazy rebuild
|
||
//
|
||
// 設計:
|
||
// - in-memory map:job_id → user_id
|
||
// - 重啟即失(接受的取捨;MVP 階段 — 見 conversion.md §9.2 graceful degradation)
|
||
// - 重啟後第一次某 user 進 GET /api/conversion/active 或 GET /{job_id} →
|
||
// 從 converter 的 GET /api/v1/jobs?user_id=&status=in_progress 拿 in-progress jobs
|
||
// 重建該 user 的 ownership(lazy rebuild,不啟動時 batch)
|
||
//
|
||
// 為什麼 lazy 而非 startup batch:
|
||
// - startup batch 對 converter 是 hammer(重啟頻繁時尤甚),且大部分 jobs 重啟期間
|
||
// 使用者根本沒在等
|
||
// - lazy 的 cost 對應 user 行為,cost 上限 = 線上同時在 /conversion 頁面的 user 數
|
||
//
|
||
// Phase 1 follow-up:
|
||
// - DB-backed(Postgres / Redis)讓重啟不失資料 — progress.md 已記
|
||
// - 加 user → []job_id 的反向索引,ActiveJobOf O(1)
|
||
//
|
||
// Phase 0.8 conversion (見 .autoflow/04-architecture/conversion.md §2.6.1)
|
||
package conversion
|
||
|
||
import (
|
||
"context"
|
||
"crypto/sha256"
|
||
"encoding/hex"
|
||
"errors"
|
||
"log/slog"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
// ==========================================================================
|
||
// 對外 interface
|
||
// ==========================================================================
|
||
|
||
// Ownership 是 visionA-backend 對 conversion job 的擁有權追蹤。
|
||
//
|
||
// In-memory map: job_id → user_id。重啟即失,需 lazy rebuild:
|
||
// 第一次某 user 進 GET /api/conversion/{id} 或 /active 時,若 cache 沒
|
||
// 該 user 任何項,從 converter list 拿 in_progress jobs 重建。
|
||
//
|
||
// goroutine-safe:
|
||
// - jobToUser map 用 RWMutex(高頻 Get / 低頻 Set / Delete)
|
||
// - rebuilt 用 per-user mutex(DCL pattern,仿 mc_token_client 但不同 user 不互相阻塞)
|
||
type Ownership interface {
|
||
// Set 註冊 job 屬於 user(init 完成時呼叫)。
|
||
Set(jobID, userID string)
|
||
|
||
// Get 查 job 屬於誰;不在 cache 回 ("", false)。
|
||
Get(jobID string) (userID string, ok bool)
|
||
|
||
// Delete 從 cache 移除(reset / job expired 時)。
|
||
Delete(jobID string)
|
||
|
||
// EnsureRebuilt 確保該 user 的 ownership 已從 converter rebuild 過。
|
||
//
|
||
// 第一次呼叫該 user 時,從 converter list 拿所有 in_progress jobs 寫進 cache。
|
||
// 後續呼叫該 user 是 noop(fast path)。
|
||
//
|
||
// 失敗處理:
|
||
// - converter 5xx / network → 回傳 error,不標 rebuilt(下次重試)
|
||
// - ctx cancel → 立即 return ctx.Err()
|
||
// - rebuild 內部對 converter 的呼叫帶 5s timeout(用 context.WithTimeout 包裝
|
||
// ctx),避免單一 caller 無限阻塞同 user 其他 caller
|
||
EnsureRebuilt(ctx context.Context, userID string) error
|
||
|
||
// ActiveJobOf 列出該 user 目前有的 active job_id(從 cache,不重新 rebuild)。
|
||
//
|
||
// caller 應先呼叫 EnsureRebuilt 確保 cache 有資料。
|
||
//
|
||
// Phase 0.8 同 user 同時最多 1 個 active job,return slice 通常是 0 or 1。
|
||
// Phase 1 加反向 user → []job_id 索引讓這變成 O(1)。
|
||
ActiveJobOf(userID string) []string
|
||
}
|
||
|
||
// ==========================================================================
|
||
// 內部常數
|
||
// ==========================================================================
|
||
|
||
const (
|
||
// rebuildTimeout 是單次 rebuild 對 converter 呼叫的 timeout 上限。
|
||
// 防止某個 caller 卡死同 user 其他 caller(per-user mutex 的 head-of-line blocking)。
|
||
// 對齊 conversion.md §9.1 retry 矩陣的 list endpoint:max 1 retry + 0.5s 退避,
|
||
// 加上 converter HTTP timeout 10s,最壞約 ~10.5s — 設 5s 是因為 lazy rebuild 在
|
||
// frontend pre-check 路徑上,UX 可接受失敗 + retry。caller (flow.go) 失敗時會 fallback。
|
||
rebuildTimeout = 5 * time.Second
|
||
)
|
||
|
||
// ==========================================================================
|
||
// 預設實作
|
||
// ==========================================================================
|
||
|
||
// ownership 是 Ownership 的 in-memory 實作。
|
||
//
|
||
// 兩組鎖分離:
|
||
// - mu: 保護 jobToUser map(Set/Get/Delete/ActiveJobOf)
|
||
// - perUserLocks: 每個 user 一把 mutex,避免 EnsureRebuilt 互相阻塞
|
||
//
|
||
// 為什麼不用單一全域 rebuiltMu:
|
||
// - 全域 rebuiltMu 會讓 user A 的 rebuild block user B 的 rebuild(rebuild 包 5s
|
||
// timeout,最壞 100 user 同時進來變 500s 排隊)
|
||
// - per-user mutex 用 sync.Map 自動 lazy-init;不同 user 並行進 rebuild 互不干擾
|
||
type ownership struct {
|
||
mu sync.RWMutex
|
||
jobToUser map[string]string // job_id → user_id
|
||
|
||
// perUserLocks: user_id → *sync.Mutex
|
||
// 用 sync.Map 自動處理 lazy init + 移除(Phase 0.8 不主動 evict — user 量級小)
|
||
perUserLocks sync.Map
|
||
|
||
// rebuilt: user_id → 已 rebuild 過?
|
||
// 用獨立 mutex 而非 sync.Map.LoadOrStore — 因為 set 與 fetch converter 必須原子
|
||
// (fetch 失敗不 set),sync.Map 的 LoadOrStore 不適合這個語意
|
||
rebuiltMu sync.RWMutex
|
||
rebuilt map[string]bool
|
||
|
||
converter ConverterClient
|
||
logger *slog.Logger
|
||
}
|
||
|
||
// NewOwnership 建立一個 Ownership 實例。
|
||
//
|
||
// converter 必填(lazy rebuild 依賴);logger 為 optional,nil 用 slog.Default()。
|
||
func NewOwnership(converter ConverterClient, logger *slog.Logger) Ownership {
|
||
if logger == nil {
|
||
logger = slog.Default()
|
||
}
|
||
return &ownership{
|
||
jobToUser: make(map[string]string),
|
||
rebuilt: make(map[string]bool),
|
||
converter: converter,
|
||
logger: logger,
|
||
}
|
||
}
|
||
|
||
// ==========================================================================
|
||
// Set / Get / Delete — RWMutex 標準 map 保護
|
||
// ==========================================================================
|
||
|
||
// Set 寫入 ownership;空字串視為 no-op(防呆)。
|
||
func (o *ownership) Set(jobID, userID string) {
|
||
if jobID == "" || userID == "" {
|
||
return
|
||
}
|
||
o.mu.Lock()
|
||
o.jobToUser[jobID] = userID
|
||
o.mu.Unlock()
|
||
}
|
||
|
||
// Get 讀取 ownership;不存在回 ("", false)。
|
||
func (o *ownership) Get(jobID string) (string, bool) {
|
||
if jobID == "" {
|
||
return "", false
|
||
}
|
||
o.mu.RLock()
|
||
userID, ok := o.jobToUser[jobID]
|
||
o.mu.RUnlock()
|
||
return userID, ok
|
||
}
|
||
|
||
// Delete 移除 ownership;不存在 no-op。
|
||
func (o *ownership) Delete(jobID string) {
|
||
if jobID == "" {
|
||
return
|
||
}
|
||
o.mu.Lock()
|
||
delete(o.jobToUser, jobID)
|
||
o.mu.Unlock()
|
||
}
|
||
|
||
// ==========================================================================
|
||
// EnsureRebuilt — DCL + per-user mutex
|
||
// ==========================================================================
|
||
|
||
// EnsureRebuilt 確保 userID 的 ownership 已從 converter rebuild。
|
||
//
|
||
// DCL(double-checked locking)流程:
|
||
// 1. fast path:先用 RLock 看 rebuilt[userID],已 rebuild → 直接 return nil
|
||
// 2. 取該 user 的 per-user mutex(不同 user 並行;同 user 序列化)
|
||
// 3. slow path:拿 mutex 後再次 check rebuilt[userID](其他 caller 可能剛 rebuild 完)
|
||
// 4. 真正 fetch converter(帶 rebuildTimeout)
|
||
// 5. 成功 → 寫 jobToUser + 標 rebuilt[userID]=true
|
||
// 6. 失敗 → 不標,下次重試
|
||
//
|
||
// 為什麼 fast path 不直接 return:sync.Map.Load 比 RWMutex.RLock 快但 race 條件需小心;
|
||
// 這裡用 RWMutex 對 rebuilt map 一致 protect(與 slow path 寫入互斥)。
|
||
func (o *ownership) EnsureRebuilt(ctx context.Context, userID string) error {
|
||
if userID == "" {
|
||
return errors.New("conversion/ownership: userID is required")
|
||
}
|
||
|
||
// fast path
|
||
o.rebuiltMu.RLock()
|
||
done := o.rebuilt[userID]
|
||
o.rebuiltMu.RUnlock()
|
||
if done {
|
||
return nil
|
||
}
|
||
|
||
// 取該 user 的 per-user mutex(lazy init via sync.Map)
|
||
mu := o.lockForUser(userID)
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
|
||
// 進入 critical section 前再 check ctx(caller 可能已 cancel)
|
||
if err := ctx.Err(); err != nil {
|
||
return err
|
||
}
|
||
|
||
// slow path 內部再 check(其他 caller 可能剛 rebuild 完)
|
||
o.rebuiltMu.RLock()
|
||
done = o.rebuilt[userID]
|
||
o.rebuiltMu.RUnlock()
|
||
if done {
|
||
return nil
|
||
}
|
||
|
||
// 真正 fetch converter(帶 rebuild timeout,避免單 caller 無限阻塞同 user 其他 caller)
|
||
fetchCtx, cancel := context.WithTimeout(ctx, rebuildTimeout)
|
||
defer cancel()
|
||
|
||
jobs, err := o.converter.ListInProgressJobs(fetchCtx, userID)
|
||
if err != nil {
|
||
// 失敗不標 rebuilt — 下次重試
|
||
o.logger.WarnContext(ctx, "ownership: lazy rebuild failed",
|
||
slog.String("user_hash", hashUserID(userID)),
|
||
slog.String("err", err.Error()),
|
||
)
|
||
return err
|
||
}
|
||
|
||
// 寫入 jobToUser(拿 jobToUser 的 write lock)
|
||
o.mu.Lock()
|
||
for _, j := range jobs {
|
||
if j == nil || j.JobID == "" {
|
||
continue
|
||
}
|
||
o.jobToUser[j.JobID] = userID
|
||
}
|
||
o.mu.Unlock()
|
||
|
||
// 標 rebuilt
|
||
o.rebuiltMu.Lock()
|
||
o.rebuilt[userID] = true
|
||
o.rebuiltMu.Unlock()
|
||
|
||
o.logger.InfoContext(ctx, "ownership: lazy rebuild done",
|
||
slog.String("user_hash", hashUserID(userID)),
|
||
slog.Int("jobs_found", len(jobs)),
|
||
)
|
||
return nil
|
||
}
|
||
|
||
// lockForUser 取(或 lazy 建立)該 user 的 mutex。
|
||
//
|
||
// 用 sync.Map.LoadOrStore:併發 100 個 goroutine 同時對同 user 取 mutex,
|
||
// LoadOrStore 保證所有 goroutine 拿到同一個 *sync.Mutex 實例(其他丟棄)。
|
||
func (o *ownership) lockForUser(userID string) *sync.Mutex {
|
||
if existing, ok := o.perUserLocks.Load(userID); ok {
|
||
return existing.(*sync.Mutex)
|
||
}
|
||
// LoadOrStore:若不存在則寫入新建的,回傳現存或新建的;
|
||
// loaded=true 代表已有他人寫入,我們新建的這把丟棄
|
||
actual, _ := o.perUserLocks.LoadOrStore(userID, &sync.Mutex{})
|
||
return actual.(*sync.Mutex)
|
||
}
|
||
|
||
// ==========================================================================
|
||
// ActiveJobOf — 反查 jobToUser
|
||
// ==========================================================================
|
||
|
||
// ActiveJobOf 從 jobToUser map 反查 user 擁有的 jobID 清單。
|
||
//
|
||
// O(N) 掃描;Phase 0.8 同 user 最多 1 active job、整體 jobToUser 規模也不大(內部
|
||
// 使用者 < 100 並發),可接受。Phase 1 加反向索引變 O(1)。
|
||
//
|
||
// caller 應先呼叫 EnsureRebuilt(這裡不主動 rebuild,避免雙寫競態)。
|
||
func (o *ownership) ActiveJobOf(userID string) []string {
|
||
if userID == "" {
|
||
return nil
|
||
}
|
||
o.mu.RLock()
|
||
defer o.mu.RUnlock()
|
||
|
||
// 預先 alloc 0 cap 的 slice — 大多數 user 是 0 或 1 個 job
|
||
result := make([]string, 0, 1)
|
||
for jobID, uid := range o.jobToUser {
|
||
if uid == userID {
|
||
result = append(result, jobID)
|
||
}
|
||
}
|
||
return result
|
||
}
|
||
|
||
// ==========================================================================
|
||
// helpers
|
||
// ==========================================================================
|
||
|
||
// hashUserID 對 user_id 做 SHA-256 取前 8 hex char,給 log 用(PII 保護)。
|
||
//
|
||
// 不存原始 user_id 進 log,避免 log file 洩漏 OIDC sub。
|
||
func hashUserID(userID string) string {
|
||
if userID == "" {
|
||
return ""
|
||
}
|
||
sum := sha256.Sum256([]byte(userID))
|
||
return hex.EncodeToString(sum[:])[:8]
|
||
}
|