diff --git a/.env.example b/.env.example index 44b66a7..e11828e 100644 --- a/.env.example +++ b/.env.example @@ -1,7 +1,6 @@ ASPNETCORE_ENVIRONMENT=Development ConnectionStrings__Default=Host=localhost;Database=send_engine;Username=postgres;Password=postgres ESP__Provider=ses -ESP__ApiKey=change_me Db__AutoMigrate=true Jwt__Issuer=http://localhost:7850/ Jwt__Audience=send_engine_api @@ -28,4 +27,17 @@ DevSender__PollIntervalSeconds=5 Ses__Region=us-east-1 Ses__FromEmail= Ses__ConfigurationSet= +Ses__SendMode=raw_bulk Ses__TemplateName= +Sqs__Enabled=false +Sqs__QueueUrl= +Sqs__Region= +Sqs__PollWaitSeconds=20 +Sqs__MaxMessages=10 +Sqs__VisibilityTimeoutSeconds=30 +AWS__Region=ap-northeast-1 +AWS_EC2_METADATA_DISABLED=true +# Local only (do NOT use in production): +# AWS_ACCESS_KEY_ID= +# AWS_SECRET_ACCESS_KEY= +# AWS_SESSION_TOKEN= diff --git a/README.md b/README.md index 62063a6..e5043fd 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ - 接收 Member Center 的訂閱事件(activated / unsubscribed / preferences.updated) - 多租戶名單快照(依 tenant/list)且僅增量更新 - 管理 Campaign / Send Job 與排程 -- 對接 ESP(SES / SendGrid / Mailgun) +- 對接 ESP(Amazon SES + Mock) - 記錄投遞結果與退信(必要時回寫 Member Center) ## 既定條件 @@ -54,8 +54,7 @@ mass_mail_engine/ ## Build 使用 VS Code `Run Build Task`(預設執行 `dotnet build SendEngine.sln`)。 -## 待確認事項 -- 事件系統選擇(Kafka/RabbitMQ/SNS+SQS / Webhook) -- ESP 優先順序(SES / SendGrid / Mailgun) -- 退信回寫的規則(hard bounce / soft bounce) -- 追蹤事件範圍(open / click / unsubscribe) +## 目前待辦 +- SES/SNS 簽章完整驗證(目前 `Ses__SkipSignatureValidation=false` 僅檢查 header 存在) +- 事件重試/DLQ 策略補強(目前主要依 SQS redrive policy) +- recipient 狀態機擴充(delivery/open/click 的完整優先序與狀態轉換) diff --git a/docs/DESIGN.md b/docs/DESIGN.md index 33e2a8c..a9392c6 100644 --- a/docs/DESIGN.md +++ b/docs/DESIGN.md @@ -55,8 +55,9 @@ Recipient 狀態: - 必要時呼叫 Member Center 停用/註記 API 目前實作: -- 先以 `POST /webhooks/ses` 接收事件並更新資料 -- `SNS -> SQS -> Worker` 尚未落地 +- 已提供 `SqsSesPollerWorker`(SQS 輪詢) +- worker 直接呼叫內部 SES processing service(不走 HTTP self-call) +- 仍保留 `POST /webhooks/ses` 直接接收模式(相容/測試) ## 信任邊界與 Auth 模型 ### 外部角色 @@ -70,8 +71,8 @@ Recipient 狀態: ### 驗證方式(建議) 1. **Member Center → Send Engine** - - 使用簽名 Webhook(主推),或 OAuth2 Client Credentials - - token 內含 `tenant_id` 與 scopes(例如 `newsletter:events.write`) + - 使用簽名 Webhook(HMAC,已實作) + - header 內含 `X-Client-Id`,對應 `auth_clients.id` 並受 tenant 綁定與 scope 驗證 2. **租戶網站 → Send Engine** - 使用 OAuth2 Client Credentials 或 JWT(由 Member Center 簽發) - token 內含 `tenant_id` 與 scopes(例如 `newsletter:send.write`) diff --git a/docs/FLOWS.md b/docs/FLOWS.md index 53c8502..b5c4b89 100644 --- a/docs/FLOWS.md +++ b/docs/FLOWS.md @@ -20,10 +20,10 @@ Member Center 為多租戶架構,信件內容由各租戶網站產生後送入 6. 寫入 List Store 快照(只增量更新,不拉全量)。 7. 記錄處理結果與版本號(供重播與對帳)。 -錯誤與重試: +錯誤與重試(目前實作): - 驗證失敗 → 直接拒絕(401/403),不寫入 Inbox。 -- DB 暫時性錯誤 → 重試(含指數退避),仍失敗則進 DLQ。 -- 事件格式錯誤 → 標記為 invalid,記錄原因。 +- DB/執行錯誤 → 由請求端或佇列重送;尚未提供獨立 DLQ worker 與統一退避策略配置。 +- 事件格式錯誤 → 回 `422`。 ## 1b. 全量名單同步流程(由 Member Center 主動推送) 目的:避免 Send Engine 透過 API 拉取名單,降低名單外流風險。 @@ -56,7 +56,9 @@ Member Center 為多租戶架構,信件內容由各租戶網站產生後送入 - 若啟用 one-click token endpoint,僅發送 `status=issued` 的收件者 5. 發送執行: - `ESP__Provider=mock`:僅模擬發送,寫入預覽事件並輸出 console log - - `ESP__Provider=ses`:使用 SES v2 `SendBulkEmail`(每批最多 50) + - `ESP__Provider=ses`: + - `Ses__SendMode=raw_bulk`:使用 SES v2 `SendEmail`,依內容分組每次最多 50 位收件者 + - `Ses__SendMode=bulk_template`:使用 SES v2 `SendBulkEmail`(每批最多 50) 6. 更新 Send Job 狀態: - 全部成功:`completed` - 例外或部分失敗:`failed` @@ -73,26 +75,28 @@ Member Center 為多租戶架構,信件內容由各租戶網站產生後送入 ## 3. 退信處理流程 目的:處理 ESP 回報的 bounce/complaint,並回寫本地名單狀態。 -目前實作流程(Webhook): -1. 由 `POST /webhooks/ses` 接收 SES 事件 payload。 -2. 驗證(可透過 `Ses__SkipSignatureValidation` 控制是否要求簽章)。 -3. 將事件寫入 Inbox(append-only)。 -4. Consumer 解析事件: +目前實作流程(Webhook + SQS Poller): +1. 由 `SqsSesPollerWorker` 輪詢 SQS 取得 SNS envelope,直接呼叫內部 SES processing service。 +2. `POST /webhooks/ses` 也會呼叫同一套 processing service(相容直接呼叫模式)。 +3. 驗證(可透過 `Ses__SkipSignatureValidation` 控制是否要求簽章)。 +4. 將事件寫入 Inbox(append-only)。 +5. Consumer 解析事件: - hard bounce → 立即標記 blacklisted(同義於 `suppressed`) - soft bounce → 累計次數,達門檻(預設 5)才標記 blacklisted(`suppressed`) - - complaint → 立即取消訂閱並標記 blacklisted(`suppressed`) + - complaint → 立即標記 blacklisted(`suppressed`) - suppression 事件 → 直接對應為 `suppressed`(即黑名單) -5. 更新 List Store 快照與投遞記錄。 -6. 回寫 Member Center(僅在以下條件): +6. 更新 List Store 快照與投遞記錄。 +7. 回寫 Member Center(僅在以下條件): - hard bounce:已設黑名單 - soft bounce:達門檻後設黑名單 - - complaint:取消訂閱 + - complaint:設黑名單 - suppression:設黑名單 補充: -- Unknown event 不應使 worker crash,應記錄後送入 DLQ -- Throttle/暫時性網路錯誤使用指數退避重試 -- `SNS -> SQS -> Worker` 架構為正式環境建議,尚未在目前程式碼中落地 +- Unknown event 目前會略過或記錄,不會讓 worker 中止(不進獨立 DLQ) +- 已落地 `SQS -> Worker -> 內部 SES processing service`;`/webhooks/ses` 仍保留直接呼叫相容模式 +- `Bounce` 事件會依 `bounceType` 正規化為 `hard_bounced`(Permanent)或 `soft_bounced`(Transient) +- 亂序事件以優先序處理:`complaint` > `hard_bounced`/`suppression` > `soft_bounced` > `delivery` 回寫規則: - Send Engine 僅回寫「停用原因」與必要欄位 diff --git a/docs/INSTALL.md b/docs/INSTALL.md index 7382f33..72975c0 100644 --- a/docs/INSTALL.md +++ b/docs/INSTALL.md @@ -2,6 +2,21 @@ - 需求:.NET SDK 8.x, PostgreSQL - 設定:複製 `.env.example` → `.env` +- AWS Credential(SQS/SES): + - Local / Docker 開發: + - 設定 `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY`(若為臨時憑證再加 `AWS_SESSION_TOKEN`) + - 設定 `AWS__Region`(或 `Sqs__Region` / `Ses__Region`) + - 建議設 `AWS_EC2_METADATA_DISABLED=true`,避免本機誤打 EC2 metadata + - ECS 正式環境(建議): + - 不要放 Access Key,改用 ECS Task Role(`taskRoleArn`) + - Task Role 至少授權: + - `sqs:ReceiveMessage` + - `sqs:DeleteMessage` + - `sqs:GetQueueAttributes` + - `ses:SendEmail`(raw_bulk) + - `ses:SendBulkEmail`(bulk_template,若有使用) + - ECS Task Definition 可設定環境變數: + - `AWS_REGION=ap-northeast-1`(或直接用 `.env` 的 `AWS__Region`) - Migration: - 預設由 API 啟動時自動執行(`Db__AutoMigrate=true`) - 需要關閉時請設定 `Db__AutoMigrate=false` @@ -48,14 +63,29 @@ - Dev Sender(Mock 發信): - `DevSender__Enabled=true`:背景 worker 會處理 `pending` send jobs,並將每位收件人的預計發送內容寫入 `events_inbox`(`source=dev_sender`, `event_type=send.preview`) - `DevSender__PollIntervalSeconds`:輪詢間隔秒數(預設 5) - - `ESP__Provider=ses` 時,背景 worker 會改用 SES `SendBulkEmail` 發送 - - SES 相關參數:`Ses__Region`、`Ses__FromEmail`、`Ses__ConfigurationSet`(可選)、`Ses__TemplateName` + - `ESP__Provider=ses` 時,即使 `DevSender__Enabled=false`,背景 sender 仍會啟動並改用 SES 發送(模式由 `Ses__SendMode` 決定) + - SES 相關參數:`Ses__Region`、`Ses__FromEmail`、`Ses__ConfigurationSet`(可選)、`Ses__SendMode`、`Ses__TemplateName` + - `Ses__SendMode=raw_bulk`(預設):使用 SES `SendEmail`,依內容分組後每次最多 50 位收件者(不依賴 SES Template) + - `Ses__SendMode=bulk_template`:使用 SES `SendBulkEmail` + Template(需提供 `template.ses_template_name` 或 `Ses__TemplateName`) + - SES 發送時會附帶 message tags:`tenant_id`、`list_id`、`campaign_id`、`send_job_id` + - SQS Poller(SES 事件回流): + - `Sqs__Enabled=true` + - `Sqs__QueueUrl=` + - `Sqs__Region`(未設定時回退 `AWS__Region` 再回退 `Ses__Region`) + - `Sqs__PollWaitSeconds`(預設 20) + - `Sqs__MaxMessages`(預設 10) + - `Sqs__VisibilityTimeoutSeconds`(預設 30) + - SQS poller 目前未提供獨立 DLQ worker;可先用 AWS SQS redrive policy 管理失敗訊息 - `SendBulkEmail` 會使用 SES 模板名稱: - 先讀 `campaign.template.ses_template_name` - 若未提供則回退 `Ses__TemplateName` - 若設定了 Member Center one-click token endpoint,sender 會在發送前批次呼叫 `/newsletter/one-click-unsubscribe-tokens`,僅發送 `status=issued` 的收件者 - 內容替換合約(Mock 與 SES 共用): - `{{email}}` - - `{{unsubscribe_url}}`(可在 `template` JSON 中提供 `unsubscribe_url` 模板,例如 `https://member.example/unsub?email={{email}}`) + - `{{unsubscribe_token}}` - `{{tenant_id}}` / `{{list_id}}` / `{{campaign_id}}` / `{{send_job_id}}` + - 若在 `campaign.template` 提供: + - `list_unsubscribe_url_template` + - `list_unsubscribe_mailto` + - sender 會自動加上 `List-Unsubscribe` 與 `List-Unsubscribe-Post` headers - 正式環境建議維持 `false` diff --git a/docs/OPENAPI.md b/docs/OPENAPI.md index 19d9b8e..1b173e7 100644 --- a/docs/OPENAPI.md +++ b/docs/OPENAPI.md @@ -18,7 +18,7 @@ - `list_id` 必須屬於該 tenant ### 2. Member Center → Send Engine Webhook -使用簽名 Webhook(HMAC)或 OAuth2 Client Credentials(建議簽名)。 +目前實作為簽名 Webhook(HMAC)。 Header 建議: - `X-Signature`: `hex(hmac_sha256(secret, body))` @@ -130,7 +130,6 @@ Request Body(分批示意): Response: - `200 OK`:accepted - `401/403`:驗證失敗 -- `409`:sync_id + batch_no 重複 - `422`:格式錯誤 ## API:租戶網站 → Send Engine @@ -166,11 +165,10 @@ Request Body: - `window_start` 必須小於 `window_end`(若有提供) - `tenant_id` 必須已存在(不存在回 `422 tenant_not_found`) - `list_id` 若不存在,會在該 tenant 下自動建立 placeholder list 後建立 send job -- `template` 可攜帶替換參數(例如:`{"unsubscribe_url":"https://member.example/unsub?email={{email}}","ses_template_name":"newsletter_default"}`) +- `template` 可攜帶發信元資料(例如:`{"ses_template_name":"newsletter_default","list_unsubscribe_url_template":"https://member.example/unsubscribe?token={{unsubscribe_token}}","list_unsubscribe_mailto":"mailto:unsubscribe@member.example"}`) 替換合約(Mock/SES 一致): - `{{email}}` -- `{{unsubscribe_url}}` - `{{unsubscribe_token}}` - `{{tenant_id}}` - `{{list_id}}` @@ -209,10 +207,9 @@ curl -X POST "http://localhost:6060/api/send-jobs" \ "list_id": "22222222-2222-2222-2222-222222222222", "name": "Weekly Update", "subject": "Hi {{email}}", - "body_html": "

Hello {{email}}

unsubscribe

", - "body_text": "Hello {{email}} | unsubscribe: {{unsubscribe_url}}", + "body_html": "

Hello {{email}}

unsubscribe

", + "body_text": "Hello {{email}} | unsubscribe: https://member.example/unsubscribe?token={{unsubscribe_token}}", "template": { - "unsubscribe_url": "https://member.example/unsubscribe?email={{email}}", "ses_template_name": "newsletter_default" } }' @@ -230,7 +227,12 @@ curl -X POST "http://localhost:6060/api/send-jobs" \ 說明: - `ESP__Provider=mock`(非 ses)時,會由 Dev Sender 產生 `send.preview` 事件供你檢查替換結果 - `ESP__Provider=mock` 時,也會把每位收件人的模擬發送內容輸出到 console log(`MOCK send preview`) -- `ESP__Provider=ses` 時,背景 sender 會用 SES `SendBulkEmail`(每批最多 50) +- 若 `template.list_unsubscribe_url_template`(或 `template.list_unsubscribe_mailto`)有提供,sender 會加上: +- `List-Unsubscribe` +- `List-Unsubscribe-Post: List-Unsubscribe=One-Click` +- `ESP__Provider=ses` 時,背景 sender 依 `Ses__SendMode` 發送: +- `raw_bulk`(預設):SES `SendEmail`,依內容分組每次最多 50 位收件者 +- `bulk_template`:SES `SendBulkEmail`(每批最多 50,需要 SES template) - 若已設定 Member Center one-click token endpoint,發送前會批次呼叫 `POST /newsletter/one-click-unsubscribe-tokens` - 僅 `status=issued` 的收件者會被送出,並把 `unsubscribe_token` 注入替換內容 @@ -283,6 +285,7 @@ Response: 推薦架構(正式): - `SES Configuration Set -> SNS -> SQS -> ECS Worker` - 由 Worker 消費事件,不要求對外公開 webhook +- 目前實作:`SqsSesPollerWorker` 會從 SQS 取訊息並直接呼叫內部 SES processing service 相容模式(可選): - `POST /webhooks/ses` @@ -294,28 +297,37 @@ Response: Request Body(示意): ```json { - "event_type": "bounce", - "message_id": "ses-id", - "tenant_id": "uuid", - "email": "user@example.com", - "bounce_type": "hard", - "occurred_at": "2026-02-10T09:45:00Z" + "Type": "Notification", + "MessageId": "sns-message-id", + "Message": "{\"eventType\":\"Bounce\",\"mail\":{\"messageId\":\"ses-message-id\",\"tags\":{\"tenant_id\":[\"...\"],\"list_id\":[\"...\"]}},\"bounce\":{\"bounceType\":\"Permanent\",\"bouncedRecipients\":[{\"emailAddress\":\"user@example.com\"}],\"timestamp\":\"2026-02-10T09:45:00Z\"}}", + "Timestamp": "2026-02-10T09:45:01Z" } ``` +相容:仍接受舊版扁平 payload(`event_type`/`tenant_id`/`email`)。 Response: - `200 OK` +- `422`:payload 解析錯誤或缺少必要欄位 +- `500`:處理時發生未預期錯誤(SQS poller 會視為 transient,保留訊息重試) 事件對應規則(固定): - `hard_bounced`:立即設為黑名單(`suppressed`) - `soft_bounced`:累計達門檻後設為黑名單(`suppressed`) -- `complaint`:取消訂閱並回寫 Member Center +- `complaint`:設為黑名單(`suppressed`)並回寫 Member Center(reason=`complaint`) - `suppression`:設為黑名單(`suppressed`) +SES `Bounce` 對應: +- `bounce.bounceType=Permanent` → `hard_bounced` +- `bounce.bounceType=Transient` → `soft_bounced` + +事件優先序(規劃): +- `complaint` > `hard_bounced`/`suppression` > `soft_bounced` > `delivery` > `open`/`click` +- 目前實作尚未建立完整 recipient 狀態機覆蓋所有事件;實際會以 `suppressed` 作為最終不可發狀態 + 回寫 Member Center 條件: - `hard_bounced`:設黑名單後回寫 - `soft_bounced`:達門檻設黑名單後回寫 -- `complaint`:立即回寫 +- `complaint`:設黑名單後立即回寫 - `suppression`:設黑名單後回寫 回寫原因碼(固定): diff --git a/docs/TECH_STACK.md b/docs/TECH_STACK.md index 3ac0410..88d6bc3 100644 --- a/docs/TECH_STACK.md +++ b/docs/TECH_STACK.md @@ -3,3 +3,4 @@ - C# .NET Core - PostgreSQL - ESP: Amazon SES(實作)+ Mock Sender(開發測試) +- Event Ingest: Amazon SNS + SQS(SES event 回流) diff --git a/docs/openapi.yaml b/docs/openapi.yaml index 2dfec40..b791e4b 100644 --- a/docs/openapi.yaml +++ b/docs/openapi.yaml @@ -199,12 +199,6 @@ paths: application/json: schema: $ref: '#/components/schemas/ErrorResponse' - '409': - description: Duplicate batch - content: - application/json: - schema: - $ref: '#/components/schemas/ErrorResponse' '422': description: Validation error content: @@ -222,7 +216,9 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/SesEvent' + oneOf: + - $ref: '#/components/schemas/SnsEnvelope' + - $ref: '#/components/schemas/SesEvent' responses: '200': description: OK @@ -238,6 +234,18 @@ paths: application/json: schema: $ref: '#/components/schemas/ErrorResponse' + '422': + description: Invalid payload + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '500': + description: Internal processing error + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' components: securitySchemes: @@ -305,8 +313,9 @@ components: description: | Optional template metadata used by sender runtime. Supported keys: - - unsubscribe_url: URL template, e.g. https://member.example/unsubscribe?token={{unsubscribe_token}} - ses_template_name: SES template name override + - list_unsubscribe_url_template: URL template, e.g. https://member.example/unsubscribe?token={{unsubscribe_token}} + - list_unsubscribe_mailto: mailto endpoint, e.g. mailto:unsubscribe@member.example scheduled_at: type: string format: date-time @@ -506,6 +515,26 @@ components: type: string format: date-time + SnsEnvelope: + type: object + required: [Type, MessageId, Message, Timestamp] + properties: + Type: + type: string + enum: [Notification, SubscriptionConfirmation, UnsubscribeConfirmation] + MessageId: + type: string + TopicArn: + type: string + Subject: + type: string + Message: + type: string + description: JSON string containing SES event payload + Timestamp: + type: string + format: date-time + ErrorResponse: type: object required: [error] diff --git a/src/SendEngine.Api/Program.cs b/src/SendEngine.Api/Program.cs index 6051dde..dc7b15e 100644 --- a/src/SendEngine.Api/Program.cs +++ b/src/SendEngine.Api/Program.cs @@ -18,6 +18,8 @@ builder.Services.AddEndpointsApiExplorer(); builder.Services.AddSwaggerGen(); builder.Services.AddInfrastructure(builder.Configuration); builder.Services.AddHostedService(); +builder.Services.AddHostedService(); +builder.Services.AddScoped(); var jwtAuthority = builder.Configuration["Jwt:Authority"]; var jwtMetadataAddress = builder.Configuration["Jwt:MetadataAddress"]; @@ -543,122 +545,24 @@ app.MapPost("/webhooks/lists/full-sync", async ( app.MapPost("/webhooks/ses", async ( HttpContext httpContext, - SesEventRequest request, - SendEngineDbContext db, - ILoggerFactory loggerFactory) => + JsonElement body, + SesEventProcessingService sesProcessor) => { - var logger = loggerFactory.CreateLogger("SendEngine.Webhooks.Ses"); - - if (request.TenantId == Guid.Empty || string.IsNullOrWhiteSpace(request.Email)) + var signature = httpContext.Request.Headers["X-Amz-Sns-Signature"].ToString(); + var result = await sesProcessor.ProcessBodyAsync(body, signature, httpContext.RequestAborted); + if (result.Success) { - logger.LogWarning("SES webhook rejected: tenant_id or email missing."); - return Results.UnprocessableEntity(new { error = "tenant_id_email_required" }); + return Results.StatusCode(result.StatusCode); } - var skipValidation = builder.Configuration.GetValue("Ses:SkipSignatureValidation", true); - logger.LogInformation( - "SES webhook received. Ses__SkipSignatureValidation={SkipValidation}", - skipValidation); - - var sesSignature = httpContext.Request.Headers["X-Amz-Sns-Signature"].ToString(); - if (!skipValidation && string.IsNullOrWhiteSpace(sesSignature)) + if (result.TransientFailure) { - logger.LogWarning("SES webhook rejected: missing X-Amz-Sns-Signature while signature validation is enabled."); - return Results.Unauthorized(); + return Results.StatusCode(result.StatusCode); } - var normalizedEventType = NormalizeSesEventType(request.EventType, request.BounceType); - request.Email = request.Email.Trim().ToLowerInvariant(); - request.EventType = normalizedEventType; - - // TEST-FRIENDLY TEMPORARY LOGIC: - // In local integration testing, skip DB operations but keep callback flow to Member Center. - // TODO(remove-test-friendly): Remove this branch once end-to-end DB flow is stable. - var testFriendlyEnabled = builder.Configuration.GetValue("TestFriendly:Enabled", false); - if (testFriendlyEnabled) - { - Console.WriteLine( - "[TEST-FRIENDLY][SES] received event_type={0} tenant_id={1} email={2} skip_signature_validation={3}", - normalizedEventType, - request.TenantId, - request.Email, - skipValidation); - logger.LogWarning("TEST-FRIENDLY enabled: skip DB operations for /webhooks/ses, keep callback flow."); - - if (TryMapDisableReason(normalizedEventType, out var reason)) - { - if (!TryExtractGuidFromTags(request.Tags, "subscriber_id", out var subscriberId) || - !TryExtractGuidFromTags(request.Tags, "list_id", out var listId)) - { - return Results.UnprocessableEntity(new - { - error = "test_friendly_tags_required", - message = "tags.subscriber_id and tags.list_id must be UUID strings when TestFriendly is enabled." - }); - } - - var shouldNotify = true; - if (normalizedEventType == "soft_bounced") - { - var threshold = builder.Configuration.GetValue("Bounce:SoftBounceThreshold", 5); - shouldNotify = IsSoftBounceThresholdReachedFromTags(request.Tags, threshold); - } - - if (shouldNotify) - { - await NotifyMemberCenterDisableAsync( - builder.Configuration, - logger, - request.TenantId, - new[] { (SubscriberId: subscriberId, ListId: listId) }, - reason, - request.OccurredAt); - } - } - - return Results.Ok(new { status = "ok", mode = "test-friendly-no-db" }); - } - - if (!await EnsureTenantForWebhookAsync(db, request.TenantId, testFriendlyEnabled)) - { - logger.LogWarning("SES webhook rejected: tenant_not_found. tenant_id={TenantId}", request.TenantId); - return Results.UnprocessableEntity(new { error = "tenant_not_found" }); - } - - logger.LogInformation( - "SES webhook processing started. mode=normal event_type={EventType} tenant_id={TenantId} email={Email}", - normalizedEventType, - request.TenantId, - request.Email); - - var payload = JsonSerializer.Serialize(request); - - var inbox = new EventInbox - { - Id = Guid.NewGuid(), - TenantId = request.TenantId, - EventType = $"ses.{normalizedEventType}", - Source = "ses", - Payload = payload, - ReceivedAt = DateTimeOffset.UtcNow, - Status = "received" - }; - - db.EventsInbox.Add(inbox); - await db.SaveChangesAsync(); - - await ApplySesEventAsync(db, builder.Configuration, logger, request, normalizedEventType); - inbox.Status = "processed"; - inbox.ProcessedAt = DateTimeOffset.UtcNow; - await db.SaveChangesAsync(); - - logger.LogInformation( - "SES webhook processed. event_type={EventType} tenant_id={TenantId} email={Email}", - normalizedEventType, - request.TenantId, - request.Email); - - return Results.Ok(); + return Results.Json( + new { error = result.Error, reason = result.Reason }, + statusCode: result.StatusCode); }).WithName("SesWebhook").WithOpenApi(); app.Run(); @@ -806,427 +710,3 @@ static string NormalizeStatus(string? status, string fallback) _ => fallback }; } - -static string NormalizeSesEventType(string? eventType, string? bounceType) -{ - var normalized = eventType?.Trim().ToLowerInvariant() ?? string.Empty; - if (normalized == "bounce") - { - var bounce = bounceType?.Trim().ToLowerInvariant() ?? string.Empty; - return bounce switch - { - "hard" => "hard_bounced", - "soft" => "soft_bounced", - _ => "bounce" - }; - } - - return normalized; -} - -static bool TryMapDisableReason(string normalizedEventType, out string reason) -{ - switch (normalizedEventType) - { - case "hard_bounced": - reason = "hard_bounce"; - return true; - case "soft_bounced": - reason = "soft_bounce_threshold"; - return true; - case "complaint": - reason = "complaint"; - return true; - case "suppression": - reason = "suppression"; - return true; - default: - reason = string.Empty; - return false; - } -} - -static bool TryExtractGuidFromTags(Dictionary? tags, string key, out Guid value) -{ - value = Guid.Empty; - if (tags is null) - { - return false; - } - - var keyCandidates = new[] - { - key, - key.ToLowerInvariant(), - key.ToUpperInvariant(), - key.Replace("_", string.Empty), - ToCamelCase(key) - }; - - foreach (var candidate in keyCandidates) - { - if (tags.TryGetValue(candidate, out var raw) && Guid.TryParse(raw, out value)) - { - return true; - } - } - - return false; -} - -static string ToCamelCase(string value) -{ - var parts = value.Split('_', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); - if (parts.Length == 0) - { - return value; - } - - return parts[0] + string.Concat(parts.Skip(1).Select(p => char.ToUpperInvariant(p[0]) + p[1..])); -} - -static bool IsSoftBounceThresholdReachedFromTags(Dictionary? tags, int threshold) -{ - if (threshold < 1) - { - threshold = 1; - } - - if (tags is null) - { - return true; - } - - if (tags.TryGetValue("soft_bounce_count", out var raw) && int.TryParse(raw, out var count)) - { - return count >= threshold; - } - - return true; -} - -static async Task ApplySesEventAsync( - SendEngineDbContext db, - IConfiguration configuration, - ILogger logger, - SesEventRequest request, - string normalizedEventType) -{ - var subscriptions = await db.Subscriptions - .Join( - db.Lists.AsNoTracking(), - s => s.ListId, - l => l.Id, - (s, l) => new { Subscription = s, ListTenantId = l.TenantId }) - .Where(x => x.ListTenantId == request.TenantId && x.Subscription.Email == request.Email) - .Select(x => x.Subscription) - .ToListAsync(); - if (subscriptions.Count == 0) - { - logger.LogWarning( - "SES event ignored: subscription not found. tenant_id={TenantId} email={Email} event_type={EventType}", - request.TenantId, - request.Email, - normalizedEventType); - return; - } - - logger.LogInformation( - "SES event matched subscriptions. tenant_id={TenantId} email={Email} matched_count={MatchedCount} event_type={EventType}", - request.TenantId, - request.Email, - subscriptions.Count, - normalizedEventType); - - switch (normalizedEventType) - { - case "hard_bounced": - await SuppressAndNotifyAsync(db, configuration, logger, request.TenantId, subscriptions, "hard_bounce", request.OccurredAt); - return; - case "soft_bounced": - var threshold = configuration.GetValue("Bounce:SoftBounceThreshold", 5); - var reached = await IsSoftBounceThresholdReachedAsync(db, request.TenantId, request.Email, threshold); - if (reached) - { - await SuppressAndNotifyAsync(db, configuration, logger, request.TenantId, subscriptions, "soft_bounce_threshold", request.OccurredAt); - } - else - { - logger.LogInformation( - "Soft bounce threshold not reached yet. tenant_id={TenantId} email={Email} threshold={Threshold}", - request.TenantId, - request.Email, - threshold); - } - return; - case "complaint": - await SuppressAndNotifyAsync(db, configuration, logger, request.TenantId, subscriptions, "complaint", request.OccurredAt); - return; - case "suppression": - await SuppressAndNotifyAsync(db, configuration, logger, request.TenantId, subscriptions, "suppression", request.OccurredAt); - return; - default: - return; - } -} - -static async Task IsSoftBounceThresholdReachedAsync( - SendEngineDbContext db, - Guid tenantId, - string normalizedEmail, - int threshold) -{ - if (threshold < 1) - { - threshold = 1; - } - - var count = await db.Database.SqlQuery($""" - SELECT count(*)::int AS "Value" - FROM events_inbox - WHERE tenant_id = {tenantId} - AND source = 'ses' - AND event_type = 'ses.soft_bounced' - AND payload->>'email' = {normalizedEmail} - """).SingleAsync(); - - return count >= threshold; -} - -static async Task SuppressAndNotifyAsync( - SendEngineDbContext db, - IConfiguration configuration, - ILogger logger, - Guid tenantId, - IReadOnlyCollection subscriptions, - string reason, - DateTimeOffset occurredAt) -{ - if (subscriptions.Count == 0) - { - return; - } - - var now = DateTimeOffset.UtcNow; - foreach (var subscription in subscriptions) - { - subscription.Status = "suppressed"; - subscription.UpdatedAt = now; - } - - await db.SaveChangesAsync(); - - var notifyTargets = subscriptions - .Where(x => x.ExternalSubscriberId.HasValue) - .Select(x => (SubscriberId: x.ExternalSubscriberId!.Value, ListId: x.ListId)) - .Distinct() - .ToArray(); - - logger.LogInformation( - "Subscriptions suppressed. tenant_id={TenantId} reason={Reason} matched_count={MatchedCount} notify_target_count={NotifyCount}", - tenantId, - reason, - subscriptions.Count, - notifyTargets.Length); - await NotifyMemberCenterDisableAsync(configuration, logger, tenantId, notifyTargets, reason, occurredAt); -} - -static async Task NotifyMemberCenterDisableAsync( - IConfiguration configuration, - ILogger logger, - Guid tenantId, - IReadOnlyCollection<(Guid SubscriberId, Guid ListId)> targets, - string reason, - DateTimeOffset occurredAt) -{ - if (targets.Count == 0) - { - logger.LogWarning("MemberCenter callback skipped: no disable targets."); - return; - } - - var url = ResolveMemberCenterUrl( - configuration, - null, - "MemberCenter:BaseUrl", - "MemberCenter:DisableSubscriptionPath", - "/subscriptions/disable"); - if (string.IsNullOrWhiteSpace(url)) - { - logger.LogWarning("MemberCenter callback skipped: URL is empty."); - return; - } - - using var client = new HttpClient(); - var token = await ResolveMemberCenterAccessTokenAsync(configuration, client, logger); - if (string.IsNullOrWhiteSpace(token)) - { - logger.LogWarning("MemberCenter callback skipped: access token is empty."); - return; - } - client.DefaultRequestHeaders.Authorization = new("Bearer", token); - logger.LogInformation( - "MemberCenter callback prepared. url={Url} auth_header={AuthHeader}", - url, - BuildMaskedAuthHeader(token)); - - foreach (var target in targets) - { - var payload = new - { - tenant_id = tenantId, - subscriber_id = target.SubscriberId, - list_id = target.ListId, - reason, - disabled_by = "send_engine", - occurred_at = occurredAt - }; - - try - { - var payloadJson = JsonSerializer.Serialize(payload); - logger.LogInformation( - "MemberCenter callback request. method=POST url={Url} headers={Headers} body={Body}", - url, - $"Authorization:{BuildMaskedAuthHeader(token)}; Content-Type:application/json", - payloadJson); - - using var response = await client.PostAsJsonAsync(url, payload); - var responseBody = await response.Content.ReadAsStringAsync(); - logger.LogInformation( - "MemberCenter callback response. status={StatusCode} body={Body}", - (int)response.StatusCode, - Truncate(responseBody, 1000)); - } - catch (Exception ex) - { - logger.LogError(ex, "MemberCenter callback failed. url={Url} list_id={ListId}", url, target.ListId); - } - } -} - -static async Task ResolveMemberCenterAccessTokenAsync(IConfiguration configuration, HttpClient client, ILogger logger) -{ - var tokenUrl = ResolveMemberCenterUrl( - configuration, - null, - "MemberCenter:BaseUrl", - "MemberCenter:TokenPath", - "/oauth/token"); - var clientId = configuration["MemberCenter:ClientId"]; - var clientSecret = configuration["MemberCenter:ClientSecret"]; - var scope = configuration["MemberCenter:Scope"]; - - if (!string.IsNullOrWhiteSpace(tokenUrl) && - !string.IsNullOrWhiteSpace(clientId) && - !string.IsNullOrWhiteSpace(clientSecret)) - { - var form = new Dictionary - { - ["grant_type"] = "client_credentials", - ["client_id"] = clientId, - ["client_secret"] = clientSecret - }; - - if (!string.IsNullOrWhiteSpace(scope)) - { - form["scope"] = scope; - } - - try - { - logger.LogInformation( - "MemberCenter token request. url={TokenUrl} client_id={ClientId} scope={Scope}", - tokenUrl, - clientId, - scope ?? string.Empty); - using var response = await client.PostAsync(tokenUrl, new FormUrlEncodedContent(form)); - if (response.IsSuccessStatusCode) - { - await using var stream = await response.Content.ReadAsStreamAsync(); - using var json = await JsonDocument.ParseAsync(stream); - if (json.RootElement.TryGetProperty("access_token", out var tokenElement)) - { - var accessToken = tokenElement.GetString(); - if (!string.IsNullOrWhiteSpace(accessToken)) - { - logger.LogInformation("MemberCenter token request succeeded."); - return accessToken; - } - } - } - else - { - var body = await response.Content.ReadAsStringAsync(); - logger.LogWarning( - "MemberCenter token request failed. status={StatusCode} body={Body}", - (int)response.StatusCode, - Truncate(body, 1000)); - } - } - catch (Exception ex) - { - logger.LogWarning(ex, "MemberCenter token request error, fallback to static token."); - } - } - - var fallbackToken = configuration["MemberCenter:ApiToken"]; - if (!string.IsNullOrWhiteSpace(fallbackToken)) - { - logger.LogWarning("Using MemberCenter__ApiToken fallback."); - } - - return fallbackToken; -} - -static string? ResolveMemberCenterUrl( - IConfiguration configuration, - string? fullUrlKey, - string baseUrlKey, - string pathKey, - string defaultPath) -{ - if (!string.IsNullOrWhiteSpace(fullUrlKey)) - { - var fullUrl = configuration[fullUrlKey]; - if (!string.IsNullOrWhiteSpace(fullUrl)) - { - return fullUrl; - } - } - - var baseUrl = configuration[baseUrlKey]; - if (string.IsNullOrWhiteSpace(baseUrl)) - { - return null; - } - - var path = configuration[pathKey]; - if (string.IsNullOrWhiteSpace(path)) - { - path = defaultPath; - } - - return $"{baseUrl.TrimEnd('/')}/{path.TrimStart('/')}"; -} - -static string BuildMaskedAuthHeader(string token) -{ - if (string.IsNullOrWhiteSpace(token)) - { - return "Bearer "; - } - - var visible = token.Length <= 8 ? token : token[^8..]; - return $"Bearer ***{visible}"; -} - -static string Truncate(string? input, int maxLen) -{ - if (string.IsNullOrEmpty(input)) - { - return string.Empty; - } - - return input.Length <= maxLen ? input : $"{input[..maxLen]}...(truncated)"; -} diff --git a/src/SendEngine.Api/SendEngine.Api.csproj b/src/SendEngine.Api/SendEngine.Api.csproj index 783de8c..ab074a5 100644 --- a/src/SendEngine.Api/SendEngine.Api.csproj +++ b/src/SendEngine.Api/SendEngine.Api.csproj @@ -10,6 +10,7 @@ + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/src/SendEngine.Api/Services/DevMockSenderWorker.cs b/src/SendEngine.Api/Services/DevMockSenderWorker.cs index 6469f6f..ef6862a 100644 --- a/src/SendEngine.Api/Services/DevMockSenderWorker.cs +++ b/src/SendEngine.Api/Services/DevMockSenderWorker.cs @@ -149,20 +149,20 @@ public sealed class DevMockSenderWorker : BackgroundService var deliveredCount = 0; if (provider == "ses") { - deliveredCount = await SendViaSesBulkAsync(campaign, replacements, cancellationToken); + deliveredCount = await SendViaSesAsync(job, campaign, replacements, cancellationToken); } else { deliveredCount = replacements.Count; foreach (var preview in replacements) { - preview.Placeholders.TryGetValue("unsubscribe_url", out var unsubscribeUrl); + var unsubscribeHeaders = BuildListUnsubscribeHeaders(campaign.Template, preview.Placeholders); _logger.LogInformation( - "MOCK send preview. send_job_id={SendJobId} email={Email} subject={Subject} unsubscribe_url={UnsubscribeUrl} body_html={BodyHtml} body_text={BodyText}", + "MOCK send preview. send_job_id={SendJobId} email={Email} subject={Subject} list_unsubscribe={ListUnsubscribe} body_html={BodyHtml} body_text={BodyText}", job.Id, preview.Email, preview.Subject, - unsubscribeUrl ?? string.Empty, + unsubscribeHeaders.ListUnsubscribe ?? string.Empty, preview.BodyHtml, preview.BodyText); } @@ -172,7 +172,6 @@ public sealed class DevMockSenderWorker : BackgroundService { var previewEvents = replacements.Select(preview => { - var unsubscribeUrl = preview.Placeholders.GetValueOrDefault("unsubscribe_url", string.Empty); var payload = JsonSerializer.Serialize(new { send_job_id = job.Id, @@ -183,7 +182,8 @@ public sealed class DevMockSenderWorker : BackgroundService subject = preview.Subject, body_html = preview.BodyHtml, body_text = preview.BodyText, - unsubscribe_url = unsubscribeUrl, + list_unsubscribe = BuildListUnsubscribeHeaders(campaign.Template, preview.Placeholders).ListUnsubscribe, + list_unsubscribe_post = BuildListUnsubscribeHeaders(campaign.Template, preview.Placeholders).ListUnsubscribePost, placeholders = preview.Placeholders, generated_at = DateTimeOffset.UtcNow }); @@ -245,7 +245,26 @@ public sealed class DevMockSenderWorker : BackgroundService } } - private async Task SendViaSesBulkAsync(Campaign campaign, IReadOnlyCollection recipients, CancellationToken cancellationToken) + private async Task SendViaSesAsync( + SendJob job, + Campaign campaign, + IReadOnlyCollection recipients, + CancellationToken cancellationToken) + { + var mode = (_configuration["Ses:SendMode"] ?? "raw_bulk").Trim().ToLowerInvariant(); + if (mode == "bulk_template") + { + return await SendViaSesBulkTemplateAsync(job, campaign, recipients, cancellationToken); + } + + return await SendViaSesRawBulkAsync(job, campaign, recipients, cancellationToken); + } + + private async Task SendViaSesBulkTemplateAsync( + SendJob job, + Campaign campaign, + IReadOnlyCollection recipients, + CancellationToken cancellationToken) { var region = _configuration["Ses:Region"] ?? "us-east-1"; var fromEmail = _configuration["Ses:FromEmail"] ?? string.Empty; @@ -286,6 +305,7 @@ public sealed class DevMockSenderWorker : BackgroundService { ToAddresses = new List { preview.Email } }, + ReplacementTags = BuildSesMessageTags(preview.Placeholders), ReplacementEmailContent = new ReplacementEmailContent { ReplacementTemplate = new ReplacementTemplate @@ -317,6 +337,96 @@ public sealed class DevMockSenderWorker : BackgroundService return delivered; } + private async Task SendViaSesRawBulkAsync( + SendJob job, + Campaign campaign, + IReadOnlyCollection recipients, + CancellationToken cancellationToken) + { + var region = _configuration["Ses:Region"] ?? "us-east-1"; + var fromEmail = _configuration["Ses:FromEmail"] ?? string.Empty; + if (string.IsNullOrWhiteSpace(fromEmail)) + { + _logger.LogWarning("SES send skipped: Ses__FromEmail is empty."); + return 0; + } + + using var ses = new AmazonSimpleEmailServiceV2Client(RegionEndpoint.GetBySystemName(region)); + + var delivered = 0; + var grouped = recipients + .GroupBy(x => + { + var headers = BuildListUnsubscribeHeaders(campaign.Template, x.Placeholders); + return $"{x.Subject}\n---\n{x.BodyHtml}\n---\n{x.BodyText}\n---\n{headers.ListUnsubscribe}\n---\n{headers.ListUnsubscribePost}"; + }, StringComparer.Ordinal) + .ToList(); + + foreach (var group in grouped) + { + foreach (var batch in group.Chunk(50)) + { + var subject = batch[0].Subject ?? string.Empty; + var bodyHtml = batch[0].BodyHtml; + var bodyText = batch[0].BodyText; + var listUnsubscribeHeaders = BuildListUnsubscribeHeaders(campaign.Template, batch[0].Placeholders); + var request = new SendEmailRequest + { + FromEmailAddress = fromEmail, + ConfigurationSetName = _configuration["Ses:ConfigurationSet"], + Destination = new Destination + { + ToAddresses = batch.Select(x => x.Email).ToList() + }, + EmailTags = BuildSesMessageTags(new Dictionary + { + ["tenant_id"] = job.TenantId.ToString("D"), + ["list_id"] = job.ListId.ToString("D"), + ["campaign_id"] = campaign.Id.ToString("D"), + ["send_job_id"] = job.Id.ToString("D") + }), + Content = new EmailContent + { + Simple = new Message + { + Subject = new Content + { + Data = subject, + Charset = "UTF-8" + }, + Body = new Body + { + Html = string.IsNullOrWhiteSpace(bodyHtml) + ? null + : new Content { Data = bodyHtml, Charset = "UTF-8" }, + Text = string.IsNullOrWhiteSpace(bodyText) + ? null + : new Content { Data = bodyText, Charset = "UTF-8" } + }, + Headers = BuildSesMessageHeaders(listUnsubscribeHeaders) + } + } + }; + + try + { + var response = await ses.SendEmailAsync(request, cancellationToken); + delivered += batch.Length; + _logger.LogInformation( + "SES raw bulk send succeeded. message_id={MessageId} recipients={RecipientCount}", + response.MessageId, + batch.Length); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "SES raw bulk send failed. recipients={RecipientCount}", batch.Length); + } + } + } + + return delivered; + } + private async Task> FetchOneClickUnsubscribeTokensAsync( Guid tenantId, Guid listId, @@ -429,13 +539,93 @@ public sealed class DevMockSenderWorker : BackgroundService ["unsubscribe_token"] = unsubscribeToken ?? string.Empty }; - var unsubscribeTemplate = ExtractTemplateString(campaign.Template, "unsubscribe_url"); - var unsubscribeUrl = Personalize(unsubscribeTemplate, values); - values["unsubscribe_url"] = unsubscribeUrl; + if (subscription.ExternalSubscriberId.HasValue) + { + values["subscriber_id"] = subscription.ExternalSubscriberId.Value.ToString("D"); + } return values; } + private static List BuildSesMessageTags(Dictionary placeholders) + { + var tags = new List(); + AddTag("tenant_id"); + AddTag("list_id"); + AddTag("campaign_id"); + AddTag("send_job_id"); + + return tags; + + void AddTag(string key) + { + if (!placeholders.TryGetValue(key, out var value) || string.IsNullOrWhiteSpace(value)) + { + return; + } + + tags.Add(new MessageTag + { + Name = key, + Value = value + }); + } + } + + private static (string? ListUnsubscribe, string? ListUnsubscribePost) BuildListUnsubscribeHeaders( + string? campaignTemplate, + Dictionary placeholders) + { + var urlTemplate = ExtractTemplateString(campaignTemplate, "list_unsubscribe_url_template"); + var mailto = ExtractTemplateString(campaignTemplate, "list_unsubscribe_mailto"); + var url = Personalize(urlTemplate, placeholders); + + var values = new List(); + if (!string.IsNullOrWhiteSpace(url)) + { + values.Add($"<{url}>"); + } + if (!string.IsNullOrWhiteSpace(mailto)) + { + values.Add($"<{mailto}>"); + } + + if (values.Count == 0) + { + return (null, null); + } + + return (string.Join(", ", values), "List-Unsubscribe=One-Click"); + } + + private static List? BuildSesMessageHeaders((string? ListUnsubscribe, string? ListUnsubscribePost) headers) + { + if (string.IsNullOrWhiteSpace(headers.ListUnsubscribe)) + { + return null; + } + + var result = new List + { + new() + { + Name = "List-Unsubscribe", + Value = headers.ListUnsubscribe + } + }; + + if (!string.IsNullOrWhiteSpace(headers.ListUnsubscribePost)) + { + result.Add(new MessageHeader + { + Name = "List-Unsubscribe-Post", + Value = headers.ListUnsubscribePost + }); + } + + return result; + } + private static async Task ResolveMemberCenterAccessTokenAsync( IConfiguration configuration, HttpClient client, diff --git a/src/SendEngine.Api/Services/SesEventProcessingService.cs b/src/SendEngine.Api/Services/SesEventProcessingService.cs new file mode 100644 index 0000000..6c4ae5e --- /dev/null +++ b/src/SendEngine.Api/Services/SesEventProcessingService.cs @@ -0,0 +1,808 @@ +using System.Net.Http.Headers; +using System.Text.Json; +using Microsoft.EntityFrameworkCore; +using SendEngine.Api.Models; +using SendEngine.Domain.Entities; +using SendEngine.Infrastructure.Data; + +namespace SendEngine.Api.Services; + +public sealed class SesEventProcessingService +{ + private readonly IServiceScopeFactory _scopeFactory; + private readonly IConfiguration _configuration; + private readonly ILogger _logger; + + public SesEventProcessingService( + IServiceScopeFactory scopeFactory, + IConfiguration configuration, + ILogger logger) + { + _scopeFactory = scopeFactory; + _configuration = configuration; + _logger = logger; + } + + public async Task ProcessRawJsonAsync( + string rawJson, + string? snsSignature, + CancellationToken cancellationToken) + { + JsonDocument bodyDoc; + try + { + bodyDoc = JsonDocument.Parse(rawJson); + } + catch (JsonException ex) + { + _logger.LogWarning(ex, "SES payload parse failed."); + return SesProcessResult.Permanent(422, "invalid_payload", "json_parse_failed"); + } + + using (bodyDoc) + { + return await ProcessBodyAsync(bodyDoc.RootElement, snsSignature, cancellationToken); + } + } + + public async Task ProcessBodyAsync( + JsonElement body, + string? snsSignature, + CancellationToken cancellationToken) + { + if (!TryNormalizeSesEventRequest(body, out var request, out var parseError)) + { + if (parseError.StartsWith("unsupported_sns_type:", StringComparison.Ordinal)) + { + _logger.LogInformation("SES webhook ignored non-notification SNS message. reason={Reason}", parseError); + return SesProcessResult.Ok(200, "ignored", parseError); + } + + _logger.LogWarning("SES webhook rejected: invalid_payload. reason={Reason}", parseError); + return SesProcessResult.Permanent(422, "invalid_payload", parseError); + } + + if (request.TenantId == Guid.Empty || string.IsNullOrWhiteSpace(request.Email)) + { + _logger.LogWarning("SES webhook rejected: tenant_id or email missing."); + return SesProcessResult.Permanent(422, "tenant_id_email_required", null); + } + + var skipValidation = _configuration.GetValue("Ses:SkipSignatureValidation", true); + _logger.LogInformation("SES webhook received. Ses__SkipSignatureValidation={SkipValidation}", skipValidation); + if (!skipValidation && string.IsNullOrWhiteSpace(snsSignature)) + { + _logger.LogWarning("SES webhook rejected: missing X-Amz-Sns-Signature while signature validation is enabled."); + return SesProcessResult.Permanent(401, "unauthorized", "missing_signature"); + } + + var normalizedEventType = NormalizeSesEventType(request.EventType, request.BounceType); + request.Email = request.Email.Trim().ToLowerInvariant(); + request.EventType = normalizedEventType; + + var testFriendlyEnabled = _configuration.GetValue("TestFriendly:Enabled", false); + if (testFriendlyEnabled) + { + Console.WriteLine( + "[TEST-FRIENDLY][SES] received event_type={0} tenant_id={1} email={2} skip_signature_validation={3}", + normalizedEventType, request.TenantId, request.Email, skipValidation); + _logger.LogWarning("TEST-FRIENDLY enabled: skip DB operations for /webhooks/ses, keep callback flow."); + + if (TryMapDisableReason(normalizedEventType, out var reason)) + { + if (!TryExtractGuidFromTags(request.Tags, "subscriber_id", out var subscriberId) || + !TryExtractGuidFromTags(request.Tags, "list_id", out var listId)) + { + return SesProcessResult.Permanent( + 422, + "test_friendly_tags_required", + "tags.subscriber_id and tags.list_id must be UUID strings when TestFriendly is enabled."); + } + + var shouldNotify = true; + if (normalizedEventType == "soft_bounced") + { + var threshold = _configuration.GetValue("Bounce:SoftBounceThreshold", 5); + shouldNotify = IsSoftBounceThresholdReachedFromTags(request.Tags, threshold); + } + + if (shouldNotify) + { + await NotifyMemberCenterDisableAsync( + _logger, + request.TenantId, + new[] { (SubscriberId: subscriberId, ListId: listId) }, + reason, + request.OccurredAt, + cancellationToken); + } + } + + return SesProcessResult.Ok(200, "ok", "test-friendly-no-db"); + } + + try + { + using var scope = _scopeFactory.CreateScope(); + var db = scope.ServiceProvider.GetRequiredService(); + + if (!await EnsureTenantForWebhookAsync(db, request.TenantId, false, cancellationToken)) + { + _logger.LogWarning("SES webhook rejected: tenant_not_found. tenant_id={TenantId}", request.TenantId); + return SesProcessResult.Permanent(422, "tenant_not_found", null); + } + + _logger.LogInformation( + "SES webhook processing started. mode=normal event_type={EventType} tenant_id={TenantId} email={Email}", + normalizedEventType, request.TenantId, request.Email); + + var payload = JsonSerializer.Serialize(request); + var inbox = new EventInbox + { + Id = Guid.NewGuid(), + TenantId = request.TenantId, + EventType = $"ses.{normalizedEventType}", + Source = "ses", + Payload = payload, + ReceivedAt = DateTimeOffset.UtcNow, + Status = "received" + }; + + db.EventsInbox.Add(inbox); + await db.SaveChangesAsync(cancellationToken); + + await ApplySesEventAsync(db, request, normalizedEventType, cancellationToken); + inbox.Status = "processed"; + inbox.ProcessedAt = DateTimeOffset.UtcNow; + await db.SaveChangesAsync(cancellationToken); + + _logger.LogInformation( + "SES webhook processed. event_type={EventType} tenant_id={TenantId} email={Email}", + normalizedEventType, request.TenantId, request.Email); + + return SesProcessResult.Ok(200, "ok", null); + } + catch (Exception ex) + { + _logger.LogError(ex, "SES processing failed unexpectedly."); + return SesProcessResult.Transient(500, "internal_error", "exception"); + } + } + + private async Task ApplySesEventAsync( + SendEngineDbContext db, + SesEventRequest request, + string normalizedEventType, + CancellationToken cancellationToken) + { + var subscriptions = await db.Subscriptions + .Join( + db.Lists.AsNoTracking(), + s => s.ListId, + l => l.Id, + (s, l) => new { Subscription = s, ListTenantId = l.TenantId }) + .Where(x => x.ListTenantId == request.TenantId && x.Subscription.Email == request.Email) + .Select(x => x.Subscription) + .ToListAsync(cancellationToken); + if (subscriptions.Count == 0) + { + _logger.LogWarning( + "SES event ignored: subscription not found. tenant_id={TenantId} email={Email} event_type={EventType}", + request.TenantId, request.Email, normalizedEventType); + return; + } + + _logger.LogInformation( + "SES event matched subscriptions. tenant_id={TenantId} email={Email} matched_count={MatchedCount} event_type={EventType}", + request.TenantId, request.Email, subscriptions.Count, normalizedEventType); + + switch (normalizedEventType) + { + case "hard_bounced": + await SuppressAndNotifyAsync(db, request.TenantId, subscriptions, "hard_bounce", request.OccurredAt, cancellationToken); + return; + case "soft_bounced": + var threshold = _configuration.GetValue("Bounce:SoftBounceThreshold", 5); + var reached = await IsSoftBounceThresholdReachedAsync(db, request.TenantId, request.Email, threshold, cancellationToken); + if (reached) + { + await SuppressAndNotifyAsync(db, request.TenantId, subscriptions, "soft_bounce_threshold", request.OccurredAt, cancellationToken); + } + return; + case "complaint": + await SuppressAndNotifyAsync(db, request.TenantId, subscriptions, "complaint", request.OccurredAt, cancellationToken); + return; + case "suppression": + await SuppressAndNotifyAsync(db, request.TenantId, subscriptions, "suppression", request.OccurredAt, cancellationToken); + return; + default: + return; + } + } + + private async Task IsSoftBounceThresholdReachedAsync( + SendEngineDbContext db, + Guid tenantId, + string normalizedEmail, + int threshold, + CancellationToken cancellationToken) + { + if (threshold < 1) + { + threshold = 1; + } + + var count = await db.Database.SqlQueryRaw( + """ + SELECT count(*)::int AS "Value" + FROM events_inbox + WHERE tenant_id = {0} + AND source = 'ses' + AND event_type = 'ses.soft_bounced' + AND payload->>'email' = {1} + """, + tenantId, + normalizedEmail) + .SingleAsync(cancellationToken); + + return count >= threshold; + } + + private async Task SuppressAndNotifyAsync( + SendEngineDbContext db, + Guid tenantId, + IReadOnlyCollection subscriptions, + string reason, + DateTimeOffset occurredAt, + CancellationToken cancellationToken) + { + var targets = subscriptions + .Where(x => x.ExternalSubscriberId.HasValue) + .Select(x => (SubscriberId: x.ExternalSubscriberId!.Value, ListId: x.ListId)) + .Distinct() + .ToArray(); + + foreach (var subscription in subscriptions) + { + if (string.Equals(subscription.Status, "suppressed", StringComparison.OrdinalIgnoreCase)) + { + continue; + } + + subscription.Status = "suppressed"; + subscription.UpdatedAt = DateTimeOffset.UtcNow; + } + + await db.SaveChangesAsync(cancellationToken); + + if (targets.Length == 0) + { + _logger.LogWarning( + "MemberCenter callback skipped: no external_subscriber_id found. tenant_id={TenantId} reason={Reason}", + tenantId, reason); + return; + } + + await NotifyMemberCenterDisableAsync(_logger, tenantId, targets, reason, occurredAt, cancellationToken); + } + + private async Task NotifyMemberCenterDisableAsync( + ILogger logger, + Guid tenantId, + IEnumerable<(Guid SubscriberId, Guid ListId)> targets, + string reason, + DateTimeOffset occurredAt, + CancellationToken cancellationToken) + { + var url = ResolveMemberCenterUrl( + null, + "MemberCenter:BaseUrl", + "MemberCenter:DisableSubscriptionPath", + "/subscriptions/disable"); + + if (string.IsNullOrWhiteSpace(url)) + { + logger.LogWarning("MemberCenter callback skipped: disable URL is empty."); + return; + } + + using var client = new HttpClient(); + var token = await ResolveMemberCenterAccessTokenAsync(client, logger, cancellationToken); + if (string.IsNullOrWhiteSpace(token)) + { + logger.LogWarning("MemberCenter callback skipped: access token is empty."); + return; + } + client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", token); + + foreach (var target in targets) + { + var body = new + { + tenant_id = tenantId, + subscriber_id = target.SubscriberId, + list_id = target.ListId, + reason, + disabled_by = "send_engine", + occurred_at = occurredAt + }; + + var response = await client.PostAsJsonAsync(url, body, cancellationToken); + if (!response.IsSuccessStatusCode) + { + var responseBody = await response.Content.ReadAsStringAsync(cancellationToken); + logger.LogWarning( + "MemberCenter disable callback failed. status={StatusCode} tenant_id={TenantId} subscriber_id={SubscriberId} list_id={ListId} reason={Reason} body={Body}", + (int)response.StatusCode, + tenantId, + target.SubscriberId, + target.ListId, + reason, + responseBody); + } + } + } + + private async Task ResolveMemberCenterAccessTokenAsync(HttpClient client, ILogger logger, CancellationToken cancellationToken) + { + var tokenUrl = ResolveMemberCenterUrl( + null, + "MemberCenter:BaseUrl", + "MemberCenter:TokenPath", + "/oauth/token"); + var clientId = _configuration["MemberCenter:ClientId"]; + var clientSecret = _configuration["MemberCenter:ClientSecret"]; + var scope = _configuration["MemberCenter:Scope"]; + + if (!string.IsNullOrWhiteSpace(tokenUrl) && + !string.IsNullOrWhiteSpace(clientId) && + !string.IsNullOrWhiteSpace(clientSecret)) + { + var form = new Dictionary + { + ["grant_type"] = "client_credentials", + ["client_id"] = clientId, + ["client_secret"] = clientSecret + }; + if (!string.IsNullOrWhiteSpace(scope)) + { + form["scope"] = scope; + } + + try + { + using var response = await client.PostAsync(tokenUrl, new FormUrlEncodedContent(form), cancellationToken); + var body = await response.Content.ReadAsStringAsync(cancellationToken); + if (response.IsSuccessStatusCode) + { + using var stream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(body)); + using var json = await JsonDocument.ParseAsync(stream, cancellationToken: cancellationToken); + if (json.RootElement.TryGetProperty("access_token", out var tokenElement)) + { + var accessToken = tokenElement.GetString(); + if (!string.IsNullOrWhiteSpace(accessToken)) + { + return accessToken; + } + } + } + } + catch (Exception ex) + { + logger.LogWarning(ex, "MemberCenter token request failed by exception."); + } + } + + var fallback = _configuration["MemberCenter:ApiToken"]; + return string.IsNullOrWhiteSpace(fallback) ? null : fallback; + } + + private string? ResolveMemberCenterUrl(string? directUrl, string baseUrlKey, string pathKey, string defaultPath) + { + if (!string.IsNullOrWhiteSpace(directUrl)) + { + return directUrl; + } + + var baseUrl = _configuration[baseUrlKey]; + if (string.IsNullOrWhiteSpace(baseUrl)) + { + return null; + } + + var path = _configuration[pathKey] ?? defaultPath; + return new Uri(new Uri(baseUrl), path).ToString(); + } + + private static string NormalizeSesEventType(string? eventType, string? bounceType) + { + var normalized = eventType?.Trim().ToLowerInvariant() ?? string.Empty; + if (normalized == "bounce") + { + var bounce = bounceType?.Trim().ToLowerInvariant() ?? string.Empty; + return bounce switch + { + "hard" => "hard_bounced", + "soft" => "soft_bounced", + _ => "bounce" + }; + } + + return normalized; + } + + private static bool TryMapDisableReason(string normalizedEventType, out string reason) + { + switch (normalizedEventType) + { + case "hard_bounced": + reason = "hard_bounce"; + return true; + case "soft_bounced": + reason = "soft_bounce_threshold"; + return true; + case "complaint": + reason = "complaint"; + return true; + case "suppression": + reason = "suppression"; + return true; + default: + reason = string.Empty; + return false; + } + } + + private static bool TryExtractGuidFromTags(Dictionary? tags, string key, out Guid value) + { + value = Guid.Empty; + if (tags is null) + { + return false; + } + + var keyCandidates = new[] + { + key, + key.ToLowerInvariant(), + key.ToUpperInvariant(), + key.Replace("_", string.Empty), + ToCamelCase(key) + }; + + foreach (var candidate in keyCandidates) + { + if (tags.TryGetValue(candidate, out var raw) && Guid.TryParse(raw, out value)) + { + return true; + } + } + + return false; + } + + private static string ToCamelCase(string value) + { + var parts = value.Split('_', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); + if (parts.Length == 0) + { + return value; + } + + return parts[0] + string.Concat(parts.Skip(1).Select(p => char.ToUpperInvariant(p[0]) + p[1..])); + } + + private static bool IsSoftBounceThresholdReachedFromTags(Dictionary? tags, int threshold) + { + if (threshold < 1) + { + threshold = 1; + } + + if (tags is null) + { + return true; + } + + return tags.TryGetValue("soft_bounce_count", out var raw) && int.TryParse(raw, out var count) + ? count >= threshold + : true; + } + + private static bool TryNormalizeSesEventRequest(JsonElement body, out SesEventRequest request, out string error) + { + request = new SesEventRequest(); + error = string.Empty; + + if (body.TryGetProperty("tenant_id", out _)) + { + try + { + request = JsonSerializer.Deserialize(body.GetRawText()) ?? new SesEventRequest(); + return true; + } + catch (JsonException ex) + { + error = $"flat_payload_invalid_json:{ex.Message}"; + return false; + } + } + + if (!body.TryGetProperty("Type", out var snsTypeElement)) + { + error = "missing_type"; + return false; + } + + var snsType = snsTypeElement.GetString() ?? string.Empty; + if (!string.Equals(snsType, "Notification", StringComparison.OrdinalIgnoreCase)) + { + error = $"unsupported_sns_type:{snsType}"; + return false; + } + + if (!body.TryGetProperty("Message", out var messageElement) || messageElement.ValueKind != JsonValueKind.String) + { + error = "missing_message"; + return false; + } + + var messageJson = messageElement.GetString() ?? string.Empty; + if (string.IsNullOrWhiteSpace(messageJson)) + { + error = "empty_message"; + return false; + } + + JsonDocument sesDoc; + try + { + sesDoc = JsonDocument.Parse(messageJson); + } + catch (JsonException ex) + { + error = $"ses_message_invalid_json:{ex.Message}"; + return false; + } + + using (sesDoc) + { + var root = sesDoc.RootElement; + var eventType = TryGetString(root, "eventType"); + if (string.IsNullOrWhiteSpace(eventType)) + { + error = "missing_eventType"; + return false; + } + + var tags = ExtractSesTags(root); + var email = ResolveSesRecipientEmail(root); + if (string.IsNullOrWhiteSpace(email)) + { + error = "missing_recipient_email"; + return false; + } + + var occurredAt = + ResolveSesOccurredAt(root) ?? + ParseDateTimeOffset(TryGetString(body, "Timestamp")) ?? + DateTimeOffset.UtcNow; + + var mailElement = TryGetProperty(root, "mail"); + var messageId = + TryGetString(mailElement, "messageId") ?? + TryGetString(body, "MessageId") ?? + string.Empty; + + var bounceType = ResolveBounceType(root, eventType); + var tenantId = Guid.Empty; + if (tags.TryGetValue("tenant_id", out var tenantRaw) && Guid.TryParse(tenantRaw, out var parsedTenant)) + { + tenantId = parsedTenant; + } + + request = new SesEventRequest + { + EventType = eventType, + MessageId = messageId, + TenantId = tenantId, + Email = email, + BounceType = bounceType, + OccurredAt = occurredAt, + Tags = tags + }; + + return true; + } + } + + private static string? ResolveBounceType(JsonElement root, string eventType) + { + if (!string.Equals(eventType, "Bounce", StringComparison.OrdinalIgnoreCase)) + { + return null; + } + + var bounceType = TryGetString(TryGetProperty(root, "bounce"), "bounceType")?.Trim().ToLowerInvariant(); + return bounceType switch + { + "permanent" => "hard", + "transient" => "soft", + _ => null + }; + } + + private static DateTimeOffset? ResolveSesOccurredAt(JsonElement root) + { + var eventType = TryGetString(root, "eventType")?.Trim().ToLowerInvariant(); + var timestamp = eventType switch + { + "bounce" => TryGetString(TryGetProperty(root, "bounce"), "timestamp"), + "complaint" => TryGetString(TryGetProperty(root, "complaint"), "timestamp"), + "delivery" => TryGetString(TryGetProperty(root, "delivery"), "timestamp"), + _ => null + }; + + return ParseDateTimeOffset(timestamp) + ?? ParseDateTimeOffset(TryGetString(TryGetProperty(root, "mail"), "timestamp")); + } + + private static string? ResolveSesRecipientEmail(JsonElement root) + { + var eventType = TryGetString(root, "eventType")?.Trim().ToLowerInvariant(); + switch (eventType) + { + case "bounce": + var bounced = TryGetFirstArrayItem(TryGetProperty(TryGetProperty(root, "bounce"), "bouncedRecipients")); + var bouncedEmail = TryGetString(bounced, "emailAddress"); + if (!string.IsNullOrWhiteSpace(bouncedEmail)) + { + return bouncedEmail; + } + break; + case "complaint": + var complained = TryGetFirstArrayItem(TryGetProperty(TryGetProperty(root, "complaint"), "complainedRecipients")); + var complaintEmail = TryGetString(complained, "emailAddress"); + if (!string.IsNullOrWhiteSpace(complaintEmail)) + { + return complaintEmail; + } + break; + case "delivery": + var recipient = TryGetFirstArrayString(TryGetProperty(TryGetProperty(root, "delivery"), "recipients")); + if (!string.IsNullOrWhiteSpace(recipient)) + { + return recipient; + } + break; + } + + return TryGetFirstArrayString(TryGetProperty(TryGetProperty(root, "mail"), "destination")); + } + + private static Dictionary ExtractSesTags(JsonElement root) + { + var result = new Dictionary(StringComparer.OrdinalIgnoreCase); + var tagsElement = TryGetProperty(TryGetProperty(root, "mail"), "tags"); + if (tagsElement.ValueKind != JsonValueKind.Object) + { + return result; + } + + foreach (var property in tagsElement.EnumerateObject()) + { + string? value = property.Value.ValueKind switch + { + JsonValueKind.Array => property.Value.EnumerateArray() + .Where(x => x.ValueKind == JsonValueKind.String) + .Select(x => x.GetString()) + .FirstOrDefault(x => !string.IsNullOrWhiteSpace(x)), + JsonValueKind.String => property.Value.GetString(), + _ => null + }; + + if (!string.IsNullOrWhiteSpace(value)) + { + result[property.Name] = value!; + } + } + + return result; + } + + private static JsonElement TryGetProperty(JsonElement element, string propertyName) + { + if (element.ValueKind == JsonValueKind.Object && element.TryGetProperty(propertyName, out var property)) + { + return property; + } + + return default; + } + + private static string? TryGetString(JsonElement element, string propertyName) + { + var property = TryGetProperty(element, propertyName); + return property.ValueKind == JsonValueKind.String ? property.GetString() : null; + } + + private static DateTimeOffset? ParseDateTimeOffset(string? value) + { + return !string.IsNullOrWhiteSpace(value) && DateTimeOffset.TryParse(value, out var parsed) ? parsed : null; + } + + private static JsonElement TryGetFirstArrayItem(JsonElement arrayElement) + { + if (arrayElement.ValueKind != JsonValueKind.Array) + { + return default; + } + + foreach (var item in arrayElement.EnumerateArray()) + { + return item; + } + + return default; + } + + private static string? TryGetFirstArrayString(JsonElement arrayElement) + { + if (arrayElement.ValueKind != JsonValueKind.Array) + { + return null; + } + + foreach (var item in arrayElement.EnumerateArray()) + { + if (item.ValueKind == JsonValueKind.String) + { + return item.GetString(); + } + } + + return null; + } + + private static async Task EnsureTenantForWebhookAsync( + SendEngineDbContext db, + Guid tenantId, + bool autoCreateTenant, + CancellationToken cancellationToken) + { + var tenantExists = await db.Tenants.AsNoTracking().AnyAsync(x => x.Id == tenantId, cancellationToken); + if (tenantExists) + { + return true; + } + + if (!autoCreateTenant) + { + return false; + } + + db.Tenants.Add(new Tenant + { + Id = tenantId, + Name = $"tenant-{tenantId:N}", + CreatedAt = DateTimeOffset.UtcNow + }); + await db.SaveChangesAsync(cancellationToken); + return true; + } +} + +public sealed record SesProcessResult( + bool Success, + bool TransientFailure, + int StatusCode, + string Error, + string? Reason) +{ + public static SesProcessResult Ok(int statusCode, string error, string? reason) => + new(true, false, statusCode, error, reason); + public static SesProcessResult Permanent(int statusCode, string error, string? reason) => + new(false, false, statusCode, error, reason); + public static SesProcessResult Transient(int statusCode, string error, string? reason) => + new(false, true, statusCode, error, reason); +} diff --git a/src/SendEngine.Api/Services/SqsSesPollerWorker.cs b/src/SendEngine.Api/Services/SqsSesPollerWorker.cs new file mode 100644 index 0000000..381ae47 --- /dev/null +++ b/src/SendEngine.Api/Services/SqsSesPollerWorker.cs @@ -0,0 +1,158 @@ +using System.Text.Json; +using Amazon; +using Amazon.SQS; +using Amazon.SQS.Model; + +namespace SendEngine.Api.Services; + +public sealed class SqsSesPollerWorker : BackgroundService +{ + private readonly IConfiguration _configuration; + private readonly ILogger _logger; + private readonly IServiceScopeFactory _scopeFactory; + + public SqsSesPollerWorker( + IConfiguration configuration, + ILogger logger, + IServiceScopeFactory scopeFactory) + { + _configuration = configuration; + _logger = logger; + _scopeFactory = scopeFactory; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var enabled = _configuration.GetValue("Sqs:Enabled", false); + if (!enabled) + { + _logger.LogInformation("SQS SES poller disabled."); + return; + } + + var queueUrl = _configuration["Sqs:QueueUrl"]; + if (string.IsNullOrWhiteSpace(queueUrl)) + { + _logger.LogWarning("SQS SES poller enabled but Sqs:QueueUrl is empty."); + return; + } + + var region = _configuration["Sqs:Region"] + ?? _configuration["AWS:Region"] + ?? _configuration["Ses:Region"] + ?? "ap-northeast-1"; + var waitSeconds = Math.Clamp(_configuration.GetValue("Sqs:PollWaitSeconds", 20), 1, 20); + var maxMessages = Math.Clamp(_configuration.GetValue("Sqs:MaxMessages", 10), 1, 10); + var visibilityTimeout = Math.Clamp(_configuration.GetValue("Sqs:VisibilityTimeoutSeconds", 30), 0, 43200); + + _logger.LogInformation( + "SQS SES poller started. queue_url={QueueUrl} region={Region} wait_seconds={WaitSeconds} max_messages={MaxMessages}", + queueUrl, region, waitSeconds, maxMessages); + + using var sqs = new AmazonSQSClient(RegionEndpoint.GetBySystemName(region)); + + while (!stoppingToken.IsCancellationRequested) + { + ReceiveMessageResponse received; + try + { + received = await sqs.ReceiveMessageAsync(new ReceiveMessageRequest + { + QueueUrl = queueUrl, + MaxNumberOfMessages = maxMessages, + WaitTimeSeconds = waitSeconds, + VisibilityTimeout = visibilityTimeout, + MessageSystemAttributeNames = new List + { + "ApproximateReceiveCount", + "SentTimestamp" + } + }, stoppingToken); + } + catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested) + { + _logger.LogInformation("SQS poller stopping: receive canceled by shutdown signal."); + break; + } + catch (TaskCanceledException) + { + _logger.LogWarning("SQS receive canceled (likely timeout/interruption)."); + continue; + } + catch (Exception ex) + { + _logger.LogError(ex, "SQS receive failed."); + await Task.Delay(TimeSpan.FromSeconds(3), stoppingToken); + continue; + } + + if (received.Messages.Count == 0) + { + continue; + } + + foreach (var message in received.Messages) + { + var receiveCount = message.Attributes.TryGetValue("ApproximateReceiveCount", out var countRaw) + ? countRaw + : "1"; + try + { + var signature = TryExtractSnsSignature(message.Body) ?? "sqs-relay"; + using var scope = _scopeFactory.CreateScope(); + var processor = scope.ServiceProvider.GetRequiredService(); + var result = await processor.ProcessRawJsonAsync(message.Body, signature, stoppingToken); + if (!result.Success) + { + _logger.LogWarning("SQS message processing not-success. status={StatusCode} transient={Transient} error={Error} reason={Reason} receive_count={ReceiveCount} message_id={MessageId}", + result.StatusCode, result.TransientFailure, result.Error, result.Reason, receiveCount, message.MessageId); + if (result.TransientFailure) + { + continue; + } + } + + if (result.Success || !result.TransientFailure) + { + await sqs.DeleteMessageAsync(queueUrl, message.ReceiptHandle, stoppingToken); + _logger.LogInformation( + "SQS message processed and deleted. message_id={MessageId} receive_count={ReceiveCount}", + message.MessageId, receiveCount); + continue; + } + } + catch (Exception ex) + { + _logger.LogError( + ex, + "SQS message processing failed. message_id={MessageId} receive_count={ReceiveCount}", + message.MessageId, receiveCount); + } + } + } + } + + private static string? TryExtractSnsSignature(string? body) + { + if (string.IsNullOrWhiteSpace(body)) + { + return null; + } + + try + { + using var doc = JsonDocument.Parse(body); + if (doc.RootElement.TryGetProperty("Signature", out var signature) + && signature.ValueKind == JsonValueKind.String) + { + return signature.GetString(); + } + } + catch (JsonException) + { + // ignore: body may not be SNS envelope + } + + return null; + } +}