// Ownership store 單元測試。 // // 測試策略: // - Set/Get/Delete 用 race detector 驗 concurrent safety // - EnsureRebuilt 用 stub ConverterClient(atomic counter 紀錄 fetch 次數) // 驗:first-call fetches / second-call noop / per-user 並行 / thundering herd 收斂 // - 失敗路徑驗:error 不標 rebuilt → 下次再 fetch // // Phase 0.8 conversion (見 .autoflow/04-architecture/conversion.md §2.6.1) package conversion import ( "context" "errors" "fmt" "io" "log/slog" "strconv" "sync" "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // ========================================================================== // stub ConverterClient — 只實作 ListInProgressJobs,其他 panic(測試不用) // ========================================================================== // stubConverterClient 是 test 用的 fake ConverterClient。 // // 只實作 ListInProgressJobs(其他 method 測試不用,panic 防呆);用 atomic counter // 紀錄各 user 被呼叫次數。 type stubConverterClient struct { mu sync.Mutex // jobsByUser: user_id → 該 user 的 in_progress jobs(若 nil → 空 slice) jobsByUser map[string][]*ConverterJob // errByUser: user_id → 強制回傳的錯誤(用在失敗路徑測試) errByUser map[string]error // callCountByUser: user_id → ListInProgressJobs 被呼叫次數(atomic counter) callCountByUser sync.Map // map[string]*atomic.Int32 // fetchDelay 模擬慢 fetch(讓併發測試有機會競態) fetchDelay time.Duration // blockSignal 若非 nil,每次 ListInProgressJobs 進入時發 signal(用在 timeout 測試) blockSignal chan struct{} // blockUntil 若非 nil,會 block 在 ctx.Done 或這個 channel 任一觸發 blockUntil chan struct{} } func newStubConverterClient() *stubConverterClient { return &stubConverterClient{ jobsByUser: make(map[string][]*ConverterJob), errByUser: make(map[string]error), } } func (s *stubConverterClient) setJobs(userID string, jobs []*ConverterJob) { s.mu.Lock() defer s.mu.Unlock() s.jobsByUser[userID] = jobs } func (s *stubConverterClient) setError(userID string, err error) { s.mu.Lock() defer s.mu.Unlock() s.errByUser[userID] = err } // callCount 取某個 user 被呼叫的次數。 func (s *stubConverterClient) callCount(userID string) int32 { v, ok := s.callCountByUser.Load(userID) if !ok { return 0 } return v.(*atomic.Int32).Load() } func (s *stubConverterClient) ListInProgressJobs(ctx context.Context, userID string) ([]*ConverterJob, error) { // atomic counter cnt, _ := s.callCountByUser.LoadOrStore(userID, &atomic.Int32{}) cnt.(*atomic.Int32).Add(1) // 通知 caller 已進入(給 thundering herd 測試用) if s.blockSignal != nil { select { case s.blockSignal <- struct{}{}: default: } } // 若有 blockUntil,等到 signal 或 ctx.Done 才 return(模擬慢 / cancel) if s.blockUntil != nil { select { case <-s.blockUntil: case <-ctx.Done(): return nil, ctx.Err() } } if s.fetchDelay > 0 { select { case <-time.After(s.fetchDelay): case <-ctx.Done(): return nil, ctx.Err() } } s.mu.Lock() err := s.errByUser[userID] jobs := s.jobsByUser[userID] s.mu.Unlock() if err != nil { return nil, err } if jobs == nil { jobs = []*ConverterJob{} } return jobs, nil } // 其他 method panic(測試不會呼叫,撞到 panic 反而好 debug)。 func (s *stubConverterClient) InitJob(ctx context.Context, req InitConverterJobReq) (*ConverterJob, error) { panic("stubConverterClient.InitJob: not used in ownership_test") } func (s *stubConverterClient) GetJob(ctx context.Context, jobID string) (*ConverterJob, error) { panic("stubConverterClient.GetJob: not used in ownership_test") } func (s *stubConverterClient) Promote(ctx context.Context, jobID string, req PromoteReq) (*ConverterPromoteResult, error) { panic("stubConverterClient.Promote: not used in ownership_test") } // 確保 stubConverterClient 滿足 ConverterClient interface(編譯期驗)。 var _ ConverterClient = (*stubConverterClient)(nil) // ========================================================================== // helper:建立靜默 logger(避免測試 stdout 噪音) // ========================================================================== func newSilentLogger() *slog.Logger { return slog.New(slog.NewTextHandler(io.Discard, nil)) } // ========================================================================== // 基本 Set / Get / Delete // ========================================================================== // TestSet_Get_Delete_Basic:write / read / delete 標準操作。 func TestSet_Get_Delete_Basic(t *testing.T) { t.Parallel() stub := newStubConverterClient() o := NewOwnership(stub, newSilentLogger()) // Set + Get o.Set("job-1", "alice") uid, ok := o.Get("job-1") assert.True(t, ok) assert.Equal(t, "alice", uid) // 覆寫 o.Set("job-1", "bob") uid, _ = o.Get("job-1") assert.Equal(t, "bob", uid, "Set 同 jobID 應覆寫") // Delete o.Delete("job-1") _, ok = o.Get("job-1") assert.False(t, ok, "Delete 後 Get 應回 false") // 不存在的 jobID _, ok = o.Get("ghost") assert.False(t, ok) // 防呆:空字串不寫入 o.Set("", "alice") o.Set("job-empty-uid", "") _, ok = o.Get("") assert.False(t, ok) _, ok = o.Get("job-empty-uid") assert.False(t, ok, "空 userID 不應寫入") } // TestDelete_RemovesFromCache:Delete 後 Get 回 false(規範必含)。 func TestDelete_RemovesFromCache(t *testing.T) { t.Parallel() stub := newStubConverterClient() o := NewOwnership(stub, newSilentLogger()) o.Set("job-1", "alice") o.Delete("job-1") _, ok := o.Get("job-1") assert.False(t, ok) // 重複 Delete 不該 panic o.Delete("job-1") o.Delete("never-existed") } // TestSet_Concurrent:100 goroutine 同時 Set 不同 job → race detector 通過。 // // 規範必含:跑 go test -race -count=3 必綠。 func TestSet_Concurrent(t *testing.T) { t.Parallel() stub := newStubConverterClient() o := NewOwnership(stub, newSilentLogger()) const N = 100 var wg sync.WaitGroup wg.Add(N) for i := 0; i < N; i++ { go func(idx int) { defer wg.Done() jobID := "job-" + strconv.Itoa(idx) userID := "user-" + strconv.Itoa(idx%10) // 10 種 user o.Set(jobID, userID) // 立即 Get 驗 not lost uid, ok := o.Get(jobID) assert.True(t, ok) assert.Equal(t, userID, uid) }(i) } wg.Wait() // 驗 100 個都進去了 for i := 0; i < N; i++ { jobID := "job-" + strconv.Itoa(i) _, ok := o.Get(jobID) assert.True(t, ok) } } // TestSet_Get_Delete_Concurrent_Mixed:併發 mixed write/read/delete,race detector 驗。 func TestSet_Get_Delete_Concurrent_Mixed(t *testing.T) { t.Parallel() stub := newStubConverterClient() o := NewOwnership(stub, newSilentLogger()) const N = 50 var wg sync.WaitGroup wg.Add(N * 3) for i := 0; i < N; i++ { jobID := "job-" + strconv.Itoa(i) go func() { defer wg.Done(); o.Set(jobID, "alice") }() go func() { defer wg.Done(); _, _ = o.Get(jobID) }() go func() { defer wg.Done(); o.Delete(jobID) }() } wg.Wait() // 不驗結果(race 驗 deadlock / 共享 state corruption 即可) } // ========================================================================== // EnsureRebuilt // ========================================================================== // TestEnsureRebuilt_FirstCall_Fetches:第一次該 user 真的打 converter(規範必含)。 func TestEnsureRebuilt_FirstCall_Fetches(t *testing.T) { t.Parallel() stub := newStubConverterClient() stub.setJobs("alice", []*ConverterJob{ {JobID: "j-1", Status: "running"}, }) o := NewOwnership(stub, newSilentLogger()) err := o.EnsureRebuilt(context.Background(), "alice") require.NoError(t, err) assert.Equal(t, int32(1), stub.callCount("alice"), "首次應打 converter 1 次") // 驗 jobToUser 已寫入 uid, ok := o.Get("j-1") assert.True(t, ok) assert.Equal(t, "alice", uid) } // TestEnsureRebuilt_SecondCall_NoOp:第二次該 user noop(atomic counter 驗,規範必含)。 func TestEnsureRebuilt_SecondCall_NoOp(t *testing.T) { t.Parallel() stub := newStubConverterClient() stub.setJobs("alice", []*ConverterJob{{JobID: "j-1"}}) o := NewOwnership(stub, newSilentLogger()) require.NoError(t, o.EnsureRebuilt(context.Background(), "alice")) require.NoError(t, o.EnsureRebuilt(context.Background(), "alice")) require.NoError(t, o.EnsureRebuilt(context.Background(), "alice")) assert.Equal(t, int32(1), stub.callCount("alice"), "成功 rebuild 後同 user 後續呼叫應 noop") } // TestEnsureRebuilt_DifferentUsers_EachFetch:不同 user 各自 fetch 一次(規範必含)。 func TestEnsureRebuilt_DifferentUsers_EachFetch(t *testing.T) { t.Parallel() stub := newStubConverterClient() stub.setJobs("alice", []*ConverterJob{{JobID: "j-a"}}) stub.setJobs("bob", []*ConverterJob{{JobID: "j-b"}}) stub.setJobs("carol", []*ConverterJob{}) o := NewOwnership(stub, newSilentLogger()) require.NoError(t, o.EnsureRebuilt(context.Background(), "alice")) require.NoError(t, o.EnsureRebuilt(context.Background(), "bob")) require.NoError(t, o.EnsureRebuilt(context.Background(), "carol")) assert.Equal(t, int32(1), stub.callCount("alice")) assert.Equal(t, int32(1), stub.callCount("bob")) assert.Equal(t, int32(1), stub.callCount("carol")) // 二次呼叫 noop require.NoError(t, o.EnsureRebuilt(context.Background(), "alice")) require.NoError(t, o.EnsureRebuilt(context.Background(), "bob")) assert.Equal(t, int32(1), stub.callCount("alice")) assert.Equal(t, int32(1), stub.callCount("bob")) } // TestEnsureRebuilt_Concurrent_OnlyOneFetch:同 user 100 goroutine 同時 EnsureRebuilt // → atomic counter 驗只 fetch 一次(規範必含 — thundering herd 收斂)。 func TestEnsureRebuilt_Concurrent_OnlyOneFetch(t *testing.T) { t.Parallel() stub := newStubConverterClient() stub.setJobs("alice", []*ConverterJob{{JobID: "j-1"}}) stub.fetchDelay = 50 * time.Millisecond // 故意讓 fetch 慢,放大 race window o := NewOwnership(stub, newSilentLogger()) const N = 100 var wg sync.WaitGroup wg.Add(N) errs := make(chan error, N) for i := 0; i < N; i++ { go func() { defer wg.Done() if err := o.EnsureRebuilt(context.Background(), "alice"); err != nil { errs <- err } }() } wg.Wait() close(errs) for err := range errs { t.Errorf("EnsureRebuilt 失敗: %v", err) } assert.Equal(t, int32(1), stub.callCount("alice"), "同 user 100 個併發 caller 應只 fetch 1 次(DCL 收斂)") } // TestEnsureRebuilt_Concurrent_DifferentUsers_NotBlocked:不同 user 並行 rebuild // 互不阻塞(per-user mutex 設計驗證)。 func TestEnsureRebuilt_Concurrent_DifferentUsers_NotBlocked(t *testing.T) { t.Parallel() stub := newStubConverterClient() stub.fetchDelay = 200 * time.Millisecond const N = 10 for i := 0; i < N; i++ { stub.setJobs("u-"+strconv.Itoa(i), []*ConverterJob{}) } o := NewOwnership(stub, newSilentLogger()) start := time.Now() var wg sync.WaitGroup wg.Add(N) for i := 0; i < N; i++ { uid := "u-" + strconv.Itoa(i) go func() { defer wg.Done() _ = o.EnsureRebuilt(context.Background(), uid) }() } wg.Wait() elapsed := time.Since(start) // 若 per-user mutex 失效退化成全域鎖:N=10 * 200ms = 2s // 並行情況:應該接近單次 fetch 200ms(加上少量 schedule overhead) // 用 1s 當判斷線(給 CI 足夠寬裕) assert.Less(t, elapsed, time.Second, "不同 user rebuild 應並行(per-user mutex),elapsed=%v", elapsed) } // TestEnsureRebuilt_ConverterError_NotMarkedRebuilt:converter 5xx → 不標 rebuilt // → 下次再 fetch(規範必含)。 func TestEnsureRebuilt_ConverterError_NotMarkedRebuilt(t *testing.T) { t.Parallel() stub := newStubConverterClient() stub.setError("alice", ErrConverterUnavailable) o := NewOwnership(stub, newSilentLogger()) // 第一次 fetch 失敗 err := o.EnsureRebuilt(context.Background(), "alice") require.Error(t, err) assert.True(t, errors.Is(err, ErrConverterUnavailable)) assert.Equal(t, int32(1), stub.callCount("alice")) // 第二次仍會 fetch(不標 rebuilt) err = o.EnsureRebuilt(context.Background(), "alice") require.Error(t, err) assert.Equal(t, int32(2), stub.callCount("alice"), "上次失敗後應再次 fetch") // 第三次成功 → 後續才會 noop stub.setError("alice", nil) stub.setJobs("alice", []*ConverterJob{{JobID: "j-1"}}) require.NoError(t, o.EnsureRebuilt(context.Background(), "alice")) assert.Equal(t, int32(3), stub.callCount("alice")) require.NoError(t, o.EnsureRebuilt(context.Background(), "alice")) assert.Equal(t, int32(3), stub.callCount("alice"), "成功後才標 rebuilt") } // TestEnsureRebuilt_ContextCancel:ctx cancel 立即 return(規範必含)。 func TestEnsureRebuilt_ContextCancel(t *testing.T) { t.Parallel() stub := newStubConverterClient() stub.blockUntil = make(chan struct{}) // 永遠不放 → 強迫等 ctx stub.setJobs("alice", []*ConverterJob{}) o := NewOwnership(stub, newSilentLogger()) ctx, cancel := context.WithCancel(context.Background()) done := make(chan error, 1) go func() { done <- o.EnsureRebuilt(ctx, "alice") }() // 等 50ms 確保 goroutine 已進到 fetch(block 在 blockUntil) time.Sleep(50 * time.Millisecond) cancel() select { case err := <-done: require.Error(t, err, "ctx cancel 應 return error") assert.True(t, errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded), "err 應為 context.Canceled 或 DeadlineExceeded,got: %v", err, ) case <-time.After(2 * time.Second): t.Fatal("ctx cancel 後 EnsureRebuilt 沒有及時 return") } // 不標 rebuilt — 下次重試 close(stub.blockUntil) // 解除 block stub.blockUntil = nil // 後續不再 block stub.setJobs("alice", []*ConverterJob{{JobID: "j-1"}}) require.NoError(t, o.EnsureRebuilt(context.Background(), "alice")) } // TestEnsureRebuilt_Timeout:rebuild 內部 timeout(converter 慢 > 5s)→ return // timeout error,不標 rebuilt。 // // 為避免測試本身跑 5s+,把 fetchDelay 設 100ms 但用 ctx WithTimeout 50ms 模擬同樣語意: // 驗 ctx cancel path 即可(ownership.go 的 rebuildTimeout 邏輯與此相同)。 func TestEnsureRebuilt_ParentCtxTimeout(t *testing.T) { t.Parallel() stub := newStubConverterClient() stub.fetchDelay = 200 * time.Millisecond stub.setJobs("alice", []*ConverterJob{}) o := NewOwnership(stub, newSilentLogger()) ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() err := o.EnsureRebuilt(ctx, "alice") require.Error(t, err) assert.True(t, errors.Is(err, context.DeadlineExceeded), "parent ctx timeout 應透傳, got: %v", err) } // TestEnsureRebuilt_EmptyUserID:空 userID return error。 func TestEnsureRebuilt_EmptyUserID(t *testing.T) { t.Parallel() stub := newStubConverterClient() o := NewOwnership(stub, newSilentLogger()) err := o.EnsureRebuilt(context.Background(), "") require.Error(t, err) } // ========================================================================== // ActiveJobOf // ========================================================================== // TestActiveJobOf_AfterRebuild:rebuild 後從 jobToUser 反查到 in_progress 的 job_id(規範必含)。 func TestActiveJobOf_AfterRebuild(t *testing.T) { t.Parallel() stub := newStubConverterClient() stub.setJobs("alice", []*ConverterJob{ {JobID: "j-active-1", Status: "running"}, }) o := NewOwnership(stub, newSilentLogger()) // rebuild 前 ActiveJobOf 應空(cache 沒資料) jobs := o.ActiveJobOf("alice") assert.Len(t, jobs, 0) require.NoError(t, o.EnsureRebuilt(context.Background(), "alice")) // rebuild 後反查 jobs = o.ActiveJobOf("alice") require.Len(t, jobs, 1) assert.Equal(t, "j-active-1", jobs[0]) } // TestActiveJobOf_Empty_NoJobs:user 沒任何 job → 空 slice(規範必含)。 func TestActiveJobOf_Empty_NoJobs(t *testing.T) { t.Parallel() stub := newStubConverterClient() stub.setJobs("alice", []*ConverterJob{}) // 沒 active job o := NewOwnership(stub, newSilentLogger()) require.NoError(t, o.EnsureRebuilt(context.Background(), "alice")) jobs := o.ActiveJobOf("alice") assert.NotNil(t, jobs, "回非 nil 空 slice 給 caller 安全 range") assert.Len(t, jobs, 0) } // TestActiveJobOf_OtherUser_NotIncluded:反查只回該 user 的,不會混到別 user。 func TestActiveJobOf_OtherUser_NotIncluded(t *testing.T) { t.Parallel() stub := newStubConverterClient() o := NewOwnership(stub, newSilentLogger()) o.Set("j-alice", "alice") o.Set("j-bob", "bob") o.Set("j-alice-2", "alice") aliceJobs := o.ActiveJobOf("alice") assert.ElementsMatch(t, []string{"j-alice", "j-alice-2"}, aliceJobs) bobJobs := o.ActiveJobOf("bob") assert.ElementsMatch(t, []string{"j-bob"}, bobJobs) // 不存在的 user jobs := o.ActiveJobOf("nobody") assert.Len(t, jobs, 0) // 空 user_id jobs = o.ActiveJobOf("") assert.Nil(t, jobs) } // TestActiveJobOf_AfterDelete:Delete 後反查不回該 job。 func TestActiveJobOf_AfterDelete(t *testing.T) { t.Parallel() stub := newStubConverterClient() o := NewOwnership(stub, newSilentLogger()) o.Set("j-1", "alice") o.Set("j-2", "alice") assert.Len(t, o.ActiveJobOf("alice"), 2) o.Delete("j-1") jobs := o.ActiveJobOf("alice") require.Len(t, jobs, 1) assert.Equal(t, "j-2", jobs[0]) } // ========================================================================== // 壓力測試 — 全 method 併發 race + 不死鎖 // ========================================================================== // TestStress_AllMethods_Concurrent:所有 method 同時跑,race detector 驗 + 完成不 timeout。 func TestStress_AllMethods_Concurrent(t *testing.T) { t.Parallel() stub := newStubConverterClient() for i := 0; i < 5; i++ { uid := "u-" + strconv.Itoa(i) stub.setJobs(uid, []*ConverterJob{ {JobID: fmt.Sprintf("j-%d-a", i)}, }) } o := NewOwnership(stub, newSilentLogger()) const ROUNDS = 50 var wg sync.WaitGroup for i := 0; i < ROUNDS; i++ { uid := "u-" + strconv.Itoa(i%5) jobID := "set-" + strconv.Itoa(i) wg.Add(5) go func() { defer wg.Done(); o.Set(jobID, uid) }() go func() { defer wg.Done(); _, _ = o.Get(jobID) }() go func() { defer wg.Done(); _ = o.EnsureRebuilt(context.Background(), uid) }() go func() { defer wg.Done(); _ = o.ActiveJobOf(uid) }() go func() { defer wg.Done(); o.Delete(jobID) }() } doneCh := make(chan struct{}) go func() { wg.Wait(); close(doneCh) }() select { case <-doneCh: // ok case <-time.After(5 * time.Second): t.Fatal("壓力測試 5s 沒結束 — 疑似 deadlock") } }