// Ownership store — visionA-backend 對 conversion job 的擁有權追蹤。 // // 動機: // - converter 端只認 user_id(OIDC sub),不認 visionA 的 OIDC cookie session // - visionA-backend 處於 trust boundary,每個 GET / promote / download / promote-to-models // 都必須先檢查「這個 jobID 是不是當前 userID 的」,不符 → 403 forbidden // - 對齊 conversion.md §7.2 ownership 檢查 + §2.6.1 lazy rebuild // // 設計: // - in-memory map:job_id → user_id // - 重啟即失(接受的取捨;MVP 階段 — 見 conversion.md §9.2 graceful degradation) // - 重啟後第一次某 user 進 GET /api/conversion/active 或 GET /{job_id} → // 從 converter 的 GET /api/v1/jobs?user_id=&status=in_progress 拿 in-progress jobs // 重建該 user 的 ownership(lazy rebuild,不啟動時 batch) // // 為什麼 lazy 而非 startup batch: // - startup batch 對 converter 是 hammer(重啟頻繁時尤甚),且大部分 jobs 重啟期間 // 使用者根本沒在等 // - lazy 的 cost 對應 user 行為,cost 上限 = 線上同時在 /conversion 頁面的 user 數 // // Phase 1 follow-up: // - DB-backed(Postgres / Redis)讓重啟不失資料 — progress.md 已記 // - 加 user → []job_id 的反向索引,ActiveJobOf O(1) // // Phase 0.8 conversion (見 .autoflow/04-architecture/conversion.md §2.6.1) package conversion import ( "context" "crypto/sha256" "encoding/hex" "errors" "log/slog" "sync" "time" ) // ========================================================================== // 對外 interface // ========================================================================== // Ownership 是 visionA-backend 對 conversion job 的擁有權追蹤。 // // In-memory map: job_id → user_id。重啟即失,需 lazy rebuild: // 第一次某 user 進 GET /api/conversion/{id} 或 /active 時,若 cache 沒 // 該 user 任何項,從 converter list 拿 in_progress jobs 重建。 // // goroutine-safe: // - jobToUser map 用 RWMutex(高頻 Get / 低頻 Set / Delete) // - rebuilt 用 per-user mutex(DCL pattern,仿 mc_token_client 但不同 user 不互相阻塞) type Ownership interface { // Set 註冊 job 屬於 user(init 完成時呼叫)。 Set(jobID, userID string) // Get 查 job 屬於誰;不在 cache 回 ("", false)。 Get(jobID string) (userID string, ok bool) // Delete 從 cache 移除(reset / job expired 時)。 Delete(jobID string) // EnsureRebuilt 確保該 user 的 ownership 已從 converter rebuild 過。 // // 第一次呼叫該 user 時,從 converter list 拿所有 in_progress jobs 寫進 cache。 // 後續呼叫該 user 是 noop(fast path)。 // // 失敗處理: // - converter 5xx / network → 回傳 error,不標 rebuilt(下次重試) // - ctx cancel → 立即 return ctx.Err() // - rebuild 內部對 converter 的呼叫帶 5s timeout(用 context.WithTimeout 包裝 // ctx),避免單一 caller 無限阻塞同 user 其他 caller EnsureRebuilt(ctx context.Context, userID string) error // ActiveJobOf 列出該 user 目前有的 active job_id(從 cache,不重新 rebuild)。 // // caller 應先呼叫 EnsureRebuilt 確保 cache 有資料。 // // Phase 0.8 同 user 同時最多 1 個 active job,return slice 通常是 0 or 1。 // Phase 1 加反向 user → []job_id 索引讓這變成 O(1)。 ActiveJobOf(userID string) []string } // ========================================================================== // 內部常數 // ========================================================================== const ( // rebuildTimeout 是單次 rebuild 對 converter 呼叫的 timeout 上限。 // 防止某個 caller 卡死同 user 其他 caller(per-user mutex 的 head-of-line blocking)。 // 對齊 conversion.md §9.1 retry 矩陣的 list endpoint:max 1 retry + 0.5s 退避, // 加上 converter HTTP timeout 10s,最壞約 ~10.5s — 設 5s 是因為 lazy rebuild 在 // frontend pre-check 路徑上,UX 可接受失敗 + retry。caller (flow.go) 失敗時會 fallback。 rebuildTimeout = 5 * time.Second ) // ========================================================================== // 預設實作 // ========================================================================== // ownership 是 Ownership 的 in-memory 實作。 // // 兩組鎖分離: // - mu: 保護 jobToUser map(Set/Get/Delete/ActiveJobOf) // - perUserLocks: 每個 user 一把 mutex,避免 EnsureRebuilt 互相阻塞 // // 為什麼不用單一全域 rebuiltMu: // - 全域 rebuiltMu 會讓 user A 的 rebuild block user B 的 rebuild(rebuild 包 5s // timeout,最壞 100 user 同時進來變 500s 排隊) // - per-user mutex 用 sync.Map 自動 lazy-init;不同 user 並行進 rebuild 互不干擾 type ownership struct { mu sync.RWMutex jobToUser map[string]string // job_id → user_id // perUserLocks: user_id → *sync.Mutex // 用 sync.Map 自動處理 lazy init + 移除(Phase 0.8 不主動 evict — user 量級小) perUserLocks sync.Map // rebuilt: user_id → 已 rebuild 過? // 用獨立 mutex 而非 sync.Map.LoadOrStore — 因為 set 與 fetch converter 必須原子 // (fetch 失敗不 set),sync.Map 的 LoadOrStore 不適合這個語意 rebuiltMu sync.RWMutex rebuilt map[string]bool converter ConverterClient logger *slog.Logger } // NewOwnership 建立一個 Ownership 實例。 // // converter 必填(lazy rebuild 依賴);logger 為 optional,nil 用 slog.Default()。 func NewOwnership(converter ConverterClient, logger *slog.Logger) Ownership { if logger == nil { logger = slog.Default() } return &ownership{ jobToUser: make(map[string]string), rebuilt: make(map[string]bool), converter: converter, logger: logger, } } // ========================================================================== // Set / Get / Delete — RWMutex 標準 map 保護 // ========================================================================== // Set 寫入 ownership;空字串視為 no-op(防呆)。 func (o *ownership) Set(jobID, userID string) { if jobID == "" || userID == "" { return } o.mu.Lock() o.jobToUser[jobID] = userID o.mu.Unlock() } // Get 讀取 ownership;不存在回 ("", false)。 func (o *ownership) Get(jobID string) (string, bool) { if jobID == "" { return "", false } o.mu.RLock() userID, ok := o.jobToUser[jobID] o.mu.RUnlock() return userID, ok } // Delete 移除 ownership;不存在 no-op。 func (o *ownership) Delete(jobID string) { if jobID == "" { return } o.mu.Lock() delete(o.jobToUser, jobID) o.mu.Unlock() } // ========================================================================== // EnsureRebuilt — DCL + per-user mutex // ========================================================================== // EnsureRebuilt 確保 userID 的 ownership 已從 converter rebuild。 // // DCL(double-checked locking)流程: // 1. fast path:先用 RLock 看 rebuilt[userID],已 rebuild → 直接 return nil // 2. 取該 user 的 per-user mutex(不同 user 並行;同 user 序列化) // 3. slow path:拿 mutex 後再次 check rebuilt[userID](其他 caller 可能剛 rebuild 完) // 4. 真正 fetch converter(帶 rebuildTimeout) // 5. 成功 → 寫 jobToUser + 標 rebuilt[userID]=true // 6. 失敗 → 不標,下次重試 // // 為什麼 fast path 不直接 return:sync.Map.Load 比 RWMutex.RLock 快但 race 條件需小心; // 這裡用 RWMutex 對 rebuilt map 一致 protect(與 slow path 寫入互斥)。 func (o *ownership) EnsureRebuilt(ctx context.Context, userID string) error { if userID == "" { return errors.New("conversion/ownership: userID is required") } // fast path o.rebuiltMu.RLock() done := o.rebuilt[userID] o.rebuiltMu.RUnlock() if done { return nil } // 取該 user 的 per-user mutex(lazy init via sync.Map) mu := o.lockForUser(userID) mu.Lock() defer mu.Unlock() // 進入 critical section 前再 check ctx(caller 可能已 cancel) if err := ctx.Err(); err != nil { return err } // slow path 內部再 check(其他 caller 可能剛 rebuild 完) o.rebuiltMu.RLock() done = o.rebuilt[userID] o.rebuiltMu.RUnlock() if done { return nil } // 真正 fetch converter(帶 rebuild timeout,避免單 caller 無限阻塞同 user 其他 caller) fetchCtx, cancel := context.WithTimeout(ctx, rebuildTimeout) defer cancel() jobs, err := o.converter.ListInProgressJobs(fetchCtx, userID) if err != nil { // 失敗不標 rebuilt — 下次重試 o.logger.WarnContext(ctx, "ownership: lazy rebuild failed", slog.String("user_hash", hashUserID(userID)), slog.String("err", err.Error()), ) return err } // 寫入 jobToUser(拿 jobToUser 的 write lock) o.mu.Lock() for _, j := range jobs { if j == nil || j.JobID == "" { continue } o.jobToUser[j.JobID] = userID } o.mu.Unlock() // 標 rebuilt o.rebuiltMu.Lock() o.rebuilt[userID] = true o.rebuiltMu.Unlock() o.logger.InfoContext(ctx, "ownership: lazy rebuild done", slog.String("user_hash", hashUserID(userID)), slog.Int("jobs_found", len(jobs)), ) return nil } // lockForUser 取(或 lazy 建立)該 user 的 mutex。 // // 用 sync.Map.LoadOrStore:併發 100 個 goroutine 同時對同 user 取 mutex, // LoadOrStore 保證所有 goroutine 拿到同一個 *sync.Mutex 實例(其他丟棄)。 func (o *ownership) lockForUser(userID string) *sync.Mutex { if existing, ok := o.perUserLocks.Load(userID); ok { return existing.(*sync.Mutex) } // LoadOrStore:若不存在則寫入新建的,回傳現存或新建的; // loaded=true 代表已有他人寫入,我們新建的這把丟棄 actual, _ := o.perUserLocks.LoadOrStore(userID, &sync.Mutex{}) return actual.(*sync.Mutex) } // ========================================================================== // ActiveJobOf — 反查 jobToUser // ========================================================================== // ActiveJobOf 從 jobToUser map 反查 user 擁有的 jobID 清單。 // // O(N) 掃描;Phase 0.8 同 user 最多 1 active job、整體 jobToUser 規模也不大(內部 // 使用者 < 100 並發),可接受。Phase 1 加反向索引變 O(1)。 // // caller 應先呼叫 EnsureRebuilt(這裡不主動 rebuild,避免雙寫競態)。 func (o *ownership) ActiveJobOf(userID string) []string { if userID == "" { return nil } o.mu.RLock() defer o.mu.RUnlock() // 預先 alloc 0 cap 的 slice — 大多數 user 是 0 或 1 個 job result := make([]string, 0, 1) for jobID, uid := range o.jobToUser { if uid == userID { result = append(result, jobID) } } return result } // ========================================================================== // helpers // ========================================================================== // hashUserID 對 user_id 做 SHA-256 取前 8 hex char,給 log 用(PII 保護)。 // // 不存原始 user_id 進 log,避免 log file 洩漏 OIDC sub。 func hashUserID(userID string) string { if userID == "" { return "" } sum := sha256.Sum256([]byte(userID)) return hex.EncodeToString(sum[:])[:8] }