feat: Enhance SES webhook handling and add support for soft bounce thresholds

This commit is contained in:
warrenchen 2026-02-18 12:56:54 +09:00
parent 620a1ae237
commit a7752c8ce0
9 changed files with 532 additions and 15 deletions

View File

@ -10,3 +10,13 @@ Webhook__Secrets__member_center=change_me_webhook_secret
Webhook__TimestampSkewSeconds=300
Webhook__AllowNullTenantClient=false
Ses__SkipSignatureValidation=true
Bounce__SoftBounceThreshold=5
MemberCenter__BaseUrl=
MemberCenter__DisableSubscriptionPath=/subscriptions/disable
MemberCenter__TokenPath=/connect/token
MemberCenter__DisableSubscriptionUrl=
MemberCenter__TokenUrl=
MemberCenter__ClientId=
MemberCenter__ClientSecret=
MemberCenter__Scope=newsletter:events.write
MemberCenter__ApiToken=

View File

@ -6,3 +6,5 @@
- If the command fails or hangs due to sandbox limits (for example restore/build stalls), rerun with `sandbox_permissions: "require_escalated"`.
- The escalation request must include a short justification explaining that sandbox restrictions are blocking normal .NET execution.
- Do not change project paths or command intent when escalating; rerun the same command with elevated permissions.
- If a sandbox command appears hung, run it with `tty=true` and explicitly terminate the sandbox session first (send `Ctrl+C`) before escalating.
- Never leave a hung sandbox process running in the background while starting the escalated rerun.

View File

@ -20,6 +20,7 @@
- Campaign / Send Job 基本流程
- Sender Adapter至少一種 ESP
- 投遞與退信記錄
- SES 回流建議採 `Configuration Set -> SNS -> SQS -> Worker`
## 非目標(暫不處理)
- 自建 SMTP server

View File

@ -11,6 +11,44 @@ ESP 介接暫定為 Amazon SES。
- Sender Adapter
- Delivery & Bounce Handling
## Sending Proxy 規格整合
### 目標與邊界
- 接收內容網站或會員平台送來的發送工作
- 呼叫 Amazon SES API優先 SES v2
- 必帶 Configuration Set + Message Tags
- 消費 SES 回流事件Bounce / Complaint / Delivery
- 必要時回寫 Member Center
不負責:
- List-Unsubscribe one-click endpoint 本身的服務實作
- 會員最終名單權威資料庫
### 狀態機(規劃)
Job 狀態:
- queued
- sending
- sent
- partially_failed
- failed
- completed
Recipient 狀態:
- pending
- sent
- delivered
- soft_bounced
- hard_bounced
- complained
- suppressed
### SES 事件回流架構(建議)
- `SES Configuration Set -> SNS Topic -> SQS Queue -> ECS Worker`
- Worker 職責:
- Poll SQS
- 解析 SNS envelope 與 SES payload
- 更新 DB 狀態
- 必要時呼叫 Member Center 停用/註記 API
## 信任邊界與 Auth 模型
### 外部角色
- Member Center事件來源與名單權威來源authority

View File

@ -54,8 +54,12 @@ Member Center 為多租戶架構,信件內容由各租戶網站產生後送入
- 依規則過濾已退訂、bounced、黑名單
4. 切分成可控批次batch寫入 Outbox。
5. Sender Worker 取出 batch轉成 SES API 請求。
6. SES 回應 message_id → 記錄 delivery log。
7. 更新 Send Job 進度(成功/失敗/重試)。
6. 發送時必帶:
- SES Configuration Set
- Message Tags至少含 campaign_id / site_id / list_id
- Newsletter 類型需帶 `List-Unsubscribe``List-Unsubscribe-Post` headers
7. SES 回應 message_id → 記錄 delivery log。
8. 更新 Send Job 進度(成功/失敗/重試)。
控速策略(範例):
- 全域 TPS 上限 + tenant TPS 上限
@ -70,19 +74,36 @@ Member Center 為多租戶架構,信件內容由各租戶網站產生後送入
目的:處理 ESP 回報的 bounce/complaint並回寫本地名單狀態。
流程:
1. SES 透過 SNS/Webhook 回報事件bounce/complaint/delivery/open/click
2. Webhook 驗證簽章與來源SES/SNS 驗證)
1. SES 事件由 Configuration Set 發送至 SNS再落到 SQS
2. ECS Worker 輪詢 SQS解析 SNS envelope 與 SES payload
3. 將事件寫入 Inboxappend-only
4. Consumer 解析事件:
- hard bounce → 標記 bounced + 停用
- soft bounce → 記錄次數,超過門檻停用
- complaint → 立即停用並列入黑名單
- hard bounce → 立即標記 blacklisted同義於 `suppressed`
- soft bounce → 累計次數,達門檻(預設 5才標記 blacklisted`suppressed`
- complaint → 立即取消訂閱並標記 blacklisted`suppressed`
- suppression 事件 → 直接對應為 `suppressed`(即黑名單)
5. 更新 List Store 快照與投遞記錄。
6. 回寫 Member Center 以停用訂閱(例如 hard bounce / complaint
6. 回寫 Member Center僅在以下條件
- hard bounce已設黑名單
- soft bounce達門檻後設黑名單
- complaint取消訂閱
- suppression設黑名單
補充:
- Unknown event 不應使 worker crash應記錄後送入 DLQ
- Throttle/暫時性網路錯誤使用指數退避重試
回寫規則:
- Send Engine 僅回寫「停用原因」與必要欄位
- Member Center 需提供可標註來源的欄位(例如 `disabled_by=send_engine`
- 回寫原因碼固定為:
- `hard_bounce`
- `soft_bounce_threshold`
- `complaint`
- `suppression`
名詞定義:
- `blacklisted``suppressed` 同義,表示此收件者不可再發送
資料一致性:
- 任何狀態改變需保留歷史append-only events + current snapshot

View File

@ -14,3 +14,12 @@
- Webhook 驗證規則為 tenant 綁定:`auth_clients.tenant_id` 必須等於 payload `tenant_id`
- 不支援 `X-Client-Id` fallback
- 預設拒絕 `tenant_id = NULL` 的通用 client`Webhook__AllowNullTenantClient=false`
- Member Center 回寫授權(建議):
- `MemberCenter__BaseUrl`(建議)
- `MemberCenter__DisableSubscriptionPath`(預設 `/subscriptions/disable`
- `MemberCenter__TokenPath`(預設 `/connect/token`
- `MemberCenter__ClientId`
- `MemberCenter__ClientSecret`
- `MemberCenter__Scope=newsletter:events.write`
- `MemberCenter__DisableSubscriptionUrl``MemberCenter__TokenUrl` 可用完整 URL 覆蓋fallback
- `MemberCenter__ApiToken` 僅作暫時 fallback非首選

View File

@ -40,6 +40,10 @@ scope 最小化:
- `newsletter:events.write`(停用回寫)
- `newsletter:list.read`(若未來仍需查詢)
實作約定:
- 優先走 token endpointclient credentials
- `ApiToken` 僅作暫時 fallback建議逐步移除
## 通用欄位
### Timestamp
- 欄位:`occurred_at`
@ -166,6 +170,35 @@ Response
}
```
### C-1. Sending Proxy Submit Job整合規格
用途:對齊內容網站/會員平台呼叫發信代理的標準接口。
Endpoint
- `POST /v1/send-jobs`
Request Body欄位
- `message_type``newsletter` | `transactional`
- `from`:發件人
- `to`:收件人陣列
- `subject`:主旨
- `html`HTML 內容
- `text`:純文字內容
- `headers`:自定義 header白名單
- `list_unsubscribe.url`:退訂 URL
- `list_unsubscribe.mailto`:可選
- `tags.campaign_id` / `tags.site_id` / `tags.list_id` / `tags.segment`
- `idempotency_key`:冪等鍵
Response
- `job_id`
- `status=queued`
規則:
- 必須帶 Configuration Set + Message Tags 後才能呼叫 SES
- `newsletter` 類型需帶:
- `List-Unsubscribe`
- `List-Unsubscribe-Post: List-Unsubscribe=One-Click`
### D. 查詢 Send Job
Endpoint
- `GET /api/send-jobs/{id}`
@ -204,9 +237,13 @@ Response
## WebhookSES → Send Engine
### F. SES 事件回報
用途:接收 bounce/complaint/delivery/open/click 等事件。
用途:接收 bounce/hard_bounced/soft_bounced/complaint/suppression/delivery/open/click 等事件。
Endpoint
推薦架構(正式):
- `SES Configuration Set -> SNS -> SQS -> ECS Worker`
- 由 Worker 消費事件,不要求對外公開 webhook
相容模式(可選):
- `POST /webhooks/ses`
驗證:
@ -227,6 +264,24 @@ Request Body示意
Response
- `200 OK`
事件對應規則(固定):
- `hard_bounced`:立即設為黑名單(`suppressed`
- `soft_bounced`:累計達門檻後設為黑名單(`suppressed`
- `complaint`:取消訂閱並回寫 Member Center
- `suppression`:設為黑名單(`suppressed`
回寫 Member Center 條件:
- `hard_bounced`:設黑名單後回寫
- `soft_bounced`:達門檻設黑名單後回寫
- `complaint`:立即回寫
- `suppression`:設黑名單後回寫
回寫原因碼(固定):
- `hard_bounce`
- `soft_bounce_threshold`
- `complaint`
- `suppression`
## 外部 APISend Engine → Member Center
以下為 Member Center 端提供的 API非 Send Engine 的 OpenAPI 規格範圍。
@ -234,7 +289,7 @@ Response
用途:因 hard bounce / complaint 停用訂閱,並在 Member Center 註記來源。
EndpointMember Center 提供):
- `POST /api/subscriptions/disable`
- `POST /subscriptions/disable`
Scope
- `newsletter:events.write`
@ -257,3 +312,12 @@ Request Body示意
- `409`重放或事件重複nonce / event_id
- `422`:資料格式錯誤
- `500`:伺服器內部錯誤
## Retry 策略(整合規格)
- Throttle指數退避重試
- Temporary network error重試
- Hard failure不重試
- Retry 上限可設定(例如 5 次)
## 相關環境參數
- `Bounce__SoftBounceThreshold`soft bounce 轉黑名單門檻(預設 `5`

View File

@ -11,9 +11,52 @@ security:
- bearerAuth: []
paths:
/v1/send-jobs:
post:
summary: Submit send job (sending proxy)
security:
- bearerAuth: []
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/SubmitSendJobRequest'
responses:
'200':
description: Queued
content:
application/json:
schema:
$ref: '#/components/schemas/SubmitSendJobResponse'
'401':
description: Unauthorized
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
'403':
description: Forbidden
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
'409':
description: Idempotency conflict
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
'422':
description: Validation error
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
/api/send-jobs:
post:
summary: Create send job
summary: Create send job (legacy/internal)
security:
- bearerAuth: []
requestBody:
@ -356,6 +399,76 @@ components:
type: string
enum: [pending, running, completed, failed, cancelled]
SubmitSendJobRequest:
type: object
required: [message_type, from, to, subject, html, text, tags, idempotency_key]
properties:
message_type:
type: string
enum: [newsletter, transactional]
from:
type: string
format: email
to:
type: array
items:
type: string
format: email
minItems: 1
subject:
type: string
minLength: 1
html:
type: string
text:
type: string
headers:
type: object
additionalProperties:
type: string
list_unsubscribe:
$ref: '#/components/schemas/ListUnsubscribe'
tags:
$ref: '#/components/schemas/MessageTags'
idempotency_key:
type: string
minLength: 1
SubmitSendJobResponse:
type: object
required: [job_id, status]
properties:
job_id:
type: string
format: uuid
status:
type: string
enum: [queued]
ListUnsubscribe:
type: object
required: [url]
properties:
url:
type: string
format: uri
mailto:
type: string
format: email
MessageTags:
type: object
required: [campaign_id, site_id, list_id, segment]
properties:
campaign_id:
type: string
site_id:
type: string
list_id:
type: string
segment:
type: string
TrackingOptions:
type: object
properties:
@ -398,7 +511,7 @@ components:
format: email
status:
type: string
enum: [active, unsubscribed, bounced, complaint]
enum: [active, unsubscribed, bounced, complaint, suppressed]
preferences:
type: object
additionalProperties: true
@ -436,7 +549,7 @@ components:
properties:
event_type:
type: string
enum: [bounce, complaint, delivery, open, click]
enum: [bounce, hard_bounced, soft_bounced, complaint, suppression, delivery, open, click]
message_id:
type: string
tenant_id:

View File

@ -1,6 +1,7 @@
using System.Security.Claims;
using System.Text;
using System.Text.Json;
using System.Net.Http.Json;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.EntityFrameworkCore;
using Microsoft.IdentityModel.Tokens;
@ -314,6 +315,11 @@ app.MapPost("/webhooks/lists/full-sync", async (HttpContext httpContext, FullSyn
app.MapPost("/webhooks/ses", async (HttpContext httpContext, SesEventRequest request, SendEngineDbContext db) =>
{
if (request.TenantId == Guid.Empty || string.IsNullOrWhiteSpace(request.Email))
{
return Results.UnprocessableEntity(new { error = "tenant_id_email_required" });
}
var skipValidation = builder.Configuration.GetValue("Ses:SkipSignatureValidation", true);
var sesSignature = httpContext.Request.Headers["X-Amz-Sns-Signature"].ToString();
if (!skipValidation && string.IsNullOrWhiteSpace(sesSignature))
@ -321,13 +327,18 @@ app.MapPost("/webhooks/ses", async (HttpContext httpContext, SesEventRequest req
return Results.Unauthorized();
}
var normalizedEmail = request.Email.Trim().ToLowerInvariant();
var normalizedEventType = NormalizeSesEventType(request.EventType, request.BounceType);
request.Email = normalizedEmail;
request.EventType = normalizedEventType;
var payload = JsonSerializer.Serialize(request);
var inbox = new EventInbox
{
Id = Guid.NewGuid(),
TenantId = request.TenantId,
EventType = $"ses.{request.EventType}",
EventType = $"ses.{normalizedEventType}",
Source = "ses",
Payload = payload,
ReceivedAt = DateTimeOffset.UtcNow,
@ -337,6 +348,11 @@ app.MapPost("/webhooks/ses", async (HttpContext httpContext, SesEventRequest req
db.EventsInbox.Add(inbox);
await db.SaveChangesAsync();
await ApplySesEventAsync(db, builder.Configuration, request, normalizedEventType);
inbox.Status = "processed";
inbox.ProcessedAt = DateTimeOffset.UtcNow;
await db.SaveChangesAsync();
return Results.Ok();
}).WithName("SesWebhook").WithOpenApi();
@ -476,6 +492,249 @@ static string NormalizeStatus(string? status, string fallback)
"unsubscribed" => "unsubscribed",
"bounced" => "bounced",
"complaint" => "complaint",
"suppressed" => "suppressed",
_ => fallback
};
}
static string NormalizeSesEventType(string? eventType, string? bounceType)
{
var normalized = eventType?.Trim().ToLowerInvariant() ?? string.Empty;
if (normalized == "bounce")
{
var bounce = bounceType?.Trim().ToLowerInvariant() ?? string.Empty;
return bounce switch
{
"hard" => "hard_bounced",
"soft" => "soft_bounced",
_ => "bounce"
};
}
return normalized;
}
static async Task ApplySesEventAsync(
SendEngineDbContext db,
IConfiguration configuration,
SesEventRequest request,
string normalizedEventType)
{
var subscriber = await db.Subscribers
.FirstOrDefaultAsync(x => x.TenantId == request.TenantId && x.Email == request.Email);
if (subscriber is null)
{
return;
}
switch (normalizedEventType)
{
case "hard_bounced":
await SuppressAndNotifyAsync(db, configuration, request.TenantId, subscriber.Id, "hard_bounce", request.OccurredAt);
return;
case "soft_bounced":
var threshold = configuration.GetValue("Bounce:SoftBounceThreshold", 5);
var reached = await IsSoftBounceThresholdReachedAsync(db, request.TenantId, request.Email, threshold);
if (reached)
{
await SuppressAndNotifyAsync(db, configuration, request.TenantId, subscriber.Id, "soft_bounce_threshold", request.OccurredAt);
}
return;
case "complaint":
await SuppressAndNotifyAsync(db, configuration, request.TenantId, subscriber.Id, "complaint", request.OccurredAt);
return;
case "suppression":
await SuppressAndNotifyAsync(db, configuration, request.TenantId, subscriber.Id, "suppression", request.OccurredAt);
return;
default:
return;
}
}
static async Task<bool> IsSoftBounceThresholdReachedAsync(
SendEngineDbContext db,
Guid tenantId,
string normalizedEmail,
int threshold)
{
if (threshold < 1)
{
threshold = 1;
}
var marker = $"\"Email\":\"{normalizedEmail}\"";
var count = await db.EventsInbox.AsNoTracking()
.CountAsync(x =>
x.TenantId == tenantId &&
x.Source == "ses" &&
x.EventType == "ses.soft_bounced" &&
x.Payload.Contains(marker));
return count >= threshold;
}
static async Task SuppressAndNotifyAsync(
SendEngineDbContext db,
IConfiguration configuration,
Guid tenantId,
Guid subscriberId,
string reason,
DateTimeOffset occurredAt)
{
var subscriber = await db.Subscribers
.FirstOrDefaultAsync(x => x.TenantId == tenantId && x.Id == subscriberId);
if (subscriber is null)
{
return;
}
var now = DateTimeOffset.UtcNow;
subscriber.Status = "suppressed";
subscriber.UpdatedAt = now;
var memberships = await db.ListMembers
.Where(x => x.TenantId == tenantId && x.SubscriberId == subscriberId)
.ToListAsync();
foreach (var member in memberships)
{
member.Status = "suppressed";
member.UpdatedAt = now;
}
await db.SaveChangesAsync();
var listIds = memberships.Select(x => x.ListId).Distinct().ToArray();
await NotifyMemberCenterDisableAsync(configuration, tenantId, subscriberId, listIds, reason, occurredAt);
}
static async Task NotifyMemberCenterDisableAsync(
IConfiguration configuration,
Guid tenantId,
Guid subscriberId,
IReadOnlyCollection<Guid> listIds,
string reason,
DateTimeOffset occurredAt)
{
var url = ResolveMemberCenterUrl(
configuration,
"MemberCenter:DisableSubscriptionUrl",
"MemberCenter:BaseUrl",
"MemberCenter:DisableSubscriptionPath",
"/subscriptions/disable");
if (string.IsNullOrWhiteSpace(url))
{
return;
}
using var client = new HttpClient();
var token = await ResolveMemberCenterAccessTokenAsync(configuration, client);
if (string.IsNullOrWhiteSpace(token))
{
return;
}
client.DefaultRequestHeaders.Authorization = new("Bearer", token);
foreach (var listId in listIds)
{
var payload = new
{
tenant_id = tenantId,
subscriber_id = subscriberId,
list_id = listId,
reason,
disabled_by = "send_engine",
occurred_at = occurredAt
};
try
{
await client.PostAsJsonAsync(url, payload);
}
catch
{
// Best-effort callback: errors are intentionally swallowed for now.
}
}
}
static async Task<string?> ResolveMemberCenterAccessTokenAsync(IConfiguration configuration, HttpClient client)
{
var tokenUrl = ResolveMemberCenterUrl(
configuration,
"MemberCenter:TokenUrl",
"MemberCenter:BaseUrl",
"MemberCenter:TokenPath",
"/connect/token");
var clientId = configuration["MemberCenter:ClientId"];
var clientSecret = configuration["MemberCenter:ClientSecret"];
var scope = configuration["MemberCenter:Scope"];
if (!string.IsNullOrWhiteSpace(tokenUrl) &&
!string.IsNullOrWhiteSpace(clientId) &&
!string.IsNullOrWhiteSpace(clientSecret))
{
var form = new Dictionary<string, string>
{
["grant_type"] = "client_credentials",
["client_id"] = clientId,
["client_secret"] = clientSecret
};
if (!string.IsNullOrWhiteSpace(scope))
{
form["scope"] = scope;
}
try
{
using var response = await client.PostAsync(tokenUrl, new FormUrlEncodedContent(form));
if (response.IsSuccessStatusCode)
{
await using var stream = await response.Content.ReadAsStreamAsync();
using var json = await JsonDocument.ParseAsync(stream);
if (json.RootElement.TryGetProperty("access_token", out var tokenElement))
{
var accessToken = tokenElement.GetString();
if (!string.IsNullOrWhiteSpace(accessToken))
{
return accessToken;
}
}
}
}
catch
{
// Fallback to static token below.
}
}
return configuration["MemberCenter:ApiToken"];
}
static string? ResolveMemberCenterUrl(
IConfiguration configuration,
string fullUrlKey,
string baseUrlKey,
string pathKey,
string defaultPath)
{
var fullUrl = configuration[fullUrlKey];
if (!string.IsNullOrWhiteSpace(fullUrl))
{
return fullUrl;
}
var baseUrl = configuration[baseUrlKey];
if (string.IsNullOrWhiteSpace(baseUrl))
{
return null;
}
var path = configuration[pathKey];
if (string.IsNullOrWhiteSpace(path))
{
path = defaultPath;
}
return $"{baseUrl.TrimEnd('/')}/{path.TrimStart('/')}";
}