// flow_test.go — Service interface 整合層的單元測試。 // // 測試策略: // - 各 client 用 in-package stub(不耦合 T2-T5 真實邏輯,純驗 flow 整合行為) // - 沿用 ownership_test.go 的 stubConverterClient(補上 InitJob/GetJob/Promote 實作) // - 用本檔案專屬的 stubFAAClient / stubMCTokenClient / stubModelStore / stubStorage // // 涵蓋 5 個 method × happy / ownership 失敗 / client 失敗 propagation + // task spec 額外要求: // - InitJob 同 user 已有 active → ActiveJobError // - PromoteToModels 已 promote 過 → 回既有 model_id(idempotent) // - PromoteToModels job 沒 succeeded → ErrJobNotCompleted // - DownloadRedirectURL URL 組裝正確(含 url.PathEscape / url.QueryEscape) // - ActiveJob converter 回 404 → ownership.Delete + (nil, nil) // // Phase 0.8 conversion (見 .autoflow/04-architecture/conversion.md §2.7) package conversion import ( "bytes" "context" "errors" "fmt" "io" "mime/multipart" "strings" "sync" "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // ========================================================================== // stubs — 補齊 ownership_test.go 沒實作的 method // ========================================================================== // flowStubConverter 是 flow_test 專用的 ConverterClient stub。 // // 與 ownership_test.go 的 stubConverterClient 區隔: // - ownership_test 只用 ListInProgressJobs,其他 method panic // - flow_test 需要 InitJob / GetJob / Promote / List 全套 // // 設計:行為由 functional fields(initJobFunc 等)控制,testcase 寫起來直觀。 type flowStubConverter struct { mu sync.Mutex // 預設行為:jobsByID 用於 GetJob lookup;initJobFunc 用於控制 InitJob 結果 jobsByID map[string]*ConverterJob // 各 method 的 hook(nil → 走預設行為) initJobFunc func(ctx context.Context, req InitConverterJobReq) (*ConverterJob, error) getJobFunc func(ctx context.Context, jobID string) (*ConverterJob, error) promoteFunc func(ctx context.Context, jobID string, req PromoteReq) (*ConverterPromoteResult, error) listInProgressJobsFunc func(ctx context.Context, userID string) ([]*ConverterJob, error) // 各 method 呼叫次數(atomic) initJobCalls atomic.Int32 getJobCalls atomic.Int32 promoteCalls atomic.Int32 listInProgressJobsCalls atomic.Int32 // 紀錄 InitJob 收到的 body(驗證 multipart user_id 注入) lastInitBody []byte lastInitBodyType string } func newFlowStubConverter() *flowStubConverter { return &flowStubConverter{ jobsByID: make(map[string]*ConverterJob), } } func (s *flowStubConverter) setJob(j *ConverterJob) { s.mu.Lock() defer s.mu.Unlock() s.jobsByID[j.JobID] = j } func (s *flowStubConverter) InitJob(ctx context.Context, req InitConverterJobReq) (*ConverterJob, error) { s.initJobCalls.Add(1) // 把 body 讀完(模擬 converter 收到 streaming body) if req.Body != nil { buf, _ := io.ReadAll(req.Body) s.mu.Lock() s.lastInitBody = buf s.lastInitBodyType = req.BodyContentType s.mu.Unlock() } if s.initJobFunc != nil { return s.initJobFunc(ctx, req) } // 預設:回一個 created job return &ConverterJob{ JobID: "stub-job-1", Status: "created", Stage: "onnx", CreatedAt: time.Now().UTC(), UpdatedAt: time.Now().UTC(), SourceFilename: req.SourceFilename, Platform: req.Platform, }, nil } func (s *flowStubConverter) GetJob(ctx context.Context, jobID string) (*ConverterJob, error) { s.getJobCalls.Add(1) if s.getJobFunc != nil { return s.getJobFunc(ctx, jobID) } s.mu.Lock() defer s.mu.Unlock() if j, ok := s.jobsByID[jobID]; ok { jc := *j return &jc, nil } return nil, fmt.Errorf("%w: get_job 404 (not_found)", ErrJobNotFound) } func (s *flowStubConverter) Promote(ctx context.Context, jobID string, req PromoteReq) (*ConverterPromoteResult, error) { s.promoteCalls.Add(1) if s.promoteFunc != nil { return s.promoteFunc(ctx, jobID, req) } return &ConverterPromoteResult{ TargetObjectKey: req.TargetObjectKey, Size: 12345, Checksum: "stub-etag", }, nil } func (s *flowStubConverter) ListInProgressJobs(ctx context.Context, userID string) ([]*ConverterJob, error) { s.listInProgressJobsCalls.Add(1) if s.listInProgressJobsFunc != nil { return s.listInProgressJobsFunc(ctx, userID) } return nil, nil } var _ ConverterClient = (*flowStubConverter)(nil) // flowStubFAA 是 FAAClient stub。 type flowStubFAA struct { mu sync.Mutex getFileFunc func(ctx context.Context, objectKey string) (*FAAFile, error) getCalls atomic.Int32 lastKey string } func newFlowStubFAA() *flowStubFAA { return &flowStubFAA{} } func (s *flowStubFAA) GetFile(ctx context.Context, objectKey string) (*FAAFile, error) { s.getCalls.Add(1) s.mu.Lock() s.lastKey = objectKey s.mu.Unlock() if s.getFileFunc != nil { return s.getFileFunc(ctx, objectKey) } body := io.NopCloser(strings.NewReader("nef-bytes-stub")) return &FAAFile{ Body: body, ContentLength: int64(len("nef-bytes-stub")), ContentType: "application/octet-stream", ETag: "stub-etag", }, nil } var _ FAAClient = (*flowStubFAA)(nil) // flowStubMCToken 是 MCTokenClient stub。 type flowStubMCToken struct { serviceTokenFunc func(ctx context.Context, scope string) (string, error) issueDelegatedDownloadFunc func(ctx context.Context, in IssueDownloadReq) (*DelegatedDownloadToken, error) // 紀錄最後一次 IssueDelegatedDownload 收到的 input mu sync.Mutex lastIssueInput *IssueDownloadReq } func newFlowStubMCToken() *flowStubMCToken { return &flowStubMCToken{} } func (s *flowStubMCToken) ServiceToken(ctx context.Context, scope string) (string, error) { if s.serviceTokenFunc != nil { return s.serviceTokenFunc(ctx, scope) } return "stub-service-token", nil } func (s *flowStubMCToken) IssueDelegatedDownload(ctx context.Context, in IssueDownloadReq) (*DelegatedDownloadToken, error) { s.mu.Lock() cp := in s.lastIssueInput = &cp s.mu.Unlock() if s.issueDelegatedDownloadFunc != nil { return s.issueDelegatedDownloadFunc(ctx, in) } return &DelegatedDownloadToken{ Token: "opaque-stub-token-xyz", ExpiresAt: time.Now().Add(5 * time.Minute), }, nil } var _ MCTokenClient = (*flowStubMCToken)(nil) // flowStubModelStore 是 ModelStore stub。 type flowStubModelStore struct { mu sync.Mutex // records: model_id → ModelRecord records map[string]*ModelRecord // idCounter 給 GenerateID 用 idCounter atomic.Int32 // hook 控制(測試 model save 失敗用) saveErr error findErr error } func newFlowStubModelStore() *flowStubModelStore { return &flowStubModelStore{ records: make(map[string]*ModelRecord), } } func (s *flowStubModelStore) Save(ctx context.Context, m *ModelRecord) error { if s.saveErr != nil { return s.saveErr } s.mu.Lock() defer s.mu.Unlock() cp := *m s.records[m.ID] = &cp return nil } func (s *flowStubModelStore) FindBySourceJobID(ctx context.Context, ownerUserID, sourceJobID string) (*ModelRecord, error) { if s.findErr != nil { return nil, s.findErr } s.mu.Lock() defer s.mu.Unlock() for _, r := range s.records { if r.OwnerUserID == ownerUserID && r.SourceJobID == sourceJobID { cp := *r return &cp, nil } } return nil, nil } func (s *flowStubModelStore) GenerateID() string { n := s.idCounter.Add(1) return fmt.Sprintf("model-%03d", n) } var _ ModelStore = (*flowStubModelStore)(nil) // flowStubStorage 是 Storage stub。 type flowStubStorage struct { mu sync.Mutex // objects: key → bytes(驗證 streaming write 正確) objects map[string][]byte putErr error putCalls atomic.Int32 } func newFlowStubStorage() *flowStubStorage { return &flowStubStorage{ objects: make(map[string][]byte), } } func (s *flowStubStorage) Put(ctx context.Context, key string, r io.Reader, size int64, meta map[string]string) error { s.putCalls.Add(1) if s.putErr != nil { // 仍 read 防 io.Pipe 寫端 block _, _ = io.Copy(io.Discard, r) return s.putErr } buf, err := io.ReadAll(r) if err != nil { return err } s.mu.Lock() s.objects[key] = buf s.mu.Unlock() return nil } var _ Storage = (*flowStubStorage)(nil) // ========================================================================== // helper: 建立 flow service + 全套 stub // ========================================================================== type flowFixture struct { svc Service converter *flowStubConverter faa *flowStubFAA mcToken *flowStubMCToken models *flowStubModelStore storage *flowStubStorage ownership Ownership } func newFlowFixture(t *testing.T) *flowFixture { t.Helper() conv := newFlowStubConverter() faa := newFlowStubFAA() mcToken := newFlowStubMCToken() models := newFlowStubModelStore() storage := newFlowStubStorage() own := NewOwnership(conv, newSilentLogger()) svc, err := NewService(FlowOpts{ Converter: conv, FAA: faa, MCToken: mcToken, Ownership: own, ModelStore: models, Storage: storage, TenantID: "visiona-tenant", FAABaseURL: "https://faa.example.com", DefaultJobExpiryDuration: 7 * 24 * time.Hour, DelegatedTTLSeconds: 300, Logger: newSilentLogger(), Now: time.Now, }) require.NoError(t, err) return &flowFixture{ svc: svc, converter: conv, faa: faa, mcToken: mcToken, models: models, storage: storage, ownership: own, } } // makeMultipartBody 建一個合法的 multipart/form-data body 給 InitJob 測試用。 // // 包含:model_id / version / platform / model(fake .onnx file)+ 故意塞一個 client user_id(測黑名單)。 func makeMultipartBody(t *testing.T, clientUserID string) (body io.Reader, contentType string) { t.Helper() var buf bytes.Buffer mw := multipart.NewWriter(&buf) require.NoError(t, mw.WriteField("model_id", "1024")) require.NoError(t, mw.WriteField("version", "v1.0.0")) require.NoError(t, mw.WriteField("platform", "720")) if clientUserID != "" { require.NoError(t, mw.WriteField("user_id", clientUserID)) // 應被黑名單 } fw, err := mw.CreateFormFile("model", "yolov5s.onnx") require.NoError(t, err) _, err = fw.Write([]byte("fake-onnx-bytes")) require.NoError(t, err) require.NoError(t, mw.Close()) return &buf, mw.FormDataContentType() } // ========================================================================== // Constructor — 缺欄位驗證 // ========================================================================== func TestNewService_RequiredFields(t *testing.T) { t.Parallel() conv := newFlowStubConverter() faa := newFlowStubFAA() mc := newFlowStubMCToken() own := NewOwnership(conv, newSilentLogger()) mod := newFlowStubModelStore() st := newFlowStubStorage() tests := []struct { name string opts FlowOpts }{ {"missing converter", FlowOpts{FAA: faa, MCToken: mc, Ownership: own, ModelStore: mod, Storage: st, TenantID: "t", FAABaseURL: "https://x"}}, {"missing faa", FlowOpts{Converter: conv, MCToken: mc, Ownership: own, ModelStore: mod, Storage: st, TenantID: "t", FAABaseURL: "https://x"}}, {"missing mc", FlowOpts{Converter: conv, FAA: faa, Ownership: own, ModelStore: mod, Storage: st, TenantID: "t", FAABaseURL: "https://x"}}, {"missing ownership", FlowOpts{Converter: conv, FAA: faa, MCToken: mc, ModelStore: mod, Storage: st, TenantID: "t", FAABaseURL: "https://x"}}, {"missing modelstore", FlowOpts{Converter: conv, FAA: faa, MCToken: mc, Ownership: own, Storage: st, TenantID: "t", FAABaseURL: "https://x"}}, {"missing storage", FlowOpts{Converter: conv, FAA: faa, MCToken: mc, Ownership: own, ModelStore: mod, TenantID: "t", FAABaseURL: "https://x"}}, {"missing tenant", FlowOpts{Converter: conv, FAA: faa, MCToken: mc, Ownership: own, ModelStore: mod, Storage: st, FAABaseURL: "https://x"}}, {"missing faaurl", FlowOpts{Converter: conv, FAA: faa, MCToken: mc, Ownership: own, ModelStore: mod, Storage: st, TenantID: "t"}}, } for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() _, err := NewService(tt.opts) require.Error(t, err) }) } } func TestNewService_DefaultsApplied(t *testing.T) { t.Parallel() conv := newFlowStubConverter() faa := newFlowStubFAA() mc := newFlowStubMCToken() own := NewOwnership(conv, newSilentLogger()) mod := newFlowStubModelStore() st := newFlowStubStorage() svc, err := NewService(FlowOpts{ Converter: conv, FAA: faa, MCToken: mc, Ownership: own, ModelStore: mod, Storage: st, TenantID: "visiona", FAABaseURL: "https://faa.example.com/", // DefaultJobExpiryDuration / DelegatedTTLSeconds 留空 → 應 fallback }) require.NoError(t, err) require.NotNil(t, svc) f := svc.(*flow) assert.Equal(t, 7*24*time.Hour, f.defaultJobExpiryDuration) assert.Equal(t, 300, f.delegatedTTLSeconds) assert.Equal(t, "https://faa.example.com", f.faaBaseURL, "trailing slash 應被 trim") } // ========================================================================== // InitJob // ========================================================================== // TestInitJob_HappyPath:標準 init flow,黑名單 user_id 注入正確。 func TestInitJob_HappyPath(t *testing.T) { t.Parallel() fix := newFlowFixture(t) body, ct := makeMultipartBody(t, "fake-client-userid") job, err := fix.svc.InitJob(context.Background(), InitJobInput{ UserID: "user-alice", ContentType: ct, Body: body, }) require.NoError(t, err) require.NotNil(t, job) assert.Equal(t, "stub-job-1", job.JobID) assert.Equal(t, "created", job.Status) assert.Equal(t, int32(1), fix.converter.initJobCalls.Load()) // 驗 ownership 已寫入 uid, ok := fix.ownership.Get("stub-job-1") assert.True(t, ok) assert.Equal(t, "user-alice", uid) // 驗 multipart body 中 user_id 是 visionA 灌的,client 帶的被黑名單 fix.converter.mu.Lock() gotBody := string(fix.converter.lastInitBody) fix.converter.mu.Unlock() assert.Contains(t, gotBody, "user-alice", "visionA-backend 注入的 user_id 應在 body 中") // fake-client-userid 不該出現(被黑名單) assert.NotContains(t, gotBody, "fake-client-userid", "client 帶的 user_id 應被黑名單,不應出現在送給 converter 的 body") } // TestInitJob_ActiveJobExists:同 user 已有 active job → ActiveJobError。 // // 這個 case 來自 task spec「額外要測」。 func TestInitJob_ActiveJobExists(t *testing.T) { t.Parallel() fix := newFlowFixture(t) // 預先在 cache 注入一個 active job createdAt := time.Now().UTC() fix.converter.setJob(&ConverterJob{ JobID: "existing-job", Status: "running", Stage: "bie", CreatedAt: createdAt, }) fix.ownership.Set("existing-job", "user-alice") body, ct := makeMultipartBody(t, "") _, err := fix.svc.InitJob(context.Background(), InitJobInput{ UserID: "user-alice", ContentType: ct, Body: body, }) require.Error(t, err) assert.True(t, errors.Is(err, ErrActiveJobExists)) var ae *ActiveJobError require.True(t, errors.As(err, &ae)) require.NotNil(t, ae.Job) assert.Equal(t, "existing-job", ae.Job.JobID) assert.Equal(t, "running", ae.Job.Status) // converter.InitJob 不該被呼叫(pre-check 攔截) assert.Equal(t, int32(0), fix.converter.initJobCalls.Load()) } // TestInitJob_ActiveJob_AlreadyCompleted_PassThrough:cache 中的 job 已 completed // → 視為無 active,正常 init。 func TestInitJob_ActiveJob_AlreadyCompleted_PassThrough(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.setJob(&ConverterJob{ JobID: "old-job", Status: "completed", CreatedAt: time.Now().UTC(), }) fix.ownership.Set("old-job", "user-alice") body, ct := makeMultipartBody(t, "") job, err := fix.svc.InitJob(context.Background(), InitJobInput{ UserID: "user-alice", ContentType: ct, Body: body, }) require.NoError(t, err) assert.Equal(t, "stub-job-1", job.JobID) } // TestInitJob_ConverterError_Propagation:converter 失敗應透傳 sentinel。 func TestInitJob_ConverterError_Propagation(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.initJobFunc = func(ctx context.Context, req InitConverterJobReq) (*ConverterJob, error) { // 仍 drain body 以免 io.Pipe 寫端 block _, _ = io.Copy(io.Discard, req.Body) return nil, fmt.Errorf("%w: simulated 502", ErrConverterUnavailable) } body, ct := makeMultipartBody(t, "") _, err := fix.svc.InitJob(context.Background(), InitJobInput{ UserID: "user-alice", ContentType: ct, Body: body, }) require.Error(t, err) assert.True(t, errors.Is(err, ErrConverterUnavailable)) // 失敗時 ownership 不應寫入 _, ok := fix.ownership.Get("stub-job-1") assert.False(t, ok) } // TestInitJob_RebuildBodyError_ConsumerSeesError:rebuild 中途 reader 失敗 // → converter 端從 pipe 讀時應拿到該 error(而非空的 EOF / 截斷 multipart)。 // // 對齊 Reviewer M-2:原本 `defer pw.Close()` 配 `pw.CloseWithError(err)` 的寫法 // 因 defer LIFO 會把錯誤訊號蓋成 nil EOF。修法後 converter 端應能透過 pipe 讀到 // rebuild 階段拋出的錯誤(例如 io.ErrUnexpectedEOF / 自訂錯誤)。 func TestInitJob_RebuildBodyError_ConsumerSeesError(t *testing.T) { t.Parallel() fix := newFlowFixture(t) // 在 converter stub 的 InitJob 中,主動讀 body — 驗證讀到的是「帶 rebuild error 的 pipe」 // 而不是「截斷的 EOF」 var readErr error fix.converter.initJobFunc = func(ctx context.Context, req InitConverterJobReq) (*ConverterJob, error) { // 讀完 body;若 rebuild 失敗,pipe 應拿到非 nil error(不是 EOF) _, readErr = io.Copy(io.Discard, req.Body) // 模擬 converter 因為收不完 body 回 5xx return nil, fmt.Errorf("%w: simulated bad multipart from rebuild", ErrConverterUnavailable) } // 故意給一個會在 rebuild 中失敗的 body:合法 boundary 但 part 內容讀到一半就 error body := &errReader{ // 先給足以讓 multipart.NewReader 找到第一個 boundary 的內容 content: []byte("--boundary123\r\nContent-Disposition: form-data; name=\"x\"\r\n\r\n"), errAt: 1024, // 讀到第 N byte 後拋錯 err: errors.New("simulated reader failure mid-stream"), } contentType := "multipart/form-data; boundary=boundary123" _, err := fix.svc.InitJob(context.Background(), InitJobInput{ UserID: "user-alice", ContentType: contentType, Body: body, }) require.Error(t, err) // 應透傳成 ErrConverterUnavailable(converter stub 回 5xx;或 rebuild 自身 wrap) assert.True(t, errors.Is(err, ErrConverterUnavailable), "rebuild + converter 雙失敗,最終應收斂成 ErrConverterUnavailable") // 關鍵 assert:converter 端讀 body 時,應拿到「非 nil error」而不是空 EOF // (原本 defer 順序錯時 readErr 會是 nil — 因為 pw.Close() 蓋掉 CloseWithError) assert.Error(t, readErr, "converter 端 io.Copy(req.Body) 應拿到 rebuild 階段的錯誤訊號,而不是 nil EOF") } // TestInitJob_RebuildHappyPath_ConsumerSeesEOF:正常完成時,consumer 端應拿到 EOF(非 error)。 // // 對齊 Reviewer M-2 的反向 case:成功路徑 pipe 應正常 EOF。 func TestInitJob_RebuildHappyPath_ConsumerSeesEOF(t *testing.T) { t.Parallel() fix := newFlowFixture(t) var readErr error fix.converter.initJobFunc = func(ctx context.Context, req InitConverterJobReq) (*ConverterJob, error) { _, readErr = io.Copy(io.Discard, req.Body) return &ConverterJob{ JobID: "stub-job-1", Status: "created", CreatedAt: time.Now(), }, nil } body, ct := makeMultipartBody(t, "") _, err := fix.svc.InitJob(context.Background(), InitJobInput{ UserID: "user-alice", ContentType: ct, Body: body, }) require.NoError(t, err) // happy path:pipe 應正常 EOF(io.Copy 對 EOF 不報 error) assert.NoError(t, readErr, "正常完成時 converter 端 io.Copy(req.Body) 應 nil error(io.Copy 把 EOF 視為正常結束)") } // errReader 在讀到 errAt bytes 後拋錯,用於模擬 rebuild 中途失敗。 type errReader struct { content []byte pos int read int errAt int err error } func (r *errReader) Read(p []byte) (int, error) { if r.read >= r.errAt { return 0, r.err } if r.pos >= len(r.content) { // 把剩餘 byte 補 0 直到 errAt — 模擬「讀到一半才出錯」 n := r.errAt - r.read if n > len(p) { n = len(p) } for i := 0; i < n; i++ { p[i] = 0 } r.read += n return n, nil } n := copy(p, r.content[r.pos:]) r.pos += n r.read += n return n, nil } // TestInitJob_RequiredFields:缺 UserID / Body / ContentType return error。 func TestInitJob_RequiredFields(t *testing.T) { t.Parallel() fix := newFlowFixture(t) _, err := fix.svc.InitJob(context.Background(), InitJobInput{ContentType: "x", Body: strings.NewReader("y")}) assert.Error(t, err) _, err = fix.svc.InitJob(context.Background(), InitJobInput{UserID: "u", ContentType: "x"}) assert.Error(t, err) _, err = fix.svc.InitJob(context.Background(), InitJobInput{UserID: "u", Body: strings.NewReader("y")}) assert.Error(t, err) } // ========================================================================== // GetJob // ========================================================================== // TestGetJob_HappyPath:ownership 有 → converter.GetJob → 回 *Job。 func TestGetJob_HappyPath(t *testing.T) { t.Parallel() fix := newFlowFixture(t) createdAt := time.Now().UTC() fix.converter.setJob(&ConverterJob{ JobID: "j1", Status: "running", Stage: "bie", CreatedAt: createdAt, UpdatedAt: createdAt, SourceFilename: "yolov5s.onnx", Platform: "720", }) fix.ownership.Set("j1", "user-alice") job, err := fix.svc.GetJob(context.Background(), "user-alice", "j1") require.NoError(t, err) assert.Equal(t, "j1", job.JobID) assert.Equal(t, "yolov5s.onnx", job.SourceFilename) assert.Equal(t, "720", job.TargetChip) // expires_at fallback:created_at + 7d assert.Equal(t, createdAt.Add(7*24*time.Hour), job.ExpiresAt) } // TestGetJob_OwnershipMismatch_ReturnsNotFound:ownership 不符回 ErrJobNotFound(避免洩漏)。 func TestGetJob_OwnershipMismatch_ReturnsNotFound(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.setJob(&ConverterJob{JobID: "j1", Status: "running", CreatedAt: time.Now()}) fix.ownership.Set("j1", "user-bob") _, err := fix.svc.GetJob(context.Background(), "user-alice", "j1") require.Error(t, err) assert.True(t, errors.Is(err, ErrJobNotFound), "ownership mismatch 應回 not_found 而非 forbidden(§7.2 防枚舉)") // converter.GetJob 不該被呼叫 assert.Equal(t, int32(0), fix.converter.getJobCalls.Load()) } // TestGetJob_OwnershipMissing_ReturnsNotFound:cache 中沒對應 jobID → not_found。 func TestGetJob_OwnershipMissing_ReturnsNotFound(t *testing.T) { t.Parallel() fix := newFlowFixture(t) _, err := fix.svc.GetJob(context.Background(), "user-alice", "ghost-job") require.Error(t, err) assert.True(t, errors.Is(err, ErrJobNotFound)) } // TestGetJob_ConverterError_Propagation:converter 5xx 透傳。 func TestGetJob_ConverterError_Propagation(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.ownership.Set("j1", "user-alice") fix.converter.getJobFunc = func(ctx context.Context, jobID string) (*ConverterJob, error) { return nil, fmt.Errorf("%w: simulated", ErrConverterUnavailable) } _, err := fix.svc.GetJob(context.Background(), "user-alice", "j1") require.Error(t, err) assert.True(t, errors.Is(err, ErrConverterUnavailable)) } // ========================================================================== // ActiveJob // ========================================================================== // TestActiveJob_HappyPath:lazy rebuild → ActiveJobOf → converter.GetJob → 回 *Job。 func TestActiveJob_HappyPath(t *testing.T) { t.Parallel() fix := newFlowFixture(t) createdAt := time.Now().UTC() fix.converter.listInProgressJobsFunc = func(ctx context.Context, userID string) ([]*ConverterJob, error) { if userID != "user-alice" { return nil, nil } return []*ConverterJob{ {JobID: "j-active", Status: "running", CreatedAt: createdAt}, }, nil } fix.converter.setJob(&ConverterJob{ JobID: "j-active", Status: "running", Stage: "bie", CreatedAt: createdAt, }) job, err := fix.svc.ActiveJob(context.Background(), "user-alice") require.NoError(t, err) require.NotNil(t, job) assert.Equal(t, "j-active", job.JobID) assert.Equal(t, "running", job.Status) } // TestActiveJob_NoActive:沒 active job 回 (nil, nil)。 func TestActiveJob_NoActive(t *testing.T) { t.Parallel() fix := newFlowFixture(t) job, err := fix.svc.ActiveJob(context.Background(), "user-alice") require.NoError(t, err) assert.Nil(t, job) } // TestActiveJob_ConverterReturns404_DeletesAndReturnsNil:cache 中有 job 但 converter 回 404 // → 清 ownership + (nil, nil)。task spec 額外要測 case。 func TestActiveJob_ConverterReturns404_DeletesAndReturnsNil(t *testing.T) { t.Parallel() fix := newFlowFixture(t) // 預先在 cache 中放一個 — 模擬 visionA 重啟 + lazy rebuild 從 converter 拉到, // 但中間 converter 又 GC 了 fix.converter.listInProgressJobsFunc = func(ctx context.Context, userID string) ([]*ConverterJob, error) { return []*ConverterJob{{JobID: "j-stale", Status: "running", CreatedAt: time.Now()}}, nil } fix.converter.getJobFunc = func(ctx context.Context, jobID string) (*ConverterJob, error) { return nil, fmt.Errorf("%w: simulated 404", ErrJobNotFound) } job, err := fix.svc.ActiveJob(context.Background(), "user-alice") require.NoError(t, err) assert.Nil(t, job, "converter 404 應視為無 active") // ownership 已清掉 _, ok := fix.ownership.Get("j-stale") assert.False(t, ok, "converter 404 後應呼叫 ownership.Delete") } // TestActiveJob_ConverterError_Propagation:converter 5xx 透傳給 caller(不 fail-soft)。 func TestActiveJob_ConverterError_Propagation(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.listInProgressJobsFunc = func(ctx context.Context, userID string) ([]*ConverterJob, error) { return nil, fmt.Errorf("%w: list 5xx", ErrConverterUnavailable) } _, err := fix.svc.ActiveJob(context.Background(), "user-alice") require.Error(t, err) assert.True(t, errors.Is(err, ErrConverterUnavailable)) } // TestActiveJob_CompletedJob_ReturnsNil:cache 中是 completed job → 不算 active。 func TestActiveJob_CompletedJob_ReturnsNil(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.listInProgressJobsFunc = func(ctx context.Context, userID string) ([]*ConverterJob, error) { return []*ConverterJob{{JobID: "j-done", Status: "running", CreatedAt: time.Now()}}, nil } // converter 即時狀態 = completed fix.converter.setJob(&ConverterJob{ JobID: "j-done", Status: "completed", CreatedAt: time.Now(), }) job, err := fix.svc.ActiveJob(context.Background(), "user-alice") require.NoError(t, err) assert.Nil(t, job) } // ========================================================================== // PromoteToModels // ========================================================================== // TestPromoteToModels_HappyPath:完整 pipeline。 func TestPromoteToModels_HappyPath(t *testing.T) { t.Parallel() fix := newFlowFixture(t) createdAt := time.Now().UTC() fix.converter.setJob(&ConverterJob{ JobID: "j1", Status: "completed", CreatedAt: createdAt, SourceFilename: "yolov5s.onnx", Platform: "720", }) fix.ownership.Set("j1", "user-alice") res, err := fix.svc.PromoteToModels(context.Background(), "user-alice", "j1", "my-model") require.NoError(t, err) require.NotNil(t, res) assert.NotEmpty(t, res.ModelID) assert.Equal(t, "converted", res.Source) assert.Equal(t, "j1", res.SourceJobID) assert.Equal(t, "my-model", res.Name) assert.Equal(t, "kl720", res.TargetChip) assert.Equal(t, "ready", res.Status) assert.Equal(t, int64(12345), res.FileSize) // 驗 storage 真的有寫 assert.Equal(t, int32(1), fix.storage.putCalls.Load()) fix.storage.mu.Lock() expectedKey := fmt.Sprintf("models/user-alice/%s.nef", res.ModelID) assert.Contains(t, fix.storage.objects, expectedKey) fix.storage.mu.Unlock() // 驗 model store 真的有寫 rec, _ := fix.models.FindBySourceJobID(context.Background(), "user-alice", "j1") require.NotNil(t, rec) assert.Equal(t, res.ModelID, rec.ID) // 驗 promote / faa 各被打 1 次 assert.Equal(t, int32(1), fix.converter.promoteCalls.Load()) assert.Equal(t, int32(1), fix.faa.getCalls.Load()) } // TestPromoteToModels_DefaultName:caller 傳空 name 應走 fallback `_kl`。 func TestPromoteToModels_DefaultName(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.setJob(&ConverterJob{ JobID: "j1", Status: "completed", CreatedAt: time.Now(), SourceFilename: "yolov5s.onnx", Platform: "520", }) fix.ownership.Set("j1", "user-alice") res, err := fix.svc.PromoteToModels(context.Background(), "user-alice", "j1", "") require.NoError(t, err) assert.Equal(t, "yolov5s_kl520", res.Name) } // TestPromoteToModels_Idempotent:同 jobID 二次 promote 應回既有 model_id(task spec 要求)。 func TestPromoteToModels_Idempotent(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.setJob(&ConverterJob{ JobID: "j1", Status: "completed", CreatedAt: time.Now(), SourceFilename: "x.onnx", Platform: "720", }) fix.ownership.Set("j1", "user-alice") first, err := fix.svc.PromoteToModels(context.Background(), "user-alice", "j1", "v1") require.NoError(t, err) require.NotNil(t, first) // 第二次:應該不再打 converter.Promote / faa.GetFile / storage.Put convPromoteBefore := fix.converter.promoteCalls.Load() faaCallsBefore := fix.faa.getCalls.Load() storagePutBefore := fix.storage.putCalls.Load() second, err := fix.svc.PromoteToModels(context.Background(), "user-alice", "j1", "v2") require.NoError(t, err) require.NotNil(t, second) assert.Equal(t, first.ModelID, second.ModelID, "二次 promote 應回既有 model_id") assert.Equal(t, convPromoteBefore, fix.converter.promoteCalls.Load(), "二次 promote 不應再打 converter.Promote") assert.Equal(t, faaCallsBefore, fix.faa.getCalls.Load(), "二次 promote 不應再打 faa.GetFile") assert.Equal(t, storagePutBefore, fix.storage.putCalls.Load(), "二次 promote 不應再寫 storage") } // TestPromoteToModels_JobNotCompleted:job 狀態 != completed → ErrJobNotCompleted(task spec 要求)。 func TestPromoteToModels_JobNotCompleted(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.setJob(&ConverterJob{ JobID: "j1", Status: "running", CreatedAt: time.Now(), }) fix.ownership.Set("j1", "user-alice") _, err := fix.svc.PromoteToModels(context.Background(), "user-alice", "j1", "x") require.Error(t, err) assert.True(t, errors.Is(err, ErrJobNotCompleted)) } // TestPromoteToModels_OwnershipMismatch:別 user 的 job → not_found。 func TestPromoteToModels_OwnershipMismatch(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.setJob(&ConverterJob{ JobID: "j1", Status: "completed", CreatedAt: time.Now(), }) fix.ownership.Set("j1", "user-bob") _, err := fix.svc.PromoteToModels(context.Background(), "user-alice", "j1", "x") require.Error(t, err) assert.True(t, errors.Is(err, ErrJobNotFound)) } // TestPromoteToModels_FAAError_Propagation:FAA 失敗透傳。 func TestPromoteToModels_FAAError_Propagation(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.setJob(&ConverterJob{ JobID: "j1", Status: "completed", CreatedAt: time.Now(), SourceFilename: "x.onnx", Platform: "720", }) fix.ownership.Set("j1", "user-alice") fix.faa.getFileFunc = func(ctx context.Context, objectKey string) (*FAAFile, error) { return nil, fmt.Errorf("%w: faa 502", ErrFAAUnavailable) } _, err := fix.svc.PromoteToModels(context.Background(), "user-alice", "j1", "x") require.Error(t, err) assert.True(t, errors.Is(err, ErrFAAUnavailable)) // model record 不應被建(FAA 失敗在 storage 寫入前) rec, _ := fix.models.FindBySourceJobID(context.Background(), "user-alice", "j1") assert.Nil(t, rec) } // TestPromoteToModels_StorageError:storage.Put 失敗 → 包成 ErrStorageUnavailable。 // // 對齊 Reviewer M-1:visionA 自家 storage(disk full / S3 5xx / 權限錯誤)失敗 // 不該被歸類為 FAA 或 converter 問題,避免 SRE alarm 打錯 team / i18n 訊息誤導。 func TestPromoteToModels_StorageError(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.setJob(&ConverterJob{ JobID: "j1", Status: "completed", CreatedAt: time.Now(), SourceFilename: "x.onnx", Platform: "720", }) fix.ownership.Set("j1", "user-alice") fix.storage.putErr = errors.New("disk full") _, err := fix.svc.PromoteToModels(context.Background(), "user-alice", "j1", "x") require.Error(t, err) assert.True(t, errors.Is(err, ErrStorageUnavailable), "storage.Put 失敗應歸類為 ErrStorageUnavailable,不是 ErrFAAUnavailable") // 確認沒被誤包成其他 sentinel assert.False(t, errors.Is(err, ErrFAAUnavailable), "storage 失敗不該被歸類為 FAA 問題(Reviewer M-1)") assert.False(t, errors.Is(err, ErrConverterUnavailable)) // model record 不應被建(storage 失敗在 modelStore.Save 前) rec, _ := fix.models.FindBySourceJobID(context.Background(), "user-alice", "j1") assert.Nil(t, rec) } // TestPromoteToModels_ModelStoreError:modelStore.Save 失敗 → 包成 ErrModelStoreUnavailable。 // // 對齊 Reviewer M-1:visionA 自家 model store 失敗不該被歸類為 converter 問題。 func TestPromoteToModels_ModelStoreError(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.setJob(&ConverterJob{ JobID: "j1", Status: "completed", CreatedAt: time.Now(), SourceFilename: "x.onnx", Platform: "720", }) fix.ownership.Set("j1", "user-alice") fix.models.saveErr = errors.New("postgres connection refused") _, err := fix.svc.PromoteToModels(context.Background(), "user-alice", "j1", "x") require.Error(t, err) assert.True(t, errors.Is(err, ErrModelStoreUnavailable), "modelStore.Save 失敗應歸類為 ErrModelStoreUnavailable,不是 ErrConverterUnavailable") assert.False(t, errors.Is(err, ErrConverterUnavailable), "modelStore 失敗不該被歸類為 converter 問題(Reviewer M-1)") } // ========================================================================== // DownloadRedirectURL // ========================================================================== // TestDownloadRedirectURL_HappyPath:URL 組裝正確(task spec 要求)。 func TestDownloadRedirectURL_HappyPath(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.setJob(&ConverterJob{ JobID: "j1", Status: "completed", CreatedAt: time.Now(), }) fix.ownership.Set("j1", "user-alice") url, err := fix.svc.DownloadRedirectURL(context.Background(), "user-alice", "j1") require.NoError(t, err) // FAA base + /files/?access_token= // key = "models/user-alice/j1.nef",token = "opaque-stub-token-xyz" assert.Equal(t, "https://faa.example.com/files/models/user-alice/j1.nef?access_token=opaque-stub-token-xyz", url, ) // 驗 IssueDelegatedDownload 帶到的參數 fix.mcToken.mu.Lock() in := fix.mcToken.lastIssueInput fix.mcToken.mu.Unlock() require.NotNil(t, in) assert.Equal(t, "visiona-tenant", in.TenantID) assert.Equal(t, "user-alice", in.UserID) assert.Equal(t, "models/user-alice/j1.nef", in.ObjectKey) assert.Equal(t, 300, in.ExpiresInSeconds) } // TestDownloadRedirectURL_EscapeSpecialChars:特殊字元的 user_id / job_id 走 escape。 func TestDownloadRedirectURL_EscapeSpecialChars(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.mcToken.issueDelegatedDownloadFunc = func(ctx context.Context, in IssueDownloadReq) (*DelegatedDownloadToken, error) { // 模擬 token 含特殊字元 return &DelegatedDownloadToken{ Token: "abc def+/=", ExpiresAt: time.Now().Add(5 * time.Minute), }, nil } // 用合法但帶 special char 的 user_id(OIDC sub 通常不會這樣,但要 defensive) userID := "user with space" fix.converter.setJob(&ConverterJob{JobID: "j1", Status: "completed", CreatedAt: time.Now()}) fix.ownership.Set("j1", userID) url, err := fix.svc.DownloadRedirectURL(context.Background(), userID, "j1") require.NoError(t, err) // path 段 user_id 應 escape(' ' → %20) assert.Contains(t, url, "/files/models/user%20with%20space/j1.nef") // token 段應 query escape('+' / '=' / '/' / ' ') assert.Contains(t, url, "?access_token=abc+def%2B%2F%3D") } // TestDownloadRedirectURL_OwnershipMismatch:not_found。 func TestDownloadRedirectURL_OwnershipMismatch(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.setJob(&ConverterJob{JobID: "j1", Status: "completed", CreatedAt: time.Now()}) fix.ownership.Set("j1", "user-bob") _, err := fix.svc.DownloadRedirectURL(context.Background(), "user-alice", "j1") require.Error(t, err) assert.True(t, errors.Is(err, ErrJobNotFound)) } // TestDownloadRedirectURL_JobNotCompleted:still running → ErrJobNotCompleted。 func TestDownloadRedirectURL_JobNotCompleted(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.setJob(&ConverterJob{JobID: "j1", Status: "running", CreatedAt: time.Now()}) fix.ownership.Set("j1", "user-alice") _, err := fix.svc.DownloadRedirectURL(context.Background(), "user-alice", "j1") require.Error(t, err) assert.True(t, errors.Is(err, ErrJobNotCompleted)) } // TestDownloadRedirectURL_PromoteError_Propagation:promote 5xx 透傳。 func TestDownloadRedirectURL_PromoteError_Propagation(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.setJob(&ConverterJob{JobID: "j1", Status: "completed", CreatedAt: time.Now()}) fix.ownership.Set("j1", "user-alice") fix.converter.promoteFunc = func(ctx context.Context, jobID string, req PromoteReq) (*ConverterPromoteResult, error) { return nil, fmt.Errorf("%w: promote 502", ErrConverterUnavailable) } _, err := fix.svc.DownloadRedirectURL(context.Background(), "user-alice", "j1") require.Error(t, err) assert.True(t, errors.Is(err, ErrConverterUnavailable)) } // TestDownloadRedirectURL_MCError_Propagation:MC delegated 5xx 透傳。 func TestDownloadRedirectURL_MCError_Propagation(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.setJob(&ConverterJob{JobID: "j1", Status: "completed", CreatedAt: time.Now()}) fix.ownership.Set("j1", "user-alice") fix.mcToken.issueDelegatedDownloadFunc = func(ctx context.Context, in IssueDownloadReq) (*DelegatedDownloadToken, error) { return nil, fmt.Errorf("%w: mc 5xx", ErrMCTokenUnavailable) } _, err := fix.svc.DownloadRedirectURL(context.Background(), "user-alice", "j1") require.Error(t, err) assert.True(t, errors.Is(err, ErrMCTokenUnavailable)) } // ========================================================================== // helper functions tests // ========================================================================== func TestNormalizeTargetChip(t *testing.T) { t.Parallel() cases := []struct { in, want string }{ {"720", "kl720"}, {"520", "kl520"}, {"KL630", "kl630"}, {"kl730", "kl730"}, {"", ""}, {" 720 ", "kl720"}, } for _, c := range cases { assert.Equal(t, c.want, normalizeTargetChip(c.in), "input=%q", c.in) } } func TestDefaultModelName(t *testing.T) { t.Parallel() assert.Equal(t, "yolov5s_kl720", defaultModelName(&ConverterJob{ SourceFilename: "yolov5s.onnx", Platform: "720", })) assert.Equal(t, "yolov5s_kl520", defaultModelName(&ConverterJob{ SourceFilename: "/path/to/yolov5s.onnx", Platform: "520", })) // 沒 chip assert.Equal(t, "x", defaultModelName(&ConverterJob{SourceFilename: "x.tflite"})) // 沒 stem assert.Equal(t, "converted_kl720", defaultModelName(&ConverterJob{Platform: "720"})) } func TestEscapeObjectKeyPath(t *testing.T) { t.Parallel() assert.Equal(t, "models/user/file.nef", escapeObjectKeyPath("models/user/file.nef")) // space 在 path 中需 escape assert.Equal(t, "models/user%20space/file.nef", escapeObjectKeyPath("models/user space/file.nef")) // '/' 保留(path separator);其他 path-reserved 字元正常 escape assert.Equal(t, "a%3Fb/c", escapeObjectKeyPath("a?b/c")) // '+' 在 path 段是 valid,不會被 escape(與 query string 不同) assert.Equal(t, "a+b/c", escapeObjectKeyPath("a+b/c")) } func TestBuildTargetObjectKey(t *testing.T) { t.Parallel() assert.Equal(t, "models/u1/j1.nef", buildTargetObjectKey("u1", "j1")) } func TestBuildStorageKey(t *testing.T) { t.Parallel() assert.Equal(t, "models/u1/m1.nef", buildStorageKey("u1", "m1")) }