/** * Kneron Toolchain Task Scheduler * * 職責: * 1. REST API — 建立 job、查詢狀態、上傳檔案、下載結果 * 2. Job State — 透過 Redis Hash 管理 job 生命週期 * 3. Queue 調度 — 透過 Redis Stream 派工給 Worker * 4. Done 監聽 — 接收 Worker 完成事件,推進到下一階段 * 5. SSE — 即時推送 job 狀態給前端 */ const express = require('express'); const cors = require('cors'); const multer = require('multer'); const helmet = require('helmet'); const rateLimit = require('express-rate-limit'); const morgan = require('morgan'); const compression = require('compression'); const { v4: uuidv4 } = require('uuid'); const Redis = require('ioredis'); const path = require('path'); const fs = require('fs'); const { S3Client, PutObjectCommand, GetObjectCommand } = require('@aws-sdk/client-s3'); require('dotenv').config(); // --------------------------------------------------------------------------- // Config // --------------------------------------------------------------------------- const PORT = process.env.PORT || 4000; const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:6379'; const JOB_DATA_DIR = process.env.JOB_DATA_DIR || '/data/jobs'; const FRONTEND_URL = process.env.FRONTEND_URL || 'http://localhost:3000'; // MinIO config const STORAGE_BACKEND = process.env.STORAGE_BACKEND || 'local'; const MINIO_ENDPOINT = process.env.MINIO_ENDPOINT_URL || 'http://192.168.0.130:9000'; const MINIO_BUCKET = process.env.MINIO_BUCKET || 'convertet-working-space'; const MINIO_ACCESS_KEY = process.env.MINIO_ACCESS_KEY || 'convuser'; const MINIO_SECRET_KEY = process.env.MINIO_SECRET_KEY || ''; const MINIO_REGION = process.env.MINIO_REGION || 'us-east-1'; let minio = null; if (STORAGE_BACKEND === 'minio') { minio = new S3Client({ endpoint: MINIO_ENDPOINT, region: MINIO_REGION, credentials: { accessKeyId: MINIO_ACCESS_KEY, secretAccessKey: MINIO_SECRET_KEY, }, forcePathStyle: true, // Required for MinIO }); console.log(`[Scheduler] MinIO storage enabled: ${MINIO_ENDPOINT}/${MINIO_BUCKET}`); } async function uploadToMinIO(key, body, contentType) { if (!minio) return; await minio.send(new PutObjectCommand({ Bucket: MINIO_BUCKET, Key: key, Body: body, ContentType: contentType, })); } async function getFromMinIO(key) { if (!minio) return null; const response = await minio.send(new GetObjectCommand({ Bucket: MINIO_BUCKET, Key: key, })); // Convert Body to Buffer (AWS SDK v3 Body is a web stream in Node 18) const chunks = []; for await (const chunk of response.Body) { chunks.push(chunk); } return { body: Buffer.concat(chunks), contentLength: response.ContentLength, }; } // Pipeline: fixed stage order const STAGES = ['onnx', 'bie', 'nef']; const STAGE_QUEUES = { onnx: 'queue:onnx', bie: 'queue:bie', nef: 'queue:nef', }; const DONE_QUEUE = 'queue:done'; const DONE_GROUP = 'scheduler'; // --------------------------------------------------------------------------- // Redis clients (one for commands, one for blocking reads) // --------------------------------------------------------------------------- const redis = new Redis(REDIS_URL); const redisSub = new Redis(REDIS_URL); redis.on('error', (err) => console.error('Redis error:', err)); redisSub.on('error', (err) => console.error('Redis subscriber error:', err)); // --------------------------------------------------------------------------- // Express setup // --------------------------------------------------------------------------- const app = express(); app.use(helmet()); app.use(compression()); app.use(morgan('short')); app.use(cors({ origin: FRONTEND_URL, credentials: true })); const limiter = rateLimit({ windowMs: 15 * 60 * 1000, max: 200, message: 'Too many requests, please try again later.', }); app.use('/api', limiter); app.use(express.json({ limit: '10mb' })); app.use(express.urlencoded({ extended: true, limit: '10mb' })); // File upload — store to job directory const upload = multer({ storage: multer.memoryStorage(), limits: { fileSize: 500 * 1024 * 1024 }, // 500 MB }); // --------------------------------------------------------------------------- // SSE: keep track of connected clients per job_id // --------------------------------------------------------------------------- const sseClients = new Map(); // job_id -> Set function sendSSE(jobId, data) { const clients = sseClients.get(jobId); if (!clients) return; const payload = `data: ${JSON.stringify(data)}\n\n`; for (const res of clients) { res.write(payload); } } // --------------------------------------------------------------------------- // Helper: get / set job record in Redis // --------------------------------------------------------------------------- async function getJob(jobId) { const raw = await redis.get(`job:${jobId}`); if (!raw) return null; return JSON.parse(raw); } async function setJob(jobId, job) { job.updated_at = new Date().toISOString(); await redis.set(`job:${jobId}`, JSON.stringify(job)); // Notify SSE clients sendSSE(jobId, job); } // --------------------------------------------------------------------------- // Helper: enqueue a task to a stage queue // --------------------------------------------------------------------------- async function enqueueStage(stage, job) { const queue = STAGE_QUEUES[stage]; const message = { job_id: job.job_id, created_at: job.created_at, input_dir: path.join(JOB_DATA_DIR, job.job_id), parameters: job.parameters || {}, }; await redis.xadd(queue, '*', 'data', JSON.stringify(message)); console.log(`[Scheduler] Enqueued job ${job.job_id} to ${queue}`); } // --------------------------------------------------------------------------- // Helper: advance job to next stage or mark completed // --------------------------------------------------------------------------- async function advanceJob(jobId, completedStage) { const job = await getJob(jobId); if (!job) { console.warn(`[Scheduler] Job ${jobId} not found, ignoring done event`); return; } const currentIndex = STAGES.indexOf(completedStage); if (currentIndex < 0) { console.warn(`[Scheduler] Unknown stage: ${completedStage}`); return; } const nextIndex = currentIndex + 1; if (nextIndex < STAGES.length) { // Advance to next stage const nextStage = STAGES[nextIndex]; job.status = nextStage.toUpperCase(); job.stage = nextStage; job.progress = Math.round(((nextIndex) / STAGES.length) * 100); await setJob(jobId, job); await enqueueStage(nextStage, job); } else { // All stages completed job.status = 'COMPLETED'; job.stage = null; job.progress = 100; await setJob(jobId, job); console.log(`[Scheduler] Job ${jobId} COMPLETED`); } } // --------------------------------------------------------------------------- // Helper: mark job as failed // --------------------------------------------------------------------------- async function failJob(jobId, step, reason) { const job = await getJob(jobId); if (!job) return; job.status = 'FAILED'; job.error = { step, reason }; await setJob(jobId, job); console.log(`[Scheduler] Job ${jobId} FAILED at ${step}: ${reason}`); } // --------------------------------------------------------------------------- // Done queue listener — runs in background // --------------------------------------------------------------------------- async function ensureConsumerGroup(queue, group) { try { await redis.xgroup('CREATE', queue, group, '0', 'MKSTREAM'); } catch (err) { // Group already exists — OK if (!err.message.includes('BUSYGROUP')) throw err; } } async function listenDoneQueue() { const consumerName = `scheduler-${process.pid}`; await ensureConsumerGroup(DONE_QUEUE, DONE_GROUP); console.log(`[Scheduler] Listening on ${DONE_QUEUE} as ${consumerName}`); while (true) { try { const results = await redisSub.xreadgroup( 'GROUP', DONE_GROUP, consumerName, 'COUNT', 10, 'BLOCK', 5000, 'STREAMS', DONE_QUEUE, '>' ); if (!results) continue; for (const [, messages] of results) { for (const [messageId, fields] of messages) { try { const data = JSON.parse(fields[1]); // fields = ['data', '{...}'] const { job_id, step, result, reason } = data; console.log(`[Scheduler] Done event: job=${job_id} step=${step} result=${result}`); if (result === 'ok') { await advanceJob(job_id, step); } else { await failJob(job_id, step, reason || 'Unknown error'); } // ACK the message await redisSub.xack(DONE_QUEUE, DONE_GROUP, messageId); } catch (err) { console.error('[Scheduler] Error processing done event:', err); } } } } catch (err) { if (err.message.includes('Connection is closed')) { console.error('[Scheduler] Redis connection lost, retrying in 3s...'); await new Promise((r) => setTimeout(r, 3000)); } else { console.error('[Scheduler] Done listener error:', err); await new Promise((r) => setTimeout(r, 1000)); } } } } // --------------------------------------------------------------------------- // Ensure worker queue consumer groups exist on startup // --------------------------------------------------------------------------- async function ensureWorkerGroups() { const groups = { 'queue:onnx': 'onnx-workers', 'queue:bie': 'bie-workers', 'queue:nef': 'nef-workers', }; for (const [queue, group] of Object.entries(groups)) { await ensureConsumerGroup(queue, group); } } // --------------------------------------------------------------------------- // API Routes // --------------------------------------------------------------------------- // Health check app.get('/health', async (req, res) => { try { await redis.ping(); res.json({ service: 'task-scheduler', status: 'healthy', timestamp: new Date().toISOString(), redis: 'connected', }); } catch { res.status(503).json({ service: 'task-scheduler', status: 'unhealthy', redis: 'disconnected', }); } }); // POST /jobs — Create a new job app.post('/jobs', upload.fields([ { name: 'model', maxCount: 1 }, { name: 'ref_images', maxCount: 100 }, ]), async (req, res) => { try { // Validate required fields const { model_id, version, platform } = req.body; if (!model_id || !version || !platform) { return res.status(400).json({ error: 'model_id, version, platform are required' }); } if (!req.files || !req.files.model || req.files.model.length === 0) { return res.status(400).json({ error: 'model file is required' }); } const jobId = uuidv4(); if (minio) { // S3 mode: upload files to MinIO const modelFile = req.files.model[0]; const s3Prefix = `jobs/${jobId}`; await uploadToMinIO( `${s3Prefix}/input/${modelFile.originalname}`, modelFile.buffer, modelFile.mimetype || 'application/octet-stream', ); if (req.files.ref_images) { for (const img of req.files.ref_images) { await uploadToMinIO( `${s3Prefix}/input/ref_images/${img.originalname}`, img.buffer, img.mimetype || 'image/jpeg', ); } } console.log(`[Scheduler] Uploaded job ${jobId} files to MinIO`); } else { // Local mode: write to shared volume const jobDir = path.join(JOB_DATA_DIR, jobId); const inputDir = path.join(jobDir, 'input'); const refImagesDir = path.join(inputDir, 'ref_images'); const logsDir = path.join(jobDir, 'logs'); fs.mkdirSync(inputDir, { recursive: true }); fs.mkdirSync(refImagesDir, { recursive: true }); fs.mkdirSync(logsDir, { recursive: true }); const modelFile = req.files.model[0]; const modelPath = path.join(inputDir, modelFile.originalname); fs.writeFileSync(modelPath, modelFile.buffer); if (req.files.ref_images) { for (const img of req.files.ref_images) { const imgPath = path.join(refImagesDir, img.originalname); fs.writeFileSync(imgPath, img.buffer); } } } // Optional flags const parameters = { model_id: parseInt(model_id, 10), version, platform, enable_evaluate: req.body.enable_evaluate === 'true', enable_sim_fp: req.body.enable_sim_fp === 'true', enable_sim_fixed: req.body.enable_sim_fixed === 'true', enable_sim_hw: req.body.enable_sim_hw === 'true', }; // Create job record const job = { job_id: jobId, created_at: new Date().toISOString(), status: 'ONNX', stage: 'onnx', progress: 0, updated_at: new Date().toISOString(), parameters, output: { bie_path: null, nef_path: null }, error: null, }; await setJob(jobId, job); // Enqueue to first stage await enqueueStage('onnx', job); res.status(201).json({ job_id: jobId, status: 'ONNX', message: 'Job created and queued', }); } catch (err) { console.error('[Scheduler] POST /jobs error:', err); res.status(500).json({ error: err.message }); } }); // GET /jobs/:jobId — Query job status app.get('/jobs/:jobId', async (req, res) => { const job = await getJob(req.params.jobId); if (!job) { return res.status(404).json({ error: 'JOB_NOT_FOUND' }); } res.json(job); }); // GET /jobs — List all jobs app.get('/jobs', async (req, res) => { try { const keys = await redis.keys('job:*'); const jobs = []; for (const key of keys) { const raw = await redis.get(key); if (raw) jobs.push(JSON.parse(raw)); } // Sort by created_at descending jobs.sort((a, b) => new Date(b.created_at) - new Date(a.created_at)); res.json(jobs); } catch (err) { res.status(500).json({ error: err.message }); } }); // GET /jobs/:jobId/events — SSE stream app.get('/jobs/:jobId/events', async (req, res) => { const jobId = req.params.jobId; const job = await getJob(jobId); if (!job) { return res.status(404).json({ error: 'JOB_NOT_FOUND' }); } // Set SSE headers res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', }); // Send current state immediately res.write(`data: ${JSON.stringify(job)}\n\n`); // Register client if (!sseClients.has(jobId)) { sseClients.set(jobId, new Set()); } sseClients.get(jobId).add(res); // Heartbeat to keep connection alive const heartbeat = setInterval(() => { res.write(': heartbeat\n\n'); }, 15000); // Cleanup on disconnect req.on('close', () => { clearInterval(heartbeat); const clients = sseClients.get(jobId); if (clients) { clients.delete(res); if (clients.size === 0) sseClients.delete(jobId); } }); }); // GET /jobs/:jobId/download/:filename — Download result file app.get('/jobs/:jobId/download/:filename', async (req, res) => { const { jobId, filename } = req.params; const job = await getJob(jobId); if (!job) { return res.status(404).json({ error: 'JOB_NOT_FOUND' }); } if (minio) { // MinIO mode: fetch from MinIO and send const minioKey = `jobs/${jobId}/${filename}`; try { const result = await getFromMinIO(minioKey); if (!result) { return res.status(404).json({ error: 'FILE_NOT_FOUND' }); } res.setHeader('Content-Disposition', `attachment; filename="${filename}"`); res.setHeader('Content-Length', result.body.length); res.send(result.body); } catch (err) { if (err.name === 'NoSuchKey') { return res.status(404).json({ error: 'FILE_NOT_FOUND' }); } console.error('[Scheduler] Download error:', err); res.status(500).json({ error: 'Download failed' }); } } else { // Local mode: serve from filesystem const filePath = path.join(JOB_DATA_DIR, jobId, filename); if (!fs.existsSync(filePath)) { return res.status(404).json({ error: 'FILE_NOT_FOUND' }); } res.download(filePath); } }); // GET /queues/stats — Queue monitoring stats app.get('/queues/stats', async (req, res) => { try { const queues = ['queue:onnx', 'queue:bie', 'queue:nef', 'queue:done']; const groupNames = { 'queue:onnx': 'onnx-workers', 'queue:bie': 'bie-workers', 'queue:nef': 'nef-workers', 'queue:done': 'scheduler', }; const stats = {}; for (const queue of queues) { const length = await redis.xlen(queue); let consumers = []; let pending = 0; let lag = 0; const group = groupNames[queue]; if (group) { try { const groups = await redis.xinfo('GROUPS', queue); // xinfo GROUPS returns flat array: [name, val, name, val, ...] for (let i = 0; i < groups.length; i++) { const g = groups[i]; // Each group is a flat array of key-value pairs const info = {}; for (let j = 0; j < g.length; j += 2) { info[g[j]] = g[j + 1]; } if (info.name === group) { pending = parseInt(info.pending || '0', 10); lag = parseInt(info.lag || '0', 10); // Get consumers in this group try { const consumerList = await redis.xinfo('CONSUMERS', queue, group); consumers = consumerList.map((c) => { const ci = {}; for (let j = 0; j < c.length; j += 2) { ci[c[j]] = c[j + 1]; } return { name: ci.name, pending: parseInt(ci.pending || '0', 10), idle: parseInt(ci.idle || '0', 10), }; }); } catch { /* no consumers yet */ } break; } } } catch { /* group may not exist yet */ } } stats[queue] = { length, pending, lag, consumers }; } // Also get job summary const keys = await redis.keys('job:*'); const jobSummary = { total: keys.length, ONNX: 0, BIE: 0, NEF: 0, COMPLETED: 0, FAILED: 0 }; for (const key of keys) { const raw = await redis.get(key); if (raw) { const job = JSON.parse(raw); if (jobSummary[job.status] !== undefined) { jobSummary[job.status]++; } } } res.json({ timestamp: new Date().toISOString(), queues: stats, jobs: jobSummary, }); } catch (err) { console.error('[Scheduler] GET /queues/stats error:', err); res.status(500).json({ error: err.message }); } }); // Error handling app.use((err, req, res, next) => { console.error('[Scheduler] Server error:', err); res.status(500).json({ error: 'Internal server error' }); }); // 404 app.use('*', (req, res) => { res.status(404).json({ error: 'Endpoint not found' }); }); // --------------------------------------------------------------------------- // Start // --------------------------------------------------------------------------- async function start() { // Ensure all consumer groups exist await ensureWorkerGroups(); // Start listening for done events (background) listenDoneQueue().catch((err) => { console.error('[Scheduler] Done listener fatal error:', err); process.exit(1); }); app.listen(PORT, () => { console.log(`[Scheduler] Running on port ${PORT}`); console.log(`[Scheduler] Redis: ${REDIS_URL}`); console.log(`[Scheduler] Job data dir: ${JOB_DATA_DIR}`); console.log(`[Scheduler] Storage: ${STORAGE_BACKEND}${minio ? ` (${MINIO_ENDPOINT}/${MINIO_BUCKET})` : ''}`); console.log(`[Scheduler] Stages: ${STAGES.join(' -> ')}`); }); } start().catch((err) => { console.error('[Scheduler] Failed to start:', err); process.exit(1); }); module.exports = app;