diff --git a/docs/autoflow/04-architecture/conversion.md b/docs/autoflow/04-architecture/conversion.md index f101f40..a365ede 100644 --- a/docs/autoflow/04-architecture/conversion.md +++ b/docs/autoflow/04-architecture/conversion.md @@ -166,7 +166,7 @@ type Service interface { // DownloadStream — 「下載」流程(Phase 0.8b v0.6:server-side stream proxy + converter `GET /api/v1/jobs/{id}/result`): // 1. ownership 檢查 - // 2. ensurePromoted(冪等 cache 對 converter;NEF 確認已在 converter MinIO + FAA) + // 2. ensurePromoted(對 converter 冪等呼叫;NEF 確認已在 converter MinIO + FAA) // 3. converter.GetResult(ctx, jobID) — 直接打 converter GET result endpoint // Authorization: Bearer (同其他 converter API method) // converter response 200 + NEF binary stream + Content-Length + Content-Disposition @@ -297,7 +297,7 @@ type Flow struct { // DownloadStream(v0.6 流程): // 1. ownership.Check(userID, jobID) -// 2. _, _ := flow.ensurePromoted(ctx, jobID) // 冪等 cache,確保 converter 端 promote 完成(NEF 已在 MinIO + FAA) +// 2. _, _ := flow.ensurePromoted(ctx, jobID) // 對 converter 冪等呼叫(converter 端 idempotent),確保 NEF 已在 MinIO + FAA // 3. stream, meta, _ := flow.converter.GetResult(ctx, jobID) // 內部:GET {ConverterBaseURL}/api/v1/jobs/{jobID}/result // Authorization: Bearer @@ -319,7 +319,24 @@ type Flow struct { // 主要 method 對應 Service interface(v0.6 流程已在 struct 上方註解寫出;此處保留結構說明) ``` -**冪等性**:`flow.ensurePromoted(jobID)` 內部用 `sync.Map` 記 `job_id → target_object_key`;同 job 第二次 promote 直接回 cache,不打 converter。`target_object_key` 在 v0.6 仍由 promote response 拿到(給 log / debug 用)、但 visionA 不再用它直接打 FAA(converter `GetResult` 內部知道哪個 object 屬於哪個 jobID)。 +**冪等性(Phase 0.8 簡化)**:`flow.ensurePromoted(jobID)` **每次呼叫都直接打 converter `POST /api/v1/jobs/{id}/promote`、不在 visionA 端 cache**。`target_object_key` 由 visionA 端 `buildTargetObjectKey(userID, jobID)` 構造(規則 `models//.nef`,固定)、promote response 回的 `target_object_key` 用於 log / debug,visionA 不再用它直接打 FAA(converter `GetResult` 內部知道哪個 object 屬於哪個 jobID)。 + +**為什麼 Phase 0.8 不實作 cache**(4 個理由): + +1. **converter 端 promote 本身就 idempotent**(ADR-016 §1.6):同 `job_id` + 同 `target_object_key` 重複呼叫 → converter 內部已 ensure(同步保留 MinIO 物件 + PUT FAA 兩端都是 set semantic)、無副作用、只多 1 個 network round-trip。 +2. **cache 只省 round-trip、不省 promote 本身的 work**:converter 端真正的成本在 MinIO + FAA PUT;visionA 端 sync.Map cache hit 只省「visionA → converter」這 1 跳(~10-50ms LAN)、不影響 user-perceived latency 大頭。 +3. **visionA restart 後 cache 清空、first request 仍重 promote、沒長期收益**:MVP 部署頻繁、cache hit rate 低;且既有 `modelStore.FindBySourceJobID` 在 promote-to-models 已有「真正的」冪等檢查(基於 DB record)——download 路徑沒這層、但 ensurePromoted 對 converter 重複呼叫等價於那個 check。 +4. **Phase 0.8 MVP 流量小、沒觀察到 promote 流量問題**:每個 user 一個 active job、download 觸發頻率「每 job 1-N 次(N 通常 ≤ 3,user 看到 success card 後可能下載 + 同時 promote-to-models)」。 + +**Phase 1+ 升級路徑**:如果 production 觀察到 promote 流量問題(converter promote endpoint p99 飆高 / FAA PUT 對 converter 端是瓶頸)、再加 cache 不遲。三個選項: + +| 選項 | 描述 | 優點 | 缺點 | +|------|------|------|------| +| A | visionA 端 in-memory `sync.Map[jobID]bool` cache | 實作最簡(10 行)、無外部依賴 | restart 後失效、多 instance 部署時各自 cache(不共享) | +| B | DB / Redis 持久化 promoted 狀態(新增 `conversion_jobs.promoted_at` 欄位或 Redis SET) | restart 友善、多 instance 共享 | 多一個外部依賴、寫入路徑多一跳 | +| C | 從既有 `model store` 的 `source_job_id` 推論 promoted | 不需新欄位 / 新結構、復用 promote-to-models 的 source-of-truth | 只 cover「已 promote-to-models」的 case、純 download 未 promote-to-models 的 job 仍每次重 promote(download 路徑覆蓋率不完整) | + +選哪個視 production 觀測結果決定:流量集中在 download-only flow → 選 A 或 B;流量集中在 promote-to-models flow → C 已自然 cover。 **為什麼移除 delegated token 邏輯**:v0.5 規劃「IssueDelegatedDownload + DownloadWithDelegated」依賴 MC 有對應 endpoint 才 work;對 MC source 驗證後確認該 endpoint **從未存在**——v0.5 的設計是 fictional、永遠跑不通。v0.6 把整條鏈撤回、改走 converter 中轉(converter 自己用 OAuth 推 FAA、後續 download 從 converter MinIO 拉)。詳見 [ADR-016](./adr/adr-016-download-via-converter.md)。 @@ -1167,7 +1184,7 @@ frontend 用 `` 觸發時,若失敗 browser 會把錯誤頁顯示在 ### 10.7 Race condition - 同 user 同時兩 tab init → 第一個成功寫 ownership / converter 接受;第二個 pre-check 通過但 converter 409 -- 兩 tab 同時 promote-to-models → 第一個寫 model record 成功;第二個重複呼叫 ensurePromoted(cache hit)→ **converter.GetResult 拉 NEF 兩次**(接受的取捨;converter MinIO 端冪等讀)→ models repo 寫入時可能撞 model_id 衝突 — 改用 model_id 在 finalize 前 SELECT 檢查 +- 兩 tab 同時 promote-to-models → 第一個寫 model record 成功;第二個重複呼叫 ensurePromoted(visionA 端無 cache、直接打 converter,converter 端 idempotent;§2.5)→ **converter.GetResult 拉 NEF 兩次**(接受的取捨;converter MinIO 端冪等讀)→ models repo 寫入時可能撞 model_id 衝突 — 改用 model_id 在 finalize 前 SELECT 檢查 - 兩 tab 同時 download → visionA backend 各自獨立 converter.GetResult(無 cache);兩條 stream 同時跑、兩條都成功(converter MinIO 端冪等讀)— Phase 0.8b 可接受,量大時再加 server-side stream cache 或方向 A(converter Phase 2 download-tokens 讓 browser 直連 converter) ### 10.8 DoS 防護(最小集,Phase 1 強化) @@ -1262,4 +1279,5 @@ frontend 用 `` 觸發時,若失敗 browser 會把錯誤頁顯示在 | 2026-05-11 | 0.4 | **Phase 0.8b**:服務間認證從 OAuth `client_credentials` 改為 pre-shared API key(對應 [ADR-015](./adr/adr-015-server-to-server-api-key.md))。主要變更:(1) §1 端對端 sequence 拿掉 MC node;(2) §2 砍 `mc_token_client.go` 整個檔;(3) §3 新增「服務間認證(API key)」章節(原 §5 OAuth 章節整段刪除,章節編號 4→5);(4) §4.1 `/download` handler 從 `c.Redirect(302)` 改 server-side stream proxy(Service interface `DownloadRedirectURL` → `DownloadStream`);(5) §6 錯誤碼 mapping 移除 MC 4 個 code、新增 `converter_auth_failed` / `faa_auth_failed`;(6) §9.1 retry 矩陣移除 MC 2 row、所有下游 401/403 不重試;(7) §10.2 刪除 delegated token TTL、§10.3 改為 pre-shared API key 保護、§10.4 改為 server-side stream proxy 安全模型;(8) 變更影響清單列出 backend agent 後續實作要動的 .go 檔。OIDC user login 完全不動。 | | 2026-05-15 | 0.4.1 | 修 §4.1 `/download` handler `Content-Disposition` filename 來源描述歧義(T4 Reviewer M-3)— 原註釋「filename 來自 promote 結果」可被誤讀為「FAA promote response 直接給 filename」;改為明確標示「visionA backend 在 service 層由 `defaultDownloadFilename(cj)` 從 conversion job metadata 構造(規則 `_.nef`),對齊 wireframe success card 顯示範例」、並補充「FAA 端的 object_key 是 `models//.nef` 對 user 不友善」的對比說明。純文字釐清、無實作行為變更。 | | 2026-05-16 | 0.5 | **對應 ADR-015 v2.0 範圍縮限**:撤回 v0.4「visionA → FAA 改 API key」決定、FAA 線回到 ADR-014 §2 原設計(MC service token + delegated download token);visionA → converter API key 路線(v0.4)**維持**。主要變更:(1) §1 整體 flow sequence 加回 MC node、download path 改回「MC issue delegated token → visionA 帶 delegated token 打 FAA」;(2) §2 模組設計 — mc_token_client.go 部分復活(保留 service token cache + IssueDelegatedDownload 邏輯)、faa_client.go 改 `DownloadWithDelegated(ctx, delegatedToken, objectKey)`、flow.go 加回 `tokens *MCTokenClient` 欄位、DownloadStream / PromoteToModels 流程加回 IssueDelegatedDownload 步驟;(3) §3 拆成 §3.1 visionA → converter(API key)+ §3.2 visionA → FAA(service token + delegated download token),§3.2 詳述 FAA dual-auth 設計與為什麼 download endpoint 強制用 delegated token;(4) §4.1 download handler 流程改回「ensurePromoted → IssueDelegatedDownload → DownloadWithDelegated」(保留 server-side stream proxy 不退回 302);(5) §6 錯誤碼回收 `mc_token_unavailable` / `download_token_failed` 兩個 code,撤回 v0.4 加的 `faa_auth_failed`;(6) §9 retry 矩陣回收 MC 兩 row、FAA row 改回 service token + delegated;(7) §10 安全考量 — §10.2 delegated token TTL 回收、§10.3 API key 保護縮限至 converter、新增 §10.4 MC service token + delegated download token 保護、§10.6 三方對比加 v0.5 column;(8) 變更影響清單列出 backend agent 下次任務範圍(從 v0.4 回退 FAA 線 + mc_token_client 部分復活)。**本次純文件修訂、source code 改造留給 backend agent 下次任務**。OIDC user login 完全不動。 | +| 2026-05-16 | 0.6.1 | 修 §2.5 ensurePromoted cache 描述歧義(T2 review M-2)— 原本寫「實作 `sync.Map[jobID]bool` cache、同 job 第二次 promote 直接回 cache」但 visionA backend 實際 implementation(flow.go `ensurePromoted`)沒實作這個 cache、每次都直接打 converter `POST /promote`;改為明確標示「Phase 0.8 簡化 — 不實作 cache」並補 4 個簡化理由(converter 端 promote idempotent / cache 只省 round-trip / restart 後失效 / MVP 流量小) + 3 個 Phase 1+ 升級選項(in-memory sync.Map / DB-or-Redis 持久化 / 從 model store source_job_id 推論)。code 行為不變、純文件對齊。 | | 2026-05-16 | 0.6 | **對應 [ADR-016](./adr/adr-016-download-via-converter.md)**:撤回 v0.5「visionA → FAA 線回到 MC service token + delegated download token」**全部規劃**。原因:對 MC source 全 grep 驗證後確認 MC **沒有** `POST /file-access/download-tokens` endpoint、也沒有 FAA `MemberCenterDelegatedDownloadTokenValidator` assume 的 introspection endpoint—— ADR-014 §2 與 ADR-015 v2.0 §2 的 delegated token 鏈是 fictional(從 2026-05-02 寫定起未曾 e2e 跑通)。**v0.6 新設計**:visionA download 改走 **converter 新增的 `GET /api/v1/jobs/{id}/result` + visionA stream 中轉**;visionA 端不再有任何 visionA → MC / visionA → FAA 路徑、server-to-server 認證收斂為單條 visionA → converter(API key);promote 仍走 visionA → converter `POST /promote`(converter 內部 PUT FAA 與 visionA 無關,不變)。主要變更:(1) §1 整體 flow sequence 移除 MC node、download path 改成「converter.GetResult」;(2) §2 模組設計 — mc_token_client.go 維持砍除(撤回 v0.5 部分復活)、faa_client.go 改名為 converter_result_client.go(或併入 converter_client.go),新增 `GetResult` method、flow.go 移除 `tokens *MCTokenClient` 欄位、DownloadStream / PromoteToModels 都改走 converter.GetResult;(3) §3 整段重寫 — §3.1 visionA → converter API key(不變、新增同 key 用於 GetResult endpoint)+ §3.2 visionA → FAA 整段撤回(§3.2.1~§3.2.5 全部標 v0.6 撤回);(4) §4.1 download handler 流程改成「ensurePromoted → converter.GetResult」(保留 server-side stream proxy 不退回 302);(5) §6 錯誤碼撤回 `faa_unavailable` / `mc_token_unavailable` / `download_token_failed` 三個 code、新增 `result_not_found` / `result_expired` 兩個 code;(6) §9 retry 矩陣移除 MC 兩 row、FAA row 全部撤回、新增 Converter GetResult row;(7) §10 安全考量 — §10.2 delegated token TTL 整段撤回、§10.3 API key 保護維持縮限至 converter 同時新增「同一把用於 GetResult」說明、§10.4 MC service token + delegated download token 保護整段撤回、§10.6 加 v0.6 column 對比、§10.7 race condition 與 §10.8 DoS(重編號)更新;(8) 變更影響清單列出 backend agent 下次任務範圍(從 v0.5 規劃撤回 + 新增 converter.GetResult + 跨 repo converter scheduler 加 endpoint)。**本次純文件修訂、source code 改造留給 backend agent 下次任務 + converter 跨 repo 由 jimchen 自行處理**。OIDC user login 完全不動。 | diff --git a/visionA-backend/cmd/api-server/conversion_e2e_test.go b/visionA-backend/cmd/api-server/conversion_e2e_test.go index f985178..72d3f91 100644 --- a/visionA-backend/cmd/api-server/conversion_e2e_test.go +++ b/visionA-backend/cmd/api-server/conversion_e2e_test.go @@ -79,10 +79,11 @@ import ( // mockConverter 模擬 kneron_model_converter 的 task-scheduler。 // -// 對應 4 個 endpoint(converter_client.go 註解列表): +// 對應 5 個 endpoint(converter_client.go 註解列表;Phase 0.8b v0.6 新增 GetResult): // - POST /api/v1/jobs — InitJob(multipart streaming) // - GET /api/v1/jobs/{id} — GetJob // - POST /api/v1/jobs/{id}/promote — Promote +// - GET /api/v1/jobs/{id}/result — GetResult(v0.6 新增;ADR-016 §1) // - GET /api/v1/jobs?user_id=&status=in_progress — ListInProgressJobs // // 透過 atomic.Int32 counter 計數每個 endpoint 被打幾次(給場景 #2 lazy rebuild 驗證用)。 @@ -96,10 +97,11 @@ type mockConverter struct { userActive map[string][]string // observed:紀錄關鍵事件以便驗證 - initCallCount atomic.Int32 - getJobCallCount atomic.Int32 - promoteCallCount atomic.Int32 - listJobsCallCount atomic.Int32 + initCallCount atomic.Int32 + getJobCallCount atomic.Int32 + promoteCallCount atomic.Int32 + listJobsCallCount atomic.Int32 + getResultCallCount atomic.Int32 // v0.6(ADR-016 §1) // initBodyBytes:場景 #1 驗 streaming forward 收到的真實 body(mock 端 ReadAll 後保留) initBodyMu sync.Mutex @@ -110,6 +112,11 @@ type mockConverter struct { // nextInitBehavior:給場景 #4 用 — 若設為 conflictUserID,第二次 init 對該 user // 直接回 409 user_has_active_job nextInitConflict atomic.Int32 // 0=正常;>0=回 409 / 後續 decrement + + // v0.6:getResult endpoint 用的 NEF binary payload 與 last auth header(取代原 mock FAA) + resultMu sync.Mutex + resultPayload []byte // nil → 預設一個小 marker payload + resultLastAuthHeader string } // initBodyMust 把 mock 收到的 init body 取出(test caller 用)。 @@ -325,6 +332,13 @@ func (m *mockConverter) handleJobsByID(w http.ResponseWriter, r *http.Request) { return } m.handlePromote(w, r, jobID) + case "result": + // v0.6(ADR-016 §1):GET /api/v1/jobs/{id}/result — visionA download path 改走此 endpoint + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + m.handleGetResult(w, r, jobID) default: http.NotFound(w, r) } @@ -382,6 +396,67 @@ func (m *mockConverter) handlePromote(w http.ResponseWriter, r *http.Request, jo }) } +// handleGetResult 模擬 GET /api/v1/jobs/{id}/result(v0.6 / ADR-016 §1)。 +// +// 行為: +// - 紀錄收到的 Authorization header(給 E2E 驗證 visionA 帶上 converter API key) +// - job 不存在 → 404 `job_not_found` +// - job 存在但 status != completed → 409 `job_not_completed` +// - 否則 200 + Content-Type/Length/Disposition + body +// (payload 由 setResultPayload 控制;nil 用 default marker) +func (m *mockConverter) handleGetResult(w http.ResponseWriter, r *http.Request, jobID string) { + m.getResultCallCount.Add(1) + + m.resultMu.Lock() + m.resultLastAuthHeader = r.Header.Get("Authorization") + payload := m.resultPayload + m.resultMu.Unlock() + if payload == nil { + payload = []byte("mock-converter-result-default-payload") + } + + m.mu.Lock() + j := m.jobs[jobID] + m.mu.Unlock() + if j == nil { + writeJSON(w, http.StatusNotFound, map[string]any{ + "error": map[string]any{"code": "job_not_found"}, + }) + return + } + if j.Status != "completed" { + writeJSON(w, http.StatusConflict, map[string]any{ + "error": map[string]any{"code": "job_not_completed"}, + "status": j.Status, + }) + return + } + + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Content-Length", strconv.Itoa(len(payload))) + // converter 給的 filename 是 object_key 派生(對 user 不直觀);visionA backend 端會用 + // defaultDownloadFilename(cj) 覆寫成 _.nef 對齊 wireframe + w.Header().Set("Content-Disposition", `attachment; filename="`+jobID+`.nef"`) + w.Header().Set("Cache-Control", "no-store, no-cache, must-revalidate, max-age=0") + w.WriteHeader(http.StatusOK) + _, _ = w.Write(payload) +} + +// setResultPayload 設定 GET /result 回的 binary 內容(測試端控制 byte-perfect 比對)。 +func (m *mockConverter) setResultPayload(payload []byte) { + m.resultMu.Lock() + defer m.resultMu.Unlock() + m.resultPayload = payload +} + +// getResultLastAuthHeader 取最後一次 GET /result 收到的 Authorization header +// (測試驗 visionA 帶上正確 Bearer )。 +func (m *mockConverter) getResultLastAuthHeader() string { + m.resultMu.Lock() + defer m.resultMu.Unlock() + return m.resultLastAuthHeader +} + // markJobCompleted 把 mock 端 jobID 推進到 completed 狀態(給場景 #3 download 用)。 func (m *mockConverter) markJobCompleted(jobID string) { m.mu.Lock() @@ -853,34 +928,36 @@ func TestConversionE2E_LazyRebuildAfterRestart(t *testing.T) { // E2E #3:Download server-side stream proxy(Phase 0.8b) // ========================================================================== -// TestConversionE2E_DownloadStream 驗 Phase 0.8b 後 download 端對端行為: +// TestConversionE2E_DownloadStream 驗 Phase 0.8b v0.6 後 download 端對端行為: // // 1. user X 對 completed job 打 /download → status 200 OK // 2. response header: // - Content-Type: application/octet-stream // - Content-Disposition: attachment; filename="..."(filename 經 sanitize) // - Cache-Control: no-store, no-cache, must-revalidate, max-age=0 -// 3. response body bytes 與 mock FAA 寫的 binary 一致(byte-perfect) -// 4. mock FAA 收到 visionA 帶的 Authorization: Bearer -// (驗 visionA 端真的用 API key wire 對下游發 request) +// 3. response body bytes 與 mock converter `/result` 寫的 binary 一致(byte-perfect) +// 4. mock converter 端 `/api/v1/jobs/{id}/result` 收到 visionA 帶的 Authorization: +// Bearer (驗 visionA 端真的用 API key wire 對 converter 發 request) // 5. **沒有** 302 / Location header / token 結構性流經 frontend // (Phase 0.8b 設計核心:server-side proxy 取代 delegated token redirect) +// 6. **v0.6 新增**:visionA 端**不再直接打 FAA**(mock FAA `/files` 的 getCallCount 應為 0) +// —— 對應 ADR-016 撤回 visionA → FAA 直接呼叫的設計 // -// 對齊 api-conversion.md §4 (Phase 0.8b) + conversion.md §4.1 + ADR-015 §7。 +// 對齊 api-conversion.md §4 (Phase 0.8b v0.6) + conversion.md §4.1 + ADR-016 §1。 // // 流程: -// - 起 fixture(mock FAA 預設回 small NEF binary marker) +// - 起 fixture(mock converter `/result` 預設回 small NEF binary marker) // - user X init 一個 job → mock converter 自動建 running job // - markJobCompleted(jobID) 把 mock job 推進 completed // - 對 /download 打 GET — client 設 ErrUseLastResponse 防止意外 follow(雖預期非 302) -// - 驗以上 5 點 +// - 驗以上 6 點 func TestConversionE2E_DownloadStream(t *testing.T) { f := setupConversionFixture(t) defer f.Close() - // 設定 mock FAA 回的 NEF binary(測試端控制 byte-perfect 比對) - const wantNEFContent = "PHASE-0.8b-MOCK-NEF-BINARY-PAYLOAD-FROM-FAA-STREAM-1234567890" - f.faa.setNEFPayload([]byte(wantNEFContent)) + // v0.6:設定 mock converter `/result` 回的 NEF binary(取代原 mock FAA setNEFPayload) + const wantNEFContent = "PHASE-0.8b-v0.6-MOCK-NEF-BINARY-PAYLOAD-FROM-CONVERTER-RESULT-12345" + f.conv.setResultPayload([]byte(wantNEFContent)) const wantSub = "user-download-003" client := f.AuthenticatedClient(t, wantSub, "download@e2e.local") @@ -924,8 +1001,12 @@ func TestConversionE2E_DownloadStream(t *testing.T) { assert.NotContains(t, cd, "\n", "Content-Disposition 不應含 \\n(CRLF injection 防護)") // filename 應對齊 wireframe §8.1:_.nef // (mock converter 建的 job source_filename=yolov5s.onnx + platform=720 → yolov5s_kl720.nef) + // — v0.6:mock converter `/result` 給的 filename 對 user 不直觀,visionA backend 用 + // defaultDownloadFilename(cj) 覆寫成 _.nef assert.Contains(t, cd, ".nef", "filename 應以 .nef 結尾(NEF 結果檔),得 %s", cd) + assert.Contains(t, cd, "yolov5s_kl720.nef", + "v0.6:filename 應由 visionA defaultDownloadFilename(cj) 覆寫成 _.nef,得 %s", cd) cc := resp.Header.Get("Cache-Control") assert.Contains(t, cc, "no-store", @@ -935,16 +1016,17 @@ func TestConversionE2E_DownloadStream(t *testing.T) { assert.Contains(t, cc, "must-revalidate", "Cache-Control 應含 must-revalidate,得 %s", cc) - // === 斷言 3:response body byte-perfect 對齊 mock FAA 寫的 binary === + // === 斷言 3:response body byte-perfect 對齊 mock converter `/result` 寫的 binary === assert.Equal(t, wantNEFContent, string(bodyBytes), - "response body 應等於 mock FAA 寫的 NEF binary(byte-perfect stream proxy)") + "response body 應等於 mock converter `/result` 寫的 NEF binary(byte-perfect stream proxy)") - // === 斷言 4:mock FAA 收到 visionA 帶的 Authorization Bearer === - authHeader := f.faa.getLastAuthHeader() + // === 斷言 4:mock converter `/result` 收到 visionA 帶的 Authorization Bearer === + // v0.6:取代原本驗 mock FAA 收到 FAA API key 的斷言 + authHeader := f.conv.getResultLastAuthHeader() assert.True(t, strings.HasPrefix(authHeader, "Bearer "), - "mock FAA 應收到 Bearer 開頭的 Authorization header,得 %q", authHeader) - assert.Contains(t, authHeader, "fixture-faa-api-key-do-not-use-in-prod", - "mock FAA 應收到 fixture FAA API key(驗 visionA 端 wire 正確)") + "mock converter `/result` 應收到 Bearer 開頭的 Authorization header,得 %q", authHeader) + assert.Contains(t, authHeader, "fixture-converter-api-key-do-not-use-in-prod", + "mock converter `/result` 應收到 fixture converter API key(驗 visionA 端 wire 正確)") // === 斷言 5:沒有 302 / Location header(Phase 0.8b 結構性無 redirect)=== assert.NotEqual(t, http.StatusFound, resp.StatusCode, @@ -952,9 +1034,21 @@ func TestConversionE2E_DownloadStream(t *testing.T) { assert.Empty(t, resp.Header.Get("Location"), "Phase 0.8b 不應有 Location header(無 redirect 流程)") - // 驗 mock FAA 真的被打到(防 mock 路徑 wire 錯) - assert.GreaterOrEqual(t, int(f.faa.getCallCount.Load()), 1, - "mock FAA GET /files 應至少被打一次") + // === 斷言 6(v0.6 新增):visionA 端不再直接打 FAA === + // + // 這條斷言是 **ADR-016 §1 設計約束的 regression 防護**:visionA 端不再直接呼叫 FAA, + // 整條 download path 改走 converter MinIO。萬一未來某 agent 不小心把 `f.faa.GetFile` + // 加回 production code(例如「optimize: 直接打 FAA 跳一層」),此 e2e 立即 fail。 + // 比依賴 reviewer 抓更可靠。 + // + // **T3 計畫**:T3 砍 faa_client.go 整檔後,mock FAA + `f.faa.getCallCount` 計數器都會 + // 一起砍;此 assertion 應改為「wire 點不存在 FAA dependency」的編譯期靜態斷言 + // (e.g. `var _ = (*conversion.FAAClient)(nil)` 不應 compile),維持同等強度的 regression 防護。 + assert.Equal(t, int32(0), f.faa.getCallCount.Load(), + "v0.6:visionA 端不應再直接打 FAA(download path 改走 converter `/result`)") + // 驗 mock converter `/result` 真的被打到(防 wire 路徑錯) + assert.GreaterOrEqual(t, int(f.conv.getResultCallCount.Load()), 1, + "mock converter GET /api/v1/jobs/{id}/result 應至少被打一次") } // ========================================================================== diff --git a/visionA-backend/internal/conversion/conversion.go b/visionA-backend/internal/conversion/conversion.go index 2796d9b..6b20a01 100644 --- a/visionA-backend/internal/conversion/conversion.go +++ b/visionA-backend/internal/conversion/conversion.go @@ -67,41 +67,70 @@ type Service interface { // PromoteToModels 執行「加到模型庫」流程。 // - // 步驟(見 conversion.md §1 Stage 3a + §2.5): + // 步驟(Phase 0.8b v0.6,對齊 ADR-016 / conversion.md §1 Stage 3a + §2.5; + // 實作見 internal/conversion/flow.go PromoteToModels): // 1. ownership.Check(userID, jobID) - // 2. ensurePromoted(jobID) — 冪等:若已 promote 過用 cache,否則打 converter - // 3. faa.Download(promotedKey) — 用 service token (scope=files:download.read) server-to-server pull - // 4. 走既有 /api/models/init + /api/models/finalize(不繞過既有 handler 邏輯) - // 5. 回填 model.Source="converted" + model.SourceJobID=jobID(schema 已預埋) + // 2. converter.GetJob — 確認 status=completed(否則 ErrJobNotCompleted) + // 3. 冪等檢查:modelStore.FindBySourceJobID — 已有 model 直接回(避免重複 promote) + // 4. converter.Promote — 觸發 NEF 推到 FAA 並保留在 converter MinIO,拿到 target_object_key + // / size / checksum + // 5. converter.GetResult(jobID) — streaming pull NEF binary from converter MinIO + // (v0.6:取代原 faa.Download(promotedKey);visionA 端不再直接打 FAA) + // 6. storage.Put — streaming 寫進 visionA storage(不 ReadAll;size 用 promoteRes.Size 為權威值, + // 避免 converter response chunked transfer 時 Content-Length = -1 的邊界) + // 7. modelStore.Save — 建 model record(Source="converted"、SourceJobID=jobID;schema 已預埋) // // 冪等性:對同一 jobID 重複呼叫;若已建過 model record,回既有 modelID 而非新建。 // + // 安全模型演進(見 conversion.md §10.6 + ADR-016 後果): + // - Phase 0.8:visionA backend 用 service token (scope=files:download.read) server-to-server + // pull from FAA + // - Phase 0.8b v0.4:visionA backend 中轉 stream from FAA(API key) + // - **Phase 0.8b v0.6**:visionA backend 中轉 stream from **converter MinIO**;visionA 端 + // 完全不接觸 MC / FAA;認證鏈只剩單條 visionA → converter API key + // // `name` 是 Design Phase 0.8 wireframe §7.1 的單一欄位(不含 description)。 PromoteToModels(ctx context.Context, userID, jobID, name string) (*PromoteResult, error) - // DownloadStream 產出「下載」的 server-side stream proxy(Phase 0.8b 變更,對應 ADR-015 §7)。 + // DownloadStream 產出「下載」的 server-side stream proxy(Phase 0.8b v0.6 變更, + // 對應 ADR-016 §1 + conversion.md §2.5 / §4.1)。 // - // 流程(見 conversion.md §1 Stage 3b + §4.1): + // 流程(Phase 0.8b v0.6,ADR-016 §1 + conversion.md §2.5 / §4.1; + // 實作見 internal/conversion/flow.go DownloadStream): // 1. ownership 檢查(不符 → ErrJobNotFound,§7.2 防枚舉) - // 2. converter.GetJob 確認 status=completed(否則 ErrJobNotCompleted) - // 3. ensurePromoted(與 PromoteToModels 共用同一個 converter promote endpoint,冪等) - // 4. faa.GetFile(targetObjectKey) — 用 pre-shared API key 直接拉 NEF stream + // 2. converter.GetJob — 確認 status=completed(否則 ErrJobNotCompleted) + // 3. ensurePromoted — 自動觸發 promote 確保 converter MinIO 內有 NEF(converter 端冪等) + // 4. converter.GetResult(jobID) — 從 converter MinIO streaming pull NEF binary + // (v0.6:取代原 faa.GetFile(targetObjectKey);visionA 端不再直接打 FAA) + // 5. 回傳 (io.ReadCloser, *DownloadMetadata, nil);caller(handler)負責 io.CopyN 到 client + Close // - // Phase 0.8 → 0.8b 差異: - // - Phase 0.8:visionA → MC 換 delegated token → 組 FAA URL → handler 回 302, - // browser 直連 FAA。 - // - Phase 0.8b:MC 認證鏈取消(ADR-015)→ 沒有 delegated token → visionA backend - // 用 API key 直接拉 FAA → 中轉 stream 給 browser(server-side proxy)。 + // filename 來源(對齊 conversion.md §4.1):DownloadMetadata.Filename **由 visionA backend + // 覆寫**為 `_.nef`(如 `yolov5s_kl720.nef`); + // 不沿用 converter response 的 Content-Disposition filename(converter 給的 filename 可能是 + // object_key 派生、對 user 不直觀;conversion.md §4.1 註明 visionA 為 source-of-truth)。 // - // 安全(見 conversion.md §10.4): + // 安全模型演進(見 conversion.md §10.6 + ADR-016 後果): + // - Phase 0.8:MC simple delegated token + 302 redirect → browser 直連 FAA(token 短暫流經 browser) + // - Phase 0.8b v0.4:visionA backend 中轉 stream from FAA(API key) + // - Phase 0.8b v0.5:同 v0.4 設計但 token 來源改 MC delegated(fictional、從未跑通) + // - **Phase 0.8b v0.6**:visionA backend 中轉 stream from **converter MinIO**;visionA 端 + // 完全不接觸 MC / FAA;認證鏈只剩單條 visionA → converter API key + // + // 安全(見 conversion.md §10.4 + §10.6): // - 沒有 token 結構性存在於任何 frontend response(API key 永遠在 server side) - // - object_key 不對 frontend 揭露(filename 取自 promote 結果,由 visionA 命名) - // - 不需 FAA CORS(visionA → FAA 是 server-side outbound HTTP call,不適用 CORS) + // - object_key 不對 frontend 揭露(filename 由 visionA 從 conversion job metadata 重新構造) + // - 不需 FAA CORS(visionA → converter,不需 FAA CORS;visionA 端不再接觸 FAA) + // + // 錯誤透傳(對齊 ADR-016 §1.3 / conversion.md §6): + // - converter 401 / 403 → ErrConverterAuthFailed(handler 透 ErrorCode mask 成 converter_unavailable / 502) + // - converter 404 → ErrJobNotFound(與 ownership 找不到共用文字 / 404) + // - converter 410 → ErrResultExpired(v0.6 新增;frontend 顯示「請重新轉檔」CTA / 410) + // - converter 5xx → ErrConverterUnavailable(converter / MinIO 暫時失常 / 502) // // Caller(handler)責任: // - 取得 stream 後**必須 defer stream.Close()**,否則 keep-alive connection 不會回 pool // - 設好 response header(Content-Type / Content-Disposition / Cache-Control / Content-Length), - // 用 io.Copy(w, stream) streaming 寫到 client + // 用 io.CopyN(w, stream, MaxDownloadStreamBytes) streaming 寫到 client(含 size cap) // - 中途錯誤無法再改 status(已 200 + part of body),由 ctx 控制 caller 端 cleanup DownloadStream(ctx context.Context, userID, jobID string) (stream io.ReadCloser, meta *DownloadMetadata, err error) diff --git a/visionA-backend/internal/conversion/converter_client.go b/visionA-backend/internal/conversion/converter_client.go index 294be56..746f709 100644 --- a/visionA-backend/internal/conversion/converter_client.go +++ b/visionA-backend/internal/conversion/converter_client.go @@ -32,6 +32,8 @@ import ( "fmt" "io" "log/slog" + "mime" + "net" "net/http" "net/url" "strings" @@ -78,6 +80,42 @@ type ConverterClient interface { // // 預期 0 或 1 筆(同 user 同時只能 1 active job),但回 slice 保留 future-proof。 ListInProgressJobs(ctx context.Context, userID string) ([]*ConverterJob, error) + + // GetResult 拉 converter `GET /api/v1/jobs/{id}/result` 的 NEF binary stream + // (Phase 0.8b v0.6 新增,ADR-016 §1)。 + // + // 回傳的 *DownloadMetadata.Filename 直接由 converter response 的 Content-Disposition + // 解出;caller(flow.go)通常會用自己的 `defaultDownloadFilename(cj)` 覆寫,給 user + // 比較友善的檔名(converter 給的 filename 可能是 object_key 派生、對 user 不直觀)。 + // + // 回傳的 io.ReadCloser 是 streaming response body;**caller 必須 Close**, + // 不然底層 http.Response.Body 不會釋放、connection 也回不了 pool。 + // + // 推薦 pattern: + // + // stream, meta, err := client.GetResult(ctx, jobID) + // if err != nil { return err } + // defer stream.Close() + // io.CopyN(dst, stream, MaxDownloadStreamBytes) // streaming + size cap + // + // 重試行為(Phase A retry only,對齊 conversion.md §9.1): + // - dial / TLS / response header 階段的 5xx / network / timeout: + // 線性退避重試 max 2 次(1s, 2s — `resultRetryBackoff` 採 base × attempt;目前 max + // retry = 2 下與指數退避結果一致,未來若擴 retry 次數需重新對齊 conversion.md §9.1) + // — GET 沒 request body 完全 idempotent + // - 401 / 403 / 404 / 409 / 410 / 其他 4xx:不重試,立即 return 對應 sentinel + // - ctx cancel / deadline:立即 return ctx.Err() + // - 一旦拿到 200 response(進 Phase B,body streaming):return *DownloadMetadata + // + stream,body 由 caller 自己讀;中斷不再 retry(streaming response 不可 replay) + // + // 錯誤映射(對齊 ADR-016 §1.3 + conversion.md §6): + // - 401 / 403 → ErrConverterAuthFailed(不 retry;對外 mask 成 converter_unavailable / 502) + // - 404 → ErrJobNotFound(不 retry;對外 not_found / 404) + // - 409 → ErrJobNotCompleted(不 retry;對外 job_not_completed / 409) + // - 410 → ErrResultExpired(不 retry;對外 result_expired / 410,新增) + // - 其他 4xx → ErrValidationFailed(不 retry;極罕見) + // - 5xx exhausted / network exhausted → ErrConverterUnavailable(對外 converter_unavailable / 502) + GetResult(ctx context.Context, jobID string) (io.ReadCloser, *DownloadMetadata, error) } // InitConverterJobReq 是 InitJob 的輸入;body 為 streaming(io.Reader 一次性消費)。 @@ -165,6 +203,20 @@ type ConverterClientOpts struct { // 與 HTTPClient 分開避免互相影響:GetJob 在 polling 場景頻繁呼叫,timeout 短才合理。 InitHTTPClient *http.Client + // StreamHTTPClient 為 optional;nil 用預設(無整體 Timeout,自訂 dial / response header + // timeout)— GetResult NEF binary stream 大檔下載專用。 + // + // 為什麼預設 client 不設 Timeout: + // 500MB NEF 在慢網路下 download 可能 5-10 分鐘;http.Client.Timeout 是「整體 timeout」 + // 涵蓋「dial + response header + body 讀完」三段,會在大檔下載中途斷線。 + // 改用 transport 層的 DialTimeout + ResponseHeaderTimeout(10s 各自)— 連線階段卡死才算 fail, + // body streaming 階段交給 ctx.Done() 控制(caller 用帶 deadline 的 ctx 即可)。 + // + // 設計同 faa_client.go newDefaultFAAHTTPClient(v0.6 砍 faa_client.go 後將 pattern 收進此處)。 + // + // Phase 0.8b v0.6 conversion (見 ADR-016 §1.2 / conversion.md §2.3) + StreamHTTPClient *http.Client + // Now 為 optional;nil 用 time.Now。測試會注入 fake clock。 Now func() time.Time @@ -185,10 +237,32 @@ const ( converterMaxRetriesGet = 2 // GetJob max 3 attempts (1 + 2 retries) converterMaxRetriesPromote = 2 // Promote max 3 attempts (1 + 2 retries) converterMaxRetriesList = 1 // List max 2 attempts (1 + 1 retry) + // Phase 0.8b v0.6:GetResult 對齊 §9.1 v0.6 新增 row「max 2 attempts、1s, 2s 退避」 + // (與 Promote 同等級;NEF 拉取頻率低、failure 多半是 converter MinIO 暫時故障可 retry) + converterMaxRetriesResult = 2 // 退避 base converterRetryBase = 500 * time.Millisecond + // resultRetryBaseDelay 是 GetResult 線性退避的 base(attempt=1→1s, attempt=2→2s)— 對齊 §9.1 v0.6。 + // `resultRetryBackoff` 採 base × attempt;目前 max retry = 2 下與指數退避結果一致, + // 未來若擴 retry 次數需重新對齊 conversion.md §9.1。 + // + // 與 doWithRetry 用的 converterRetryBase(500ms)不同:result 是大檔下載、Phase A retry + // 期間 server 可能還在 MinIO 暫時故障,延長 base 給 converter 多一點時間恢復。 + resultRetryBaseDelay = 1 * time.Second + + // resultDialTimeout 是 GetResult 連線階段的 timeout(TCP / TLS 握手)。 + resultDialTimeout = 10 * time.Second + // resultResponseHeaderTimeout 是「送完 request → 收到 response status 行」的 timeout。 + // 涵蓋 converter 內部 MinIO lookup + auth validate;10s 足以涵蓋常態。 + // **不涵蓋 body streaming 階段**(body 由 ctx 控制)。 + resultResponseHeaderTimeout = 10 * time.Second + + // resultErrorBodyReadCap 是失敗 response 從 body 讀進 io.Discard 的最大量(4KB)。 + // 失敗時讀少量 body 讓 keep-alive 能 reuse connection。 + resultErrorBodyReadCap = 4 * 1024 + // promote 預設 source(Phase 0.8 visionA 一律取 nef) promoteDefaultSource = "nef" ) @@ -207,12 +281,13 @@ var ErrConverterAPIKeyNotConfigured = errors.New("conversion/converter_client: A // // 套件內 unexported struct(caller 拿 interface),讓未來換實作不影響 caller。 type converterClient struct { - baseURL string - apiKey string // Phase 0.8b:pre-shared API key,建構時 fail-fast 不允許空字串 - http *http.Client - httpInit *http.Client - now func() time.Time - logger *slog.Logger + baseURL string + apiKey string // Phase 0.8b:pre-shared API key,建構時 fail-fast 不允許空字串 + http *http.Client + httpInit *http.Client + httpStream *http.Client // Phase 0.8b v0.6:GetResult NEF binary stream 專用(無整體 Timeout) + now func() time.Time + logger *slog.Logger } // NewConverterClient 建立一個 ConverterClient 實例。 @@ -222,9 +297,6 @@ type converterClient struct { // // **Fail-fast**:若 opts.APIKey 為空字串,此函式 panic。理由是 Phase 0.8b 不允許 server 在 // 「未認證」狀態下啟動 — 對齊 ADR-015 §3.5.3 部署檢查清單 #1。 -// -// `opts.Tokens` 是 Phase 0.8 廢棄欄位(見 ConverterClientOpts.Tokens 註解),即使非 nil 也不被 -// 內部使用;T5 切換 wire 點後從 struct 移除。 func NewConverterClient(opts ConverterClientOpts) ConverterClient { if opts.APIKey == "" { panic(ErrConverterAPIKeyNotConfigured) @@ -237,6 +309,10 @@ func NewConverterClient(opts ConverterClientOpts) ConverterClient { if httpInit == nil { httpInit = &http.Client{Timeout: converterInitHTTPTimeout} } + httpStream := opts.StreamHTTPClient + if httpStream == nil { + httpStream = newDefaultStreamHTTPClient() + } now := opts.Now if now == nil { now = time.Now @@ -246,12 +322,42 @@ func NewConverterClient(opts ConverterClientOpts) ConverterClient { logger = slog.Default() } return &converterClient{ - baseURL: strings.TrimRight(opts.BaseURL, "/"), - apiKey: opts.APIKey, - http: httpClient, - httpInit: httpInit, - now: now, - logger: logger, + baseURL: strings.TrimRight(opts.BaseURL, "/"), + apiKey: opts.APIKey, + http: httpClient, + httpInit: httpInit, + httpStream: httpStream, + now: now, + logger: logger, + } +} + +// newDefaultStreamHTTPClient 建一個適合 streaming download 的預設 http.Client。 +// +// 為什麼自訂 transport(同 v0.5 之前 faa_client.go 的 newDefaultFAAHTTPClient): +// - http.Client.Timeout 不適用大檔下載(會中斷 body streaming) +// - 需要分別控制 dial / response header timeout,body streaming 不限制(由 ctx 控) +// +// transport 其餘參數沿用 net/http DefaultTransport 的合理預設(MaxIdleConns 等)。 +// +// Phase 0.8b v0.6 conversion (見 ADR-016 §1.2 / conversion.md §2.3) +func newDefaultStreamHTTPClient() *http.Client { + transport := &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: resultDialTimeout, + KeepAlive: 30 * time.Second, + }).DialContext, + ResponseHeaderTimeout: resultResponseHeaderTimeout, + // 沿用 DefaultTransport 的合理預設 + MaxIdleConns: 100, + MaxIdleConnsPerHost: 10, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + } + return &http.Client{ + Transport: transport, + // **不設 Timeout** — body streaming 階段由 ctx 控制 } } @@ -867,6 +973,235 @@ func parseListJobs(body []byte) ([]*ConverterJob, error) { return out, nil } +// ========================================================================== +// GetResult — Phase 0.8b v0.6:streaming NEF pull from converter MinIO +// ========================================================================== + +// GetResult 實作 ConverterClient.GetResult(Phase 0.8b v0.6,ADR-016 §1)。 +// +// 流程: +// 1. 組 URL:`{baseURL}/api/v1/jobs/{jobID}/result` +// 2. doStreamWithRetry:max (1 + converterMaxRetriesResult) attempts;每 attempt 重新 c.httpStream.Do +// - 拿到 200:解 Content-Length / Content-Type / Content-Disposition filename,return stream + metadata +// - 拿到 4xx:close body 後依 status mapping 對應 sentinel,不 retry +// - 拿到 5xx:close body,等 backoff 後 retry +// - network / dial / responseHeader timeout:等 backoff 後 retry +// - ctx cancel / deadline:立即 return ctx.Err() +// +// 為什麼不沿用 c.doWithRetry(GetJob / Promote 用的那個): +// - 那個 helper 在 success 時 ReadAll body 進 []byte(不適合 stream) +// - 那個 helper 用 c.http(10s timeout,會中斷大檔下載) +// - 那個 helper 把 close body 的責任收進來(不能透傳給 caller) +// +// GetResult 是 streaming pull、結構與 v0.5 之前 faa_client.go 的 GetFile 完全對等 +// (只是 endpoint 換 converter、auth 換 c.apiKey);本檔內保留獨立 retry helper。 +func (c *converterClient) GetResult(ctx context.Context, jobID string) (io.ReadCloser, *DownloadMetadata, error) { + if jobID == "" { + return nil, nil, fmt.Errorf("conversion/converter_client: GetResult jobID is required") + } + + endpoint := c.baseURL + "/api/v1/jobs/" + url.PathEscape(jobID) + "/result" + + return c.doStreamWithRetry(ctx, jobID, endpoint) +} + +// doStreamWithRetry 是 GetResult 的 Phase A retry 執行器。 +// +// 結構與 v0.5 之前 faa_client.go doWithRetry 相同;對齊 ADR-016 §1.6(converter 端 +// handler 用 pipeline(minioStream, res) 直接 stream、visionA 端用 ResponseBody as-is 透傳)。 +func (c *converterClient) doStreamWithRetry( + ctx context.Context, + jobID, endpoint string, +) (io.ReadCloser, *DownloadMetadata, error) { + var lastErr error + for attempt := 0; attempt <= converterMaxRetriesResult; attempt++ { + // retry 前等待退避;ctx cancel 立即中斷 + if attempt > 0 { + select { + case <-ctx.Done(): + // ctx cancel/deadline → 立即 return(不 retry,不包成 sentinel) + return nil, nil, ctx.Err() + case <-time.After(resultRetryBackoff(attempt)): + } + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) + if err != nil { + // 建 request 失敗(極罕見:URL parse 異常)— 不可 retry + return nil, nil, fmt.Errorf("%w: build get_result request: %v", ErrConverterUnavailable, err) + } + req.Header.Set("Accept", "application/octet-stream") + // Phase 0.8b:直接帶 pre-shared API key(不查 cache、不打 MC) + req.Header.Set("Authorization", "Bearer "+c.apiKey) + + stream, meta, classifiedErr, retryable := c.doStreamOnce(req, jobID, attempt) + if classifiedErr == nil { + // 成功 — stream 含未 close 的 body,由 caller 接手 + return stream, meta, nil + } + lastErr = classifiedErr + if !retryable { + // 4xx / 401-403 / 404 / 409 / 410 / ctx cancel:直接 return,不再 retry + return nil, nil, classifiedErr + } + // retryable 5xx / network / timeout:繼續下一輪 + } + // 用完 retry 額度 + c.logger.Warn("conversion.converter.get_result_retry_exhausted", + slog.String("job_id", jobID), + slog.Int("attempts", converterMaxRetriesResult+1)) + return nil, nil, lastErr +} + +// doStreamOnce 執行一次 GetResult Phase A:發 request → 等 response header → 分類結果。 +// +// 回傳: +// - 成功(2xx):stream != nil(含未 close 的 streaming body), meta != nil, classifiedErr=nil, retryable=false +// - 失敗:stream=nil, meta=nil, classifiedErr 為 sentinel-wrapped error, retryable 表示是否該重試 +// +// 重要:成功時 caller(doStreamWithRetry)會直接把 stream 透傳出去 — 這層**不 close body**。 +// 失敗時這層**會 close body**(讀少量讓 keep-alive reuse connection)。 +func (c *converterClient) doStreamOnce( + req *http.Request, + jobID string, + attempt int, +) (stream io.ReadCloser, meta *DownloadMetadata, err error, retryable bool) { + startedAt := c.now() + res, doErr := c.httpStream.Do(req) + duration := c.now().Sub(startedAt) + if doErr != nil { + // network / dial / response header timeout / ctx cancel + if errors.Is(doErr, context.Canceled) || errors.Is(doErr, context.DeadlineExceeded) { + c.logger.Warn("conversion.converter.get_result_ctx_cancelled", + slog.String("job_id", jobID), + slog.Int("attempt", attempt+1), + slog.Duration("duration", duration)) + return nil, nil, doErr, false + } + c.logger.Warn("conversion.converter.get_result_network_error", + slog.String("job_id", jobID), + slog.Int("attempt", attempt+1), + slog.Duration("duration", duration), + // err.Error() 不會含 secret(http.Client 錯誤訊息只有 URL + 連線層 errno), + // 但仍 truncate 防 log 爆量 + slog.String("err", truncate(doErr.Error(), 200))) + return nil, nil, fmt.Errorf("%w: get_result network error: %v", ErrConverterUnavailable, doErr), true + } + + // 成功(2xx):解 headers,把 res.Body 透傳給 caller streaming 消費 — **不在這裡 close**! + // 注意:成功路徑沒 defer res.Body.Close() — body 的所有權交給 caller。 + if res.StatusCode >= 200 && res.StatusCode < 300 { + contentType := res.Header.Get("Content-Type") + if contentType == "" { + // converter 未設 → 給安全預設(octet-stream 必觸發 browser download dialog) + contentType = "application/octet-stream" + } + filename := parseFilenameFromContentDisposition(res.Header.Get("Content-Disposition")) + md := &DownloadMetadata{ + Filename: filename, + ContentType: contentType, + ContentLength: res.ContentLength, + } + c.logger.Info("conversion.converter.get_result_success", + slog.String("job_id", jobID), + slog.Int("status", res.StatusCode), + slog.Int("attempt", attempt+1), + slog.Int64("content_length", res.ContentLength), + slog.String("content_type", contentType), + slog.String("filename", filename), + slog.Duration("duration", duration)) + return res.Body, md, nil, false + } + + // 失敗(非 2xx):讀少量 body 做 log(避免 5xx 帶大 body 爆 log),然後 close + // 讀進 io.Discard 而不是真的存下來: + // - 不寫進 log(converter 錯誤 body 可能含 requestId / 路徑等內部資訊) + // - 只是讓 keep-alive 能 reuse connection(read-to-EOF or close) + defer res.Body.Close() + _, _ = io.CopyN(io.Discard, res.Body, resultErrorBodyReadCap) + + c.logger.Warn("conversion.converter.get_result_endpoint_error", + slog.String("job_id", jobID), + slog.Int("status", res.StatusCode), + slog.Int("attempt", attempt+1), + slog.Duration("duration", duration)) + + mappedErr, isRetryable := c.mapGetResultError(res.StatusCode) + return nil, nil, mappedErr, isRetryable +} + +// mapGetResultError 把 converter `GET /api/v1/jobs/{id}/result` 的非 2xx 對應到 sentinel +// + 是否 retryable。對齊 ADR-016 §1.3 + conversion.md §6。 +// +// Phase 0.8b v0.6 新增 mapping: +// - 401 / 403 → ErrConverterAuthFailed(不 retry — API key 不對齊;運維事件;對外 mask 成 converter_unavailable) +// - 404 → ErrJobNotFound(不 retry — job_id 不存在 / 已被 GC;對外 not_found) +// - 409 → ErrJobNotCompleted(不 retry — job 尚未 completed;理論上 visionA flow.go ensurePromoted +// 階段已先確認 completed 才打 GetResult,不應發生;保留 mapping 防 converter 端 race) +// - 410 → ErrResultExpired(**新**,不 retry — job completed 但 NEF 已過 7d expires_at; +// 對外 result_expired / 410,給 frontend 顯示「請重新轉檔」CTA) +// - 其他 4xx → ErrValidationFailed(不 retry) +// - 5xx → ErrConverterUnavailable(**可 retry**:converter / MinIO 暫時失常) +// +// **設計取捨:404 共用既有 ErrJobNotFound 而非新增 ErrResultNotFound** +// (對齊 T1 Reviewer Minor #M-1): +// - conversion.md §6 對 converter 404 `result_not_found` 規定「i18n 與 `not_found` +// 共用文字(轉檔任務不存在)」— 對外語意層面與 ownership 找不到的 404 同樣處理; +// - 新增 sentinel 只會讓 errors.go 多出一個與 ErrJobNotFound 行為完全相同的 code path、 +// handler / i18n / metric 全要再加 case,over-engineering; +// - 若 Phase 1+ 想對「converter 404 vs ownership 404」做分流(例如不同的 retry 策略 / +// 不同的 SRE alarm),再評估時引入新 sentinel;目前統一 mapping。 +func (c *converterClient) mapGetResultError(status int) (err error, retryable bool) { + switch { + case status == http.StatusUnauthorized || status == http.StatusForbidden: + return fmt.Errorf("%w: get_result %d", ErrConverterAuthFailed, status), false + case status == http.StatusNotFound: + return fmt.Errorf("%w: get_result %d", ErrJobNotFound, status), false + case status == http.StatusConflict: + return fmt.Errorf("%w: get_result %d", ErrJobNotCompleted, status), false + case status == http.StatusGone: + // Phase 0.8b v0.6 新增:converter MinIO 內 NEF 已被 GC(7d expires_at 過期) + return fmt.Errorf("%w: get_result %d", ErrResultExpired, status), false + case status >= 400 && status < 500: + // 其他 4xx:不可 retry + return fmt.Errorf("%w: get_result %d", ErrValidationFailed, status), false + default: + // 5xx:可 retry + return fmt.Errorf("%w: get_result %d", ErrConverterUnavailable, status), true + } +} + +// resultRetryBackoff 回傳第 n 次 retry(n 從 1 開始)的等待時間。 +// 1 → 1s, 2 → 2s(對齊 conversion.md §9.1 v0.6 新增 row) +// +// 不加 jitter — Phase 0.8b download 路徑由 user 主動觸發頻率不高,並發競爭機率低; +// jitter 邊際效益低。同 v0.5 之前 faa_client.go faaRetryBackoff 設計。 +func resultRetryBackoff(attempt int) time.Duration { + if attempt < 1 { + return resultRetryBaseDelay + } + return resultRetryBaseDelay * time.Duration(attempt) +} + +// parseFilenameFromContentDisposition 從 Content-Disposition header 解 filename 屬性。 +// +// 範例輸入:`attachment; filename="yolov5s_kl720.nef"` +// +// 解析失敗或缺 filename 屬性 → 回空字串(caller 端通常用 defaultDownloadFilename(cj) +// 覆寫;converter 給的 filename 只是 fallback / 觀測用)。 +// +// 用 mime.ParseMediaType 處理 quoted-string、escape、param 順序等細節,比手撕 split 穩。 +func parseFilenameFromContentDisposition(cd string) string { + if cd == "" { + return "" + } + _, params, err := mime.ParseMediaType(cd) + if err != nil { + return "" + } + return params["filename"] +} + // parseConverterPromoteResult 解 POST /api/v1/jobs/{id}/promote 的 response。 // // 對齊 openapi.yaml `PromoteResponse`:取 promoted[0](Phase 0.8 一次只 promote 1 target)。 diff --git a/visionA-backend/internal/conversion/converter_client_test.go b/visionA-backend/internal/conversion/converter_client_test.go index 1d337a9..8c8fd15 100644 --- a/visionA-backend/internal/conversion/converter_client_test.go +++ b/visionA-backend/internal/conversion/converter_client_test.go @@ -927,6 +927,376 @@ func TestListInProgressJobs_AuthFailed401_NoRetry(t *testing.T) { assert.Equal(t, int32(1), attempts.Load(), "401 不應 retry") } +// ========================================================================== +// GetResult tests(Phase 0.8b v0.6,ADR-016 §1) +// ========================================================================== + +// TestGetResult_Success:mock 接受 GET /api/v1/jobs/{id}/result,回 200 + binary body +// + Content-Length / Content-Type / Content-Disposition。 +// +// Phase 0.8b v0.6:驗 visionA → converter 端送 `Authorization: Bearer ` + +// metadata 正確解出 + body 透傳成功(不被 ReadAll 進 RAM)。 +func TestGetResult_Success(t *testing.T) { + t.Parallel() + + const wantBody = "FAKE_NEF_BINARY_CONTENT_FOR_TEST" + var serverAuth string + var serverPath string + + mux := http.NewServeMux() + mux.HandleFunc("/api/v1/jobs/job-xyz/result", func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, http.MethodGet, r.Method) + serverAuth = r.Header.Get("Authorization") + serverPath = r.URL.Path + + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Content-Length", "32") + w.Header().Set("Content-Disposition", `attachment; filename="yolov5s_kl720.nef"`) + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(wantBody)) + }) + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + cc := newConverterClientForTest(t, srv.URL) + stream, meta, err := cc.GetResult(context.Background(), "job-xyz") + require.NoError(t, err) + require.NotNil(t, stream) + require.NotNil(t, meta) + defer stream.Close() + + assert.Equal(t, "Bearer "+fakeConverterAPIKey, serverAuth, + "Phase 0.8b:server 端應收到 pre-shared API key(visionA → converter API key 路徑)") + assert.Equal(t, "/api/v1/jobs/job-xyz/result", serverPath) + assert.Equal(t, int64(32), meta.ContentLength) + assert.Equal(t, "application/octet-stream", meta.ContentType) + assert.Equal(t, "yolov5s_kl720.nef", meta.Filename) + + body, err := io.ReadAll(stream) + require.NoError(t, err) + assert.Equal(t, wantBody, string(body)) +} + +// TestGetResult_StreamingBody_NotBuffered:用 8MB body,驗 client 拿到 stream +// 而不是 ReadAll 進 RAM。 +// +// 策略:用 countingReader 包 8MB 假 body,count 一次回應裡 reader 被 Read 的次數; +// 然後在 caller 端只讀前 1KB 就 close,驗 client 端對 body 是 lazy read(剩下的不會被讀完)。 +func TestGetResult_StreamingBody_NotBuffered(t *testing.T) { + t.Parallel() + + const totalBytes = 8 * 1024 * 1024 // 8MB + var serverReadCalls int64 + + mux := http.NewServeMux() + mux.HandleFunc("/api/v1/jobs/job-big/result", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Content-Length", "8388608") + w.WriteHeader(http.StatusOK) + + // 用 countingReader 包來源(驗 server 端 io.Copy 是 streaming) + src := &countingReader{R: io.LimitReader(zerosReader{}, totalBytes)} + _, _ = io.Copy(w, src) + atomic.AddInt64(&serverReadCalls, src.calls) + }) + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + cc := newConverterClientForTest(t, srv.URL) + stream, meta, err := cc.GetResult(context.Background(), "job-big") + require.NoError(t, err) + require.NotNil(t, stream) + require.NotNil(t, meta) + + // caller 只讀前 1KB 就 close — 驗 client 確實是 streaming + // (非 ReadAll,否則 client 收到 8MB 完整 body 後才回給 caller) + buf := make([]byte, 1024) + n, _ := io.ReadFull(stream, buf) + assert.Equal(t, 1024, n, "caller 應能在 client 還在 streaming 時就拿到部分 byte") + require.NoError(t, stream.Close()) + + assert.Equal(t, int64(8*1024*1024), meta.ContentLength, + "meta.ContentLength 應正確反映 server 端 Content-Length(不是 -1)") +} + +// TestGetResult_AuthFailed401:mock 回 401 → ErrConverterAuthFailed,不 retry, +// 且對外 mask 為 converter_unavailable / 502。 +// +// 對齊 T1 Reviewer Minor #M-3:補 mask 行為驗證(ErrorCode / HTTPStatus); +// 結構與 TestInitJob_AuthFailed401 對稱。 +func TestGetResult_AuthFailed401(t *testing.T) { + t.Parallel() + + var attempts atomic.Int32 + mux := http.NewServeMux() + mux.HandleFunc("/api/v1/jobs/job-401/result", func(w http.ResponseWriter, r *http.Request) { + attempts.Add(1) + w.WriteHeader(http.StatusUnauthorized) + _, _ = w.Write([]byte(`{"error":"unauthorized"}`)) + }) + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + cc := newConverterClientForTest(t, srv.URL) + stream, meta, err := cc.GetResult(context.Background(), "job-401") + + require.Error(t, err) + assert.Nil(t, stream) + assert.Nil(t, meta) + assert.True(t, errors.Is(err, ErrConverterAuthFailed), + "401 必須 mapping 到 ErrConverterAuthFailed(API key 不對齊運維事件;對外 mask 成 converter_unavailable)") + assert.Equal(t, int32(1), attempts.Load(), "401 不應 retry") + // 對外 ErrorCode mask 成 converter_unavailable(不洩漏「API key 不對」內部運維狀態) + assert.Equal(t, "converter_unavailable", ErrorCode(err), + "ErrorCode 應 mask 成 converter_unavailable(與 TestInitJob_AuthFailed401 對稱)") + assert.Equal(t, 502, HTTPStatus(err), + "HTTPStatus 應為 502 — auth_failed 與『服務不可達』對外同層") +} + +// TestGetResult_AuthFailed403:對稱 — mock 回 403 → 同樣 ErrConverterAuthFailed、不 retry, +// 對外仍 mask 為 converter_unavailable / 502。 +// +// mapGetResultError 將 401 / 403 mapping 到同一個 sentinel(API key 不對齊運維事件), +// 此 test 對稱補上 403 case,比照 TestInitJob_AuthFailed403 結構。 +func TestGetResult_AuthFailed403(t *testing.T) { + t.Parallel() + + var attempts atomic.Int32 + mux := http.NewServeMux() + mux.HandleFunc("/api/v1/jobs/job-403/result", func(w http.ResponseWriter, r *http.Request) { + attempts.Add(1) + w.WriteHeader(http.StatusForbidden) + _, _ = w.Write([]byte(`{"error":"unauthorized"}`)) + }) + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + cc := newConverterClientForTest(t, srv.URL) + stream, meta, err := cc.GetResult(context.Background(), "job-403") + + require.Error(t, err) + assert.Nil(t, stream) + assert.Nil(t, meta) + assert.True(t, errors.Is(err, ErrConverterAuthFailed), + "403 必須與 401 共用 ErrConverterAuthFailed sentinel") + assert.Equal(t, int32(1), attempts.Load(), "403 不應 retry") + assert.Equal(t, "converter_unavailable", ErrorCode(err), + "ErrorCode 對 403 也 mask 成 converter_unavailable") + assert.Equal(t, 502, HTTPStatus(err)) +} + +// TestGetResult_NotFound404:mock 回 404 → ErrJobNotFound,不 retry。 +// +// 對應 ADR-016 §1.3:job_id 不存在 / 已被 GC(converter 端 7d expires_at)。 +// 對外 visionA 端用既有 ErrJobNotFound(i18n key `conversion.error.not_found`, +// 與 ownership 找不到共用文字「任務不存在」)。 +func TestGetResult_NotFound404(t *testing.T) { + t.Parallel() + + var attempts atomic.Int32 + mux := http.NewServeMux() + mux.HandleFunc("/api/v1/jobs/job-404/result", func(w http.ResponseWriter, r *http.Request) { + attempts.Add(1) + w.WriteHeader(http.StatusNotFound) + _, _ = w.Write([]byte(`{"error":"job_not_found"}`)) + }) + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + cc := newConverterClientForTest(t, srv.URL) + _, _, err := cc.GetResult(context.Background(), "job-404") + + require.Error(t, err) + assert.True(t, errors.Is(err, ErrJobNotFound), + "404 必須 mapping 到 ErrJobNotFound(job_id 不存在 / 已被 GC)") + assert.Equal(t, int32(1), attempts.Load(), "404 不應 retry") +} + +// TestGetResult_NotCompleted409:mock 回 409 → ErrJobNotCompleted,不 retry。 +// +// 對應 ADR-016 §1.3:job 尚未 completed(理論上 visionA flow.go ensurePromoted 已先 +// 確認 completed 才打 GetResult,不應發生;保留 mapping 防 converter 端 race)。 +func TestGetResult_NotCompleted409(t *testing.T) { + t.Parallel() + + var attempts atomic.Int32 + mux := http.NewServeMux() + mux.HandleFunc("/api/v1/jobs/job-409/result", func(w http.ResponseWriter, r *http.Request) { + attempts.Add(1) + w.WriteHeader(http.StatusConflict) + _, _ = w.Write([]byte(`{"error":"job_not_completed","status":"running"}`)) + }) + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + cc := newConverterClientForTest(t, srv.URL) + _, _, err := cc.GetResult(context.Background(), "job-409") + + require.Error(t, err) + assert.True(t, errors.Is(err, ErrJobNotCompleted), + "409 必須 mapping 到 ErrJobNotCompleted(job 尚未 completed)") + assert.Equal(t, int32(1), attempts.Load(), "409 不應 retry") +} + +// TestGetResult_ResultExpired410:mock 回 410 → ErrResultExpired(v0.6 新增 sentinel),不 retry。 +// +// 對應 ADR-016 §1.3:job completed 但 converter MinIO 內 NEF 已過 7d expires_at 被 GC。 +// 對外 HTTP 410 / code `result_expired`,frontend 顯示「轉檔結果已過期,請重新轉檔」CTA。 +func TestGetResult_ResultExpired410(t *testing.T) { + t.Parallel() + + var attempts atomic.Int32 + mux := http.NewServeMux() + mux.HandleFunc("/api/v1/jobs/job-410/result", func(w http.ResponseWriter, r *http.Request) { + attempts.Add(1) + w.WriteHeader(http.StatusGone) + _, _ = w.Write([]byte(`{"error":"result_expired"}`)) + }) + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + cc := newConverterClientForTest(t, srv.URL) + _, _, err := cc.GetResult(context.Background(), "job-410") + + require.Error(t, err) + assert.True(t, errors.Is(err, ErrResultExpired), + "410 必須 mapping 到 ErrResultExpired(NEF 已被 converter MinIO GC)") + assert.Equal(t, "result_expired", ErrorCode(err), + "ErrorCode 應回 result_expired(與 converter_unavailable 區分,給 frontend 顯示精確過期訊息)") + assert.Equal(t, 410, HTTPStatus(err), + "HTTPStatus 應回 410 Gone(語意:曾經有、現在永遠沒了)") + assert.Equal(t, int32(1), attempts.Load(), "410 不應 retry(過期不會自己變回來)") +} + +// TestGetResult_5xx_RetryThenSuccess:mock 前 1 次回 503、第 2 次 200。 +// 驗 Phase A retry 機制:5xx 可 retry。 +func TestGetResult_5xx_RetryThenSuccess(t *testing.T) { + t.Parallel() + + var attempts atomic.Int32 + const wantBody = "RECOVERED_NEF" + + mux := http.NewServeMux() + mux.HandleFunc("/api/v1/jobs/job-503-then-200/result", func(w http.ResponseWriter, r *http.Request) { + n := attempts.Add(1) + if n == 1 { + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte(`{"error":"storage_unavailable"}`)) + return + } + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Content-Length", "13") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(wantBody)) + }) + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + // 用較短 retry base 加速 test(注入 fake stream client 不適用,因為 transport + // 行為與 retry base 不同 — 改用 short-circuit ctx 或直接接受 1s 等候)。 + // resultRetryBackoff(1) = 1s;test 接受。 + cc := newConverterClientForTest(t, srv.URL) + stream, meta, err := cc.GetResult(context.Background(), "job-503-then-200") + require.NoError(t, err) + require.NotNil(t, stream) + defer stream.Close() + + body, err := io.ReadAll(stream) + require.NoError(t, err) + assert.Equal(t, wantBody, string(body)) + assert.Equal(t, int64(13), meta.ContentLength) + assert.Equal(t, int32(2), attempts.Load(), "5xx 應 retry 1 次後成功(共 2 attempts)") +} + +// TestGetResult_5xx_Exhausted:mock 一直 503 → 用完 retry 額度後回 ErrConverterUnavailable。 +func TestGetResult_5xx_Exhausted(t *testing.T) { + t.Parallel() + + var attempts atomic.Int32 + mux := http.NewServeMux() + mux.HandleFunc("/api/v1/jobs/job-always-503/result", func(w http.ResponseWriter, r *http.Request) { + attempts.Add(1) + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte(`{"error":"storage_unavailable"}`)) + }) + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + cc := newConverterClientForTest(t, srv.URL) + _, _, err := cc.GetResult(context.Background(), "job-always-503") + require.Error(t, err) + assert.True(t, errors.Is(err, ErrConverterUnavailable), + "5xx exhausted 必須 mapping 到 ErrConverterUnavailable") + assert.Equal(t, int32(converterMaxRetriesResult+1), attempts.Load(), + "應該打滿 max retry 額度(1 + converterMaxRetriesResult = 3 attempts)") +} + +// TestGetResult_ContextCancel:開 ctx 然後 cancel → 收 ctx.Err(不 retry)。 +// +// 場景:server 一開始就 503,但在第 1 次 retry 退避(1s)期間 caller cancel ctx +// → 應該立即 return ctx.Err(),不再進下一輪 attempt。 +func TestGetResult_ContextCancel(t *testing.T) { + t.Parallel() + + var attempts atomic.Int32 + mux := http.NewServeMux() + mux.HandleFunc("/api/v1/jobs/job-cancel/result", func(w http.ResponseWriter, r *http.Request) { + attempts.Add(1) + w.WriteHeader(http.StatusServiceUnavailable) + }) + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + cc := newConverterClientForTest(t, srv.URL) + ctx, cancel := context.WithCancel(context.Background()) + // 100ms 後 cancel — 在第 1 次 503 後、第 1 次 retry 退避(1s)期間觸發 + time.AfterFunc(100*time.Millisecond, cancel) + + _, _, err := cc.GetResult(ctx, "job-cancel") + require.Error(t, err) + assert.True(t, errors.Is(err, context.Canceled), + "ctx cancel 應立即透傳,不繼續 retry") + // attempts 可能是 1(第 1 次 503 後在 backoff sleep 收到 cancel) + assert.LessOrEqual(t, attempts.Load(), int32(2), + "ctx cancel 後不應繼續 retry 到打滿額度") +} + +// TestGetResult_EmptyJobID:jobID 空字串 → 立即 return error,不打網路。 +func TestGetResult_EmptyJobID(t *testing.T) { + t.Parallel() + + cc := newConverterClientForTest(t, "http://this-should-never-be-called") + _, _, err := cc.GetResult(context.Background(), "") + require.Error(t, err) + assert.Contains(t, err.Error(), "jobID is required") +} + +// TestParseFilenameFromContentDisposition:cover parser 的 happy / empty / malformed case。 +func TestParseFilenameFromContentDisposition(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + cd string + want string + }{ + {"happy_path", `attachment; filename="yolov5s_kl720.nef"`, "yolov5s_kl720.nef"}, + {"no_quotes", `attachment; filename=yolov5s_kl720.nef`, "yolov5s_kl720.nef"}, + {"empty_header", ``, ""}, + {"malformed_no_attachment", `;;;`, ""}, + {"missing_filename_param", `attachment`, ""}, + {"inline_disposition", `inline; filename="foo.bin"`, "foo.bin"}, + } + for _, tc := range tests { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + got := parseFilenameFromContentDisposition(tc.cd) + assert.Equal(t, tc.want, got) + }) + } +} + // ========================================================================== // 共用:interface 契約 + helpers // ========================================================================== diff --git a/visionA-backend/internal/conversion/errors.go b/visionA-backend/internal/conversion/errors.go index e4950c1..69d10b9 100644 --- a/visionA-backend/internal/conversion/errors.go +++ b/visionA-backend/internal/conversion/errors.go @@ -60,6 +60,24 @@ var ( // 對應 HTTP 503 / code "service_busy"。 ErrServiceBusy = errors.New("conversion: service busy") + // ErrResultExpired — converter `GET /api/v1/jobs/{id}/result` 回 410(v0.6 新增)。 + // + // 觸發情境(ADR-016 §1.3 / conversion.md §6): + // - job 已 completed,但 converter MinIO 內 NEF 已被 GC(超過 7 天 expires_at) + // - user 隔了一週以上才回來按「下載」/「加到模型庫」 + // + // 對外 HTTP 410 / code "result_expired"。frontend 顯示「轉檔結果已過期,請重新轉檔」 + // 並提供重新轉檔 CTA(與 502 converter_unavailable 文字明確區分,給 user 行動指引)。 + // + // 設計選擇(與 ErrJobNotFound 區分): + // - ErrJobNotFound(404):「任務本身不存在」(job_id 從未建立 / converter 端 job record 被 GC) + // - ErrResultExpired(410):「任務存在過、有完成過,但結果檔被 GC」(converter 端 job + // record 仍可查 status,但 MinIO 物件已被清)— user 角度語意不同(前者是 url + // 錯了,後者是過期了),HTTP 410 也明確語意「曾經有、現在永遠沒了」(vs 404「不存在」) + // + // Phase 0.8b v0.6 conversion (見 ADR-016 §1.3 / conversion.md §6 / api-conversion.md §4) + ErrResultExpired = errors.New("conversion: converter result expired") + // Phase 0.8b T3 移除(5 個僅 mc_token_client 用的 sentinel,MC 路徑取消後不再有觸發點): // - ErrDownloadTokenFailed — MC delegated token 4xx // - ErrMCTokenUnavailable — MC 5xx / network 持續失敗 @@ -233,6 +251,11 @@ func ErrorCode(err error) string { return "faa_unavailable" case errors.Is(err, ErrServiceBusy): return "service_busy" + case errors.Is(err, ErrResultExpired): + // Phase 0.8b v0.6:converter `GET /result` 410;對應 api-conversion.md §錯誤碼總覽 + // 新增 `result_expired` code(與 `converter_unavailable` 區分,給 frontend 顯示 + // 「轉檔結果已過期,請重新轉檔」CTA)。 + return "result_expired" case errors.Is(err, ErrConverterAuthFailed): // Phase 0.8b:對外刻意 mask 成 converter_unavailable(不揭露「API key 不對」內部狀態); // caller 想做精細處理用 errors.Is(err, ErrConverterAuthFailed) 直接判斷(log / metric)。 @@ -277,6 +300,10 @@ func HTTPStatus(err error) int { return 500 case errors.Is(err, ErrServiceBusy): return 503 + case errors.Is(err, ErrResultExpired): + // Phase 0.8b v0.6:HTTP 410 Gone — converter result endpoint 過期 + // (ADR-016 §1.3)。語意比 404 更明確:「曾經有、現在永遠沒了、不要再 retry」。 + return 410 default: return 500 } diff --git a/visionA-backend/internal/conversion/flow.go b/visionA-backend/internal/conversion/flow.go index 1b8b2e7..85a3b16 100644 --- a/visionA-backend/internal/conversion/flow.go +++ b/visionA-backend/internal/conversion/flow.go @@ -107,9 +107,26 @@ type Storage interface { // - 移除 tenantID:MC delegated download token 機制取消,不再需要 tenant 概念 // - 移除 faaBaseURL:DownloadStream 走 faa.GetFile(FAAClient 內含 baseURL),不再自組 FAA URL // - 移除 delegatedTTLSeconds:delegated download token 取消 +// +// Phase 0.8b v0.6 變更(ADR-016 / conversion.md §2.5 / §4.1): +// - DownloadStream / PromoteToModels 改走 `converter.GetResult` 從 converter MinIO 拉 NEF stream; +// visionA 端**不再直接呼叫 FAA**(撤回 v0.5 設計缺口) +// - faa 欄位仍保留(T3 才砍整檔 faa_client.go),但本 struct 的 method 內部不再使用 f.faa; +// `FlowOpts.FAA` 仍是必填以維持 wire 點向後相容(T3 / T5 再清) +// +// **T3 預期清單**(給接手的 backend agent;reviewer s-1 補): +// (a) 刪 internal/conversion/faa_client.go 整檔 +// (b) 砍 FAAClient interface(type assertion / mock 都連帶刪) +// (c) 砍 FlowOpts.FAA 欄位 + NewService 對 FAA 的必填校驗 +// (d) 砍 flow.faa 欄位本身 +// (e) 砍 cmd/api-server/main.go wire 點對 FAAClient 的注入 +// (f) 砍 cmd/api-server/conversion_e2e_test.go mockFAA + setupConversionFixture 對 FAA 的 wire; +// 保留 e2e negative assertion 路徑、改驗「wire 點不存在 FAA dependency」(編譯期靜態斷言) +// (g) 砍 internal/conversion/flow_test.go flowStubFAA + flowFixture.faa 欄位(同步 T3) +// (h) 跑 grep 確認沒有殘留 `f\.faa\.` / `FAAClient` reference(含 godoc / 註解) type flow struct { converter ConverterClient - faa FAAClient + faa FAAClient // v0.6:保留欄位以維持 NewService 向後相容;method 內不再使用(T3 砍) ownership Ownership modelStore ModelStore @@ -522,12 +539,14 @@ func (f *flow) ActiveJob(ctx context.Context, userID string) (*Job, error) { // PromoteToModels 對齊 conversion.md §1 Stage 3a + §2.5 + api-conversion.md §3。 // -// 流程: +// 流程(Phase 0.8b v0.6,對齊 ADR-016 / conversion.md §2.5 PromoteToModels 流程): // 1. ownership 驗(不符 → ErrJobNotFound) // 2. converter.GetJob — 確認 status=completed(否則 ErrJobNotCompleted) // 3. 冪等檢查:modelStore.FindBySourceJobID — 已有 model 直接回(避免重複 promote) -// 4. converter.Promote — 拿到 target_object_key -// 5. faa.GetFile(target_object_key) — streaming pull NEF +// 4. converter.Promote — 觸發 NEF 推到 FAA 並保留在 converter MinIO,拿到 target_object_key +// / size / checksum +// 5. converter.GetResult(jobID) — streaming pull NEF binary from converter MinIO +// (v0.6 取代原 faa.GetFile(targetObjectKey) — visionA 端不再直接打 FAA) // 6. storage.Put — streaming 寫進 visionA storage(不 ReadAll) // 7. modelStore.Save — 建 model record(Source="converted"、SourceJobID=jobID) // 8. return PromoteResult @@ -596,12 +615,24 @@ func (f *flow) PromoteToModels(ctx context.Context, userID, jobID, name string) return nil, err } - // 5. faa.GetFile streaming pull - file, err := f.faa.GetFile(ctx, promoteRes.TargetObjectKey) + // 5. converter.GetResult — streaming pull NEF binary from converter MinIO + // + // v0.6 變更(ADR-016 §1):原 faa.GetFile(targetObjectKey) 替換成 converter.GetResult(jobID); + // converter promote 已把 NEF 同步保留在 converter MinIO,GetResult 從 MinIO get object + // 後 stream 回來。visionA 端不再有對 FAA 的直接呼叫。 + // + // converter.GetResult 的 stream 與 size: + // - stream:caller 必須 Close(同 v0.5 之前 FAA stream 的責任,未變) + // - meta:含 ContentLength(從 converter response Content-Length 解出)與 Filename + // (從 Content-Disposition 解出);本 method 對 filename 不使用(model record 用 + // `finalName`,PromoteResult 對 user 不揭露 NEF 檔名);對 ContentLength 由 Storage.Put + // 用 promoteRes.Size(converter promote 已確定的權威值)— 兩者通常一致、但 promoteRes + // 是 source-of-truth(meta.ContentLength 走 HTTP header、若 chunked transfer 為 -1) + stream, _, err := f.converter.GetResult(ctx, jobID) if err != nil { return nil, err } - defer file.Body.Close() + defer stream.Close() // 6. storage.Put streaming write modelID := f.modelStore.GenerateID() @@ -611,7 +642,7 @@ func (f *flow) PromoteToModels(ctx context.Context, userID, jobID, name string) "source_job_id": jobID, "target_chip": normalizeTargetChip(cj.Platform), } - if err := f.storage.Put(ctx, storageKey, file.Body, file.ContentLength, storageMeta); err != nil { + if err := f.storage.Put(ctx, storageKey, stream, promoteRes.Size, storageMeta); err != nil { f.logger.WarnContext(ctx, "conversion.flow.promote_storage_put_failed", slog.String("user_hash", hashUserID(userID)), slog.String("job_id", jobID), @@ -666,27 +697,40 @@ func (f *flow) PromoteToModels(ctx context.Context, userID, jobID, name string) // DownloadStream — 對應 GET /api/conversion/{job_id}/download(Phase 0.8b server-side proxy) // ========================================================================== -// DownloadStream 對齊 conversion.md §1 Stage 3b + §4.1 + api-conversion.md §4 + ADR-015 §7。 +// DownloadStream 對齊 conversion.md §1 Stage 3b + §4.1 + api-conversion.md §4 + ADR-016。 // -// 流程: +// 流程(Phase 0.8b v0.6,ADR-016 §1 + conversion.md §2.5 / §4.1): // 1. ownership 驗(不符 → ErrJobNotFound,§7.2 防枚舉) // 2. converter.GetJob — 確認 status=completed(否則 ErrJobNotCompleted) -// 3. ensurePromoted — 自動觸發 promote(若還沒 promote 過),拿到 target_object_key +// 3. ensurePromoted — 自動觸發 promote 確保 converter MinIO 內有 NEF // - 設計選擇(沿用 Phase 0.8):自動觸發。理由:api-conversion.md §4 註解說 // 「兩條路徑(promote-to-models / download)都拿同一個 target_object_key」+ // 「不會與 promote-to-models 衝突;兩者內部都會 ensurePromoted(冪等)」— // 要求 user 先按 promote-to-models 才能下載會違背「下載」按鈕的直覺語意。 -// 4. faa.GetFile — 用 pre-shared API key 直接拉 NEF stream(不再經 MC delegated token) +// - v0.6 同時保留此步驟的另一個理由:converter `GET /api/v1/jobs/{id}/result` 從 +// converter MinIO get object;promote 是把 NEF 同步保留在 MinIO + 推到 FAA 的步驟, +// 兩者順序固定(promote 先、GetResult 後)。 +// 4. converter.GetResult(jobID) — 從 converter MinIO streaming pull NEF binary +// (v0.6 取代原 faa.GetFile(targetObjectKey) — visionA 端不再直接打 FAA) // 5. 回傳 (io.ReadCloser, *DownloadMetadata, nil);caller(handler)負責 io.Copy 到 client + Close // -// Phase 0.8b vs Phase 0.8 安全模型差異(見 conversion.md §10.4 + ADR-015 §10.4): +// 安全模型演進(見 conversion.md §10.6 + ADR-016 後果): // - Phase 0.8:MC simple delegated token + 302 redirect → browser 直連 FAA(token 短暫流經 browser) -// - Phase 0.8b:visionA backend 中轉 stream → 沒有 token 結構性存在於任何 frontend response +// - Phase 0.8b v0.4:visionA backend 中轉 stream from FAA(API key) +// - Phase 0.8b v0.5:同 v0.4 設計但 token 來源改 MC delegated(fictional、從未跑通) +// - **Phase 0.8b v0.6**:visionA backend 中轉 stream from **converter MinIO**;visionA 端 +// 完全不接觸 MC / FAA;認證鏈只剩單條 visionA → converter API key // // Caller(handler)責任(避免 fd / goroutine leak): // - **必須 defer stream.Close()** // - 設好 response header(Content-Type / Content-Length / Content-Disposition / Cache-Control: no-store) // - 用 io.Copy(w, stream) streaming 寫;不要 ReadAll 進 RAM(單檔 NEF 可達 50MB+) +// +// 錯誤透傳(對齊 ADR-016 §1.3 / conversion.md §6): +// - converter 401/403 → ErrConverterAuthFailed(handler 透 ErrorCode mask 成 converter_unavailable) +// - converter 404 → ErrJobNotFound(與 ownership 找不到共用文字) +// - converter 410 → ErrResultExpired(v0.6 新增;frontend 顯示「請重新轉檔」CTA) +// - converter 5xx → ErrConverterUnavailable(converter / MinIO 暫時失常) func (f *flow) DownloadStream(ctx context.Context, userID, jobID string) (io.ReadCloser, *DownloadMetadata, error) { if userID == "" { return nil, nil, errors.New("conversion: DownloadStream requires userID") @@ -718,40 +762,46 @@ func (f *flow) DownloadStream(ctx context.Context, userID, jobID string) (io.Rea return nil, nil, fmt.Errorf("%w: status=%s", ErrJobNotCompleted, cj.Status) } - // 3. ensurePromoted — 自動觸發 promote 拿 target_object_key(converter 端冪等) + // 3. ensurePromoted — 自動觸發 promote 確保 converter MinIO 內有 NEF(converter 端冪等) + // 回傳的 targetObjectKey 在 v0.6 只用於 log(visionA 端不再用它打 FAA) targetObjectKey, err := f.ensurePromoted(ctx, userID, jobID, cj) if err != nil { return nil, nil, err } - // 4. faa.GetFile — 用 pre-shared API key streaming pull - file, err := f.faa.GetFile(ctx, targetObjectKey) + // 4. converter.GetResult — 從 converter MinIO streaming pull NEF + // (v0.6:取代原 faa.GetFile(targetObjectKey);visionA 端不再直接打 FAA) + stream, resultMeta, err := f.converter.GetResult(ctx, jobID) if err != nil { return nil, nil, err } - // 5. 組 metadata;filename 沿用 PromoteToModels 的命名規則(`_.nef`) - contentType := file.ContentType + // 5. 組對外 metadata;filename 沿用 visionA 自己的命名規則(`_.nef`), + // 覆寫 converter 給的 filename(converter Content-Disposition 給的 filename 可能是 + // object_key 派生、對 user 不直觀;conversion.md §4.1 註明 visionA 為 source-of-truth) + contentType := resultMeta.ContentType if contentType == "" { - // FAA 未設 → 給安全預設(octet-stream 必觸發 browser download dialog) + // converter 未設 → 給安全預設(octet-stream 必觸發 browser download dialog); + // 注意:converter_client.go doStreamOnce 已對 Content-Type 空字串補 octet-stream, + // 這層保留 fallback 為深防(defence in depth) contentType = "application/octet-stream" } meta := &DownloadMetadata{ Filename: defaultDownloadFilename(cj), ContentType: contentType, - ContentLength: file.ContentLength, + ContentLength: resultMeta.ContentLength, } f.logger.InfoContext(ctx, "conversion.flow.download_stream_opened", slog.String("user_hash", hashUserID(userID)), slog.String("job_id", jobID), slog.String("object_key_hash", hashObjectKey(targetObjectKey)), - slog.Int64("content_length", file.ContentLength), + slog.Int64("content_length", resultMeta.ContentLength), slog.String("filename", meta.Filename), ) // caller 拿 io.ReadCloser 後**必須 defer Close**;本層不負責 close(透傳給 handler) - return file.Body, meta, nil + return stream, meta, nil } // defaultDownloadFilename 產 DownloadStream 的對外 filename。 diff --git a/visionA-backend/internal/conversion/flow_test.go b/visionA-backend/internal/conversion/flow_test.go index 46135d6..1b75e47 100644 --- a/visionA-backend/internal/conversion/flow_test.go +++ b/visionA-backend/internal/conversion/flow_test.go @@ -10,12 +10,18 @@ // - InitJob 同 user 已有 active → ActiveJobError // - PromoteToModels 已 promote 過 → 回既有 model_id(idempotent) // - PromoteToModels job 沒 succeeded → ErrJobNotCompleted -// - DownloadStream 從 FAA stream 拉到正確 metadata(Phase 0.8b:取代原 DownloadRedirectURL URL 組裝) +// - DownloadStream 從 converter MinIO stream 拉到正確 metadata(v0.6 取代原 FAA stream) // - ActiveJob converter 回 404 → ownership.Delete + (nil, nil) // -// Phase 0.8 conversion (見 .autoflow/04-architecture/conversion.md §2.7) +// Phase 0.8 conversion (見 docs/autoflow/04-architecture/conversion.md §2.7) // Phase 0.8b T4:DownloadRedirectURL → DownloadStream + 砍 flowStubMCToken // (見 ADR-015 §6 + conversion.md §3 / §4.1) +// Phase 0.8b v0.6 T2:DownloadStream / PromoteToModels 改走 converter.GetResult +// (見 ADR-016 + conversion.md §2.5 / §4.1 / §6) +// - flowStubFAA struct **保留**(T3 才整檔砍 faa_client.go)但 test 不再透過它走 +// download / promote 路徑;改用 flowStubConverter.getResultFunc hook +// - fixture 自動安裝 default getResultFunc(回 defaultStubNEFBody)讓既有 happy-path +// test 不必個別設 hook;需 override 行為的 test 直接覆寫 fix.converter.getResultFunc package conversion import ( @@ -57,12 +63,15 @@ type flowStubConverter struct { getJobFunc func(ctx context.Context, jobID string) (*ConverterJob, error) promoteFunc func(ctx context.Context, jobID string, req PromoteReq) (*ConverterPromoteResult, error) listInProgressJobsFunc func(ctx context.Context, userID string) ([]*ConverterJob, error) + // Phase 0.8b v0.6(ADR-016 §1):GetResult stream hook + getResultFunc func(ctx context.Context, jobID string) (io.ReadCloser, *DownloadMetadata, error) // 各 method 呼叫次數(atomic) initJobCalls atomic.Int32 getJobCalls atomic.Int32 promoteCalls atomic.Int32 listInProgressJobsCalls atomic.Int32 + getResultCalls atomic.Int32 // 紀錄 InitJob 收到的 body(驗證 multipart user_id 注入) lastInitBody []byte @@ -140,6 +149,23 @@ func (s *flowStubConverter) ListInProgressJobs(ctx context.Context, userID strin return nil, nil } +// GetResult 是 Phase 0.8b v0.6 新增(ADR-016 §1)— flow.DownloadStream / PromoteToModels +// 在 T2 起改走此 method。 +// +// 預設行為:若 getResultFunc 為 nil,回 ErrConverterUnavailable wrapped error,避免 silent nil +// 觸發 caller NPE(讓 test 立即看到「忘設 hook」訊號)。 +// +// fixture(newFlowFixture)會在建立時自動安裝一個 default getResultFunc 回 defaultStubNEFBody, +// 所以多數 happy-path test 不必個別設 hook;需 override 的 test 直接覆寫 fix.converter.getResultFunc。 +func (s *flowStubConverter) GetResult(ctx context.Context, jobID string) (io.ReadCloser, *DownloadMetadata, error) { + s.getResultCalls.Add(1) + if s.getResultFunc != nil { + return s.getResultFunc(ctx, jobID) + } + // 預設:回 error stream(caller 若沒設 hook 表示不該觸發 GetResult) + return nil, nil, fmt.Errorf("%w: flowStubConverter.GetResult called without getResultFunc hook", ErrConverterUnavailable) +} + var _ ConverterClient = (*flowStubConverter)(nil) // flowStubFAA 是 FAAClient stub。 @@ -281,6 +307,17 @@ type flowFixture struct { // Phase 0.8b T4:mcToken 欄位已移除(flow 不再依賴 MCTokenClient);FlowOpts 也砍 4 個欄位 // (MCToken / TenantID / FAABaseURL / DelegatedTTLSeconds)。 +// +// Phase 0.8b v0.6(ADR-016 / T2):DownloadStream / PromoteToModels 改走 converter.GetResult; +// fixture 自動安裝 default getResultFunc(回 `defaultStubNEFBody`)讓既有 happy-path test 不必 +// 個別設 hook。需要 override 行為(specific Content-Length / error)的 test 直接覆寫 +// `fix.converter.getResultFunc` 即可。 +// +// **Phase 0.8b v0.6 (T2)** — FAA 欄位/stub 過渡狀態:FAA 欄位保留作為 T3 過渡(method 內無 caller、 +// godoc flow.go:111-118 明示「T3 砍」);`flowStubFAA` 仍保留並維持 wire 進 FlowOpts.FAA,以 +// 驗 e2e negative assertion「FAA 0 命中」(conversion_e2e_test.go:1037-1040)。T3 砍 +// faa_client.go 整檔時同步砍:(a) `flowStubFAA` type;(b) `flowFixture.faa` 欄位; +// (c) FlowOpts.FAA 必填;(d) e2e mockFAA + setupConversionFixture 對 FAA 的 wire。 func newFlowFixture(t *testing.T) *flowFixture { t.Helper() conv := newFlowStubConverter() @@ -289,6 +326,16 @@ func newFlowFixture(t *testing.T) *flowFixture { storage := newFlowStubStorage() own := NewOwnership(conv, newSilentLogger()) + // v0.6:default getResultFunc — 回固定 NEF stub bytes + octet-stream + filename + // (與 v0.5 之前 flowStubFAA default 行為對等;test 仍可覆寫) + conv.getResultFunc = func(ctx context.Context, jobID string) (io.ReadCloser, *DownloadMetadata, error) { + return io.NopCloser(strings.NewReader(defaultStubNEFBody)), &DownloadMetadata{ + Filename: "converter-stub.nef", + ContentType: "application/octet-stream", + ContentLength: int64(len(defaultStubNEFBody)), + }, nil + } + svc, err := NewService(FlowOpts{ Converter: conv, FAA: faa, @@ -311,6 +358,10 @@ func newFlowFixture(t *testing.T) *flowFixture { } } +// defaultStubNEFBody 是 fixture default getResultFunc 回的 stub NEF body 內容(穩定 byte sequence +// 讓 test 可比對 streaming 正確性)。沿用 v0.5 之前 flowStubFAA default 行為("nef-bytes-stub")。 +const defaultStubNEFBody = "nef-bytes-stub" + // makeMultipartBody 建一個合法的 multipart/form-data body 給 InitJob 測試用。 // // 包含:model_id / version / platform / model(fake .onnx file)+ 故意塞一個 client user_id(測黑名單)。 @@ -828,6 +879,10 @@ func TestPromoteToModels_HappyPath(t *testing.T) { fix.storage.mu.Lock() expectedKey := fmt.Sprintf("models/user-alice/%s.nef", res.ModelID) assert.Contains(t, fix.storage.objects, expectedKey) + // 驗 storage 寫進去的內容是 default getResultFunc 回的 NEF stream + // (v0.6:取代原本 flowStubFAA default body 驗證;確保 stream byte-perfect 透傳) + assert.Equal(t, defaultStubNEFBody, string(fix.storage.objects[expectedKey]), + "storage 寫進去的 byte 應與 converter.GetResult 回的 stream 完全一致") fix.storage.mu.Unlock() // 驗 model store 真的有寫 @@ -835,9 +890,12 @@ func TestPromoteToModels_HappyPath(t *testing.T) { require.NotNil(t, rec) assert.Equal(t, res.ModelID, rec.ID) - // 驗 promote / faa 各被打 1 次 + // 驗 promote / converter.GetResult 各被打 1 次(v0.6:取代原 faa.getCalls) assert.Equal(t, int32(1), fix.converter.promoteCalls.Load()) - assert.Equal(t, int32(1), fix.faa.getCalls.Load()) + assert.Equal(t, int32(1), fix.converter.getResultCalls.Load(), + "PromoteToModels 應呼叫 1 次 converter.GetResult(v0.6 取代 faa.GetFile)") + assert.Equal(t, int32(0), fix.faa.getCalls.Load(), + "v0.6:visionA 端不再直接打 FAA,faa.GetFile 不該被呼叫") } // TestPromoteToModels_DefaultName:caller 傳空 name 應走 fallback `_kl`。 @@ -874,9 +932,10 @@ func TestPromoteToModels_Idempotent(t *testing.T) { require.NoError(t, err) require.NotNil(t, first) - // 第二次:應該不再打 converter.Promote / faa.GetFile / storage.Put + // 第二次:應該不再打 converter.Promote / converter.GetResult / storage.Put + // (v0.6:getResultCalls 取代原 faaCallsBefore) convPromoteBefore := fix.converter.promoteCalls.Load() - faaCallsBefore := fix.faa.getCalls.Load() + getResultBefore := fix.converter.getResultCalls.Load() storagePutBefore := fix.storage.putCalls.Load() second, err := fix.svc.PromoteToModels(context.Background(), "user-alice", "j1", "v2") @@ -886,8 +945,8 @@ func TestPromoteToModels_Idempotent(t *testing.T) { assert.Equal(t, convPromoteBefore, fix.converter.promoteCalls.Load(), "二次 promote 不應再打 converter.Promote") - assert.Equal(t, faaCallsBefore, fix.faa.getCalls.Load(), - "二次 promote 不應再打 faa.GetFile") + assert.Equal(t, getResultBefore, fix.converter.getResultCalls.Load(), + "二次 promote 不應再打 converter.GetResult(冪等 hit 在 FindBySourceJobID 階段短路)") assert.Equal(t, storagePutBefore, fix.storage.putCalls.Load(), "二次 promote 不應再寫 storage") } @@ -922,8 +981,12 @@ func TestPromoteToModels_OwnershipMismatch(t *testing.T) { assert.True(t, errors.Is(err, ErrJobNotFound)) } -// TestPromoteToModels_FAAError_Propagation:FAA 失敗透傳。 -func TestPromoteToModels_FAAError_Propagation(t *testing.T) { +// TestPromoteToModels_ConverterGetResultError_Propagation:converter.GetResult 失敗透傳。 +// +// v0.6(ADR-016 / T2):取代原 TestPromoteToModels_FAAError_Propagation。 +// PromoteToModels 第 5 步改走 converter.GetResult、不再 pull FAA;對應的失敗 sentinel +// 也從 ErrFAAUnavailable 改為 ErrConverterUnavailable(converter MinIO 5xx)。 +func TestPromoteToModels_ConverterGetResultError_Propagation(t *testing.T) { t.Parallel() fix := newFlowFixture(t) @@ -932,15 +995,49 @@ func TestPromoteToModels_FAAError_Propagation(t *testing.T) { SourceFilename: "x.onnx", Platform: "720", }) fix.ownership.Set("j1", "user-alice") - fix.faa.getFileFunc = func(ctx context.Context, objectKey string) (*FAAFile, error) { - return nil, fmt.Errorf("%w: faa 502", ErrFAAUnavailable) + fix.converter.getResultFunc = func(ctx context.Context, jobID string) (io.ReadCloser, *DownloadMetadata, error) { + return nil, nil, fmt.Errorf("%w: converter result 502", ErrConverterUnavailable) } _, err := fix.svc.PromoteToModels(context.Background(), "user-alice", "j1", "x") require.Error(t, err) - assert.True(t, errors.Is(err, ErrFAAUnavailable)) + assert.True(t, errors.Is(err, ErrConverterUnavailable)) - // model record 不應被建(FAA 失敗在 storage 寫入前) + // model record 不應被建(converter.GetResult 失敗在 storage 寫入前) + rec, _ := fix.models.FindBySourceJobID(context.Background(), "user-alice", "j1") + assert.Nil(t, rec) + + // v0.6:visionA 端不再直接打 FAA + assert.Equal(t, int32(0), fix.faa.getCalls.Load()) +} + +// TestPromoteToModels_ConverterGetResultExpired_Propagation:v0.6 新增—— +// converter.GetResult 回 410 `result_expired`(job completed 但 NEF 已過 7 天 expires_at 被 GC) +// 應透傳給 caller,handler 層會 mapping 到 HTTP 410 / code `result_expired`,給 frontend 顯示 +// 「轉檔結果已過期,請重新轉檔」CTA。 +// +// 對齊 ADR-016 §1.3 + conversion.md §6 + api-conversion.md §4。 +func TestPromoteToModels_ConverterGetResultExpired_Propagation(t *testing.T) { + t.Parallel() + fix := newFlowFixture(t) + + fix.converter.setJob(&ConverterJob{ + JobID: "j1", Status: "completed", CreatedAt: time.Now(), + SourceFilename: "x.onnx", Platform: "720", + }) + fix.ownership.Set("j1", "user-alice") + fix.converter.getResultFunc = func(ctx context.Context, jobID string) (io.ReadCloser, *DownloadMetadata, error) { + return nil, nil, fmt.Errorf("%w: get_result 410", ErrResultExpired) + } + + _, err := fix.svc.PromoteToModels(context.Background(), "user-alice", "j1", "x") + require.Error(t, err) + assert.True(t, errors.Is(err, ErrResultExpired), + "converter 410 → ErrResultExpired 必須透傳給 handler") + assert.Equal(t, "result_expired", ErrorCode(err)) + assert.Equal(t, 410, HTTPStatus(err)) + + // model record 不應被建 rec, _ := fix.models.FindBySourceJobID(context.Background(), "user-alice", "j1") assert.Nil(t, rec) } @@ -1015,10 +1112,30 @@ func TestPromoteToModels_ModelStoreError(t *testing.T) { // 6. FAAError_Propagation(取代 MCError):FAA pull 失敗透傳 // TestDownloadStream_HappyPath:成功 → 拿到 io.ReadCloser + DownloadMetadata 正確。 +// +// v0.6(ADR-016 / T2):DownloadStream 改走 converter.GetResult;驗證點調整為: +// - converter.GetResult 被叫 1 次、傳入正確 jobID +// - stream byte-perfect 透傳(與 default getResultFunc 回的 body 一致) +// - filename 仍由 visionA 端的 defaultDownloadFilename(cj) 覆寫(source-of-truth), +// **不** 用 converter response Content-Disposition 給的值 +// - visionA 端不再直接呼叫 FAA(faa.getCalls == 0) func TestDownloadStream_HappyPath(t *testing.T) { t.Parallel() fix := newFlowFixture(t) + // 覆寫 getResultFunc 紀錄收到的 jobID,並驗 converter response Content-Disposition 給的 + // filename 會被 visionA 端覆寫掉 + var capturedJobID string + fix.converter.getResultFunc = func(ctx context.Context, jobID string) (io.ReadCloser, *DownloadMetadata, error) { + capturedJobID = jobID + return io.NopCloser(strings.NewReader(defaultStubNEFBody)), &DownloadMetadata{ + // converter response Content-Disposition 給的 filename — 預期會被 visionA 覆寫 + Filename: "converter-raw-object-key.nef", + ContentType: "application/octet-stream", + ContentLength: int64(len(defaultStubNEFBody)), + }, nil + } + fix.converter.setJob(&ConverterJob{ JobID: "j1", Status: "completed", @@ -1034,22 +1151,25 @@ func TestDownloadStream_HappyPath(t *testing.T) { require.NotNil(t, meta) defer stream.Close() - // metadata 對齊 stub FAA 的 default 行為(faa_client_test.go newFlowStubFAA) + // metadata:filename 由 visionA defaultDownloadFilename(cj) 覆寫,**不**用 converter 給的 assert.Equal(t, "yolov5s_kl720.nef", meta.Filename, - "filename = _.nef,對齊 wireframe §8.1 + defaultDownloadFilename") + "filename = visionA 自己的 _.nef,覆寫 converter response 的 filename(source-of-truth)") assert.Equal(t, "application/octet-stream", meta.ContentType) - assert.Equal(t, int64(len("nef-bytes-stub")), meta.ContentLength) + assert.Equal(t, int64(len(defaultStubNEFBody)), meta.ContentLength) - // stream 內容與 stub FAA 預設 body 一致(streaming pull) + // stream 內容與 default getResultFunc 回的 body 一致(byte-perfect 透傳) body, err := io.ReadAll(stream) require.NoError(t, err) - assert.Equal(t, "nef-bytes-stub", string(body)) + assert.Equal(t, defaultStubNEFBody, string(body)) - // 驗 faa.GetFile 帶到的 object_key(buildTargetObjectKey 規則:models//.nef) - fix.faa.mu.Lock() - gotKey := fix.faa.lastKey - fix.faa.mu.Unlock() - assert.Equal(t, "models/user-alice/j1.nef", gotKey) + // converter.GetResult 收到的 jobID 正確 + assert.Equal(t, "j1", capturedJobID, "converter.GetResult 應收到 jobID(不再用 object_key)") + assert.Equal(t, int32(1), fix.converter.getResultCalls.Load(), + "DownloadStream 應呼叫 1 次 converter.GetResult") + + // v0.6:visionA 端不再直接打 FAA + assert.Equal(t, int32(0), fix.faa.getCalls.Load(), + "v0.6:DownloadStream 不該呼叫 faa.GetFile") } // TestDownloadStream_FilenameFromConverterJob:filename 取自 cj.SourceFilename + Platform, @@ -1073,18 +1193,20 @@ func TestDownloadStream_FilenameFromConverterJob(t *testing.T) { assert.Equal(t, "my_model_kl520.nef", meta.Filename) } -// TestDownloadStream_DefaultsContentType:FAA 沒給 Content-Type → 預設 octet-stream -// (確保 browser download dialog 仍會觸發)。 +// TestDownloadStream_DefaultsContentType:converter.GetResult 回 empty Content-Type +// → flow.DownloadStream 應 fallback 為 octet-stream(深防:converter_client 也有同樣 fallback)。 +// +// v0.6(ADR-016 / T2):改用 getResultFunc 模擬 converter 端缺 Content-Type;驗證 flow.go +// 的雙層 fallback(converter_client doStreamOnce 已 fallback,flow.go DownloadStream 再保險一次)。 func TestDownloadStream_DefaultsContentType(t *testing.T) { t.Parallel() fix := newFlowFixture(t) - fix.faa.getFileFunc = func(ctx context.Context, objectKey string) (*FAAFile, error) { - return &FAAFile{ - Body: io.NopCloser(strings.NewReader("nef")), + fix.converter.getResultFunc = func(ctx context.Context, jobID string) (io.ReadCloser, *DownloadMetadata, error) { + return io.NopCloser(strings.NewReader("nef")), &DownloadMetadata{ + Filename: "ignored.nef", ContentLength: 3, - ContentType: "", // 故意空白 - ETag: "etag", + ContentType: "", // 故意空白 — 模擬下游缺 Content-Type 的 edge case }, nil } fix.converter.setJob(&ConverterJob{ @@ -1096,7 +1218,7 @@ func TestDownloadStream_DefaultsContentType(t *testing.T) { _, meta, err := fix.svc.DownloadStream(context.Background(), "user-alice", "j1") require.NoError(t, err) assert.Equal(t, "application/octet-stream", meta.ContentType, - "FAA 沒給 Content-Type 時應 fallback 為 application/octet-stream") + "converter.GetResult 沒給 Content-Type 時 flow.DownloadStream 應 fallback 為 application/octet-stream") } // TestDownloadStream_OwnershipMismatch:別 user 的 job → ErrJobNotFound(防枚舉)。 @@ -1113,8 +1235,10 @@ func TestDownloadStream_OwnershipMismatch(t *testing.T) { assert.Nil(t, stream) assert.Nil(t, meta) - // FAA 不該被打到(ownership 不符在 FAA call 之前) - assert.Equal(t, int32(0), fix.faa.getCalls.Load()) + // converter.GetResult 不該被打到(ownership 不符在 GetResult 之前) + // v0.6:取代原 faa.getCalls assert + assert.Equal(t, int32(0), fix.converter.getResultCalls.Load(), + "ownership 不符應在 converter.GetResult 之前短路") } // TestDownloadStream_JobNotCompleted:still running → ErrJobNotCompleted。 @@ -1147,50 +1271,81 @@ func TestDownloadStream_PromoteError_Propagation(t *testing.T) { require.Error(t, err) assert.True(t, errors.Is(err, ErrConverterUnavailable)) - // FAA 不該被打到(promote 失敗在 FAA call 之前) - assert.Equal(t, int32(0), fix.faa.getCalls.Load()) + // converter.GetResult 不該被打到(promote 失敗在 GetResult 之前) + // v0.6:取代原 faa.getCalls assert + assert.Equal(t, int32(0), fix.converter.getResultCalls.Load(), + "promote 失敗應在 converter.GetResult 之前短路") } -// TestDownloadStream_FAAError_Propagation:FAA pull 5xx 透傳(取代原 MCError test)。 +// TestDownloadStream_ConverterGetResultError_Propagation:converter.GetResult 5xx 透傳 +// (v0.6 取代原 TestDownloadStream_FAAError_Propagation;download path 改走 converter MinIO)。 // -// Phase 0.8b 後 download path 不再經 MC,FAA stream 失敗是最常見的失敗模式 -// (API key 不對齊 → ErrFAAAuthFailed;FAA 服務不可達 → ErrFAAUnavailable)。 -func TestDownloadStream_FAAError_Propagation(t *testing.T) { +// v0.6 後 download path:visionA → converter(API key)→ converter MinIO; +// 失敗模式從 FAA 失敗變成 converter / MinIO 失敗。 +func TestDownloadStream_ConverterGetResultError_Propagation(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.setJob(&ConverterJob{JobID: "j1", Status: "completed", CreatedAt: time.Now()}) fix.ownership.Set("j1", "user-alice") - fix.faa.getFileFunc = func(ctx context.Context, objectKey string) (*FAAFile, error) { - return nil, fmt.Errorf("%w: faa 502", ErrFAAUnavailable) + fix.converter.getResultFunc = func(ctx context.Context, jobID string) (io.ReadCloser, *DownloadMetadata, error) { + return nil, nil, fmt.Errorf("%w: get_result 502", ErrConverterUnavailable) } stream, meta, err := fix.svc.DownloadStream(context.Background(), "user-alice", "j1") require.Error(t, err) - assert.True(t, errors.Is(err, ErrFAAUnavailable)) + assert.True(t, errors.Is(err, ErrConverterUnavailable)) assert.Nil(t, stream) assert.Nil(t, meta) } -// TestDownloadStream_FAAAuthFailed_Propagation:FAA API key 不對齊 → ErrFAAAuthFailed -// 透傳(handler 層會 mask 成 faa_unavailable 對外)。 -func TestDownloadStream_FAAAuthFailed_Propagation(t *testing.T) { +// TestDownloadStream_ConverterAuthFailed_Propagation:converter API key 不對齊 +// → ErrConverterAuthFailed 透傳(handler 層會 mask 成 converter_unavailable 對外)。 +// +// v0.6 取代原 TestDownloadStream_FAAAuthFailed_Propagation。 +func TestDownloadStream_ConverterAuthFailed_Propagation(t *testing.T) { t.Parallel() fix := newFlowFixture(t) fix.converter.setJob(&ConverterJob{JobID: "j1", Status: "completed", CreatedAt: time.Now()}) fix.ownership.Set("j1", "user-alice") - fix.faa.getFileFunc = func(ctx context.Context, objectKey string) (*FAAFile, error) { - return nil, fmt.Errorf("%w: faa 401", ErrFAAAuthFailed) + fix.converter.getResultFunc = func(ctx context.Context, jobID string) (io.ReadCloser, *DownloadMetadata, error) { + return nil, nil, fmt.Errorf("%w: get_result 401", ErrConverterAuthFailed) } _, _, err := fix.svc.DownloadStream(context.Background(), "user-alice", "j1") require.Error(t, err) - assert.True(t, errors.Is(err, ErrFAAAuthFailed), - "flow 層 sentinel 仍是 ErrFAAAuthFailed;handler 層才 mask 對外") - // 驗 sentinel 可被 errors.As 解出(handler 用 conversion.HTTPStatus / ErrorCode 處理) - assert.Equal(t, "faa_unavailable", ErrorCode(err), - "ErrorCode helper 對 ErrFAAAuthFailed 應 mask 成 faa_unavailable(對外不洩漏 auth_failed)") + assert.True(t, errors.Is(err, ErrConverterAuthFailed), + "flow 層 sentinel 仍是 ErrConverterAuthFailed;handler 層才 mask 對外") + // 驗對外 mask + assert.Equal(t, "converter_unavailable", ErrorCode(err), + "ErrorCode 對 ErrConverterAuthFailed 應 mask 成 converter_unavailable(不洩漏 auth_failed)") + assert.Equal(t, 502, HTTPStatus(err)) +} + +// TestDownloadStream_ConverterResultExpired_Propagation:v0.6 新增—— +// converter.GetResult 回 410 ErrResultExpired 應透傳給 handler,對外 HTTP 410 + code result_expired。 +// +// 對齊 ADR-016 §1.3:「job completed 但 NEF 已過 7 天 expires_at 被 GC」場景; +// frontend 收到 410 後顯示「轉檔結果已過期,請重新轉檔」CTA(與 404 not_found 區分)。 +func TestDownloadStream_ConverterResultExpired_Propagation(t *testing.T) { + t.Parallel() + fix := newFlowFixture(t) + + fix.converter.setJob(&ConverterJob{JobID: "j1", Status: "completed", CreatedAt: time.Now()}) + fix.ownership.Set("j1", "user-alice") + fix.converter.getResultFunc = func(ctx context.Context, jobID string) (io.ReadCloser, *DownloadMetadata, error) { + return nil, nil, fmt.Errorf("%w: get_result 410", ErrResultExpired) + } + + stream, meta, err := fix.svc.DownloadStream(context.Background(), "user-alice", "j1") + require.Error(t, err) + assert.True(t, errors.Is(err, ErrResultExpired)) + assert.Nil(t, stream) + assert.Nil(t, meta) + // 對外 ErrorCode / HTTPStatus 對齊 conversion.md §6 + api-conversion.md §4 + assert.Equal(t, "result_expired", ErrorCode(err)) + assert.Equal(t, 410, HTTPStatus(err)) } // ========================================================================== diff --git a/visionA-backend/internal/conversion/ownership_test.go b/visionA-backend/internal/conversion/ownership_test.go index 57ff641..5cf41f6 100644 --- a/visionA-backend/internal/conversion/ownership_test.go +++ b/visionA-backend/internal/conversion/ownership_test.go @@ -137,6 +137,12 @@ func (s *stubConverterClient) Promote(ctx context.Context, jobID string, req Pro panic("stubConverterClient.Promote: not used in ownership_test") } +// GetResult — Phase 0.8b v0.6 新增(ADR-016 §1);ownership 路徑不會打 GetResult, +// 撞到 panic 反而好 debug。 +func (s *stubConverterClient) GetResult(ctx context.Context, jobID string) (io.ReadCloser, *DownloadMetadata, error) { + panic("stubConverterClient.GetResult: not used in ownership_test") +} + // 確保 stubConverterClient 滿足 ConverterClient interface(編譯期驗)。 var _ ConverterClient = (*stubConverterClient)(nil)