warrenchen d49c30b447 feat: Add SQS integration for SES event processing
- Introduced SqsSesPollerWorker to poll messages from SQS and process SES events.
- Implemented SesEventProcessingService to handle SES event payloads and store them in the database.
- Updated DevMockSenderWorker to support new SES sending methods and improved logging for unsubscribe headers.
- Added AWS SDK for SQS to project dependencies.
2026-02-26 17:10:25 +09:00

5.9 KiB
Raw Permalink Blame History

Flows

本文件描述 Send Engine 的三條核心流程:訂閱事件同步、發送排程、退信處理。 本引擎只負責事件控制與執行,不提供 UI。 ESP 介接暫定為 Amazon SES。 Member Center 為多租戶架構,信件內容由各租戶網站產生後送入 Send Engine。

1. 訂閱事件同步流程

目的:以事件為主,建立/更新本地名單快照tenant/list scope不做跨租戶查詢。

流程:

  1. Member Center 發出事件(subscription.activated / subscription.unsubscribed / preferences.updated)。
  2. Send Engine 的 Event Ingest 接收事件Webhook 或 Queue 擇一)。
  3. 進行驗證tenant scope、簽章/Token、重放保護
  4. 事件落地為 Inboxappend-only標記 received_at
  5. Consumer 依 event_id 去重並處理:
    • activated → upsert 訂閱者、掛到 tenant/list
    • unsubscribed → 標記狀態為 unsubscribed保留歷史
    • preferences.updated → 更新偏好欄位與訂閱狀態
  6. 寫入 List Store 快照(只增量更新,不拉全量)。
  7. 記錄處理結果與版本號(供重播與對帳)。

錯誤與重試(目前實作):

  • 驗證失敗 → 直接拒絕401/403不寫入 Inbox。
  • DB/執行錯誤 → 由請求端或佇列重送;尚未提供獨立 DLQ worker 與統一退避策略配置。
  • 事件格式錯誤 → 回 422

1b. 全量名單同步流程(由 Member Center 主動推送)

目的:避免 Send Engine 透過 API 拉取名單,降低名單外流風險。

流程:

  1. Member Center 內部管理介面或排程觸發「名單重建」。
  2. Member Center 以 webhook 推送全量名單或分批名單事件(推薦分批)。
  3. Send Engine 驗證來源與 tenant scope。
  4. 事件寫入 InboxConsumer 以批次方式重建 List Store。
  5. 重建完成後標記同步版本sync_version

注意事項:

  • 建議以批次/游標推送,避免單筆 payload 過大
  • 可於 Send Engine 端提供 sync_received 回應與進度回報

2. 發送排程流程

目的:從租戶網站送入的內容建立 Send Job並由背景 worker 執行發送。

目前實作流程:

  1. 租戶網站以 Member Center 簽發的 token 呼叫 Send Engine API 建立 Campaign/Send Job
    • 必填tenant_id、list_id、內容subject/body/template 其一或組合)
    • 選填排程時間、發送窗口、追蹤設定open/click
  2. Send Engine 驗證 tenant scope 與內容完整性,建立 Send Job。
    • tenant_id 以 token 為準body 的 tenant_id 僅作一致性檢查
    • tenant 必須預先存在(建議由 Installer 建立)
    • list_id 若不存在Send Engine 會在該 tenant 下自動建立 listplaceholder
  3. DevMockSenderWorker 輪詢 send_jobs(status=pending),執行時先改為 running
  4. worker 讀取 subscriptions(status=active),並過濾不可發送對象:
    • unsubscribed / suppressed 不發送
    • 若啟用 one-click token endpoint僅發送 status=issued 的收件者
  5. 發送執行:
    • ESP__Provider=mock:僅模擬發送,寫入預覽事件並輸出 console log
    • ESP__Provider=ses
    • Ses__SendMode=raw_bulk:使用 SES v2 SendEmail,依內容分組每次最多 50 位收件者
    • Ses__SendMode=bulk_template:使用 SES v2 SendBulkEmail(每批最多 50
  6. 更新 Send Job 狀態:
    • 全部成功:completed
    • 例外或部分失敗:failed

控速策略(範例):

  • 全域 TPS 上限 + tenant TPS 上限
  • SES provider-specific rate limit
  • 超限則延後批次並回寫排程

內容管理注意事項:

  • Send Engine 不負責內容產生與編輯,僅接收已渲染或可渲染內容
  • 若採模板渲染,模板需由租戶網站提供並由 Send Engine 以 tenant scope 渲染

3. 退信處理流程

目的:處理 ESP 回報的 bounce/complaint並回寫本地名單狀態。

目前實作流程Webhook + SQS Poller

  1. SqsSesPollerWorker 輪詢 SQS 取得 SNS envelope直接呼叫內部 SES processing service。
  2. POST /webhooks/ses 也會呼叫同一套 processing service相容直接呼叫模式
  3. 驗證(可透過 Ses__SkipSignatureValidation 控制是否要求簽章)。
  4. 將事件寫入 Inboxappend-only
  5. Consumer 解析事件:
    • hard bounce → 立即標記 blacklisted同義於 suppressed
    • soft bounce → 累計次數,達門檻(預設 5才標記 blacklistedsuppressed
    • complaint → 立即標記 blacklistedsuppressed
    • suppression 事件 → 直接對應為 suppressed(即黑名單)
  6. 更新 List Store 快照與投遞記錄。
  7. 回寫 Member Center僅在以下條件
    • hard bounce已設黑名單
    • soft bounce達門檻後設黑名單
    • complaint設黑名單
    • suppression設黑名單

補充:

  • Unknown event 目前會略過或記錄,不會讓 worker 中止(不進獨立 DLQ
  • 已落地 SQS -> Worker -> 內部 SES processing service/webhooks/ses 仍保留直接呼叫相容模式
  • Bounce 事件會依 bounceType 正規化為 hard_bouncedPermanentsoft_bouncedTransient
  • 亂序事件以優先序處理:complaint > hard_bounced/suppression > soft_bounced > delivery

回寫規則:

  • Send Engine 僅回寫「停用原因」與必要欄位
  • Member Center 需提供可標註來源的欄位(例如 disabled_by=send_engine
  • 回寫原因碼固定為:
  • hard_bounce
  • soft_bounce_threshold
  • complaint
  • suppression

名詞定義:

  • blacklistedsuppressed 同義,表示此收件者不可再發送

資料一致性:

  • 任何狀態改變需保留歷史append-only events + current snapshot
  • 避免以 ESP 回報反向新增不存在的訂閱者(僅更新已存在者)

多租戶安全性補充:

  • 所有事件與 Send Job 必須攜帶 tenant_id且不可跨租戶讀寫
  • API 僅允許 tenant scope 的操作list/read/write