From a7752c8ce03a7fa6edba82f37fbccbfe4d78e70b Mon Sep 17 00:00:00 2001 From: warrenchen Date: Wed, 18 Feb 2026 12:56:54 +0900 Subject: [PATCH] feat: Enhance SES webhook handling and add support for soft bounce thresholds --- .env.example | 10 ++ AGENTS.md | 2 + README.md | 1 + docs/DESIGN.md | 38 +++++ docs/FLOWS.md | 37 +++-- docs/INSTALL.md | 9 ++ docs/OPENAPI.md | 70 ++++++++- docs/openapi.yaml | 119 +++++++++++++++- src/SendEngine.Api/Program.cs | 261 +++++++++++++++++++++++++++++++++- 9 files changed, 532 insertions(+), 15 deletions(-) diff --git a/.env.example b/.env.example index 6507a14..dd3770c 100644 --- a/.env.example +++ b/.env.example @@ -10,3 +10,13 @@ Webhook__Secrets__member_center=change_me_webhook_secret Webhook__TimestampSkewSeconds=300 Webhook__AllowNullTenantClient=false Ses__SkipSignatureValidation=true +Bounce__SoftBounceThreshold=5 +MemberCenter__BaseUrl= +MemberCenter__DisableSubscriptionPath=/subscriptions/disable +MemberCenter__TokenPath=/connect/token +MemberCenter__DisableSubscriptionUrl= +MemberCenter__TokenUrl= +MemberCenter__ClientId= +MemberCenter__ClientSecret= +MemberCenter__Scope=newsletter:events.write +MemberCenter__ApiToken= diff --git a/AGENTS.md b/AGENTS.md index b0f7ba6..53957d1 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -6,3 +6,5 @@ - If the command fails or hangs due to sandbox limits (for example restore/build stalls), rerun with `sandbox_permissions: "require_escalated"`. - The escalation request must include a short justification explaining that sandbox restrictions are blocking normal .NET execution. - Do not change project paths or command intent when escalating; rerun the same command with elevated permissions. +- If a sandbox command appears hung, run it with `tty=true` and explicitly terminate the sandbox session first (send `Ctrl+C`) before escalating. +- Never leave a hung sandbox process running in the background while starting the escalated rerun. diff --git a/README.md b/README.md index 37d7d1d..62063a6 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,7 @@ - Campaign / Send Job 基本流程 - Sender Adapter(至少一種 ESP) - 投遞與退信記錄 +- SES 回流建議採 `Configuration Set -> SNS -> SQS -> Worker` ## 非目標(暫不處理) - 自建 SMTP server diff --git a/docs/DESIGN.md b/docs/DESIGN.md index cee4568..a82b9a9 100644 --- a/docs/DESIGN.md +++ b/docs/DESIGN.md @@ -11,6 +11,44 @@ ESP 介接暫定為 Amazon SES。 - Sender Adapter - Delivery & Bounce Handling +## Sending Proxy 規格整合 +### 目標與邊界 +- 接收內容網站或會員平台送來的發送工作 +- 呼叫 Amazon SES API(優先 SES v2) +- 必帶 Configuration Set + Message Tags +- 消費 SES 回流事件(Bounce / Complaint / Delivery) +- 必要時回寫 Member Center + +不負責: +- List-Unsubscribe one-click endpoint 本身的服務實作 +- 會員最終名單權威資料庫 + +### 狀態機(規劃) +Job 狀態: +- queued +- sending +- sent +- partially_failed +- failed +- completed + +Recipient 狀態: +- pending +- sent +- delivered +- soft_bounced +- hard_bounced +- complained +- suppressed + +### SES 事件回流架構(建議) +- `SES Configuration Set -> SNS Topic -> SQS Queue -> ECS Worker` +- Worker 職責: +- Poll SQS +- 解析 SNS envelope 與 SES payload +- 更新 DB 狀態 +- 必要時呼叫 Member Center 停用/註記 API + ## 信任邊界與 Auth 模型 ### 外部角色 - Member Center:事件來源與名單權威來源(authority) diff --git a/docs/FLOWS.md b/docs/FLOWS.md index 484fe93..3e1de86 100644 --- a/docs/FLOWS.md +++ b/docs/FLOWS.md @@ -54,8 +54,12 @@ Member Center 為多租戶架構,信件內容由各租戶網站產生後送入 - 依規則過濾(已退訂、bounced、黑名單) 4. 切分成可控批次(batch),寫入 Outbox。 5. Sender Worker 取出 batch,轉成 SES API 請求。 -6. SES 回應 message_id → 記錄 delivery log。 -7. 更新 Send Job 進度(成功/失敗/重試)。 +6. 發送時必帶: + - SES Configuration Set + - Message Tags(至少含 campaign_id / site_id / list_id) + - Newsletter 類型需帶 `List-Unsubscribe` 與 `List-Unsubscribe-Post` headers +7. SES 回應 message_id → 記錄 delivery log。 +8. 更新 Send Job 進度(成功/失敗/重試)。 控速策略(範例): - 全域 TPS 上限 + tenant TPS 上限 @@ -70,19 +74,36 @@ Member Center 為多租戶架構,信件內容由各租戶網站產生後送入 目的:處理 ESP 回報的 bounce/complaint,並回寫本地名單狀態。 流程: -1. SES 透過 SNS/Webhook 回報事件(bounce/complaint/delivery/open/click)。 -2. Webhook 驗證簽章與來源(SES/SNS 驗證)。 +1. SES 事件由 Configuration Set 發送至 SNS,再落到 SQS。 +2. ECS Worker 輪詢 SQS,解析 SNS envelope 與 SES payload。 3. 將事件寫入 Inbox(append-only)。 4. Consumer 解析事件: - - hard bounce → 標記 bounced + 停用 - - soft bounce → 記錄次數,超過門檻停用 - - complaint → 立即停用並列入黑名單 + - hard bounce → 立即標記 blacklisted(同義於 `suppressed`) + - soft bounce → 累計次數,達門檻(預設 5)才標記 blacklisted(`suppressed`) + - complaint → 立即取消訂閱並標記 blacklisted(`suppressed`) + - suppression 事件 → 直接對應為 `suppressed`(即黑名單) 5. 更新 List Store 快照與投遞記錄。 -6. 回寫 Member Center 以停用訂閱(例如 hard bounce / complaint)。 +6. 回寫 Member Center(僅在以下條件): + - hard bounce:已設黑名單 + - soft bounce:達門檻後設黑名單 + - complaint:取消訂閱 + - suppression:設黑名單 + +補充: +- Unknown event 不應使 worker crash,應記錄後送入 DLQ +- Throttle/暫時性網路錯誤使用指數退避重試 回寫規則: - Send Engine 僅回寫「停用原因」與必要欄位 - Member Center 需提供可標註來源的欄位(例如 `disabled_by=send_engine`) +- 回寫原因碼固定為: +- `hard_bounce` +- `soft_bounce_threshold` +- `complaint` +- `suppression` + +名詞定義: +- `blacklisted` 與 `suppressed` 同義,表示此收件者不可再發送 資料一致性: - 任何狀態改變需保留歷史(append-only events + current snapshot) diff --git a/docs/INSTALL.md b/docs/INSTALL.md index 5ead951..11ea88a 100644 --- a/docs/INSTALL.md +++ b/docs/INSTALL.md @@ -14,3 +14,12 @@ - Webhook 驗證規則為 tenant 綁定:`auth_clients.tenant_id` 必須等於 payload `tenant_id` - 不支援 `X-Client-Id` fallback - 預設拒絕 `tenant_id = NULL` 的通用 client(`Webhook__AllowNullTenantClient=false`) +- Member Center 回寫授權(建議): + - `MemberCenter__BaseUrl`(建議) + - `MemberCenter__DisableSubscriptionPath`(預設 `/subscriptions/disable`) + - `MemberCenter__TokenPath`(預設 `/connect/token`) + - `MemberCenter__ClientId` + - `MemberCenter__ClientSecret` + - `MemberCenter__Scope=newsletter:events.write` + - `MemberCenter__DisableSubscriptionUrl` 與 `MemberCenter__TokenUrl` 可用完整 URL 覆蓋(fallback) + - `MemberCenter__ApiToken` 僅作暫時 fallback(非首選) diff --git a/docs/OPENAPI.md b/docs/OPENAPI.md index 68e86d2..fd53618 100644 --- a/docs/OPENAPI.md +++ b/docs/OPENAPI.md @@ -40,6 +40,10 @@ scope 最小化: - `newsletter:events.write`(停用回寫) - `newsletter:list.read`(若未來仍需查詢) +實作約定: +- 優先走 token endpoint(client credentials) +- `ApiToken` 僅作暫時 fallback(建議逐步移除) + ## 通用欄位 ### Timestamp - 欄位:`occurred_at` @@ -166,6 +170,35 @@ Response: } ``` +### C-1. Sending Proxy Submit Job(整合規格) +用途:對齊內容網站/會員平台呼叫發信代理的標準接口。 + +Endpoint: +- `POST /v1/send-jobs` + +Request Body(欄位): +- `message_type`:`newsletter` | `transactional` +- `from`:發件人 +- `to`:收件人陣列 +- `subject`:主旨 +- `html`:HTML 內容 +- `text`:純文字內容 +- `headers`:自定義 header(白名單) +- `list_unsubscribe.url`:退訂 URL +- `list_unsubscribe.mailto`:可選 +- `tags.campaign_id` / `tags.site_id` / `tags.list_id` / `tags.segment` +- `idempotency_key`:冪等鍵 + +Response: +- `job_id` +- `status=queued` + +規則: +- 必須帶 Configuration Set + Message Tags 後才能呼叫 SES +- `newsletter` 類型需帶: +- `List-Unsubscribe` +- `List-Unsubscribe-Post: List-Unsubscribe=One-Click` + ### D. 查詢 Send Job Endpoint: - `GET /api/send-jobs/{id}` @@ -204,9 +237,13 @@ Response: ## Webhook:SES → Send Engine ### F. SES 事件回報 -用途:接收 bounce/complaint/delivery/open/click 等事件。 +用途:接收 bounce/hard_bounced/soft_bounced/complaint/suppression/delivery/open/click 等事件。 -Endpoint: +推薦架構(正式): +- `SES Configuration Set -> SNS -> SQS -> ECS Worker` +- 由 Worker 消費事件,不要求對外公開 webhook + +相容模式(可選): - `POST /webhooks/ses` 驗證: @@ -227,6 +264,24 @@ Request Body(示意): Response: - `200 OK` +事件對應規則(固定): +- `hard_bounced`:立即設為黑名單(`suppressed`) +- `soft_bounced`:累計達門檻後設為黑名單(`suppressed`) +- `complaint`:取消訂閱並回寫 Member Center +- `suppression`:設為黑名單(`suppressed`) + +回寫 Member Center 條件: +- `hard_bounced`:設黑名單後回寫 +- `soft_bounced`:達門檻設黑名單後回寫 +- `complaint`:立即回寫 +- `suppression`:設黑名單後回寫 + +回寫原因碼(固定): +- `hard_bounce` +- `soft_bounce_threshold` +- `complaint` +- `suppression` + ## 外部 API:Send Engine → Member Center 以下為 Member Center 端提供的 API,非 Send Engine 的 OpenAPI 規格範圍。 @@ -234,7 +289,7 @@ Response: 用途:因 hard bounce / complaint 停用訂閱,並在 Member Center 註記來源。 Endpoint(Member Center 提供): -- `POST /api/subscriptions/disable` +- `POST /subscriptions/disable` Scope: - `newsletter:events.write` @@ -257,3 +312,12 @@ Request Body(示意): - `409`:重放或事件重複(nonce / event_id) - `422`:資料格式錯誤 - `500`:伺服器內部錯誤 + +## Retry 策略(整合規格) +- Throttle:指數退避重試 +- Temporary network error:重試 +- Hard failure:不重試 +- Retry 上限可設定(例如 5 次) + +## 相關環境參數 +- `Bounce__SoftBounceThreshold`:soft bounce 轉黑名單門檻(預設 `5`) diff --git a/docs/openapi.yaml b/docs/openapi.yaml index 0033ded..a82cefb 100644 --- a/docs/openapi.yaml +++ b/docs/openapi.yaml @@ -11,9 +11,52 @@ security: - bearerAuth: [] paths: + /v1/send-jobs: + post: + summary: Submit send job (sending proxy) + security: + - bearerAuth: [] + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/SubmitSendJobRequest' + responses: + '200': + description: Queued + content: + application/json: + schema: + $ref: '#/components/schemas/SubmitSendJobResponse' + '401': + description: Unauthorized + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '403': + description: Forbidden + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '409': + description: Idempotency conflict + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '422': + description: Validation error + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + /api/send-jobs: post: - summary: Create send job + summary: Create send job (legacy/internal) security: - bearerAuth: [] requestBody: @@ -356,6 +399,76 @@ components: type: string enum: [pending, running, completed, failed, cancelled] + SubmitSendJobRequest: + type: object + required: [message_type, from, to, subject, html, text, tags, idempotency_key] + properties: + message_type: + type: string + enum: [newsletter, transactional] + from: + type: string + format: email + to: + type: array + items: + type: string + format: email + minItems: 1 + subject: + type: string + minLength: 1 + html: + type: string + text: + type: string + headers: + type: object + additionalProperties: + type: string + list_unsubscribe: + $ref: '#/components/schemas/ListUnsubscribe' + tags: + $ref: '#/components/schemas/MessageTags' + idempotency_key: + type: string + minLength: 1 + + SubmitSendJobResponse: + type: object + required: [job_id, status] + properties: + job_id: + type: string + format: uuid + status: + type: string + enum: [queued] + + ListUnsubscribe: + type: object + required: [url] + properties: + url: + type: string + format: uri + mailto: + type: string + format: email + + MessageTags: + type: object + required: [campaign_id, site_id, list_id, segment] + properties: + campaign_id: + type: string + site_id: + type: string + list_id: + type: string + segment: + type: string + TrackingOptions: type: object properties: @@ -398,7 +511,7 @@ components: format: email status: type: string - enum: [active, unsubscribed, bounced, complaint] + enum: [active, unsubscribed, bounced, complaint, suppressed] preferences: type: object additionalProperties: true @@ -436,7 +549,7 @@ components: properties: event_type: type: string - enum: [bounce, complaint, delivery, open, click] + enum: [bounce, hard_bounced, soft_bounced, complaint, suppression, delivery, open, click] message_id: type: string tenant_id: diff --git a/src/SendEngine.Api/Program.cs b/src/SendEngine.Api/Program.cs index 9797cf7..4be52c7 100644 --- a/src/SendEngine.Api/Program.cs +++ b/src/SendEngine.Api/Program.cs @@ -1,6 +1,7 @@ using System.Security.Claims; using System.Text; using System.Text.Json; +using System.Net.Http.Json; using Microsoft.AspNetCore.Authentication.JwtBearer; using Microsoft.EntityFrameworkCore; using Microsoft.IdentityModel.Tokens; @@ -314,6 +315,11 @@ app.MapPost("/webhooks/lists/full-sync", async (HttpContext httpContext, FullSyn app.MapPost("/webhooks/ses", async (HttpContext httpContext, SesEventRequest request, SendEngineDbContext db) => { + if (request.TenantId == Guid.Empty || string.IsNullOrWhiteSpace(request.Email)) + { + return Results.UnprocessableEntity(new { error = "tenant_id_email_required" }); + } + var skipValidation = builder.Configuration.GetValue("Ses:SkipSignatureValidation", true); var sesSignature = httpContext.Request.Headers["X-Amz-Sns-Signature"].ToString(); if (!skipValidation && string.IsNullOrWhiteSpace(sesSignature)) @@ -321,13 +327,18 @@ app.MapPost("/webhooks/ses", async (HttpContext httpContext, SesEventRequest req return Results.Unauthorized(); } + var normalizedEmail = request.Email.Trim().ToLowerInvariant(); + var normalizedEventType = NormalizeSesEventType(request.EventType, request.BounceType); + request.Email = normalizedEmail; + request.EventType = normalizedEventType; + var payload = JsonSerializer.Serialize(request); var inbox = new EventInbox { Id = Guid.NewGuid(), TenantId = request.TenantId, - EventType = $"ses.{request.EventType}", + EventType = $"ses.{normalizedEventType}", Source = "ses", Payload = payload, ReceivedAt = DateTimeOffset.UtcNow, @@ -337,6 +348,11 @@ app.MapPost("/webhooks/ses", async (HttpContext httpContext, SesEventRequest req db.EventsInbox.Add(inbox); await db.SaveChangesAsync(); + await ApplySesEventAsync(db, builder.Configuration, request, normalizedEventType); + inbox.Status = "processed"; + inbox.ProcessedAt = DateTimeOffset.UtcNow; + await db.SaveChangesAsync(); + return Results.Ok(); }).WithName("SesWebhook").WithOpenApi(); @@ -476,6 +492,249 @@ static string NormalizeStatus(string? status, string fallback) "unsubscribed" => "unsubscribed", "bounced" => "bounced", "complaint" => "complaint", + "suppressed" => "suppressed", _ => fallback }; } + +static string NormalizeSesEventType(string? eventType, string? bounceType) +{ + var normalized = eventType?.Trim().ToLowerInvariant() ?? string.Empty; + if (normalized == "bounce") + { + var bounce = bounceType?.Trim().ToLowerInvariant() ?? string.Empty; + return bounce switch + { + "hard" => "hard_bounced", + "soft" => "soft_bounced", + _ => "bounce" + }; + } + + return normalized; +} + +static async Task ApplySesEventAsync( + SendEngineDbContext db, + IConfiguration configuration, + SesEventRequest request, + string normalizedEventType) +{ + var subscriber = await db.Subscribers + .FirstOrDefaultAsync(x => x.TenantId == request.TenantId && x.Email == request.Email); + if (subscriber is null) + { + return; + } + + switch (normalizedEventType) + { + case "hard_bounced": + await SuppressAndNotifyAsync(db, configuration, request.TenantId, subscriber.Id, "hard_bounce", request.OccurredAt); + return; + case "soft_bounced": + var threshold = configuration.GetValue("Bounce:SoftBounceThreshold", 5); + var reached = await IsSoftBounceThresholdReachedAsync(db, request.TenantId, request.Email, threshold); + if (reached) + { + await SuppressAndNotifyAsync(db, configuration, request.TenantId, subscriber.Id, "soft_bounce_threshold", request.OccurredAt); + } + return; + case "complaint": + await SuppressAndNotifyAsync(db, configuration, request.TenantId, subscriber.Id, "complaint", request.OccurredAt); + return; + case "suppression": + await SuppressAndNotifyAsync(db, configuration, request.TenantId, subscriber.Id, "suppression", request.OccurredAt); + return; + default: + return; + } +} + +static async Task IsSoftBounceThresholdReachedAsync( + SendEngineDbContext db, + Guid tenantId, + string normalizedEmail, + int threshold) +{ + if (threshold < 1) + { + threshold = 1; + } + + var marker = $"\"Email\":\"{normalizedEmail}\""; + var count = await db.EventsInbox.AsNoTracking() + .CountAsync(x => + x.TenantId == tenantId && + x.Source == "ses" && + x.EventType == "ses.soft_bounced" && + x.Payload.Contains(marker)); + + return count >= threshold; +} + +static async Task SuppressAndNotifyAsync( + SendEngineDbContext db, + IConfiguration configuration, + Guid tenantId, + Guid subscriberId, + string reason, + DateTimeOffset occurredAt) +{ + var subscriber = await db.Subscribers + .FirstOrDefaultAsync(x => x.TenantId == tenantId && x.Id == subscriberId); + if (subscriber is null) + { + return; + } + + var now = DateTimeOffset.UtcNow; + subscriber.Status = "suppressed"; + subscriber.UpdatedAt = now; + + var memberships = await db.ListMembers + .Where(x => x.TenantId == tenantId && x.SubscriberId == subscriberId) + .ToListAsync(); + foreach (var member in memberships) + { + member.Status = "suppressed"; + member.UpdatedAt = now; + } + + await db.SaveChangesAsync(); + + var listIds = memberships.Select(x => x.ListId).Distinct().ToArray(); + await NotifyMemberCenterDisableAsync(configuration, tenantId, subscriberId, listIds, reason, occurredAt); +} + +static async Task NotifyMemberCenterDisableAsync( + IConfiguration configuration, + Guid tenantId, + Guid subscriberId, + IReadOnlyCollection listIds, + string reason, + DateTimeOffset occurredAt) +{ + var url = ResolveMemberCenterUrl( + configuration, + "MemberCenter:DisableSubscriptionUrl", + "MemberCenter:BaseUrl", + "MemberCenter:DisableSubscriptionPath", + "/subscriptions/disable"); + if (string.IsNullOrWhiteSpace(url)) + { + return; + } + + using var client = new HttpClient(); + var token = await ResolveMemberCenterAccessTokenAsync(configuration, client); + if (string.IsNullOrWhiteSpace(token)) + { + return; + } + client.DefaultRequestHeaders.Authorization = new("Bearer", token); + + foreach (var listId in listIds) + { + var payload = new + { + tenant_id = tenantId, + subscriber_id = subscriberId, + list_id = listId, + reason, + disabled_by = "send_engine", + occurred_at = occurredAt + }; + + try + { + await client.PostAsJsonAsync(url, payload); + } + catch + { + // Best-effort callback: errors are intentionally swallowed for now. + } + } +} + +static async Task ResolveMemberCenterAccessTokenAsync(IConfiguration configuration, HttpClient client) +{ + var tokenUrl = ResolveMemberCenterUrl( + configuration, + "MemberCenter:TokenUrl", + "MemberCenter:BaseUrl", + "MemberCenter:TokenPath", + "/connect/token"); + var clientId = configuration["MemberCenter:ClientId"]; + var clientSecret = configuration["MemberCenter:ClientSecret"]; + var scope = configuration["MemberCenter:Scope"]; + + if (!string.IsNullOrWhiteSpace(tokenUrl) && + !string.IsNullOrWhiteSpace(clientId) && + !string.IsNullOrWhiteSpace(clientSecret)) + { + var form = new Dictionary + { + ["grant_type"] = "client_credentials", + ["client_id"] = clientId, + ["client_secret"] = clientSecret + }; + + if (!string.IsNullOrWhiteSpace(scope)) + { + form["scope"] = scope; + } + + try + { + using var response = await client.PostAsync(tokenUrl, new FormUrlEncodedContent(form)); + if (response.IsSuccessStatusCode) + { + await using var stream = await response.Content.ReadAsStreamAsync(); + using var json = await JsonDocument.ParseAsync(stream); + if (json.RootElement.TryGetProperty("access_token", out var tokenElement)) + { + var accessToken = tokenElement.GetString(); + if (!string.IsNullOrWhiteSpace(accessToken)) + { + return accessToken; + } + } + } + } + catch + { + // Fallback to static token below. + } + } + + return configuration["MemberCenter:ApiToken"]; +} + +static string? ResolveMemberCenterUrl( + IConfiguration configuration, + string fullUrlKey, + string baseUrlKey, + string pathKey, + string defaultPath) +{ + var fullUrl = configuration[fullUrlKey]; + if (!string.IsNullOrWhiteSpace(fullUrl)) + { + return fullUrl; + } + + var baseUrl = configuration[baseUrlKey]; + if (string.IsNullOrWhiteSpace(baseUrl)) + { + return null; + } + + var path = configuration[pathKey]; + if (string.IsNullOrWhiteSpace(path)) + { + path = defaultPath; + } + + return $"{baseUrl.TrimEnd('/')}/{path.TrimStart('/')}"; +}