jim800121chen 1231bf0ed2 feat(visionA-backend): Phase 0.8 conversion package — 5 endpoint + 8 個內部模組
Phase 0.8 把 kneron_model_converter 的轉檔功能整合進 visionA Cloud。
visionA backend 當 streaming proxy(upload)+ delegated download token broker(download)+
ownership trust boundary,converter / FAA / MC 三方零修改。

新增 internal/conversion/ 套件(8 個檔,~10,000 行 prod+test,117+ test cases,race -count=3 全綠):

- conversion.go:Service interface 5 method、Job/PromoteResult/InitJobInput types
- errors.go:13+ sentinel errors + ErrorCode/HTTPStatus mapping,對齊 conversion.md §6
- mc_token_client.go:service-to-service token (client_credentials grant) + DCL cache
  (exp - 15s 重取,per-scope cache),IssueDelegatedDownload(MC delegated download token)
  錯誤分 idp_misconfigured (4xx) / idp_unavailable (5xx) / download_token_failed / mc_token_unavailable
- converter_client.go:對 converter scheduler 4 method(InitJob multipart streaming /
  GetJob / Promote / ListInProgressJobs),InitJob 不 retry 5xx(streaming body 無法 replay)
- faa_client.go:對 FAA GET /files/{key} server-to-server pull,Phase A retry(GET 無 body
  可 replay)對齊 §9.1 retry 矩陣,streaming io.ReadCloser 透傳避 OOM
- ownership.go:in-memory job_id → user_id map + per-user mutex 防 thundering herd lazy rebuild
  (不同 user 平行 fetch,同 user 100 caller 收斂成 1 次),visionA 重啟靠 converter
  ListInProgressJobs(user) 重建
- flow.go:Service interface 整合層(5 method 串接 converter/FAA/MC/ownership)
  - InitJob 用 io.Pipe + multipart.Reader/Writer 重組 streaming proxy(黑名單 client user_id
    + 灌入 OIDC sub)
  - DownloadRedirectURL 自動觸發 promote(spec §1 Stage 3b),用 ensurePromoted helper
  - PromoteToModels 冪等(modelStore.FindBySourceJobID 為 source-of-truth)
  - OwnershipMismatch → ErrJobNotFound 不 forbidden(§7.2 防枚舉)
  - storage / modelStore 失敗包 ErrStorageUnavailable / ErrModelStoreUnavailable
    (視為 visionA 自身 500 而非 502 gateway,SRE alarm 才打對 team)

新增 internal/api/conversion.go(5 endpoint handler + main.go wire):
- POST /api/conversion/init(multipart streaming proxy,不呼叫 c.MultipartForm())
- GET  /api/conversion/active(lazy rebuild ownership)
- GET  /api/conversion/{job_id}(poll status)
- POST /api/conversion/{job_id}/promote-to-models(FAA pull → models 三段式)
- GET  /api/conversion/{job_id}/download(server-side HTTP 302 → FAA,token 不過 frontend
  JS,仿 FAA TestSite DownloadFileDirect pattern;Cache-Control: no-store)

5 個 endpoint 全部走 OIDC AuthMiddleware;user_id 從 cookie session 灌(trust boundary),
從不接受 client multipart form / JSON / query 的 user_id。
TestAllAPIEndpointsRequire401WithoutCookie 自動覆蓋新 5 endpoint regression 防呆。

新增 cmd/api-server/conversion_e2e_test.go(4 個 e2e 場景):
- TestConversionE2E_StreamingProxy(10MB body + trust boundary regression)
- TestConversionE2E_LazyRebuildAfterRestart(visionA 重啟仍能 /active)
- TestConversionE2E_Download302Redirect(驗 302 + Location header + token 不在 body)
- TestConversionE2E_ActiveJobConflict(409 + active_job 詳情)

修改 internal/config/{config,load}.go:新增 ConversionConfig 5 欄位
(ConverterBaseURL / FAABaseURL / TenantID / ServiceClientID / ServiceClientSecret)+
Enabled() helper(雙非空判定)。
修改 cmd/api-server/main.go:條件 wire(cfg.Conversion.Enabled() 為 true 才建 client + Service;
否則 Deps.Conversion=nil,handler 自動回 501)。
修改 .env.example:新增 Phase 0.8 區塊註解。
新增 cmd/api-server/conversion_adapters.go:narrow interface adapter(接既有
internal/model.Repository / internal/storage.Store → conversion.ModelStore / Storage,避免 import cycle)。

驗證:go test -race -count=3 ./... 17 packages 全綠 / go vet 0 warning / go build 成功。

對齊文件:
- .autoflow/04-architecture/adr/adr-014-conversion-integration.md
- .autoflow/04-architecture/conversion.md (TDD)
- .autoflow/04-architecture/api/api-conversion.md
- .autoflow/02-prd/features/feature-converter-integration.md
- .autoflow/03-design/wireframes/wireframe-conversion.md
- .autoflow/03-design/flows/flow-conversion.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 13:56:07 +08:00

941 lines
34 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Flow — Service interface 的具體實作T6 整合層)。
//
// 整合 T2 (mc_token_client) / T3 (converter_client) / T4 (faa_client) / T5 (ownership)
// 成為對 handler 暴露的單一 Service。對齊
// - .autoflow/04-architecture/conversion.md §2.7 整體流程協調 + §4.3.1/§4.3.2
// - .autoflow/04-architecture/api/api-conversion.md5 個 endpoint 規格)
// - .autoflow/04-architecture/adr/adr-014-conversion-integration.md
//
// 設計原則:
// - flow 不直接 import internal/model / internal/storage
// 改用 narrow interfaceModelStore / Storage— 避免 import cycle
// 讓 main.go 在 wire 時做 adapter符合 Go 慣例accept interfaces, return structs
// - 所有 method 第一步都做 ownership 檢查trust boundary§7.2
// - 多次 promote 冪等:以 modelStore 已有對應 source_job_id 為「已處理」
// 的 source-of-truth避免重複 promote / 重複建 model record
//
// Phase 0.8 conversion (見 .autoflow/04-architecture/conversion.md §2.7)
package conversion
import (
"context"
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
"io"
"log/slog"
"mime"
"mime/multipart"
"net/url"
"path"
"strings"
"time"
)
// ==========================================================================
// Narrow interfaces避免 import cyclecaller 在 main.go 做 adapter
// ==========================================================================
// ModelStore 是 flow 對 internal/model.Repository 的最小依賴子集。
//
// 設計選擇FAANG 慣例consumer 定義介面,不直接 import internal/model
// main.go 在 wire 時把 *model.InMemoryRepository或未來的 PostgresRepository
// 包成 adapter 傳進來。這樣:
// - flow_test.go 可以用 in-package stub 測試,不必拉 model package
// - 未來 model.Repository 介面再擴充也不影響 flow除非 flow 真的要用新 method
// - 不引入 import cyclemodel 不需 import conversion
//
// 具體 method 對應 internal/model.Repository
// - Save: model.Repository.Save
// - FindBySourceJobID: 既有 List + filter SourceJobIDadapter 在 main.go 寫)
// - GenerateID: 由 adapter 注入model_id 命名邏輯沿用既有專案規則)
type ModelStore interface {
// Save 新增或更新一筆 Model 紀錄。對齊 model.Repository.Save semantics。
Save(ctx context.Context, m *ModelRecord) error
// FindBySourceJobID 查找該 user 是否已有對應某 conversion job 的 model record。
// 用於 PromoteToModels 冪等檢查:同 jobID 重複 promote 直接回既有 model。
//
// 找不到回 (nil, nil);找到回 (*ModelRecord, nil);其他錯誤回 err。
FindBySourceJobID(ctx context.Context, ownerUserID, sourceJobID string) (*ModelRecord, error)
// GenerateID 產一個新的 model_id對齊既有 model package 的命名)。
GenerateID() string
}
// ModelRecord 是 flow 與 ModelStore 之間的 DTO避免 flow 直接 import internal/model。
//
// adapter在 main.go負責 ModelRecord ↔ model.Model 的轉換。
//
// 欄位對齊 internal/model.Model 的子集Phase 0.8 promote-to-models 寫入需要的)。
type ModelRecord struct {
ID string
OwnerUserID string
Name string
Description string
StorageKey string
FileSize int64
FileChecksum string
TargetChip string
Source string // 永遠 "converted"
SourceJobID string
CreatedAt time.Time
UpdatedAt time.Time
}
// Storage 是 flow 對 internal/storage.Store 的最小依賴子集。
//
// Phase 0.8 promote-to-models 流程只需要 Putstreaming 寫進 storage
// 其他 methodGet / List / Presigned由 internal/api/models.go 既有 handler 處理。
type Storage interface {
// Put streaming 寫一個 object。實作對齊 internal/storage.Store.Put
// - r 為 streaming reader實作不應 ReadAll 進記憶體
// - size 為預期大小bytes若未知傳 -1
// - meta 可為 nil
Put(ctx context.Context, key string, r io.Reader, size int64, meta map[string]string) error
}
// ==========================================================================
// Service 實作
// ==========================================================================
// flow 是 Service interface 的預設實作(不對外 exportcaller 拿 interface
type flow struct {
converter ConverterClient
faa FAAClient
mcToken MCTokenClient
ownership Ownership
modelStore ModelStore
storage Storage
tenantID string
faaBaseURL string
defaultJobExpiryDuration time.Duration
delegatedTTLSeconds int
logger *slog.Logger
now func() time.Time
}
// FlowOpts 是 NewService 的依賴注入。
//
// 必填Converter / FAA / MCToken / Ownership / ModelStore / Storage / TenantID / FAABaseURL。
// 其他 optionalnil/0 自動填合理預設)。
type FlowOpts struct {
// 4 個 clientT2-T5
Converter ConverterClient
FAA FAAClient
MCToken MCTokenClient
Ownership Ownership
// 既有 visionA 套件的 narrow adapter
ModelStore ModelStore
Storage Storage
// MC delegated download 用的 tenant idvisionA 在 MC 的 tenant 識別)
TenantID string
// FAA base URL組 download URL 用http://192.168.0.130:5081 等)。
// 不帶結尾斜線constructor 自動 trim。
FAABaseURL string
// converter 沒回 expires_at 時自行推算的 fallback duration預設 7 天)。
DefaultJobExpiryDuration time.Duration
// MC delegated download token TTL。0 → 預設 3005 分鐘)。
// 對齊 conversion.md §10.2,建議範圍 60-900。
DelegatedTTLSeconds int
Logger *slog.Logger
Now func() time.Time
}
// NewService 建立一個 Service 實例。
//
// 回傳 interface 而非 concrete structDI 友善 + 未來實作替換不影響 caller
func NewService(opts FlowOpts) (Service, error) {
if opts.Converter == nil {
return nil, errors.New("conversion: FlowOpts.Converter is required")
}
if opts.FAA == nil {
return nil, errors.New("conversion: FlowOpts.FAA is required")
}
if opts.MCToken == nil {
return nil, errors.New("conversion: FlowOpts.MCToken is required")
}
if opts.Ownership == nil {
return nil, errors.New("conversion: FlowOpts.Ownership is required")
}
if opts.ModelStore == nil {
return nil, errors.New("conversion: FlowOpts.ModelStore is required")
}
if opts.Storage == nil {
return nil, errors.New("conversion: FlowOpts.Storage is required")
}
if opts.TenantID == "" {
return nil, errors.New("conversion: FlowOpts.TenantID is required")
}
if opts.FAABaseURL == "" {
return nil, errors.New("conversion: FlowOpts.FAABaseURL is required")
}
expiry := opts.DefaultJobExpiryDuration
if expiry <= 0 {
expiry = 7 * 24 * time.Hour // 對齊 converter 7 天 GC§2.6.2
}
ttl := opts.DelegatedTTLSeconds
if ttl <= 0 {
ttl = 300
}
logger := opts.Logger
if logger == nil {
logger = slog.Default()
}
nowFn := opts.Now
if nowFn == nil {
nowFn = time.Now
}
return &flow{
converter: opts.Converter,
faa: opts.FAA,
mcToken: opts.MCToken,
ownership: opts.Ownership,
modelStore: opts.ModelStore,
storage: opts.Storage,
tenantID: opts.TenantID,
faaBaseURL: strings.TrimRight(opts.FAABaseURL, "/"),
defaultJobExpiryDuration: expiry,
delegatedTTLSeconds: ttl,
logger: logger,
now: nowFn,
}, nil
}
// 編譯時檢查:確保 *flow 實作 Service interface。
var _ Service = (*flow)(nil)
// ==========================================================================
// InitJob — 對應 POST /api/conversion/init
// ==========================================================================
// InitJob 對齊 conversion.md §4.2 streaming proxy + §2.7 整體流程。
//
// 實作流程:
// 1. ownership.EnsureRebuilt避免 cache 殘留 / 重啟後該 user 第一次進)
// 2. 同 user active job pre-check有 → 回 *ActiveJobError 帶 active job 細節
// 3. 用 io.Pipe + multipart.Reader/Writer 重組 multipart body
// - 黑名單 client 帶來的 user_id field§4.2 / §7.3
// - 注入 visionA-backend 從 OIDC sub 取得的 UserID
// 4. converter.InitJob 同步等到 201不 early-return對齊 §4.3.1 選項 A
// 5. 寫 ownership.Set(jobID, userID)
// 6. 失敗時的 cleanup 行為§4.3.2
// - converter Phase 1 **沒有實作** `POST /api/v1/jobs/{id}/cancel` endpoint
// 已驗證apps/task-scheduler 的 routes/v1/jobs.js 只有 POST '/'、GET '/'、
// GET '/:id'、POST '/:id/download-tokens'、DELETE '/:id')。
// - Phase 0.8 採「socket close 自然 abort」策略streaming body 中斷時
// converter multer 拋錯 → 該 job 留 `failed` 狀態 + error_code=invalid_multipart
// → converter 對 active_job 邏輯視為已結束 → 下次 init 不會撞 409。
// - flow.go 不主動發 cancel沒有對應 endpoint 可發);只在 InitJob 失敗時 log。
// - **Phase 1+ 升級**:當 converter 補上 `/cancel` 後T3 ConverterClient
// 新增 `CancelJob(ctx, jobID) error`flow.go 在 InitJob 失敗時開獨立 5s
// timeout context不繼承已 cancel 的 ctx做 best-effort 主動 cancel。
// 見 conversion.md §4.3.2 + ./05-implementation/phase-0.8-T6.md follow-ups。
func (f *flow) InitJob(ctx context.Context, in InitJobInput) (*Job, error) {
if in.UserID == "" {
return nil, errors.New("conversion: InitJob requires UserID")
}
if in.Body == nil {
return nil, errors.New("conversion: InitJob requires Body")
}
if in.ContentType == "" {
return nil, errors.New("conversion: InitJob requires ContentType (must contain multipart boundary)")
}
// 1. ownership lazy rebuild — 確保該 user 的 active jobs 有從 converter 拉回來
if err := f.ownership.EnsureRebuilt(ctx, in.UserID); err != nil {
// rebuild 失敗:不 hard failconverter 可能短暫不可達),讓 pre-check 走 stale cache
// — 後面真正打 converter.InitJob 時若 converter 已恢復則照常通過;若仍異常會回 502。
// 但需要記 log方便除錯。
f.logger.WarnContext(ctx, "conversion.flow.init_ownership_rebuild_failed",
slog.String("user_hash", hashUserID(in.UserID)),
slog.String("err", err.Error()),
)
}
// 2. 同 user active job pre-check§9.3
// 避免 visionA 已知 active 但仍打 converter 浪費一次 round-trip
if existing, err := f.checkActiveJob(ctx, in.UserID); err != nil {
return nil, err
} else if existing != nil {
return nil, &ActiveJobError{Job: existing}
}
// 3. 重組 multipart注入 user_id、黑名單 client 帶來的 user_id§4.2 / §7.3
pr, pw := io.Pipe()
mw := multipart.NewWriter(pw)
// goroutine 解析 client multipart 並重寫到 pwconverter 端從 pr 讀
//
// Close 順序Reviewer M-2
// 單一 close 路徑、根據 rebuild err 決定 pw.Close vs pw.CloseWithError —
// 不可用 `defer pw.Close()` 配 `pw.CloseWithError(err)`defer LIFO 會在
// CloseWithError 之後跑,把 err 蓋成 nil EOFconverter 端拿到截斷 stream
// 而不是 rebuild 錯誤訊號)
// - mw.Close 必須先(送 final boundary 給 reader再用 err 決定關 pw 的方式
// - rebuildErrCh 在 close 之後送,確保主流程拿到 err 時 pipe 已收尾
rebuildErrCh := make(chan error, 1)
go func() {
err := rebuildMultipart(in.UserID, in.ContentType, in.Body, mw)
// mw.Close 寫 final boundary即使 rebuild 失敗也要關(避免 mw 內部 buffer 殘留)
if mwErr := mw.Close(); mwErr != nil && err == nil {
err = fmt.Errorf("close multipart writer: %w", mwErr)
}
// 用單一路徑決定 pw 怎麼關
if err != nil {
_ = pw.CloseWithError(err)
} else {
_ = pw.Close()
}
rebuildErrCh <- err
}()
// 4. POST converter — 同步等到 201streaming proxy不 early-return對齊 §4.3.1
cj, err := f.converter.InitJob(ctx, InitConverterJobReq{
UserID: in.UserID,
Body: pr,
BodyContentType: mw.FormDataContentType(),
})
// 等 goroutine 結束pw.Close 已觸發 EOFrebuild 邏輯已 write 完)
rebuildErr := <-rebuildErrCh
// 若 converter 沒回 error但 rebuild goroutine 失敗 → 也視為 init 失敗
if err == nil && rebuildErr != nil {
err = fmt.Errorf("%w: rebuild multipart: %v", ErrConverterUnavailable, rebuildErr)
}
if err != nil {
// converter 4xx / 5xx / network → 已分類成 sentinel
// Cleanup 策略§4.3.2,已驗證 converter Phase 1 沒實作 /cancel endpoint
// 不主動打 cancel —— 靠 converter multer 收 socket close 自然 abort
// streaming 中斷 → multer 拋錯 → job 留 failed → 下次 init 不會撞 409
// Phase 1+ 等 converter 補 /cancel 後再升級為 best-effort 主動 cancel。
f.logger.WarnContext(ctx, "conversion.flow.init_failed",
slog.String("user_hash", hashUserID(in.UserID)),
slog.String("err", err.Error()),
)
return nil, err
}
// 5. 寫 ownership
f.ownership.Set(cj.JobID, in.UserID)
job := f.toJob(cj)
f.logger.InfoContext(ctx, "conversion.flow.init_success",
slog.String("user_hash", hashUserID(in.UserID)),
slog.String("job_id", cj.JobID),
slog.String("status", cj.Status),
slog.String("source_filename", cj.SourceFilename),
)
return job, nil
}
// rebuildMultipart 解 client 端 multipart重新寫到 mw。
//
// 規則§4.2 / §7.3
// 1. 先寫 user_id field從 visionA-backend 注入,唯一可信來源)
// 2. client 帶來的 user_id field 一律忽略(黑名單)
// 3. 其他 form field / file part 透傳
func rebuildMultipart(userID, contentType string, body io.Reader, mw *multipart.Writer) error {
// 解析 boundary
_, params, err := mime.ParseMediaType(contentType)
if err != nil {
return fmt.Errorf("parse content type: %w", err)
}
boundary := params["boundary"]
if boundary == "" {
return errors.New("missing multipart boundary")
}
// 先寫 user_id重點在 file part 之前§4.2 註解說明:避免 converter multer
// 解析時 user_id 還沒到就拒絕)
if err := mw.WriteField("user_id", userID); err != nil {
return fmt.Errorf("write user_id field: %w", err)
}
mr := multipart.NewReader(body, boundary)
for {
part, err := mr.NextPart()
if err == io.EOF {
return nil
}
if err != nil {
return fmt.Errorf("read next part: %w", err)
}
name := part.FormName()
// 黑名單 user_id忽略 client 自己塞的§4.2
if name == "user_id" {
_ = part.Close()
continue
}
if part.FileName() == "" {
// form field直接複製
fw, err := mw.CreateFormField(name)
if err != nil {
_ = part.Close()
return fmt.Errorf("create form field %q: %w", name, err)
}
if _, err := io.Copy(fw, part); err != nil {
_ = part.Close()
return fmt.Errorf("copy form field %q: %w", name, err)
}
} else {
// file partstreaming copy不 buffer 全 RAM
fw, err := mw.CreateFormFile(name, part.FileName())
if err != nil {
_ = part.Close()
return fmt.Errorf("create form file %q: %w", name, err)
}
if _, err := io.Copy(fw, part); err != nil {
_ = part.Close()
return fmt.Errorf("copy form file %q: %w", name, err)
}
}
_ = part.Close()
}
}
// checkActiveJob 看 user 是否已有 active jobpre-check
//
// 流程:
// 1. ownership.ActiveJobOf — 反查 cache 中該 user 的 jobs
// 2. 取第一個Phase 0.8 同 user 最多 1 個),用 converter.GetJob 確認狀態
// - 若狀態為 created/running → return 該 Job給 caller 包成 ActiveJobError
// - 若 converter 回 404 / 該 job 已 completed / failed → 視為無 active先清 cache 再 return nil
//
// 沒 active job 回 (nil, nil)。
func (f *flow) checkActiveJob(ctx context.Context, userID string) (*Job, error) {
jobIDs := f.ownership.ActiveJobOf(userID)
if len(jobIDs) == 0 {
return nil, nil
}
jobID := jobIDs[0]
cj, err := f.converter.GetJob(ctx, jobID)
if err != nil {
if errors.Is(err, ErrJobNotFound) {
// converter 已 GC7d 過期)— 清 cache 後視為無 active
f.ownership.Delete(jobID)
return nil, nil
}
// 其他錯誤5xx / network— 對 caller 透傳caller 決定 502
return nil, err
}
// 只有 created / running 視為 active
switch cj.Status {
case "completed", "failed":
// 已結束的 job 不算 active不清 ownershipGetJob / Download 仍需要這個對應)
return nil, nil
default:
return f.toJob(cj), nil
}
}
// ==========================================================================
// GetJob — 對應 GET /api/conversion/{job_id}
// ==========================================================================
// GetJob 對齊 conversion.md §2.7 + api-conversion.md §2。
//
// 流程:
// 1. ownership.EnsureRebuilt確保 cache 已 lazy rebuild
// 2. ownership.Get(jobID) — 比對 owner不符 → ErrJobNotFound避免洩漏 job 存在性)
// 3. converter.GetJob(jobID)
// 4. 若 expires_at 為零,補 created_at + DefaultJobExpiryDuration
//
// 設計選擇ownership 不符不回 forbidden而是 not_found
// - 避免讓攻擊者用「forbidden vs not_found」差異枚舉合法 job_id
// - 對齊 §7.2 安全考量
func (f *flow) GetJob(ctx context.Context, userID, jobID string) (*Job, error) {
if userID == "" {
return nil, errors.New("conversion: GetJob requires userID")
}
if jobID == "" {
return nil, ErrJobNotFound
}
if err := f.ownership.EnsureRebuilt(ctx, userID); err != nil {
// rebuild 失敗:不視為 fatal繼續走 cache可能 stalefail-soft
f.logger.WarnContext(ctx, "conversion.flow.get_ownership_rebuild_failed",
slog.String("user_hash", hashUserID(userID)),
slog.String("err", err.Error()),
)
}
owner, ok := f.ownership.Get(jobID)
if !ok || owner != userID {
// 不符 → 視為 not_found避免洩漏存在性
return nil, ErrJobNotFound
}
cj, err := f.converter.GetJob(ctx, jobID)
if err != nil {
return nil, err
}
return f.toJob(cj), nil
}
// ==========================================================================
// ActiveJob — 對應 GET /api/conversion/active
// ==========================================================================
// ActiveJob 對齊 conversion.md §2.6.1 lazy rebuild + api-conversion.md §5。
//
// 流程:
// 1. ownership.EnsureRebuilt從 converter ListInProgressJobs 重建 cache
// 2. ownership.ActiveJobOf — 反查
// 3. 沒有 → return (nil, nil)(不視為 error對齊 has_active=false 語意)
// 4. 取 [0]Phase 0.8 ≤ 1→ converter.GetJob 拿即時狀態
// 5. converter 回 404job 已過期被 GC→ 清 cache + return (nil, nil)
//
// 重啟恢復場景visionA-backend in-memory cache 全空時EnsureRebuilt 會打
// converter ListInProgressJobs 把該 user 的 active job 重建進來,使用者看不出差別。
func (f *flow) ActiveJob(ctx context.Context, userID string) (*Job, error) {
if userID == "" {
return nil, errors.New("conversion: ActiveJob requires userID")
}
// 1. lazy rebuild這個路徑不 fail-softrebuild 失敗 = 無法回答 has_active 問題,
// 必須 propagate 給 caller 知道)
if err := f.ownership.EnsureRebuilt(ctx, userID); err != nil {
return nil, err
}
// 2. 反查
jobIDs := f.ownership.ActiveJobOf(userID)
if len(jobIDs) == 0 {
return nil, nil
}
// 3. 取第一個,問 converter 即時狀態
jobID := jobIDs[0]
cj, err := f.converter.GetJob(ctx, jobID)
if err != nil {
if errors.Is(err, ErrJobNotFound) {
// converter 已 GC → 清 cache + 視為無 active
f.ownership.Delete(jobID)
return nil, nil
}
return nil, err
}
// 已 completed / failed 的 job 也不算 activehas_active=false
if cj.Status == "completed" || cj.Status == "failed" {
return nil, nil
}
return f.toJob(cj), nil
}
// ==========================================================================
// PromoteToModels — 對應 POST /api/conversion/{job_id}/promote-to-models
// ==========================================================================
// PromoteToModels 對齊 conversion.md §1 Stage 3a + §2.5 + api-conversion.md §3。
//
// 流程:
// 1. ownership 驗(不符 → ErrJobNotFound
// 2. converter.GetJob — 確認 status=completed否則 ErrJobNotCompleted
// 3. 冪等檢查modelStore.FindBySourceJobID — 已有 model 直接回(避免重複 promote
// 4. converter.Promote — 拿到 target_object_key
// 5. faa.GetFile(target_object_key) — streaming pull NEF
// 6. storage.Put — streaming 寫進 visionA storage不 ReadAll
// 7. modelStore.Save — 建 model recordSource="converted"、SourceJobID=jobID
// 8. return PromoteResult
//
// 名稱caller 從 wireframe §7.1 的 import Dialog 拿;空字串 fallback 為
// `<source_filename_stem>_<target_chip_lower>`(對齊 api-conversion.md §3
func (f *flow) PromoteToModels(ctx context.Context, userID, jobID, name string) (*PromoteResult, error) {
if userID == "" {
return nil, errors.New("conversion: PromoteToModels requires userID")
}
if jobID == "" {
return nil, ErrJobNotFound
}
// 1. ownership rebuild + 驗
if err := f.ownership.EnsureRebuilt(ctx, userID); err != nil {
f.logger.WarnContext(ctx, "conversion.flow.promote_ownership_rebuild_failed",
slog.String("user_hash", hashUserID(userID)),
slog.String("err", err.Error()),
)
}
owner, ok := f.ownership.Get(jobID)
if !ok || owner != userID {
return nil, ErrJobNotFound
}
// 2. converter.GetJob 確認 completed
cj, err := f.converter.GetJob(ctx, jobID)
if err != nil {
return nil, err
}
if cj.Status != "completed" {
return nil, fmt.Errorf("%w: status=%s", ErrJobNotCompleted, cj.Status)
}
// 3. 冪等檢查
if existing, err := f.modelStore.FindBySourceJobID(ctx, userID, jobID); err != nil {
f.logger.WarnContext(ctx, "conversion.flow.promote_find_existing_failed",
slog.String("user_hash", hashUserID(userID)),
slog.String("job_id", jobID),
slog.String("err", err.Error()),
)
// 查 model store 失敗不 hard fail —— 仍嘗試 promote最壞結果是重複建一個 model record
} else if existing != nil {
f.logger.InfoContext(ctx, "conversion.flow.promote_idempotent_hit",
slog.String("user_hash", hashUserID(userID)),
slog.String("job_id", jobID),
slog.String("model_id", existing.ID),
)
return modelRecordToPromoteResult(existing), nil
}
// 4. converter.Promote — 組目標 object_keyFAA 內部命名規則由 visionA 決定)
finalName := name
if finalName == "" {
finalName = defaultModelName(cj)
}
targetObjectKey := buildTargetObjectKey(userID, jobID)
promoteRes, err := f.converter.Promote(ctx, jobID, PromoteReq{
UserID: userID,
Source: promoteDefaultSource, // "nef"
TargetObjectKey: targetObjectKey,
})
if err != nil {
return nil, err
}
// 5. faa.GetFile streaming pull
file, err := f.faa.GetFile(ctx, promoteRes.TargetObjectKey)
if err != nil {
return nil, err
}
defer file.Body.Close()
// 6. storage.Put streaming write
modelID := f.modelStore.GenerateID()
storageKey := buildStorageKey(userID, modelID)
storageMeta := map[string]string{
"source": "converted",
"source_job_id": jobID,
"target_chip": normalizeTargetChip(cj.Platform),
}
if err := f.storage.Put(ctx, storageKey, file.Body, file.ContentLength, storageMeta); err != nil {
f.logger.WarnContext(ctx, "conversion.flow.promote_storage_put_failed",
slog.String("user_hash", hashUserID(userID)),
slog.String("job_id", jobID),
slog.String("storage_key", storageKey),
slog.String("err", err.Error()),
)
// visionA 自家 storage 失敗disk full / S3 5xx / 權限錯誤)
// — 不是 FAA / converter 問題,用獨立 sentinel 讓 SRE alarm 打對 team
// (對齊 Reviewer M-1
return nil, fmt.Errorf("%w: storage.Put %s: %v", ErrStorageUnavailable, storageKey, err)
}
// 7. modelStore.Save
now := f.now().UTC()
rec := &ModelRecord{
ID: modelID,
OwnerUserID: userID,
Name: finalName,
StorageKey: storageKey,
FileSize: promoteRes.Size,
FileChecksum: promoteRes.Checksum,
TargetChip: normalizeTargetChip(cj.Platform),
Source: "converted",
SourceJobID: jobID,
CreatedAt: now,
UpdatedAt: now,
}
if err := f.modelStore.Save(ctx, rec); err != nil {
f.logger.WarnContext(ctx, "conversion.flow.promote_model_save_failed",
slog.String("user_hash", hashUserID(userID)),
slog.String("job_id", jobID),
slog.String("model_id", modelID),
slog.String("err", err.Error()),
)
// model store save 失敗in-memory 不會失敗;未來 Postgres 才會觸發)
// — 不是 converter / FAA 問題,用獨立 sentinel 對齊 SRE alarm 分類Reviewer M-1
// 已寫進 storage 但無 record 對應 → 等同孤立檔案Phase 1 加 GC 機制清掃
return nil, fmt.Errorf("%w: modelStore.Save model_id=%s: %v", ErrModelStoreUnavailable, modelID, err)
}
f.logger.InfoContext(ctx, "conversion.flow.promote_success",
slog.String("user_hash", hashUserID(userID)),
slog.String("job_id", jobID),
slog.String("model_id", modelID),
slog.Int64("file_size", promoteRes.Size),
)
return modelRecordToPromoteResult(rec), nil
}
// ==========================================================================
// DownloadRedirectURL — 對應 GET /api/conversion/{job_id}/download
// ==========================================================================
// DownloadRedirectURL 對齊 conversion.md §1 Stage 3b + §3.1 + api-conversion.md §4。
//
// 流程:
// 1. ownership 驗(不符 → ErrJobNotFound
// 2. converter.GetJob — 確認 status=completed
// 3. ensurePromoted — 自動觸發 promote若還沒 promote 過),拿到 target_object_key
// - 設計選擇task spec 詢問點自動觸發。理由api-conversion.md §4 註解說
// 「兩條路徑promote-to-models / download都拿同一個 target_object_key」+
// 「不會與 promote-to-models 衝突;兩者內部都會 ensurePromoted冪等」—
// 要求 user 先按 promote-to-models 才能下載會違背「下載」按鈕的直覺語意。
// 4. mcToken.IssueDelegatedDownload — 換 opaque token (TTL 5min 預設)
// 5. 組 https://<faa>/files/<key>?access_token=<token>
//
// 安全§10.4
// - token 不出現在任何 JSON responsecaller 走 server-side 302 redirect
// - object_key 不對 frontend 揭露
func (f *flow) DownloadRedirectURL(ctx context.Context, userID, jobID string) (string, error) {
if userID == "" {
return "", errors.New("conversion: DownloadRedirectURL requires userID")
}
if jobID == "" {
return "", ErrJobNotFound
}
// 1. ownership 驗
if err := f.ownership.EnsureRebuilt(ctx, userID); err != nil {
f.logger.WarnContext(ctx, "conversion.flow.download_ownership_rebuild_failed",
slog.String("user_hash", hashUserID(userID)),
slog.String("err", err.Error()),
)
}
owner, ok := f.ownership.Get(jobID)
if !ok || owner != userID {
return "", ErrJobNotFound
}
// 2. converter.GetJob 確認 completed
cj, err := f.converter.GetJob(ctx, jobID)
if err != nil {
return "", err
}
if cj.Status != "completed" {
return "", fmt.Errorf("%w: status=%s", ErrJobNotCompleted, cj.Status)
}
// 3. ensurePromoted — 自動觸發 promote 拿 target_object_key
// Phase 0.8 不 cache promoted_object_keyconverter 端 promote 是冪等的,
// 重複呼叫成本可接受 — 反正 download 路徑 user 主動觸發頻率不高)
targetObjectKey, err := f.ensurePromoted(ctx, userID, jobID, cj)
if err != nil {
return "", err
}
// 4. mcToken 換 delegated download token
delegated, err := f.mcToken.IssueDelegatedDownload(ctx, IssueDownloadReq{
TenantID: f.tenantID,
UserID: userID,
ObjectKey: targetObjectKey,
ExpiresInSeconds: f.delegatedTTLSeconds,
})
if err != nil {
return "", err
}
// 5. 組 URLFAA base + /files/<key>?access_token=<token>
// - object_key 用 url.PathEscape 處理(含路徑分隔符的 key 安全 escape
// - token 用 url.QueryEscape雖 opaque token 通常不含特殊字元,仍 escape 防呆)
downloadURL := fmt.Sprintf("%s/files/%s?access_token=%s",
f.faaBaseURL,
escapeObjectKeyPath(targetObjectKey),
url.QueryEscape(delegated.Token),
)
f.logger.InfoContext(ctx, "conversion.flow.download_url_issued",
slog.String("user_hash", hashUserID(userID)),
slog.String("job_id", jobID),
slog.String("object_key_hash", hashObjectKey(targetObjectKey)),
slog.Int("ttl_sec", f.delegatedTTLSeconds),
)
return downloadURL, nil
}
// ensurePromoted 取 target_object_key — 若已 promote 過model record 已存在)用 cache
// 否則打 converter.Promote 拿。
//
// 用 modelStore.FindBySourceJobID 當 source-of-truth若已有 model record 表示
// PromoteToModels 已成功跑過,可直接從 record 拿 storage_key 反推 target_object_key
// ✗ 不行storage_key 是 visionA storage 的 key不是 FAA 的 object_key。
//
// 改用 converter.Promote 冪等性§2.7「promote 動作是冪等的converter 端對同一
// job 重複 promote 接受」)— 直接打 converter重複呼叫成本低同步等 1-2s
//
// 為什麼不用 sync.Map cachePhase 0.8 download 路徑 user 主動觸發頻率不高(每 job 1-N 次),
// 簡單性 > 微優化。Phase 1 量大再加 cacheprogress.md 已記)。
func (f *flow) ensurePromoted(ctx context.Context, userID, jobID string, cj *ConverterJob) (string, error) {
targetObjectKey := buildTargetObjectKey(userID, jobID)
res, err := f.converter.Promote(ctx, jobID, PromoteReq{
UserID: userID,
Source: promoteDefaultSource,
TargetObjectKey: targetObjectKey,
})
if err != nil {
return "", err
}
return res.TargetObjectKey, nil
}
// ==========================================================================
// helpers
// ==========================================================================
// toJob 把 ConverterJobclient 層中介 type轉成對外的 Jobresponse shape
//
// 補 expires_at fallbackconverter 沒給 → created_at + DefaultJobExpiryDuration§2.6.2)。
func (f *flow) toJob(cj *ConverterJob) *Job {
if cj == nil {
return nil
}
job := &Job{
JobID: cj.JobID,
Status: cj.Status,
Stage: cj.Stage,
CreatedAt: cj.CreatedAt,
UpdatedAt: cj.UpdatedAt,
ExpiresAt: cj.ExpiresAt,
SourceFilename: cj.SourceFilename,
TargetChip: cj.Platform,
ErrorCode: cj.ErrorCode,
ErrorMessage: cj.ErrorMessage,
}
if cj.Progress != nil {
job.Progress = *cj.Progress
}
if cj.StageProgress != nil {
job.StageProgress = *cj.StageProgress
}
if job.ExpiresAt.IsZero() && !cj.CreatedAt.IsZero() {
job.ExpiresAt = cj.CreatedAt.Add(f.defaultJobExpiryDuration)
}
return job
}
// modelRecordToPromoteResult 把 ModelRecord 轉成對外的 PromoteResult。
func modelRecordToPromoteResult(rec *ModelRecord) *PromoteResult {
if rec == nil {
return nil
}
return &PromoteResult{
ModelID: rec.ID,
Source: rec.Source,
SourceJobID: rec.SourceJobID,
Name: rec.Name,
TargetChip: rec.TargetChip,
FileSize: rec.FileSize,
Status: "ready", // visionA model 既有 statuspromote 完即 ready
CreatedAt: rec.CreatedAt,
}
}
// buildTargetObjectKey 產 FAA 的 object_keyvisionA 端命名規則)。
//
// 命名models/{user_id}/{job_id}.nef
// 用 user_id 隔離job_id 唯一性由 converter 保證UUID
//
// 對齊 conversion.md §10.4「object_key 不對 frontend 揭露」— 命名只在 server-side 用。
func buildTargetObjectKey(userID, jobID string) string {
// 注意:這裡不對 userID/jobID 做 escape — callervisionA-backend handler
// 已從 OIDC sub / converter response 拿,皆為合法 ID 字元UUID / OIDC sub
return fmt.Sprintf("models/%s/%s.nef", userID, jobID)
}
// buildStorageKey 產 visionA storage 的 key不是 FAA 的)。
//
// 沿用 internal/storage 既有命名慣例models/{user_id}/{model_id}.nef
// storage.md §2 範例)。
func buildStorageKey(userID, modelID string) string {
return fmt.Sprintf("models/%s/%s.nef", userID, modelID)
}
// escapeObjectKeyPath 對 object_key 做 path escape但保留 '/' 為 path separator。
//
// url.PathEscape 會把 '/' 也 escape 成 %2F — 對 FAA `/files/{**objectKey}` 來說
// 應該保留 '/' 為路徑分隔符,所以拆段後逐段 escape 再合回。
func escapeObjectKeyPath(objectKey string) string {
parts := strings.Split(objectKey, "/")
for i := range parts {
parts[i] = url.PathEscape(parts[i])
}
return strings.Join(parts, "/")
}
// normalizeTargetChip 把 converter 端 platform"520"/"720"/...)轉成 visionA model 的
// target_chip 表示法("kl520"/"kl720"/...)。
//
// 對齊 api-conversion.md §3 注解「conversion job 用 platform '720'model.target_chip 用 'kl720'」。
func normalizeTargetChip(platform string) string {
p := strings.ToLower(strings.TrimSpace(platform))
if p == "" {
return ""
}
if strings.HasPrefix(p, "kl") {
return p
}
return "kl" + p
}
// defaultModelName 產 PromoteToModels caller 沒給 name 時的 fallback。
//
// 規則:`<source_filename_stem>_<target_chip_lower>` — 對齊 api-conversion.md §3 預設值
// wireframe §7.1 import Dialog 預設)。
func defaultModelName(cj *ConverterJob) string {
// path.Base("") 會回 ".";先擋掉空 / "." / ".." 等無效 stem
var stem string
if cj.SourceFilename != "" {
base := path.Base(cj.SourceFilename)
if base != "." && base != "/" && base != ".." {
stem = strings.TrimSuffix(base, path.Ext(base))
}
}
chip := strings.ToLower(strings.TrimSpace(cj.Platform))
switch {
case stem != "" && chip != "":
return fmt.Sprintf("%s_kl%s", stem, chip)
case stem != "":
return stem
case chip != "":
return fmt.Sprintf("converted_kl%s", chip)
default:
// 兜底:用 timestamp 避免空 name
return fmt.Sprintf("converted_%d", time.Now().Unix())
}
}
// generateRandomID — 不對外暴露,用於測試或 ModelStore.GenerateID adapter 沒提供時的 fallback。
//
// 16 hex chars (64-bit)。
//
//nolint:unused // 保留供 main.go 的 adapter 在 fallback 時使用
func generateRandomID() string {
b := make([]byte, 8)
if _, err := rand.Read(b); err != nil {
// crypto/rand 失敗極為罕見;用 timestamp 兜底
return fmt.Sprintf("%d", time.Now().UnixNano())
}
return hex.EncodeToString(b)
}