// Flow — Service interface 的具體實作(T6 整合層)。 // // 整合 T2 (mc_token_client) / T3 (converter_client) / T4 (faa_client) / T5 (ownership) // 成為對 handler 暴露的單一 Service。對齊: // - .autoflow/04-architecture/conversion.md §2.7 整體流程協調 + §4.3.1/§4.3.2 // - .autoflow/04-architecture/api/api-conversion.md(5 個 endpoint 規格) // - .autoflow/04-architecture/adr/adr-014-conversion-integration.md // // 設計原則: // - flow 不直接 import internal/model / internal/storage, // 改用 narrow interface(ModelStore / Storage)— 避免 import cycle, // 讓 main.go 在 wire 時做 adapter,符合 Go 慣例(accept interfaces, return structs) // - 所有 method 第一步都做 ownership 檢查(trust boundary,§7.2) // - 多次 promote 冪等:以 modelStore 已有對應 source_job_id 為「已處理」 // 的 source-of-truth,避免重複 promote / 重複建 model record // // Phase 0.8 conversion (見 .autoflow/04-architecture/conversion.md §2.7) package conversion import ( "context" "crypto/rand" "encoding/hex" "errors" "fmt" "io" "log/slog" "mime" "mime/multipart" "net/url" "path" "strings" "time" ) // ========================================================================== // Narrow interfaces(避免 import cycle;caller 在 main.go 做 adapter) // ========================================================================== // ModelStore 是 flow 對 internal/model.Repository 的最小依賴子集。 // // 設計選擇(FAANG 慣例):consumer 定義介面,不直接 import internal/model; // main.go 在 wire 時把 *model.InMemoryRepository(或未來的 PostgresRepository) // 包成 adapter 傳進來。這樣: // - flow_test.go 可以用 in-package stub 測試,不必拉 model package // - 未來 model.Repository 介面再擴充也不影響 flow(除非 flow 真的要用新 method) // - 不引入 import cycle(model 不需 import conversion) // // 具體 method 對應 internal/model.Repository: // - Save: model.Repository.Save // - FindBySourceJobID: 既有 List + filter SourceJobID(adapter 在 main.go 寫) // - GenerateID: 由 adapter 注入(model_id 命名邏輯沿用既有專案規則) type ModelStore interface { // Save 新增或更新一筆 Model 紀錄。對齊 model.Repository.Save semantics。 Save(ctx context.Context, m *ModelRecord) error // FindBySourceJobID 查找該 user 是否已有對應某 conversion job 的 model record。 // 用於 PromoteToModels 冪等檢查:同 jobID 重複 promote 直接回既有 model。 // // 找不到回 (nil, nil);找到回 (*ModelRecord, nil);其他錯誤回 err。 FindBySourceJobID(ctx context.Context, ownerUserID, sourceJobID string) (*ModelRecord, error) // GenerateID 產一個新的 model_id(對齊既有 model package 的命名)。 GenerateID() string } // ModelRecord 是 flow 與 ModelStore 之間的 DTO,避免 flow 直接 import internal/model。 // // adapter(在 main.go)負責 ModelRecord ↔ model.Model 的轉換。 // // 欄位對齊 internal/model.Model 的子集(Phase 0.8 promote-to-models 寫入需要的)。 type ModelRecord struct { ID string OwnerUserID string Name string Description string StorageKey string FileSize int64 FileChecksum string TargetChip string Source string // 永遠 "converted" SourceJobID string CreatedAt time.Time UpdatedAt time.Time } // Storage 是 flow 對 internal/storage.Store 的最小依賴子集。 // // Phase 0.8 promote-to-models 流程只需要 Put(streaming 寫進 storage); // 其他 method(Get / List / Presigned)由 internal/api/models.go 既有 handler 處理。 type Storage interface { // Put streaming 寫一個 object。實作對齊 internal/storage.Store.Put: // - r 為 streaming reader,實作不應 ReadAll 進記憶體 // - size 為預期大小(bytes);若未知傳 -1 // - meta 可為 nil Put(ctx context.Context, key string, r io.Reader, size int64, meta map[string]string) error } // ========================================================================== // Service 實作 // ========================================================================== // flow 是 Service interface 的預設實作(不對外 export,caller 拿 interface)。 type flow struct { converter ConverterClient faa FAAClient mcToken MCTokenClient ownership Ownership modelStore ModelStore storage Storage tenantID string faaBaseURL string defaultJobExpiryDuration time.Duration delegatedTTLSeconds int logger *slog.Logger now func() time.Time } // FlowOpts 是 NewService 的依賴注入。 // // 必填:Converter / FAA / MCToken / Ownership / ModelStore / Storage / TenantID / FAABaseURL。 // 其他 optional(nil/0 自動填合理預設)。 type FlowOpts struct { // 4 個 client(T2-T5) Converter ConverterClient FAA FAAClient MCToken MCTokenClient Ownership Ownership // 既有 visionA 套件的 narrow adapter ModelStore ModelStore Storage Storage // MC delegated download 用的 tenant id(visionA 在 MC 的 tenant 識別) TenantID string // FAA base URL;組 download URL 用(http://192.168.0.130:5081 等)。 // 不帶結尾斜線,constructor 自動 trim。 FAABaseURL string // converter 沒回 expires_at 時自行推算的 fallback duration(預設 7 天)。 DefaultJobExpiryDuration time.Duration // MC delegated download token TTL(秒)。0 → 預設 300(5 分鐘)。 // 對齊 conversion.md §10.2,建議範圍 60-900。 DelegatedTTLSeconds int Logger *slog.Logger Now func() time.Time } // NewService 建立一個 Service 實例。 // // 回傳 interface 而非 concrete struct(DI 友善 + 未來實作替換不影響 caller)。 func NewService(opts FlowOpts) (Service, error) { if opts.Converter == nil { return nil, errors.New("conversion: FlowOpts.Converter is required") } if opts.FAA == nil { return nil, errors.New("conversion: FlowOpts.FAA is required") } if opts.MCToken == nil { return nil, errors.New("conversion: FlowOpts.MCToken is required") } if opts.Ownership == nil { return nil, errors.New("conversion: FlowOpts.Ownership is required") } if opts.ModelStore == nil { return nil, errors.New("conversion: FlowOpts.ModelStore is required") } if opts.Storage == nil { return nil, errors.New("conversion: FlowOpts.Storage is required") } if opts.TenantID == "" { return nil, errors.New("conversion: FlowOpts.TenantID is required") } if opts.FAABaseURL == "" { return nil, errors.New("conversion: FlowOpts.FAABaseURL is required") } expiry := opts.DefaultJobExpiryDuration if expiry <= 0 { expiry = 7 * 24 * time.Hour // 對齊 converter 7 天 GC(§2.6.2) } ttl := opts.DelegatedTTLSeconds if ttl <= 0 { ttl = 300 } logger := opts.Logger if logger == nil { logger = slog.Default() } nowFn := opts.Now if nowFn == nil { nowFn = time.Now } return &flow{ converter: opts.Converter, faa: opts.FAA, mcToken: opts.MCToken, ownership: opts.Ownership, modelStore: opts.ModelStore, storage: opts.Storage, tenantID: opts.TenantID, faaBaseURL: strings.TrimRight(opts.FAABaseURL, "/"), defaultJobExpiryDuration: expiry, delegatedTTLSeconds: ttl, logger: logger, now: nowFn, }, nil } // 編譯時檢查:確保 *flow 實作 Service interface。 var _ Service = (*flow)(nil) // ========================================================================== // InitJob — 對應 POST /api/conversion/init // ========================================================================== // InitJob 對齊 conversion.md §4.2 streaming proxy + §2.7 整體流程。 // // 實作流程: // 1. ownership.EnsureRebuilt(避免 cache 殘留 / 重啟後該 user 第一次進) // 2. 同 user active job pre-check:有 → 回 *ActiveJobError 帶 active job 細節 // 3. 用 io.Pipe + multipart.Reader/Writer 重組 multipart body // - 黑名單 client 帶來的 user_id field(§4.2 / §7.3) // - 注入 visionA-backend 從 OIDC sub 取得的 UserID // 4. converter.InitJob 同步等到 201(不 early-return;對齊 §4.3.1 選項 A) // 5. 寫 ownership.Set(jobID, userID) // 6. 失敗時的 cleanup 行為(§4.3.2): // - converter Phase 1 **沒有實作** `POST /api/v1/jobs/{id}/cancel` endpoint // (已驗證:apps/task-scheduler 的 routes/v1/jobs.js 只有 POST '/'、GET '/'、 // GET '/:id'、POST '/:id/download-tokens'、DELETE '/:id')。 // - Phase 0.8 採「socket close 自然 abort」策略:streaming body 中斷時 // converter multer 拋錯 → 該 job 留 `failed` 狀態 + error_code=invalid_multipart // → converter 對 active_job 邏輯視為已結束 → 下次 init 不會撞 409。 // - flow.go 不主動發 cancel(沒有對應 endpoint 可發);只在 InitJob 失敗時 log。 // - **Phase 1+ 升級**:當 converter 補上 `/cancel` 後,T3 ConverterClient // 新增 `CancelJob(ctx, jobID) error`,flow.go 在 InitJob 失敗時開獨立 5s // timeout context(不繼承已 cancel 的 ctx)做 best-effort 主動 cancel。 // 見 conversion.md §4.3.2 + ./05-implementation/phase-0.8-T6.md follow-ups。 func (f *flow) InitJob(ctx context.Context, in InitJobInput) (*Job, error) { if in.UserID == "" { return nil, errors.New("conversion: InitJob requires UserID") } if in.Body == nil { return nil, errors.New("conversion: InitJob requires Body") } if in.ContentType == "" { return nil, errors.New("conversion: InitJob requires ContentType (must contain multipart boundary)") } // 1. ownership lazy rebuild — 確保該 user 的 active jobs 有從 converter 拉回來 if err := f.ownership.EnsureRebuilt(ctx, in.UserID); err != nil { // rebuild 失敗:不 hard fail(converter 可能短暫不可達),讓 pre-check 走 stale cache // — 後面真正打 converter.InitJob 時若 converter 已恢復則照常通過;若仍異常會回 502。 // 但需要記 log,方便除錯。 f.logger.WarnContext(ctx, "conversion.flow.init_ownership_rebuild_failed", slog.String("user_hash", hashUserID(in.UserID)), slog.String("err", err.Error()), ) } // 2. 同 user active job pre-check(§9.3) // 避免 visionA 已知 active 但仍打 converter 浪費一次 round-trip if existing, err := f.checkActiveJob(ctx, in.UserID); err != nil { return nil, err } else if existing != nil { return nil, &ActiveJobError{Job: existing} } // 3. 重組 multipart:注入 user_id、黑名單 client 帶來的 user_id(§4.2 / §7.3) pr, pw := io.Pipe() mw := multipart.NewWriter(pw) // goroutine 解析 client multipart 並重寫到 pw;converter 端從 pr 讀 // // Close 順序(Reviewer M-2): // 單一 close 路徑、根據 rebuild err 決定 pw.Close vs pw.CloseWithError — // 不可用 `defer pw.Close()` 配 `pw.CloseWithError(err)`(defer LIFO 會在 // CloseWithError 之後跑,把 err 蓋成 nil EOF,converter 端拿到截斷 stream // 而不是 rebuild 錯誤訊號) // - mw.Close 必須先(送 final boundary 給 reader),再用 err 決定關 pw 的方式 // - rebuildErrCh 在 close 之後送,確保主流程拿到 err 時 pipe 已收尾 rebuildErrCh := make(chan error, 1) go func() { err := rebuildMultipart(in.UserID, in.ContentType, in.Body, mw) // mw.Close 寫 final boundary;即使 rebuild 失敗也要關(避免 mw 內部 buffer 殘留) if mwErr := mw.Close(); mwErr != nil && err == nil { err = fmt.Errorf("close multipart writer: %w", mwErr) } // 用單一路徑決定 pw 怎麼關 if err != nil { _ = pw.CloseWithError(err) } else { _ = pw.Close() } rebuildErrCh <- err }() // 4. POST converter — 同步等到 201(streaming proxy;不 early-return,對齊 §4.3.1) cj, err := f.converter.InitJob(ctx, InitConverterJobReq{ UserID: in.UserID, Body: pr, BodyContentType: mw.FormDataContentType(), }) // 等 goroutine 結束(pw.Close 已觸發 EOF;rebuild 邏輯已 write 完) rebuildErr := <-rebuildErrCh // 若 converter 沒回 error,但 rebuild goroutine 失敗 → 也視為 init 失敗 if err == nil && rebuildErr != nil { err = fmt.Errorf("%w: rebuild multipart: %v", ErrConverterUnavailable, rebuildErr) } if err != nil { // converter 4xx / 5xx / network → 已分類成 sentinel // Cleanup 策略(§4.3.2,已驗證 converter Phase 1 沒實作 /cancel endpoint): // 不主動打 cancel —— 靠 converter multer 收 socket close 自然 abort // (streaming 中斷 → multer 拋錯 → job 留 failed → 下次 init 不會撞 409)。 // Phase 1+ 等 converter 補 /cancel 後再升級為 best-effort 主動 cancel。 f.logger.WarnContext(ctx, "conversion.flow.init_failed", slog.String("user_hash", hashUserID(in.UserID)), slog.String("err", err.Error()), ) return nil, err } // 5. 寫 ownership f.ownership.Set(cj.JobID, in.UserID) job := f.toJob(cj) f.logger.InfoContext(ctx, "conversion.flow.init_success", slog.String("user_hash", hashUserID(in.UserID)), slog.String("job_id", cj.JobID), slog.String("status", cj.Status), slog.String("source_filename", cj.SourceFilename), ) return job, nil } // rebuildMultipart 解 client 端 multipart,重新寫到 mw。 // // 規則(§4.2 / §7.3): // 1. 先寫 user_id field(從 visionA-backend 注入,唯一可信來源) // 2. client 帶來的 user_id field 一律忽略(黑名單) // 3. 其他 form field / file part 透傳 func rebuildMultipart(userID, contentType string, body io.Reader, mw *multipart.Writer) error { // 解析 boundary _, params, err := mime.ParseMediaType(contentType) if err != nil { return fmt.Errorf("parse content type: %w", err) } boundary := params["boundary"] if boundary == "" { return errors.New("missing multipart boundary") } // 先寫 user_id(重點:在 file part 之前,§4.2 註解說明:避免 converter multer // 解析時 user_id 還沒到就拒絕) if err := mw.WriteField("user_id", userID); err != nil { return fmt.Errorf("write user_id field: %w", err) } mr := multipart.NewReader(body, boundary) for { part, err := mr.NextPart() if err == io.EOF { return nil } if err != nil { return fmt.Errorf("read next part: %w", err) } name := part.FormName() // 黑名單 user_id:忽略 client 自己塞的(§4.2) if name == "user_id" { _ = part.Close() continue } if part.FileName() == "" { // form field:直接複製 fw, err := mw.CreateFormField(name) if err != nil { _ = part.Close() return fmt.Errorf("create form field %q: %w", name, err) } if _, err := io.Copy(fw, part); err != nil { _ = part.Close() return fmt.Errorf("copy form field %q: %w", name, err) } } else { // file part:streaming copy(不 buffer 全 RAM) fw, err := mw.CreateFormFile(name, part.FileName()) if err != nil { _ = part.Close() return fmt.Errorf("create form file %q: %w", name, err) } if _, err := io.Copy(fw, part); err != nil { _ = part.Close() return fmt.Errorf("copy form file %q: %w", name, err) } } _ = part.Close() } } // checkActiveJob 看 user 是否已有 active job(pre-check)。 // // 流程: // 1. ownership.ActiveJobOf — 反查 cache 中該 user 的 jobs // 2. 取第一個(Phase 0.8 同 user 最多 1 個),用 converter.GetJob 確認狀態 // - 若狀態為 created/running → return 該 Job(給 caller 包成 ActiveJobError) // - 若 converter 回 404 / 該 job 已 completed / failed → 視為無 active,先清 cache 再 return nil // // 沒 active job 回 (nil, nil)。 func (f *flow) checkActiveJob(ctx context.Context, userID string) (*Job, error) { jobIDs := f.ownership.ActiveJobOf(userID) if len(jobIDs) == 0 { return nil, nil } jobID := jobIDs[0] cj, err := f.converter.GetJob(ctx, jobID) if err != nil { if errors.Is(err, ErrJobNotFound) { // converter 已 GC(7d 過期)— 清 cache 後視為無 active f.ownership.Delete(jobID) return nil, nil } // 其他錯誤(5xx / network)— 對 caller 透傳;caller 決定 502 return nil, err } // 只有 created / running 視為 active switch cj.Status { case "completed", "failed": // 已結束的 job 不算 active;不清 ownership(GetJob / Download 仍需要這個對應) return nil, nil default: return f.toJob(cj), nil } } // ========================================================================== // GetJob — 對應 GET /api/conversion/{job_id} // ========================================================================== // GetJob 對齊 conversion.md §2.7 + api-conversion.md §2。 // // 流程: // 1. ownership.EnsureRebuilt(確保 cache 已 lazy rebuild) // 2. ownership.Get(jobID) — 比對 owner;不符 → ErrJobNotFound(避免洩漏 job 存在性) // 3. converter.GetJob(jobID) // 4. 若 expires_at 為零,補 created_at + DefaultJobExpiryDuration // // 設計選擇:ownership 不符不回 forbidden,而是 not_found: // - 避免讓攻擊者用「forbidden vs not_found」差異枚舉合法 job_id // - 對齊 §7.2 安全考量 func (f *flow) GetJob(ctx context.Context, userID, jobID string) (*Job, error) { if userID == "" { return nil, errors.New("conversion: GetJob requires userID") } if jobID == "" { return nil, ErrJobNotFound } if err := f.ownership.EnsureRebuilt(ctx, userID); err != nil { // rebuild 失敗:不視為 fatal,繼續走 cache(可能 stale);fail-soft f.logger.WarnContext(ctx, "conversion.flow.get_ownership_rebuild_failed", slog.String("user_hash", hashUserID(userID)), slog.String("err", err.Error()), ) } owner, ok := f.ownership.Get(jobID) if !ok || owner != userID { // 不符 → 視為 not_found(避免洩漏存在性) return nil, ErrJobNotFound } cj, err := f.converter.GetJob(ctx, jobID) if err != nil { return nil, err } return f.toJob(cj), nil } // ========================================================================== // ActiveJob — 對應 GET /api/conversion/active // ========================================================================== // ActiveJob 對齊 conversion.md §2.6.1 lazy rebuild + api-conversion.md §5。 // // 流程: // 1. ownership.EnsureRebuilt(從 converter ListInProgressJobs 重建 cache) // 2. ownership.ActiveJobOf — 反查 // 3. 沒有 → return (nil, nil)(不視為 error;對齊 has_active=false 語意) // 4. 取 [0](Phase 0.8 ≤ 1)→ converter.GetJob 拿即時狀態 // 5. converter 回 404(job 已過期被 GC)→ 清 cache + return (nil, nil) // // 重啟恢復場景:visionA-backend in-memory cache 全空時,EnsureRebuilt 會打 // converter ListInProgressJobs 把該 user 的 active job 重建進來,使用者看不出差別。 func (f *flow) ActiveJob(ctx context.Context, userID string) (*Job, error) { if userID == "" { return nil, errors.New("conversion: ActiveJob requires userID") } // 1. lazy rebuild(這個路徑不 fail-soft:rebuild 失敗 = 無法回答 has_active 問題, // 必須 propagate 給 caller 知道) if err := f.ownership.EnsureRebuilt(ctx, userID); err != nil { return nil, err } // 2. 反查 jobIDs := f.ownership.ActiveJobOf(userID) if len(jobIDs) == 0 { return nil, nil } // 3. 取第一個,問 converter 即時狀態 jobID := jobIDs[0] cj, err := f.converter.GetJob(ctx, jobID) if err != nil { if errors.Is(err, ErrJobNotFound) { // converter 已 GC → 清 cache + 視為無 active f.ownership.Delete(jobID) return nil, nil } return nil, err } // 已 completed / failed 的 job 也不算 active(has_active=false) if cj.Status == "completed" || cj.Status == "failed" { return nil, nil } return f.toJob(cj), nil } // ========================================================================== // PromoteToModels — 對應 POST /api/conversion/{job_id}/promote-to-models // ========================================================================== // PromoteToModels 對齊 conversion.md §1 Stage 3a + §2.5 + api-conversion.md §3。 // // 流程: // 1. ownership 驗(不符 → ErrJobNotFound) // 2. converter.GetJob — 確認 status=completed(否則 ErrJobNotCompleted) // 3. 冪等檢查:modelStore.FindBySourceJobID — 已有 model 直接回(避免重複 promote) // 4. converter.Promote — 拿到 target_object_key // 5. faa.GetFile(target_object_key) — streaming pull NEF // 6. storage.Put — streaming 寫進 visionA storage(不 ReadAll) // 7. modelStore.Save — 建 model record(Source="converted"、SourceJobID=jobID) // 8. return PromoteResult // // 名稱:caller 從 wireframe §7.1 的 import Dialog 拿;空字串 fallback 為 // `_`(對齊 api-conversion.md §3)。 func (f *flow) PromoteToModels(ctx context.Context, userID, jobID, name string) (*PromoteResult, error) { if userID == "" { return nil, errors.New("conversion: PromoteToModels requires userID") } if jobID == "" { return nil, ErrJobNotFound } // 1. ownership rebuild + 驗 if err := f.ownership.EnsureRebuilt(ctx, userID); err != nil { f.logger.WarnContext(ctx, "conversion.flow.promote_ownership_rebuild_failed", slog.String("user_hash", hashUserID(userID)), slog.String("err", err.Error()), ) } owner, ok := f.ownership.Get(jobID) if !ok || owner != userID { return nil, ErrJobNotFound } // 2. converter.GetJob 確認 completed cj, err := f.converter.GetJob(ctx, jobID) if err != nil { return nil, err } if cj.Status != "completed" { return nil, fmt.Errorf("%w: status=%s", ErrJobNotCompleted, cj.Status) } // 3. 冪等檢查 if existing, err := f.modelStore.FindBySourceJobID(ctx, userID, jobID); err != nil { f.logger.WarnContext(ctx, "conversion.flow.promote_find_existing_failed", slog.String("user_hash", hashUserID(userID)), slog.String("job_id", jobID), slog.String("err", err.Error()), ) // 查 model store 失敗不 hard fail —— 仍嘗試 promote(最壞結果是重複建一個 model record) } else if existing != nil { f.logger.InfoContext(ctx, "conversion.flow.promote_idempotent_hit", slog.String("user_hash", hashUserID(userID)), slog.String("job_id", jobID), slog.String("model_id", existing.ID), ) return modelRecordToPromoteResult(existing), nil } // 4. converter.Promote — 組目標 object_key(FAA 內部命名規則由 visionA 決定) finalName := name if finalName == "" { finalName = defaultModelName(cj) } targetObjectKey := buildTargetObjectKey(userID, jobID) promoteRes, err := f.converter.Promote(ctx, jobID, PromoteReq{ UserID: userID, Source: promoteDefaultSource, // "nef" TargetObjectKey: targetObjectKey, }) if err != nil { return nil, err } // 5. faa.GetFile streaming pull file, err := f.faa.GetFile(ctx, promoteRes.TargetObjectKey) if err != nil { return nil, err } defer file.Body.Close() // 6. storage.Put streaming write modelID := f.modelStore.GenerateID() storageKey := buildStorageKey(userID, modelID) storageMeta := map[string]string{ "source": "converted", "source_job_id": jobID, "target_chip": normalizeTargetChip(cj.Platform), } if err := f.storage.Put(ctx, storageKey, file.Body, file.ContentLength, storageMeta); err != nil { f.logger.WarnContext(ctx, "conversion.flow.promote_storage_put_failed", slog.String("user_hash", hashUserID(userID)), slog.String("job_id", jobID), slog.String("storage_key", storageKey), slog.String("err", err.Error()), ) // visionA 自家 storage 失敗(disk full / S3 5xx / 權限錯誤) // — 不是 FAA / converter 問題,用獨立 sentinel 讓 SRE alarm 打對 team // (對齊 Reviewer M-1) return nil, fmt.Errorf("%w: storage.Put %s: %v", ErrStorageUnavailable, storageKey, err) } // 7. modelStore.Save now := f.now().UTC() rec := &ModelRecord{ ID: modelID, OwnerUserID: userID, Name: finalName, StorageKey: storageKey, FileSize: promoteRes.Size, FileChecksum: promoteRes.Checksum, TargetChip: normalizeTargetChip(cj.Platform), Source: "converted", SourceJobID: jobID, CreatedAt: now, UpdatedAt: now, } if err := f.modelStore.Save(ctx, rec); err != nil { f.logger.WarnContext(ctx, "conversion.flow.promote_model_save_failed", slog.String("user_hash", hashUserID(userID)), slog.String("job_id", jobID), slog.String("model_id", modelID), slog.String("err", err.Error()), ) // model store save 失敗(in-memory 不會失敗;未來 Postgres 才會觸發) // — 不是 converter / FAA 問題,用獨立 sentinel 對齊 SRE alarm 分類(Reviewer M-1) // 已寫進 storage 但無 record 對應 → 等同孤立檔案;Phase 1 加 GC 機制清掃 return nil, fmt.Errorf("%w: modelStore.Save model_id=%s: %v", ErrModelStoreUnavailable, modelID, err) } f.logger.InfoContext(ctx, "conversion.flow.promote_success", slog.String("user_hash", hashUserID(userID)), slog.String("job_id", jobID), slog.String("model_id", modelID), slog.Int64("file_size", promoteRes.Size), ) return modelRecordToPromoteResult(rec), nil } // ========================================================================== // DownloadRedirectURL — 對應 GET /api/conversion/{job_id}/download // ========================================================================== // DownloadRedirectURL 對齊 conversion.md §1 Stage 3b + §3.1 + api-conversion.md §4。 // // 流程: // 1. ownership 驗(不符 → ErrJobNotFound) // 2. converter.GetJob — 確認 status=completed // 3. ensurePromoted — 自動觸發 promote(若還沒 promote 過),拿到 target_object_key // - 設計選擇(task spec 詢問點):自動觸發。理由:api-conversion.md §4 註解說 // 「兩條路徑(promote-to-models / download)都拿同一個 target_object_key」+ // 「不會與 promote-to-models 衝突;兩者內部都會 ensurePromoted(冪等)」— // 要求 user 先按 promote-to-models 才能下載會違背「下載」按鈕的直覺語意。 // 4. mcToken.IssueDelegatedDownload — 換 opaque token (TTL 5min 預設) // 5. 組 https:///files/?access_token= // // 安全(§10.4): // - token 不出現在任何 JSON response(caller 走 server-side 302 redirect) // - object_key 不對 frontend 揭露 func (f *flow) DownloadRedirectURL(ctx context.Context, userID, jobID string) (string, error) { if userID == "" { return "", errors.New("conversion: DownloadRedirectURL requires userID") } if jobID == "" { return "", ErrJobNotFound } // 1. ownership 驗 if err := f.ownership.EnsureRebuilt(ctx, userID); err != nil { f.logger.WarnContext(ctx, "conversion.flow.download_ownership_rebuild_failed", slog.String("user_hash", hashUserID(userID)), slog.String("err", err.Error()), ) } owner, ok := f.ownership.Get(jobID) if !ok || owner != userID { return "", ErrJobNotFound } // 2. converter.GetJob 確認 completed cj, err := f.converter.GetJob(ctx, jobID) if err != nil { return "", err } if cj.Status != "completed" { return "", fmt.Errorf("%w: status=%s", ErrJobNotCompleted, cj.Status) } // 3. ensurePromoted — 自動觸發 promote 拿 target_object_key // Phase 0.8 不 cache promoted_object_key(converter 端 promote 是冪等的, // 重複呼叫成本可接受 — 反正 download 路徑 user 主動觸發頻率不高) targetObjectKey, err := f.ensurePromoted(ctx, userID, jobID, cj) if err != nil { return "", err } // 4. mcToken 換 delegated download token delegated, err := f.mcToken.IssueDelegatedDownload(ctx, IssueDownloadReq{ TenantID: f.tenantID, UserID: userID, ObjectKey: targetObjectKey, ExpiresInSeconds: f.delegatedTTLSeconds, }) if err != nil { return "", err } // 5. 組 URL:FAA base + /files/?access_token= // - object_key 用 url.PathEscape 處理(含路徑分隔符的 key 安全 escape) // - token 用 url.QueryEscape(雖 opaque token 通常不含特殊字元,仍 escape 防呆) downloadURL := fmt.Sprintf("%s/files/%s?access_token=%s", f.faaBaseURL, escapeObjectKeyPath(targetObjectKey), url.QueryEscape(delegated.Token), ) f.logger.InfoContext(ctx, "conversion.flow.download_url_issued", slog.String("user_hash", hashUserID(userID)), slog.String("job_id", jobID), slog.String("object_key_hash", hashObjectKey(targetObjectKey)), slog.Int("ttl_sec", f.delegatedTTLSeconds), ) return downloadURL, nil } // ensurePromoted 取 target_object_key — 若已 promote 過(model record 已存在)用 cache, // 否則打 converter.Promote 拿。 // // 用 modelStore.FindBySourceJobID 當 source-of-truth:若已有 model record 表示 // PromoteToModels 已成功跑過,可直接從 record 拿 storage_key 反推 target_object_key? // ✗ 不行:storage_key 是 visionA storage 的 key,不是 FAA 的 object_key。 // // 改用 converter.Promote 冪等性(§2.7:「promote 動作是冪等的,converter 端對同一 // job 重複 promote 接受」)— 直接打 converter,重複呼叫成本低(同步等 1-2s)。 // // 為什麼不用 sync.Map cache:Phase 0.8 download 路徑 user 主動觸發頻率不高(每 job 1-N 次), // 簡單性 > 微優化。Phase 1 量大再加 cache(progress.md 已記)。 func (f *flow) ensurePromoted(ctx context.Context, userID, jobID string, cj *ConverterJob) (string, error) { targetObjectKey := buildTargetObjectKey(userID, jobID) res, err := f.converter.Promote(ctx, jobID, PromoteReq{ UserID: userID, Source: promoteDefaultSource, TargetObjectKey: targetObjectKey, }) if err != nil { return "", err } return res.TargetObjectKey, nil } // ========================================================================== // helpers // ========================================================================== // toJob 把 ConverterJob(client 層中介 type)轉成對外的 Job(response shape)。 // // 補 expires_at fallback:converter 沒給 → created_at + DefaultJobExpiryDuration(§2.6.2)。 func (f *flow) toJob(cj *ConverterJob) *Job { if cj == nil { return nil } job := &Job{ JobID: cj.JobID, Status: cj.Status, Stage: cj.Stage, CreatedAt: cj.CreatedAt, UpdatedAt: cj.UpdatedAt, ExpiresAt: cj.ExpiresAt, SourceFilename: cj.SourceFilename, TargetChip: cj.Platform, ErrorCode: cj.ErrorCode, ErrorMessage: cj.ErrorMessage, } if cj.Progress != nil { job.Progress = *cj.Progress } if cj.StageProgress != nil { job.StageProgress = *cj.StageProgress } if job.ExpiresAt.IsZero() && !cj.CreatedAt.IsZero() { job.ExpiresAt = cj.CreatedAt.Add(f.defaultJobExpiryDuration) } return job } // modelRecordToPromoteResult 把 ModelRecord 轉成對外的 PromoteResult。 func modelRecordToPromoteResult(rec *ModelRecord) *PromoteResult { if rec == nil { return nil } return &PromoteResult{ ModelID: rec.ID, Source: rec.Source, SourceJobID: rec.SourceJobID, Name: rec.Name, TargetChip: rec.TargetChip, FileSize: rec.FileSize, Status: "ready", // visionA model 既有 status,promote 完即 ready CreatedAt: rec.CreatedAt, } } // buildTargetObjectKey 產 FAA 的 object_key(visionA 端命名規則)。 // // 命名:models/{user_id}/{job_id}.nef // 用 user_id 隔離;job_id 唯一性由 converter 保證(UUID)。 // // 對齊 conversion.md §10.4:「object_key 不對 frontend 揭露」— 命名只在 server-side 用。 func buildTargetObjectKey(userID, jobID string) string { // 注意:這裡不對 userID/jobID 做 escape — caller(visionA-backend handler) // 已從 OIDC sub / converter response 拿,皆為合法 ID 字元(UUID / OIDC sub)。 return fmt.Sprintf("models/%s/%s.nef", userID, jobID) } // buildStorageKey 產 visionA storage 的 key(不是 FAA 的)。 // // 沿用 internal/storage 既有命名慣例:models/{user_id}/{model_id}.nef // (storage.md §2 範例)。 func buildStorageKey(userID, modelID string) string { return fmt.Sprintf("models/%s/%s.nef", userID, modelID) } // escapeObjectKeyPath 對 object_key 做 path escape,但保留 '/' 為 path separator。 // // url.PathEscape 會把 '/' 也 escape 成 %2F — 對 FAA `/files/{**objectKey}` 來說 // 應該保留 '/' 為路徑分隔符,所以拆段後逐段 escape 再合回。 func escapeObjectKeyPath(objectKey string) string { parts := strings.Split(objectKey, "/") for i := range parts { parts[i] = url.PathEscape(parts[i]) } return strings.Join(parts, "/") } // normalizeTargetChip 把 converter 端 platform("520"/"720"/...)轉成 visionA model 的 // target_chip 表示法("kl520"/"kl720"/...)。 // // 對齊 api-conversion.md §3 注解:「conversion job 用 platform '720',model.target_chip 用 'kl720'」。 func normalizeTargetChip(platform string) string { p := strings.ToLower(strings.TrimSpace(platform)) if p == "" { return "" } if strings.HasPrefix(p, "kl") { return p } return "kl" + p } // defaultModelName 產 PromoteToModels caller 沒給 name 時的 fallback。 // // 規則:`_` — 對齊 api-conversion.md §3 預設值 // (wireframe §7.1 import Dialog 預設)。 func defaultModelName(cj *ConverterJob) string { // path.Base("") 會回 ".";先擋掉空 / "." / ".." 等無效 stem var stem string if cj.SourceFilename != "" { base := path.Base(cj.SourceFilename) if base != "." && base != "/" && base != ".." { stem = strings.TrimSuffix(base, path.Ext(base)) } } chip := strings.ToLower(strings.TrimSpace(cj.Platform)) switch { case stem != "" && chip != "": return fmt.Sprintf("%s_kl%s", stem, chip) case stem != "": return stem case chip != "": return fmt.Sprintf("converted_kl%s", chip) default: // 兜底:用 timestamp 避免空 name return fmt.Sprintf("converted_%d", time.Now().Unix()) } } // generateRandomID — 不對外暴露,用於測試或 ModelStore.GenerateID adapter 沒提供時的 fallback。 // // 16 hex chars (64-bit)。 // //nolint:unused // 保留供 main.go 的 adapter 在 fallback 時使用 func generateRandomID() string { b := make([]byte, 8) if _, err := rand.Read(b); err != nil { // crypto/rand 失敗極為罕見;用 timestamp 兜底 return fmt.Sprintf("%d", time.Now().UnixNano()) } return hex.EncodeToString(b) }