jim800121chen 3b7aa4c79a fix(task-scheduler): server.js 漏傳 minio dep 給 jobService(visionA e2e 撞到)
visionA 跑 Phase 0.8b e2e 時 POST /api/v1/jobs 回 502 storage_unavailable。
根因:server.js 建立 jobService 時沒把 minio facade 傳進去、
jobService.js 走 `deps.minio || null` fallback、writeInputToMinIO()
因為 minio=null throw「minio dep is required」、API 回 502。

修法:傳 minio facade 進 createJobService deps。
legacy CRUD 介面(不依賴 minio)行為不變—minio 是 optional dep。

Tests: 666/666 pass(無回歸)
Reviewer:  通過、correctness 軸無 Critical/Major

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 12:12:55 +08:00

144 lines
5.8 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* Kneron Toolchain Task Scheduler — entry point
*
* 職責:
* 1. 啟動時 fail-fast 驗證 config修 D3 — T1-deviations.md
* 2. 建立各層 dependencyredis / minio / sseService / jobService
* 3. 組裝 Express appmount legacy 路由
* 4. 在背景啟動 done queue listener
* 5. listen port
*
* **本檔不應再寫業務邏輯**。所有路由 / service / storage 細節都在 src/ 下。
*
* 重構說明T4
* src/redis.js — Redis client 與 helper
* src/storage/minio.js — MinIO facade
* src/storage/local.js — local volume helper
* src/services/sseService.js — SSE client 管理
* src/services/jobService.js — Job CRUD / advance / fail
* src/services/doneListener.js— done queue 背景監聽
* src/middleware/upload.js — multer 上傳設定
* src/routes/legacy.js — 既有 7 個路由
* src/app.js — Express app 組裝
*
* 既有 /jobs* 端點行為**完全不變**byte-for-byte除時間戳
* D3 修復:本檔在 require 階段即呼叫 loadConfig() — 必填 env 缺漏會 throw 並 exit(1)。
*/
'use strict';
/* eslint-disable no-console */
require('dotenv').config();
const { loadConfig } = require('./src/config');
const { createClients } = require('./src/redis');
const { createMinioFacade } = require('./src/storage/minio');
const { createSseService } = require('./src/services/sseService');
const { createJobService, STAGES } = require('./src/services/jobService');
const { ensureWorkerGroups, startListenDone } = require('./src/services/doneListener');
const { createUploader } = require('./src/middleware/upload');
const { createHealthService } = require('./src/services/healthService');
const { createApp } = require('./src/app');
// D3 fail-fast缺必填 env 即 process.exit(1)
let config;
try {
config = loadConfig();
} catch (err) {
console.error('[Scheduler] Config validation failed:', err.message);
process.exit(1);
}
// 既有 env — 待後續整合到 config.js
const PORT = process.env.PORT || 4000;
const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379';
const JOB_DATA_DIR = process.env.JOB_DATA_DIR || '/data/jobs';
const STORAGE_BACKEND = process.env.STORAGE_BACKEND || 'local';
// 依賴組裝
const { redis, redisSub } = createClients(REDIS_URL);
const minio = createMinioFacade();
if (minio.client) {
console.log(`[Scheduler] MinIO storage enabled: ${minio.endpoint}/${minio.bucket}`);
}
const sseService = createSseService();
// 2026-05-18 e2e bug fixv1 API `POST /api/v1/jobs` → jobService.writeInputToMinIO 需要 minio facade。
// 原本 server.js 漏傳 minio dep、jobService.js:68 `deps.minio || null` fallback 成 null、
// writeInputToMinIO line 358 throw 「minio dep is required」、API 回 502 storage_unavailable。
// 修法:傳 minio facade 進來。legacy CRUD 介面(沒 minio dep行為不變—minio 是 optional dep。
const jobService = createJobService({ redis, sseService, jobDataDir: JOB_DATA_DIR, minio });
// T10multer uploader 從 config 取上限(修 D5
// - maxFileSize = MULTIPART_MODEL_MAX_BYTES預設 500MB
// - maxRefImages = MULTIPART_REF_IMAGES_MAX_COUNT預設 100
// ref_image per-file 10MB 上限由 validator 用 config.multipart.refImageMaxBytes 把關
const uploader = createUploader({
maxFileSize: config.multipart.modelMaxBytes,
maxRefImages: config.multipart.refImagesMaxCount,
});
// T8建立 healthService不在這裡 start等 listenDoneQueue 起來後再 start
const healthService = createHealthService({ redis, config });
const app = createApp(
{ redis, jobService, sseService, minio, uploader, healthService },
{ config, storageBackend: STORAGE_BACKEND }
);
async function start() {
await ensureWorkerGroups(redis);
// done queue listener背景
startListenDone({ redis, redisSub, jobService })
.start()
.catch((err) => {
console.error('[Scheduler] Done listener fatal error:', err);
process.exit(1);
});
// T8啟動 health background polling30s 一次,第一次立即觸發)
healthService.start();
// T8graceful shutdown — 收到 SIGTERM/SIGINT 時停 polling避免 process 卡住
const onShutdown = (signal) => {
console.log(`[Scheduler] Received ${signal}, stopping health polling`);
try {
healthService.stop();
} catch (err) {
console.error('[Scheduler] healthService.stop error:', err);
}
// 不在此 process.exit交由 Node 自然結束unref 過的 timer 不會擋 exit
};
process.once('SIGTERM', () => onShutdown('SIGTERM'));
process.once('SIGINT', () => onShutdown('SIGINT'));
app.listen(PORT, () => {
console.log(`[Scheduler] Running on port ${PORT}`);
console.log(`[Scheduler] Redis: ${REDIS_URL}`);
console.log(`[Scheduler] Job data dir: ${JOB_DATA_DIR}`);
console.log(
`[Scheduler] Storage: ${STORAGE_BACKEND}${minio.client ? ` (${minio.endpoint}/${minio.bucket})` : ''}`
);
console.log(`[Scheduler] Stages: ${STAGES.join(' -> ')}`);
console.log(
`[Scheduler] Auth config OK: issuer=${config.memberCenter.issuer}, audience=${config.converter.audience}`
);
// T10印出 multipart / concurrency 配置,方便 ops 確認生效值(不含 secret
console.log(
`[Scheduler] Multipart limits: model=${config.multipart.modelMaxBytes}B, ` +
`ref_image=${config.multipart.refImageMaxBytes}B, ` +
`ref_images_count=${config.multipart.refImagesMaxCount}`
);
console.log(
`[Scheduler] Upload concurrency: max=${config.uploadConcurrency.maxConcurrent} ` +
`(503 retry-after=${config.uploadConcurrency.retryAfterSeconds}s when full)`
);
});
}
start().catch((err) => {
console.error('[Scheduler] Failed to start:', err);
process.exit(1);
});
module.exports = app;