jim800121chen 1231bf0ed2 feat(visionA-backend): Phase 0.8 conversion package — 5 endpoint + 8 個內部模組
Phase 0.8 把 kneron_model_converter 的轉檔功能整合進 visionA Cloud。
visionA backend 當 streaming proxy(upload)+ delegated download token broker(download)+
ownership trust boundary,converter / FAA / MC 三方零修改。

新增 internal/conversion/ 套件(8 個檔,~10,000 行 prod+test,117+ test cases,race -count=3 全綠):

- conversion.go:Service interface 5 method、Job/PromoteResult/InitJobInput types
- errors.go:13+ sentinel errors + ErrorCode/HTTPStatus mapping,對齊 conversion.md §6
- mc_token_client.go:service-to-service token (client_credentials grant) + DCL cache
  (exp - 15s 重取,per-scope cache),IssueDelegatedDownload(MC delegated download token)
  錯誤分 idp_misconfigured (4xx) / idp_unavailable (5xx) / download_token_failed / mc_token_unavailable
- converter_client.go:對 converter scheduler 4 method(InitJob multipart streaming /
  GetJob / Promote / ListInProgressJobs),InitJob 不 retry 5xx(streaming body 無法 replay)
- faa_client.go:對 FAA GET /files/{key} server-to-server pull,Phase A retry(GET 無 body
  可 replay)對齊 §9.1 retry 矩陣,streaming io.ReadCloser 透傳避 OOM
- ownership.go:in-memory job_id → user_id map + per-user mutex 防 thundering herd lazy rebuild
  (不同 user 平行 fetch,同 user 100 caller 收斂成 1 次),visionA 重啟靠 converter
  ListInProgressJobs(user) 重建
- flow.go:Service interface 整合層(5 method 串接 converter/FAA/MC/ownership)
  - InitJob 用 io.Pipe + multipart.Reader/Writer 重組 streaming proxy(黑名單 client user_id
    + 灌入 OIDC sub)
  - DownloadRedirectURL 自動觸發 promote(spec §1 Stage 3b),用 ensurePromoted helper
  - PromoteToModels 冪等(modelStore.FindBySourceJobID 為 source-of-truth)
  - OwnershipMismatch → ErrJobNotFound 不 forbidden(§7.2 防枚舉)
  - storage / modelStore 失敗包 ErrStorageUnavailable / ErrModelStoreUnavailable
    (視為 visionA 自身 500 而非 502 gateway,SRE alarm 才打對 team)

新增 internal/api/conversion.go(5 endpoint handler + main.go wire):
- POST /api/conversion/init(multipart streaming proxy,不呼叫 c.MultipartForm())
- GET  /api/conversion/active(lazy rebuild ownership)
- GET  /api/conversion/{job_id}(poll status)
- POST /api/conversion/{job_id}/promote-to-models(FAA pull → models 三段式)
- GET  /api/conversion/{job_id}/download(server-side HTTP 302 → FAA,token 不過 frontend
  JS,仿 FAA TestSite DownloadFileDirect pattern;Cache-Control: no-store)

5 個 endpoint 全部走 OIDC AuthMiddleware;user_id 從 cookie session 灌(trust boundary),
從不接受 client multipart form / JSON / query 的 user_id。
TestAllAPIEndpointsRequire401WithoutCookie 自動覆蓋新 5 endpoint regression 防呆。

新增 cmd/api-server/conversion_e2e_test.go(4 個 e2e 場景):
- TestConversionE2E_StreamingProxy(10MB body + trust boundary regression)
- TestConversionE2E_LazyRebuildAfterRestart(visionA 重啟仍能 /active)
- TestConversionE2E_Download302Redirect(驗 302 + Location header + token 不在 body)
- TestConversionE2E_ActiveJobConflict(409 + active_job 詳情)

修改 internal/config/{config,load}.go:新增 ConversionConfig 5 欄位
(ConverterBaseURL / FAABaseURL / TenantID / ServiceClientID / ServiceClientSecret)+
Enabled() helper(雙非空判定)。
修改 cmd/api-server/main.go:條件 wire(cfg.Conversion.Enabled() 為 true 才建 client + Service;
否則 Deps.Conversion=nil,handler 自動回 501)。
修改 .env.example:新增 Phase 0.8 區塊註解。
新增 cmd/api-server/conversion_adapters.go:narrow interface adapter(接既有
internal/model.Repository / internal/storage.Store → conversion.ModelStore / Storage,避免 import cycle)。

驗證:go test -race -count=3 ./... 17 packages 全綠 / go vet 0 warning / go build 成功。

對齊文件:
- .autoflow/04-architecture/adr/adr-014-conversion-integration.md
- .autoflow/04-architecture/conversion.md (TDD)
- .autoflow/04-architecture/api/api-conversion.md
- .autoflow/02-prd/features/feature-converter-integration.md
- .autoflow/03-design/wireframes/wireframe-conversion.md
- .autoflow/03-design/flows/flow-conversion.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 13:56:07 +08:00

315 lines
11 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Ownership store — visionA-backend 對 conversion job 的擁有權追蹤。
//
// 動機:
// - converter 端只認 user_idOIDC sub不認 visionA 的 OIDC cookie session
// - visionA-backend 處於 trust boundary每個 GET / promote / download / promote-to-models
// 都必須先檢查「這個 jobID 是不是當前 userID 的」,不符 → 403 forbidden
// - 對齊 conversion.md §7.2 ownership 檢查 + §2.6.1 lazy rebuild
//
// 設計:
// - in-memory mapjob_id → user_id
// - 重啟即失接受的取捨MVP 階段 — 見 conversion.md §9.2 graceful degradation
// - 重啟後第一次某 user 進 GET /api/conversion/active 或 GET /{job_id} →
// 從 converter 的 GET /api/v1/jobs?user_id=&status=in_progress 拿 in-progress jobs
// 重建該 user 的 ownershiplazy rebuild不啟動時 batch
//
// 為什麼 lazy 而非 startup batch
// - startup batch 對 converter 是 hammer重啟頻繁時尤甚且大部分 jobs 重啟期間
// 使用者根本沒在等
// - lazy 的 cost 對應 user 行為cost 上限 = 線上同時在 /conversion 頁面的 user 數
//
// Phase 1 follow-up
// - DB-backedPostgres / Redis讓重啟不失資料 — progress.md 已記
// - 加 user → []job_id 的反向索引ActiveJobOf O(1)
//
// Phase 0.8 conversion (見 .autoflow/04-architecture/conversion.md §2.6.1)
package conversion
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"log/slog"
"sync"
"time"
)
// ==========================================================================
// 對外 interface
// ==========================================================================
// Ownership 是 visionA-backend 對 conversion job 的擁有權追蹤。
//
// In-memory map: job_id → user_id。重啟即失需 lazy rebuild
// 第一次某 user 進 GET /api/conversion/{id} 或 /active 時,若 cache 沒
// 該 user 任何項,從 converter list 拿 in_progress jobs 重建。
//
// goroutine-safe
// - jobToUser map 用 RWMutex高頻 Get / 低頻 Set / Delete
// - rebuilt 用 per-user mutexDCL pattern仿 mc_token_client 但不同 user 不互相阻塞)
type Ownership interface {
// Set 註冊 job 屬於 userinit 完成時呼叫)。
Set(jobID, userID string)
// Get 查 job 屬於誰;不在 cache 回 ("", false)。
Get(jobID string) (userID string, ok bool)
// Delete 從 cache 移除reset / job expired 時)。
Delete(jobID string)
// EnsureRebuilt 確保該 user 的 ownership 已從 converter rebuild 過。
//
// 第一次呼叫該 user 時,從 converter list 拿所有 in_progress jobs 寫進 cache。
// 後續呼叫該 user 是 noopfast path
//
// 失敗處理:
// - converter 5xx / network → 回傳 error不標 rebuilt下次重試
// - ctx cancel → 立即 return ctx.Err()
// - rebuild 內部對 converter 的呼叫帶 5s timeout用 context.WithTimeout 包裝
// ctx避免單一 caller 無限阻塞同 user 其他 caller
EnsureRebuilt(ctx context.Context, userID string) error
// ActiveJobOf 列出該 user 目前有的 active job_id從 cache不重新 rebuild
//
// caller 應先呼叫 EnsureRebuilt 確保 cache 有資料。
//
// Phase 0.8 同 user 同時最多 1 個 active jobreturn slice 通常是 0 or 1。
// Phase 1 加反向 user → []job_id 索引讓這變成 O(1)。
ActiveJobOf(userID string) []string
}
// ==========================================================================
// 內部常數
// ==========================================================================
const (
// rebuildTimeout 是單次 rebuild 對 converter 呼叫的 timeout 上限。
// 防止某個 caller 卡死同 user 其他 callerper-user mutex 的 head-of-line blocking
// 對齊 conversion.md §9.1 retry 矩陣的 list endpointmax 1 retry + 0.5s 退避,
// 加上 converter HTTP timeout 10s最壞約 ~10.5s — 設 5s 是因為 lazy rebuild 在
// frontend pre-check 路徑上UX 可接受失敗 + retry。caller (flow.go) 失敗時會 fallback。
rebuildTimeout = 5 * time.Second
)
// ==========================================================================
// 預設實作
// ==========================================================================
// ownership 是 Ownership 的 in-memory 實作。
//
// 兩組鎖分離:
// - mu: 保護 jobToUser mapSet/Get/Delete/ActiveJobOf
// - perUserLocks: 每個 user 一把 mutex避免 EnsureRebuilt 互相阻塞
//
// 為什麼不用單一全域 rebuiltMu
// - 全域 rebuiltMu 會讓 user A 的 rebuild block user B 的 rebuildrebuild 包 5s
// timeout最壞 100 user 同時進來變 500s 排隊)
// - per-user mutex 用 sync.Map 自動 lazy-init不同 user 並行進 rebuild 互不干擾
type ownership struct {
mu sync.RWMutex
jobToUser map[string]string // job_id → user_id
// perUserLocks: user_id → *sync.Mutex
// 用 sync.Map 自動處理 lazy init + 移除Phase 0.8 不主動 evict — user 量級小)
perUserLocks sync.Map
// rebuilt: user_id → 已 rebuild 過?
// 用獨立 mutex 而非 sync.Map.LoadOrStore — 因為 set 與 fetch converter 必須原子
// fetch 失敗不 setsync.Map 的 LoadOrStore 不適合這個語意
rebuiltMu sync.RWMutex
rebuilt map[string]bool
converter ConverterClient
logger *slog.Logger
}
// NewOwnership 建立一個 Ownership 實例。
//
// converter 必填lazy rebuild 依賴logger 為 optionalnil 用 slog.Default()。
func NewOwnership(converter ConverterClient, logger *slog.Logger) Ownership {
if logger == nil {
logger = slog.Default()
}
return &ownership{
jobToUser: make(map[string]string),
rebuilt: make(map[string]bool),
converter: converter,
logger: logger,
}
}
// ==========================================================================
// Set / Get / Delete — RWMutex 標準 map 保護
// ==========================================================================
// Set 寫入 ownership空字串視為 no-op防呆
func (o *ownership) Set(jobID, userID string) {
if jobID == "" || userID == "" {
return
}
o.mu.Lock()
o.jobToUser[jobID] = userID
o.mu.Unlock()
}
// Get 讀取 ownership不存在回 ("", false)。
func (o *ownership) Get(jobID string) (string, bool) {
if jobID == "" {
return "", false
}
o.mu.RLock()
userID, ok := o.jobToUser[jobID]
o.mu.RUnlock()
return userID, ok
}
// Delete 移除 ownership不存在 no-op。
func (o *ownership) Delete(jobID string) {
if jobID == "" {
return
}
o.mu.Lock()
delete(o.jobToUser, jobID)
o.mu.Unlock()
}
// ==========================================================================
// EnsureRebuilt — DCL + per-user mutex
// ==========================================================================
// EnsureRebuilt 確保 userID 的 ownership 已從 converter rebuild。
//
// DCLdouble-checked locking流程
// 1. fast path先用 RLock 看 rebuilt[userID],已 rebuild → 直接 return nil
// 2. 取該 user 的 per-user mutex不同 user 並行;同 user 序列化)
// 3. slow path拿 mutex 後再次 check rebuilt[userID](其他 caller 可能剛 rebuild 完)
// 4. 真正 fetch converter帶 rebuildTimeout
// 5. 成功 → 寫 jobToUser + 標 rebuilt[userID]=true
// 6. 失敗 → 不標,下次重試
//
// 為什麼 fast path 不直接 returnsync.Map.Load 比 RWMutex.RLock 快但 race 條件需小心;
// 這裡用 RWMutex 對 rebuilt map 一致 protect與 slow path 寫入互斥)。
func (o *ownership) EnsureRebuilt(ctx context.Context, userID string) error {
if userID == "" {
return errors.New("conversion/ownership: userID is required")
}
// fast path
o.rebuiltMu.RLock()
done := o.rebuilt[userID]
o.rebuiltMu.RUnlock()
if done {
return nil
}
// 取該 user 的 per-user mutexlazy init via sync.Map
mu := o.lockForUser(userID)
mu.Lock()
defer mu.Unlock()
// 進入 critical section 前再 check ctxcaller 可能已 cancel
if err := ctx.Err(); err != nil {
return err
}
// slow path 內部再 check其他 caller 可能剛 rebuild 完)
o.rebuiltMu.RLock()
done = o.rebuilt[userID]
o.rebuiltMu.RUnlock()
if done {
return nil
}
// 真正 fetch converter帶 rebuild timeout避免單 caller 無限阻塞同 user 其他 caller
fetchCtx, cancel := context.WithTimeout(ctx, rebuildTimeout)
defer cancel()
jobs, err := o.converter.ListInProgressJobs(fetchCtx, userID)
if err != nil {
// 失敗不標 rebuilt — 下次重試
o.logger.WarnContext(ctx, "ownership: lazy rebuild failed",
slog.String("user_hash", hashUserID(userID)),
slog.String("err", err.Error()),
)
return err
}
// 寫入 jobToUser拿 jobToUser 的 write lock
o.mu.Lock()
for _, j := range jobs {
if j == nil || j.JobID == "" {
continue
}
o.jobToUser[j.JobID] = userID
}
o.mu.Unlock()
// 標 rebuilt
o.rebuiltMu.Lock()
o.rebuilt[userID] = true
o.rebuiltMu.Unlock()
o.logger.InfoContext(ctx, "ownership: lazy rebuild done",
slog.String("user_hash", hashUserID(userID)),
slog.Int("jobs_found", len(jobs)),
)
return nil
}
// lockForUser 取(或 lazy 建立)該 user 的 mutex。
//
// 用 sync.Map.LoadOrStore併發 100 個 goroutine 同時對同 user 取 mutex
// LoadOrStore 保證所有 goroutine 拿到同一個 *sync.Mutex 實例(其他丟棄)。
func (o *ownership) lockForUser(userID string) *sync.Mutex {
if existing, ok := o.perUserLocks.Load(userID); ok {
return existing.(*sync.Mutex)
}
// LoadOrStore若不存在則寫入新建的回傳現存或新建的
// loaded=true 代表已有他人寫入,我們新建的這把丟棄
actual, _ := o.perUserLocks.LoadOrStore(userID, &sync.Mutex{})
return actual.(*sync.Mutex)
}
// ==========================================================================
// ActiveJobOf — 反查 jobToUser
// ==========================================================================
// ActiveJobOf 從 jobToUser map 反查 user 擁有的 jobID 清單。
//
// O(N) 掃描Phase 0.8 同 user 最多 1 active job、整體 jobToUser 規模也不大(內部
// 使用者 < 100 並發可接受。Phase 1 加反向索引變 O(1)。
//
// caller 應先呼叫 EnsureRebuilt這裡不主動 rebuild避免雙寫競態
func (o *ownership) ActiveJobOf(userID string) []string {
if userID == "" {
return nil
}
o.mu.RLock()
defer o.mu.RUnlock()
// 預先 alloc 0 cap 的 slice — 大多數 user 是 0 或 1 個 job
result := make([]string, 0, 1)
for jobID, uid := range o.jobToUser {
if uid == userID {
result = append(result, jobID)
}
}
return result
}
// ==========================================================================
// helpers
// ==========================================================================
// hashUserID 對 user_id 做 SHA-256 取前 8 hex char給 log 用PII 保護)。
//
// 不存原始 user_id 進 log避免 log file 洩漏 OIDC sub。
func hashUserID(userID string) string {
if userID == "" {
return ""
}
sum := sha256.Sum256([]byte(userID))
return hex.EncodeToString(sum[:])[:8]
}