jim800121chen 4d381c0b50 feat(task-scheduler): Phase 1 — modularize server + add OAuth/JWKS + /api/v1/* routes
Refactor server.js (647 → 99 lines) into 30+ modules under src/:
- auth/: JWKS validation, JWT middleware, OAuth client_credentials
- routes/v1/: jobs (POST/GET/:id) + promote with input validation
- routes/legacy.js: existing /jobs multipart path (backward compatible)
- services/: jobService, healthService, sseService, statusMapper,
  doneListener
- middleware/: requestId, errorHandler, perClientRateLimit,
  uploadConcurrency, upload (multer + storage)
- redis/: Lua scripts for atomic claim/release_active_job
- storage/: local + minio adapters; fileAccessAgent/: PUT promote client
- config.js: env var validation with fail-fast

Phase 1 features (T1–T11):
- T1 Auth middleware + JWKS (Member Center OAuth2 resource server)
- T2 OAuth client (Member Center client_credentials, Basic auth)
- T3 /api/v1/* router skeleton
- T4 server.js refactor (legacy endpoints fully preserved, real-Redis
  regression verified — existing worker consumer group untouched)
- T5 POST /api/v1/jobs (multipart, OWASP-audited, 2 Critical / 6 Major
  fixed; Risk-A/B documented as accepted)
- T6 GET /api/v1/jobs + GET /:id (cursor pagination, ETag, IDOR-safe)
- T7 POST /jobs/:id/promote (FAA PUT with own service token, 300s
  timeout, fail-fast on missing FAA URL)
- T8 /health upgrade (healthy/degraded/unhealthy + 30s background cache)
- T9 stage_timings (release_active_job in terminal states)
- T10 env + Docker integration (MULTIPART_* + concurrency limiter)
- T11 README (498 lines) + OpenAPI 3.0 spec (1588 lines)

Tests: 630 pass across 29 suites. Updated Dockerfile + .dockerignore +
docker-compose.yml env passthrough (no hardcoded secrets, fail-fast on
missing required vars).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 10:55:05 +08:00

189 lines
6.4 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* Lua script loader / runner for ioredisT5
*
* 職責:
* 1. 從 disk 讀 `claim_active_job.lua`(純文字,方便 Reviewer / Auditor 審)
* 2. 提供 `claimActiveJob({ userId, jobId, jobJson, ttlSeconds })` 介面
* 3. 若 Redis 重啟導致 NOSCRIPT自動 fallback 重新 SCRIPT LOAD 後再 EVAL
*
* 為什麼把 Lua 放獨立檔再用 readFileSync 載入:
* - 把 script 內嵌成 JS 字串會讓 reviewer 看不清楚每行做什麼
* - 純文字 .lua 檔可獨立用 redis-cli SCRIPT LOAD 測試 / 檢查
* - 啟動時讀一次cache效能可接受< 1KB
*
* 為什麼採 SCRIPT LOAD + EVALSHA
* - 每次 EVAL 帶 script body 會占用網路頻寬EVALSHA 只送 sha → 大幅省頻寬
* - Redis 重啟OOM、reboot會清掉 script cache → 我們需要 catch NOSCRIPT 後重 LOAD
*
* 設計取捨 — 不用 ioredis 的 defineCommand
* - defineCommand 雖好用但會把 redis client 物件改造,影響測試 mock 的純度
* - 用顯式 `evalsha` + NOSCRIPT fallback 行為跟下游 expectations 吻合
*
* 安全:
* - jobJson 由呼叫端組裝已序列化過Lua 端只當 String 寫入;任何 user 輸入
* 已在 handler 端做過 sanitizefilename / object_key 等)
* - 三個 KEYS 名稱都由 server 端組裝user 不能控制 Redis key 名
*/
'use strict';
const fs = require('fs');
const path = require('path');
const crypto = require('crypto');
/**
* 讀取 lua script 檔案內容cached
*
* 為什麼包成 function 而非 module-level 常數:
* 讓測試能 reset cache必要時透過 `_internals.resetCache()`)。
*
* @param {string} fileName - 對應 luaScripts/ 下的檔名(不含路徑)
*/
const _scriptCache = new Map();
function loadScript(fileName) {
if (_scriptCache.has(fileName)) {
return _scriptCache.get(fileName);
}
const fullPath = path.join(__dirname, 'luaScripts', fileName);
const body = fs.readFileSync(fullPath, 'utf8');
const sha1 = crypto.createHash('sha1').update(body).digest('hex');
const entry = { body, sha1 };
_scriptCache.set(fileName, entry);
return entry;
}
/**
* 執行 Lua script含 NOSCRIPT 自動 reload 與重試一次。
*
* @param {import('ioredis').Redis} redis
* @param {{ body: string, sha1: string }} script
* @param {string[]} keys
* @param {string[]} args
* @returns {Promise<unknown>}
*/
async function evalScript(redis, script, keys, args) {
try {
return await redis.evalsha(script.sha1, keys.length, ...keys, ...args);
} catch (err) {
// Redis 沒有 cache 此 script → reload 後重試一次
// 不同 driver 的 NOSCRIPT 訊息略有差異,採寬鬆比對
const msg = err && err.message ? err.message : '';
if (msg.includes('NOSCRIPT')) {
// 用 EVAL 走完整 body 一次,順帶會在 server 端 cache
return await redis.eval(script.body, keys.length, ...keys, ...args);
}
throw err;
}
}
/**
* Claim active job + 完整寫入 job recordM5 方案 A
*
* @param {import('ioredis').Redis} redis
* @param {object} args
* @param {string} args.userId — 已 sanitize 過的 user_id
* @param {string} args.jobId — 新生成的 job_iduuidv4
* @param {string} args.jobJson — 完整 job record JSON.stringify 後的字串
* @param {number} args.ttlSeconds — 三把 key 的 TTL預設 7 天 = 604800
* @returns {Promise<
* | { ok: true }
* | { ok: false, conflict: true, activeJobId: string }
* >}
*/
async function claimActiveJob(redis, { userId, jobId, jobJson, ttlSeconds }) {
if (!userId || typeof userId !== 'string') {
throw new Error('[claimActiveJob] userId is required');
}
if (!jobId || typeof jobId !== 'string') {
throw new Error('[claimActiveJob] jobId is required');
}
if (typeof jobJson !== 'string') {
throw new Error('[claimActiveJob] jobJson must be a string');
}
if (!Number.isInteger(ttlSeconds) || ttlSeconds <= 0) {
throw new Error('[claimActiveJob] ttlSeconds must be a positive integer');
}
const script = loadScript('claim_active_job.lua');
const keys = [
`user:${userId}:active_job`,
`job:${jobId}`,
`user:${userId}:jobs`,
];
const args = [jobId, jobJson, String(ttlSeconds)];
const result = await evalScript(redis, script, keys, args);
// ioredis 把 Lua 回的 array 轉成 JS array of strings
if (Array.isArray(result) && result[0] === 'OK') {
return { ok: true };
}
if (Array.isArray(result) && result[0] === 'CONFLICT') {
return {
ok: false,
conflict: true,
activeJobId: typeof result[1] === 'string' ? result[1] : null,
};
}
// 不應該走到,但保險起見回 internal error 給呼叫端
throw new Error(
`[claimActiveJob] Unexpected Lua response: ${JSON.stringify(result)}`
);
}
/**
* Release active jobSec M2 + Reviewer Major-2 修復)。
*
* 用於 enqueue 失敗時補償釋放 user:{userId}:active_job 鎖,
* 配合 release_active_job.lua 的 atomic guard 確保只在 active_job 仍指向自己
* 的 jobId 時才 DEL。
*
* @param {import('ioredis').Redis} redis
* @param {object} args
* @param {string} args.userId — 已 sanitize 過的 user_id
* @param {string} args.jobId — 要釋放的 job_id
* @returns {Promise<
* | { ok: true, released: true } — 成功釋放
* | { ok: true, released: false } — NOOPactive_job 已不是這個 jobId
* >}
*/
async function releaseActiveJob(redis, { userId, jobId }) {
if (!userId || typeof userId !== 'string') {
throw new Error('[releaseActiveJob] userId is required');
}
if (!jobId || typeof jobId !== 'string') {
throw new Error('[releaseActiveJob] jobId is required');
}
const script = loadScript('release_active_job.lua');
const keys = [
`user:${userId}:active_job`,
`job:${jobId}`,
`user:${userId}:jobs`,
];
const args = [jobId];
const result = await evalScript(redis, script, keys, args);
if (Array.isArray(result) && result[0] === 'OK') {
return { ok: true, released: true };
}
if (Array.isArray(result) && result[0] === 'NOOP') {
return { ok: true, released: false };
}
throw new Error(
`[releaseActiveJob] Unexpected Lua response: ${JSON.stringify(result)}`
);
}
module.exports = {
claimActiveJob,
releaseActiveJob,
// 內部 helper 暴露給單元測試
_internals: {
loadScript,
evalScript,
resetCache: () => _scriptCache.clear(),
},
};