feat: Add SQS integration for SES event processing
- Introduced SqsSesPollerWorker to poll messages from SQS and process SES events. - Implemented SesEventProcessingService to handle SES event payloads and store them in the database. - Updated DevMockSenderWorker to support new SES sending methods and improved logging for unsubscribe headers. - Added AWS SDK for SQS to project dependencies.
This commit is contained in:
parent
60a24ee7c0
commit
d49c30b447
14
.env.example
14
.env.example
@ -1,7 +1,6 @@
|
||||
ASPNETCORE_ENVIRONMENT=Development
|
||||
ConnectionStrings__Default=Host=localhost;Database=send_engine;Username=postgres;Password=postgres
|
||||
ESP__Provider=ses
|
||||
ESP__ApiKey=change_me
|
||||
Db__AutoMigrate=true
|
||||
Jwt__Issuer=http://localhost:7850/
|
||||
Jwt__Audience=send_engine_api
|
||||
@ -28,4 +27,17 @@ DevSender__PollIntervalSeconds=5
|
||||
Ses__Region=us-east-1
|
||||
Ses__FromEmail=
|
||||
Ses__ConfigurationSet=
|
||||
Ses__SendMode=raw_bulk
|
||||
Ses__TemplateName=
|
||||
Sqs__Enabled=false
|
||||
Sqs__QueueUrl=
|
||||
Sqs__Region=
|
||||
Sqs__PollWaitSeconds=20
|
||||
Sqs__MaxMessages=10
|
||||
Sqs__VisibilityTimeoutSeconds=30
|
||||
AWS__Region=ap-northeast-1
|
||||
AWS_EC2_METADATA_DISABLED=true
|
||||
# Local only (do NOT use in production):
|
||||
# AWS_ACCESS_KEY_ID=
|
||||
# AWS_SECRET_ACCESS_KEY=
|
||||
# AWS_SESSION_TOKEN=
|
||||
|
||||
11
README.md
11
README.md
@ -6,7 +6,7 @@
|
||||
- 接收 Member Center 的訂閱事件(activated / unsubscribed / preferences.updated)
|
||||
- 多租戶名單快照(依 tenant/list)且僅增量更新
|
||||
- 管理 Campaign / Send Job 與排程
|
||||
- 對接 ESP(SES / SendGrid / Mailgun)
|
||||
- 對接 ESP(Amazon SES + Mock)
|
||||
- 記錄投遞結果與退信(必要時回寫 Member Center)
|
||||
|
||||
## 既定條件
|
||||
@ -54,8 +54,7 @@ mass_mail_engine/
|
||||
## Build
|
||||
使用 VS Code `Run Build Task`(預設執行 `dotnet build SendEngine.sln`)。
|
||||
|
||||
## 待確認事項
|
||||
- 事件系統選擇(Kafka/RabbitMQ/SNS+SQS / Webhook)
|
||||
- ESP 優先順序(SES / SendGrid / Mailgun)
|
||||
- 退信回寫的規則(hard bounce / soft bounce)
|
||||
- 追蹤事件範圍(open / click / unsubscribe)
|
||||
## 目前待辦
|
||||
- SES/SNS 簽章完整驗證(目前 `Ses__SkipSignatureValidation=false` 僅檢查 header 存在)
|
||||
- 事件重試/DLQ 策略補強(目前主要依 SQS redrive policy)
|
||||
- recipient 狀態機擴充(delivery/open/click 的完整優先序與狀態轉換)
|
||||
|
||||
@ -55,8 +55,9 @@ Recipient 狀態:
|
||||
- 必要時呼叫 Member Center 停用/註記 API
|
||||
|
||||
目前實作:
|
||||
- 先以 `POST /webhooks/ses` 接收事件並更新資料
|
||||
- `SNS -> SQS -> Worker` 尚未落地
|
||||
- 已提供 `SqsSesPollerWorker`(SQS 輪詢)
|
||||
- worker 直接呼叫內部 SES processing service(不走 HTTP self-call)
|
||||
- 仍保留 `POST /webhooks/ses` 直接接收模式(相容/測試)
|
||||
|
||||
## 信任邊界與 Auth 模型
|
||||
### 外部角色
|
||||
@ -70,8 +71,8 @@ Recipient 狀態:
|
||||
|
||||
### 驗證方式(建議)
|
||||
1. **Member Center → Send Engine**
|
||||
- 使用簽名 Webhook(主推),或 OAuth2 Client Credentials
|
||||
- token 內含 `tenant_id` 與 scopes(例如 `newsletter:events.write`)
|
||||
- 使用簽名 Webhook(HMAC,已實作)
|
||||
- header 內含 `X-Client-Id`,對應 `auth_clients.id` 並受 tenant 綁定與 scope 驗證
|
||||
2. **租戶網站 → Send Engine**
|
||||
- 使用 OAuth2 Client Credentials 或 JWT(由 Member Center 簽發)
|
||||
- token 內含 `tenant_id` 與 scopes(例如 `newsletter:send.write`)
|
||||
|
||||
@ -20,10 +20,10 @@ Member Center 為多租戶架構,信件內容由各租戶網站產生後送入
|
||||
6. 寫入 List Store 快照(只增量更新,不拉全量)。
|
||||
7. 記錄處理結果與版本號(供重播與對帳)。
|
||||
|
||||
錯誤與重試:
|
||||
錯誤與重試(目前實作):
|
||||
- 驗證失敗 → 直接拒絕(401/403),不寫入 Inbox。
|
||||
- DB 暫時性錯誤 → 重試(含指數退避),仍失敗則進 DLQ。
|
||||
- 事件格式錯誤 → 標記為 invalid,記錄原因。
|
||||
- DB/執行錯誤 → 由請求端或佇列重送;尚未提供獨立 DLQ worker 與統一退避策略配置。
|
||||
- 事件格式錯誤 → 回 `422`。
|
||||
|
||||
## 1b. 全量名單同步流程(由 Member Center 主動推送)
|
||||
目的:避免 Send Engine 透過 API 拉取名單,降低名單外流風險。
|
||||
@ -56,7 +56,9 @@ Member Center 為多租戶架構,信件內容由各租戶網站產生後送入
|
||||
- 若啟用 one-click token endpoint,僅發送 `status=issued` 的收件者
|
||||
5. 發送執行:
|
||||
- `ESP__Provider=mock`:僅模擬發送,寫入預覽事件並輸出 console log
|
||||
- `ESP__Provider=ses`:使用 SES v2 `SendBulkEmail`(每批最多 50)
|
||||
- `ESP__Provider=ses`:
|
||||
- `Ses__SendMode=raw_bulk`:使用 SES v2 `SendEmail`,依內容分組每次最多 50 位收件者
|
||||
- `Ses__SendMode=bulk_template`:使用 SES v2 `SendBulkEmail`(每批最多 50)
|
||||
6. 更新 Send Job 狀態:
|
||||
- 全部成功:`completed`
|
||||
- 例外或部分失敗:`failed`
|
||||
@ -73,26 +75,28 @@ Member Center 為多租戶架構,信件內容由各租戶網站產生後送入
|
||||
## 3. 退信處理流程
|
||||
目的:處理 ESP 回報的 bounce/complaint,並回寫本地名單狀態。
|
||||
|
||||
目前實作流程(Webhook):
|
||||
1. 由 `POST /webhooks/ses` 接收 SES 事件 payload。
|
||||
2. 驗證(可透過 `Ses__SkipSignatureValidation` 控制是否要求簽章)。
|
||||
3. 將事件寫入 Inbox(append-only)。
|
||||
4. Consumer 解析事件:
|
||||
目前實作流程(Webhook + SQS Poller):
|
||||
1. 由 `SqsSesPollerWorker` 輪詢 SQS 取得 SNS envelope,直接呼叫內部 SES processing service。
|
||||
2. `POST /webhooks/ses` 也會呼叫同一套 processing service(相容直接呼叫模式)。
|
||||
3. 驗證(可透過 `Ses__SkipSignatureValidation` 控制是否要求簽章)。
|
||||
4. 將事件寫入 Inbox(append-only)。
|
||||
5. Consumer 解析事件:
|
||||
- hard bounce → 立即標記 blacklisted(同義於 `suppressed`)
|
||||
- soft bounce → 累計次數,達門檻(預設 5)才標記 blacklisted(`suppressed`)
|
||||
- complaint → 立即取消訂閱並標記 blacklisted(`suppressed`)
|
||||
- complaint → 立即標記 blacklisted(`suppressed`)
|
||||
- suppression 事件 → 直接對應為 `suppressed`(即黑名單)
|
||||
5. 更新 List Store 快照與投遞記錄。
|
||||
6. 回寫 Member Center(僅在以下條件):
|
||||
6. 更新 List Store 快照與投遞記錄。
|
||||
7. 回寫 Member Center(僅在以下條件):
|
||||
- hard bounce:已設黑名單
|
||||
- soft bounce:達門檻後設黑名單
|
||||
- complaint:取消訂閱
|
||||
- complaint:設黑名單
|
||||
- suppression:設黑名單
|
||||
|
||||
補充:
|
||||
- Unknown event 不應使 worker crash,應記錄後送入 DLQ
|
||||
- Throttle/暫時性網路錯誤使用指數退避重試
|
||||
- `SNS -> SQS -> Worker` 架構為正式環境建議,尚未在目前程式碼中落地
|
||||
- Unknown event 目前會略過或記錄,不會讓 worker 中止(不進獨立 DLQ)
|
||||
- 已落地 `SQS -> Worker -> 內部 SES processing service`;`/webhooks/ses` 仍保留直接呼叫相容模式
|
||||
- `Bounce` 事件會依 `bounceType` 正規化為 `hard_bounced`(Permanent)或 `soft_bounced`(Transient)
|
||||
- 亂序事件以優先序處理:`complaint` > `hard_bounced`/`suppression` > `soft_bounced` > `delivery`
|
||||
|
||||
回寫規則:
|
||||
- Send Engine 僅回寫「停用原因」與必要欄位
|
||||
|
||||
@ -2,6 +2,21 @@
|
||||
|
||||
- 需求:.NET SDK 8.x, PostgreSQL
|
||||
- 設定:複製 `.env.example` → `.env`
|
||||
- AWS Credential(SQS/SES):
|
||||
- Local / Docker 開發:
|
||||
- 設定 `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY`(若為臨時憑證再加 `AWS_SESSION_TOKEN`)
|
||||
- 設定 `AWS__Region`(或 `Sqs__Region` / `Ses__Region`)
|
||||
- 建議設 `AWS_EC2_METADATA_DISABLED=true`,避免本機誤打 EC2 metadata
|
||||
- ECS 正式環境(建議):
|
||||
- 不要放 Access Key,改用 ECS Task Role(`taskRoleArn`)
|
||||
- Task Role 至少授權:
|
||||
- `sqs:ReceiveMessage`
|
||||
- `sqs:DeleteMessage`
|
||||
- `sqs:GetQueueAttributes`
|
||||
- `ses:SendEmail`(raw_bulk)
|
||||
- `ses:SendBulkEmail`(bulk_template,若有使用)
|
||||
- ECS Task Definition 可設定環境變數:
|
||||
- `AWS_REGION=ap-northeast-1`(或直接用 `.env` 的 `AWS__Region`)
|
||||
- Migration:
|
||||
- 預設由 API 啟動時自動執行(`Db__AutoMigrate=true`)
|
||||
- 需要關閉時請設定 `Db__AutoMigrate=false`
|
||||
@ -48,14 +63,29 @@
|
||||
- Dev Sender(Mock 發信):
|
||||
- `DevSender__Enabled=true`:背景 worker 會處理 `pending` send jobs,並將每位收件人的預計發送內容寫入 `events_inbox`(`source=dev_sender`, `event_type=send.preview`)
|
||||
- `DevSender__PollIntervalSeconds`:輪詢間隔秒數(預設 5)
|
||||
- `ESP__Provider=ses` 時,背景 worker 會改用 SES `SendBulkEmail` 發送
|
||||
- SES 相關參數:`Ses__Region`、`Ses__FromEmail`、`Ses__ConfigurationSet`(可選)、`Ses__TemplateName`
|
||||
- `ESP__Provider=ses` 時,即使 `DevSender__Enabled=false`,背景 sender 仍會啟動並改用 SES 發送(模式由 `Ses__SendMode` 決定)
|
||||
- SES 相關參數:`Ses__Region`、`Ses__FromEmail`、`Ses__ConfigurationSet`(可選)、`Ses__SendMode`、`Ses__TemplateName`
|
||||
- `Ses__SendMode=raw_bulk`(預設):使用 SES `SendEmail`,依內容分組後每次最多 50 位收件者(不依賴 SES Template)
|
||||
- `Ses__SendMode=bulk_template`:使用 SES `SendBulkEmail` + Template(需提供 `template.ses_template_name` 或 `Ses__TemplateName`)
|
||||
- SES 發送時會附帶 message tags:`tenant_id`、`list_id`、`campaign_id`、`send_job_id`
|
||||
- SQS Poller(SES 事件回流):
|
||||
- `Sqs__Enabled=true`
|
||||
- `Sqs__QueueUrl=<aws_sqs_queue_url>`
|
||||
- `Sqs__Region`(未設定時回退 `AWS__Region` 再回退 `Ses__Region`)
|
||||
- `Sqs__PollWaitSeconds`(預設 20)
|
||||
- `Sqs__MaxMessages`(預設 10)
|
||||
- `Sqs__VisibilityTimeoutSeconds`(預設 30)
|
||||
- SQS poller 目前未提供獨立 DLQ worker;可先用 AWS SQS redrive policy 管理失敗訊息
|
||||
- `SendBulkEmail` 會使用 SES 模板名稱:
|
||||
- 先讀 `campaign.template.ses_template_name`
|
||||
- 若未提供則回退 `Ses__TemplateName`
|
||||
- 若設定了 Member Center one-click token endpoint,sender 會在發送前批次呼叫 `/newsletter/one-click-unsubscribe-tokens`,僅發送 `status=issued` 的收件者
|
||||
- 內容替換合約(Mock 與 SES 共用):
|
||||
- `{{email}}`
|
||||
- `{{unsubscribe_url}}`(可在 `template` JSON 中提供 `unsubscribe_url` 模板,例如 `https://member.example/unsub?email={{email}}`)
|
||||
- `{{unsubscribe_token}}`
|
||||
- `{{tenant_id}}` / `{{list_id}}` / `{{campaign_id}}` / `{{send_job_id}}`
|
||||
- 若在 `campaign.template` 提供:
|
||||
- `list_unsubscribe_url_template`
|
||||
- `list_unsubscribe_mailto`
|
||||
- sender 會自動加上 `List-Unsubscribe` 與 `List-Unsubscribe-Post` headers
|
||||
- 正式環境建議維持 `false`
|
||||
|
||||
@ -18,7 +18,7 @@
|
||||
- `list_id` 必須屬於該 tenant
|
||||
|
||||
### 2. Member Center → Send Engine Webhook
|
||||
使用簽名 Webhook(HMAC)或 OAuth2 Client Credentials(建議簽名)。
|
||||
目前實作為簽名 Webhook(HMAC)。
|
||||
|
||||
Header 建議:
|
||||
- `X-Signature`: `hex(hmac_sha256(secret, body))`
|
||||
@ -130,7 +130,6 @@ Request Body(分批示意):
|
||||
Response:
|
||||
- `200 OK`:accepted
|
||||
- `401/403`:驗證失敗
|
||||
- `409`:sync_id + batch_no 重複
|
||||
- `422`:格式錯誤
|
||||
|
||||
## API:租戶網站 → Send Engine
|
||||
@ -166,11 +165,10 @@ Request Body:
|
||||
- `window_start` 必須小於 `window_end`(若有提供)
|
||||
- `tenant_id` 必須已存在(不存在回 `422 tenant_not_found`)
|
||||
- `list_id` 若不存在,會在該 tenant 下自動建立 placeholder list 後建立 send job
|
||||
- `template` 可攜帶替換參數(例如:`{"unsubscribe_url":"https://member.example/unsub?email={{email}}","ses_template_name":"newsletter_default"}`)
|
||||
- `template` 可攜帶發信元資料(例如:`{"ses_template_name":"newsletter_default","list_unsubscribe_url_template":"https://member.example/unsubscribe?token={{unsubscribe_token}}","list_unsubscribe_mailto":"mailto:unsubscribe@member.example"}`)
|
||||
|
||||
替換合約(Mock/SES 一致):
|
||||
- `{{email}}`
|
||||
- `{{unsubscribe_url}}`
|
||||
- `{{unsubscribe_token}}`
|
||||
- `{{tenant_id}}`
|
||||
- `{{list_id}}`
|
||||
@ -209,10 +207,9 @@ curl -X POST "http://localhost:6060/api/send-jobs" \
|
||||
"list_id": "22222222-2222-2222-2222-222222222222",
|
||||
"name": "Weekly Update",
|
||||
"subject": "Hi {{email}}",
|
||||
"body_html": "<p>Hello {{email}}</p><p><a href=\"{{unsubscribe_url}}\">unsubscribe</a></p>",
|
||||
"body_text": "Hello {{email}} | unsubscribe: {{unsubscribe_url}}",
|
||||
"body_html": "<p>Hello {{email}}</p><p><a href=\"https://member.example/unsubscribe?token={{unsubscribe_token}}\">unsubscribe</a></p>",
|
||||
"body_text": "Hello {{email}} | unsubscribe: https://member.example/unsubscribe?token={{unsubscribe_token}}",
|
||||
"template": {
|
||||
"unsubscribe_url": "https://member.example/unsubscribe?email={{email}}",
|
||||
"ses_template_name": "newsletter_default"
|
||||
}
|
||||
}'
|
||||
@ -230,7 +227,12 @@ curl -X POST "http://localhost:6060/api/send-jobs" \
|
||||
說明:
|
||||
- `ESP__Provider=mock`(非 ses)時,會由 Dev Sender 產生 `send.preview` 事件供你檢查替換結果
|
||||
- `ESP__Provider=mock` 時,也會把每位收件人的模擬發送內容輸出到 console log(`MOCK send preview`)
|
||||
- `ESP__Provider=ses` 時,背景 sender 會用 SES `SendBulkEmail`(每批最多 50)
|
||||
- 若 `template.list_unsubscribe_url_template`(或 `template.list_unsubscribe_mailto`)有提供,sender 會加上:
|
||||
- `List-Unsubscribe`
|
||||
- `List-Unsubscribe-Post: List-Unsubscribe=One-Click`
|
||||
- `ESP__Provider=ses` 時,背景 sender 依 `Ses__SendMode` 發送:
|
||||
- `raw_bulk`(預設):SES `SendEmail`,依內容分組每次最多 50 位收件者
|
||||
- `bulk_template`:SES `SendBulkEmail`(每批最多 50,需要 SES template)
|
||||
- 若已設定 Member Center one-click token endpoint,發送前會批次呼叫 `POST /newsletter/one-click-unsubscribe-tokens`
|
||||
- 僅 `status=issued` 的收件者會被送出,並把 `unsubscribe_token` 注入替換內容
|
||||
|
||||
@ -283,6 +285,7 @@ Response:
|
||||
推薦架構(正式):
|
||||
- `SES Configuration Set -> SNS -> SQS -> ECS Worker`
|
||||
- 由 Worker 消費事件,不要求對外公開 webhook
|
||||
- 目前實作:`SqsSesPollerWorker` 會從 SQS 取訊息並直接呼叫內部 SES processing service
|
||||
|
||||
相容模式(可選):
|
||||
- `POST /webhooks/ses`
|
||||
@ -294,28 +297,37 @@ Response:
|
||||
Request Body(示意):
|
||||
```json
|
||||
{
|
||||
"event_type": "bounce",
|
||||
"message_id": "ses-id",
|
||||
"tenant_id": "uuid",
|
||||
"email": "user@example.com",
|
||||
"bounce_type": "hard",
|
||||
"occurred_at": "2026-02-10T09:45:00Z"
|
||||
"Type": "Notification",
|
||||
"MessageId": "sns-message-id",
|
||||
"Message": "{\"eventType\":\"Bounce\",\"mail\":{\"messageId\":\"ses-message-id\",\"tags\":{\"tenant_id\":[\"...\"],\"list_id\":[\"...\"]}},\"bounce\":{\"bounceType\":\"Permanent\",\"bouncedRecipients\":[{\"emailAddress\":\"user@example.com\"}],\"timestamp\":\"2026-02-10T09:45:00Z\"}}",
|
||||
"Timestamp": "2026-02-10T09:45:01Z"
|
||||
}
|
||||
```
|
||||
相容:仍接受舊版扁平 payload(`event_type`/`tenant_id`/`email`)。
|
||||
|
||||
Response:
|
||||
- `200 OK`
|
||||
- `422`:payload 解析錯誤或缺少必要欄位
|
||||
- `500`:處理時發生未預期錯誤(SQS poller 會視為 transient,保留訊息重試)
|
||||
|
||||
事件對應規則(固定):
|
||||
- `hard_bounced`:立即設為黑名單(`suppressed`)
|
||||
- `soft_bounced`:累計達門檻後設為黑名單(`suppressed`)
|
||||
- `complaint`:取消訂閱並回寫 Member Center
|
||||
- `complaint`:設為黑名單(`suppressed`)並回寫 Member Center(reason=`complaint`)
|
||||
- `suppression`:設為黑名單(`suppressed`)
|
||||
|
||||
SES `Bounce` 對應:
|
||||
- `bounce.bounceType=Permanent` → `hard_bounced`
|
||||
- `bounce.bounceType=Transient` → `soft_bounced`
|
||||
|
||||
事件優先序(規劃):
|
||||
- `complaint` > `hard_bounced`/`suppression` > `soft_bounced` > `delivery` > `open`/`click`
|
||||
- 目前實作尚未建立完整 recipient 狀態機覆蓋所有事件;實際會以 `suppressed` 作為最終不可發狀態
|
||||
|
||||
回寫 Member Center 條件:
|
||||
- `hard_bounced`:設黑名單後回寫
|
||||
- `soft_bounced`:達門檻設黑名單後回寫
|
||||
- `complaint`:立即回寫
|
||||
- `complaint`:設黑名單後立即回寫
|
||||
- `suppression`:設黑名單後回寫
|
||||
|
||||
回寫原因碼(固定):
|
||||
|
||||
@ -3,3 +3,4 @@
|
||||
- C# .NET Core
|
||||
- PostgreSQL
|
||||
- ESP: Amazon SES(實作)+ Mock Sender(開發測試)
|
||||
- Event Ingest: Amazon SNS + SQS(SES event 回流)
|
||||
|
||||
@ -199,12 +199,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/ErrorResponse'
|
||||
'409':
|
||||
description: Duplicate batch
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/ErrorResponse'
|
||||
'422':
|
||||
description: Validation error
|
||||
content:
|
||||
@ -222,7 +216,9 @@ paths:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/SesEvent'
|
||||
oneOf:
|
||||
- $ref: '#/components/schemas/SnsEnvelope'
|
||||
- $ref: '#/components/schemas/SesEvent'
|
||||
responses:
|
||||
'200':
|
||||
description: OK
|
||||
@ -238,6 +234,18 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/ErrorResponse'
|
||||
'422':
|
||||
description: Invalid payload
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/ErrorResponse'
|
||||
'500':
|
||||
description: Internal processing error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/ErrorResponse'
|
||||
|
||||
components:
|
||||
securitySchemes:
|
||||
@ -305,8 +313,9 @@ components:
|
||||
description: |
|
||||
Optional template metadata used by sender runtime.
|
||||
Supported keys:
|
||||
- unsubscribe_url: URL template, e.g. https://member.example/unsubscribe?token={{unsubscribe_token}}
|
||||
- ses_template_name: SES template name override
|
||||
- list_unsubscribe_url_template: URL template, e.g. https://member.example/unsubscribe?token={{unsubscribe_token}}
|
||||
- list_unsubscribe_mailto: mailto endpoint, e.g. mailto:unsubscribe@member.example
|
||||
scheduled_at:
|
||||
type: string
|
||||
format: date-time
|
||||
@ -506,6 +515,26 @@ components:
|
||||
type: string
|
||||
format: date-time
|
||||
|
||||
SnsEnvelope:
|
||||
type: object
|
||||
required: [Type, MessageId, Message, Timestamp]
|
||||
properties:
|
||||
Type:
|
||||
type: string
|
||||
enum: [Notification, SubscriptionConfirmation, UnsubscribeConfirmation]
|
||||
MessageId:
|
||||
type: string
|
||||
TopicArn:
|
||||
type: string
|
||||
Subject:
|
||||
type: string
|
||||
Message:
|
||||
type: string
|
||||
description: JSON string containing SES event payload
|
||||
Timestamp:
|
||||
type: string
|
||||
format: date-time
|
||||
|
||||
ErrorResponse:
|
||||
type: object
|
||||
required: [error]
|
||||
|
||||
@ -18,6 +18,8 @@ builder.Services.AddEndpointsApiExplorer();
|
||||
builder.Services.AddSwaggerGen();
|
||||
builder.Services.AddInfrastructure(builder.Configuration);
|
||||
builder.Services.AddHostedService<DevMockSenderWorker>();
|
||||
builder.Services.AddHostedService<SqsSesPollerWorker>();
|
||||
builder.Services.AddScoped<SesEventProcessingService>();
|
||||
|
||||
var jwtAuthority = builder.Configuration["Jwt:Authority"];
|
||||
var jwtMetadataAddress = builder.Configuration["Jwt:MetadataAddress"];
|
||||
@ -543,122 +545,24 @@ app.MapPost("/webhooks/lists/full-sync", async (
|
||||
|
||||
app.MapPost("/webhooks/ses", async (
|
||||
HttpContext httpContext,
|
||||
SesEventRequest request,
|
||||
SendEngineDbContext db,
|
||||
ILoggerFactory loggerFactory) =>
|
||||
JsonElement body,
|
||||
SesEventProcessingService sesProcessor) =>
|
||||
{
|
||||
var logger = loggerFactory.CreateLogger("SendEngine.Webhooks.Ses");
|
||||
|
||||
if (request.TenantId == Guid.Empty || string.IsNullOrWhiteSpace(request.Email))
|
||||
var signature = httpContext.Request.Headers["X-Amz-Sns-Signature"].ToString();
|
||||
var result = await sesProcessor.ProcessBodyAsync(body, signature, httpContext.RequestAborted);
|
||||
if (result.Success)
|
||||
{
|
||||
logger.LogWarning("SES webhook rejected: tenant_id or email missing.");
|
||||
return Results.UnprocessableEntity(new { error = "tenant_id_email_required" });
|
||||
return Results.StatusCode(result.StatusCode);
|
||||
}
|
||||
|
||||
var skipValidation = builder.Configuration.GetValue("Ses:SkipSignatureValidation", true);
|
||||
logger.LogInformation(
|
||||
"SES webhook received. Ses__SkipSignatureValidation={SkipValidation}",
|
||||
skipValidation);
|
||||
|
||||
var sesSignature = httpContext.Request.Headers["X-Amz-Sns-Signature"].ToString();
|
||||
if (!skipValidation && string.IsNullOrWhiteSpace(sesSignature))
|
||||
if (result.TransientFailure)
|
||||
{
|
||||
logger.LogWarning("SES webhook rejected: missing X-Amz-Sns-Signature while signature validation is enabled.");
|
||||
return Results.Unauthorized();
|
||||
return Results.StatusCode(result.StatusCode);
|
||||
}
|
||||
|
||||
var normalizedEventType = NormalizeSesEventType(request.EventType, request.BounceType);
|
||||
request.Email = request.Email.Trim().ToLowerInvariant();
|
||||
request.EventType = normalizedEventType;
|
||||
|
||||
// TEST-FRIENDLY TEMPORARY LOGIC:
|
||||
// In local integration testing, skip DB operations but keep callback flow to Member Center.
|
||||
// TODO(remove-test-friendly): Remove this branch once end-to-end DB flow is stable.
|
||||
var testFriendlyEnabled = builder.Configuration.GetValue("TestFriendly:Enabled", false);
|
||||
if (testFriendlyEnabled)
|
||||
{
|
||||
Console.WriteLine(
|
||||
"[TEST-FRIENDLY][SES] received event_type={0} tenant_id={1} email={2} skip_signature_validation={3}",
|
||||
normalizedEventType,
|
||||
request.TenantId,
|
||||
request.Email,
|
||||
skipValidation);
|
||||
logger.LogWarning("TEST-FRIENDLY enabled: skip DB operations for /webhooks/ses, keep callback flow.");
|
||||
|
||||
if (TryMapDisableReason(normalizedEventType, out var reason))
|
||||
{
|
||||
if (!TryExtractGuidFromTags(request.Tags, "subscriber_id", out var subscriberId) ||
|
||||
!TryExtractGuidFromTags(request.Tags, "list_id", out var listId))
|
||||
{
|
||||
return Results.UnprocessableEntity(new
|
||||
{
|
||||
error = "test_friendly_tags_required",
|
||||
message = "tags.subscriber_id and tags.list_id must be UUID strings when TestFriendly is enabled."
|
||||
});
|
||||
}
|
||||
|
||||
var shouldNotify = true;
|
||||
if (normalizedEventType == "soft_bounced")
|
||||
{
|
||||
var threshold = builder.Configuration.GetValue("Bounce:SoftBounceThreshold", 5);
|
||||
shouldNotify = IsSoftBounceThresholdReachedFromTags(request.Tags, threshold);
|
||||
}
|
||||
|
||||
if (shouldNotify)
|
||||
{
|
||||
await NotifyMemberCenterDisableAsync(
|
||||
builder.Configuration,
|
||||
logger,
|
||||
request.TenantId,
|
||||
new[] { (SubscriberId: subscriberId, ListId: listId) },
|
||||
reason,
|
||||
request.OccurredAt);
|
||||
}
|
||||
}
|
||||
|
||||
return Results.Ok(new { status = "ok", mode = "test-friendly-no-db" });
|
||||
}
|
||||
|
||||
if (!await EnsureTenantForWebhookAsync(db, request.TenantId, testFriendlyEnabled))
|
||||
{
|
||||
logger.LogWarning("SES webhook rejected: tenant_not_found. tenant_id={TenantId}", request.TenantId);
|
||||
return Results.UnprocessableEntity(new { error = "tenant_not_found" });
|
||||
}
|
||||
|
||||
logger.LogInformation(
|
||||
"SES webhook processing started. mode=normal event_type={EventType} tenant_id={TenantId} email={Email}",
|
||||
normalizedEventType,
|
||||
request.TenantId,
|
||||
request.Email);
|
||||
|
||||
var payload = JsonSerializer.Serialize(request);
|
||||
|
||||
var inbox = new EventInbox
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
TenantId = request.TenantId,
|
||||
EventType = $"ses.{normalizedEventType}",
|
||||
Source = "ses",
|
||||
Payload = payload,
|
||||
ReceivedAt = DateTimeOffset.UtcNow,
|
||||
Status = "received"
|
||||
};
|
||||
|
||||
db.EventsInbox.Add(inbox);
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
await ApplySesEventAsync(db, builder.Configuration, logger, request, normalizedEventType);
|
||||
inbox.Status = "processed";
|
||||
inbox.ProcessedAt = DateTimeOffset.UtcNow;
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
logger.LogInformation(
|
||||
"SES webhook processed. event_type={EventType} tenant_id={TenantId} email={Email}",
|
||||
normalizedEventType,
|
||||
request.TenantId,
|
||||
request.Email);
|
||||
|
||||
return Results.Ok();
|
||||
return Results.Json(
|
||||
new { error = result.Error, reason = result.Reason },
|
||||
statusCode: result.StatusCode);
|
||||
}).WithName("SesWebhook").WithOpenApi();
|
||||
|
||||
app.Run();
|
||||
@ -806,427 +710,3 @@ static string NormalizeStatus(string? status, string fallback)
|
||||
_ => fallback
|
||||
};
|
||||
}
|
||||
|
||||
static string NormalizeSesEventType(string? eventType, string? bounceType)
|
||||
{
|
||||
var normalized = eventType?.Trim().ToLowerInvariant() ?? string.Empty;
|
||||
if (normalized == "bounce")
|
||||
{
|
||||
var bounce = bounceType?.Trim().ToLowerInvariant() ?? string.Empty;
|
||||
return bounce switch
|
||||
{
|
||||
"hard" => "hard_bounced",
|
||||
"soft" => "soft_bounced",
|
||||
_ => "bounce"
|
||||
};
|
||||
}
|
||||
|
||||
return normalized;
|
||||
}
|
||||
|
||||
static bool TryMapDisableReason(string normalizedEventType, out string reason)
|
||||
{
|
||||
switch (normalizedEventType)
|
||||
{
|
||||
case "hard_bounced":
|
||||
reason = "hard_bounce";
|
||||
return true;
|
||||
case "soft_bounced":
|
||||
reason = "soft_bounce_threshold";
|
||||
return true;
|
||||
case "complaint":
|
||||
reason = "complaint";
|
||||
return true;
|
||||
case "suppression":
|
||||
reason = "suppression";
|
||||
return true;
|
||||
default:
|
||||
reason = string.Empty;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
static bool TryExtractGuidFromTags(Dictionary<string, string>? tags, string key, out Guid value)
|
||||
{
|
||||
value = Guid.Empty;
|
||||
if (tags is null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
var keyCandidates = new[]
|
||||
{
|
||||
key,
|
||||
key.ToLowerInvariant(),
|
||||
key.ToUpperInvariant(),
|
||||
key.Replace("_", string.Empty),
|
||||
ToCamelCase(key)
|
||||
};
|
||||
|
||||
foreach (var candidate in keyCandidates)
|
||||
{
|
||||
if (tags.TryGetValue(candidate, out var raw) && Guid.TryParse(raw, out value))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static string ToCamelCase(string value)
|
||||
{
|
||||
var parts = value.Split('_', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
|
||||
if (parts.Length == 0)
|
||||
{
|
||||
return value;
|
||||
}
|
||||
|
||||
return parts[0] + string.Concat(parts.Skip(1).Select(p => char.ToUpperInvariant(p[0]) + p[1..]));
|
||||
}
|
||||
|
||||
static bool IsSoftBounceThresholdReachedFromTags(Dictionary<string, string>? tags, int threshold)
|
||||
{
|
||||
if (threshold < 1)
|
||||
{
|
||||
threshold = 1;
|
||||
}
|
||||
|
||||
if (tags is null)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if (tags.TryGetValue("soft_bounce_count", out var raw) && int.TryParse(raw, out var count))
|
||||
{
|
||||
return count >= threshold;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static async Task ApplySesEventAsync(
|
||||
SendEngineDbContext db,
|
||||
IConfiguration configuration,
|
||||
ILogger logger,
|
||||
SesEventRequest request,
|
||||
string normalizedEventType)
|
||||
{
|
||||
var subscriptions = await db.Subscriptions
|
||||
.Join(
|
||||
db.Lists.AsNoTracking(),
|
||||
s => s.ListId,
|
||||
l => l.Id,
|
||||
(s, l) => new { Subscription = s, ListTenantId = l.TenantId })
|
||||
.Where(x => x.ListTenantId == request.TenantId && x.Subscription.Email == request.Email)
|
||||
.Select(x => x.Subscription)
|
||||
.ToListAsync();
|
||||
if (subscriptions.Count == 0)
|
||||
{
|
||||
logger.LogWarning(
|
||||
"SES event ignored: subscription not found. tenant_id={TenantId} email={Email} event_type={EventType}",
|
||||
request.TenantId,
|
||||
request.Email,
|
||||
normalizedEventType);
|
||||
return;
|
||||
}
|
||||
|
||||
logger.LogInformation(
|
||||
"SES event matched subscriptions. tenant_id={TenantId} email={Email} matched_count={MatchedCount} event_type={EventType}",
|
||||
request.TenantId,
|
||||
request.Email,
|
||||
subscriptions.Count,
|
||||
normalizedEventType);
|
||||
|
||||
switch (normalizedEventType)
|
||||
{
|
||||
case "hard_bounced":
|
||||
await SuppressAndNotifyAsync(db, configuration, logger, request.TenantId, subscriptions, "hard_bounce", request.OccurredAt);
|
||||
return;
|
||||
case "soft_bounced":
|
||||
var threshold = configuration.GetValue("Bounce:SoftBounceThreshold", 5);
|
||||
var reached = await IsSoftBounceThresholdReachedAsync(db, request.TenantId, request.Email, threshold);
|
||||
if (reached)
|
||||
{
|
||||
await SuppressAndNotifyAsync(db, configuration, logger, request.TenantId, subscriptions, "soft_bounce_threshold", request.OccurredAt);
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.LogInformation(
|
||||
"Soft bounce threshold not reached yet. tenant_id={TenantId} email={Email} threshold={Threshold}",
|
||||
request.TenantId,
|
||||
request.Email,
|
||||
threshold);
|
||||
}
|
||||
return;
|
||||
case "complaint":
|
||||
await SuppressAndNotifyAsync(db, configuration, logger, request.TenantId, subscriptions, "complaint", request.OccurredAt);
|
||||
return;
|
||||
case "suppression":
|
||||
await SuppressAndNotifyAsync(db, configuration, logger, request.TenantId, subscriptions, "suppression", request.OccurredAt);
|
||||
return;
|
||||
default:
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
static async Task<bool> IsSoftBounceThresholdReachedAsync(
|
||||
SendEngineDbContext db,
|
||||
Guid tenantId,
|
||||
string normalizedEmail,
|
||||
int threshold)
|
||||
{
|
||||
if (threshold < 1)
|
||||
{
|
||||
threshold = 1;
|
||||
}
|
||||
|
||||
var count = await db.Database.SqlQuery<int>($"""
|
||||
SELECT count(*)::int AS "Value"
|
||||
FROM events_inbox
|
||||
WHERE tenant_id = {tenantId}
|
||||
AND source = 'ses'
|
||||
AND event_type = 'ses.soft_bounced'
|
||||
AND payload->>'email' = {normalizedEmail}
|
||||
""").SingleAsync();
|
||||
|
||||
return count >= threshold;
|
||||
}
|
||||
|
||||
static async Task SuppressAndNotifyAsync(
|
||||
SendEngineDbContext db,
|
||||
IConfiguration configuration,
|
||||
ILogger logger,
|
||||
Guid tenantId,
|
||||
IReadOnlyCollection<Subscription> subscriptions,
|
||||
string reason,
|
||||
DateTimeOffset occurredAt)
|
||||
{
|
||||
if (subscriptions.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var now = DateTimeOffset.UtcNow;
|
||||
foreach (var subscription in subscriptions)
|
||||
{
|
||||
subscription.Status = "suppressed";
|
||||
subscription.UpdatedAt = now;
|
||||
}
|
||||
|
||||
await db.SaveChangesAsync();
|
||||
|
||||
var notifyTargets = subscriptions
|
||||
.Where(x => x.ExternalSubscriberId.HasValue)
|
||||
.Select(x => (SubscriberId: x.ExternalSubscriberId!.Value, ListId: x.ListId))
|
||||
.Distinct()
|
||||
.ToArray();
|
||||
|
||||
logger.LogInformation(
|
||||
"Subscriptions suppressed. tenant_id={TenantId} reason={Reason} matched_count={MatchedCount} notify_target_count={NotifyCount}",
|
||||
tenantId,
|
||||
reason,
|
||||
subscriptions.Count,
|
||||
notifyTargets.Length);
|
||||
await NotifyMemberCenterDisableAsync(configuration, logger, tenantId, notifyTargets, reason, occurredAt);
|
||||
}
|
||||
|
||||
static async Task NotifyMemberCenterDisableAsync(
|
||||
IConfiguration configuration,
|
||||
ILogger logger,
|
||||
Guid tenantId,
|
||||
IReadOnlyCollection<(Guid SubscriberId, Guid ListId)> targets,
|
||||
string reason,
|
||||
DateTimeOffset occurredAt)
|
||||
{
|
||||
if (targets.Count == 0)
|
||||
{
|
||||
logger.LogWarning("MemberCenter callback skipped: no disable targets.");
|
||||
return;
|
||||
}
|
||||
|
||||
var url = ResolveMemberCenterUrl(
|
||||
configuration,
|
||||
null,
|
||||
"MemberCenter:BaseUrl",
|
||||
"MemberCenter:DisableSubscriptionPath",
|
||||
"/subscriptions/disable");
|
||||
if (string.IsNullOrWhiteSpace(url))
|
||||
{
|
||||
logger.LogWarning("MemberCenter callback skipped: URL is empty.");
|
||||
return;
|
||||
}
|
||||
|
||||
using var client = new HttpClient();
|
||||
var token = await ResolveMemberCenterAccessTokenAsync(configuration, client, logger);
|
||||
if (string.IsNullOrWhiteSpace(token))
|
||||
{
|
||||
logger.LogWarning("MemberCenter callback skipped: access token is empty.");
|
||||
return;
|
||||
}
|
||||
client.DefaultRequestHeaders.Authorization = new("Bearer", token);
|
||||
logger.LogInformation(
|
||||
"MemberCenter callback prepared. url={Url} auth_header={AuthHeader}",
|
||||
url,
|
||||
BuildMaskedAuthHeader(token));
|
||||
|
||||
foreach (var target in targets)
|
||||
{
|
||||
var payload = new
|
||||
{
|
||||
tenant_id = tenantId,
|
||||
subscriber_id = target.SubscriberId,
|
||||
list_id = target.ListId,
|
||||
reason,
|
||||
disabled_by = "send_engine",
|
||||
occurred_at = occurredAt
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
var payloadJson = JsonSerializer.Serialize(payload);
|
||||
logger.LogInformation(
|
||||
"MemberCenter callback request. method=POST url={Url} headers={Headers} body={Body}",
|
||||
url,
|
||||
$"Authorization:{BuildMaskedAuthHeader(token)}; Content-Type:application/json",
|
||||
payloadJson);
|
||||
|
||||
using var response = await client.PostAsJsonAsync(url, payload);
|
||||
var responseBody = await response.Content.ReadAsStringAsync();
|
||||
logger.LogInformation(
|
||||
"MemberCenter callback response. status={StatusCode} body={Body}",
|
||||
(int)response.StatusCode,
|
||||
Truncate(responseBody, 1000));
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex, "MemberCenter callback failed. url={Url} list_id={ListId}", url, target.ListId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static async Task<string?> ResolveMemberCenterAccessTokenAsync(IConfiguration configuration, HttpClient client, ILogger logger)
|
||||
{
|
||||
var tokenUrl = ResolveMemberCenterUrl(
|
||||
configuration,
|
||||
null,
|
||||
"MemberCenter:BaseUrl",
|
||||
"MemberCenter:TokenPath",
|
||||
"/oauth/token");
|
||||
var clientId = configuration["MemberCenter:ClientId"];
|
||||
var clientSecret = configuration["MemberCenter:ClientSecret"];
|
||||
var scope = configuration["MemberCenter:Scope"];
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(tokenUrl) &&
|
||||
!string.IsNullOrWhiteSpace(clientId) &&
|
||||
!string.IsNullOrWhiteSpace(clientSecret))
|
||||
{
|
||||
var form = new Dictionary<string, string>
|
||||
{
|
||||
["grant_type"] = "client_credentials",
|
||||
["client_id"] = clientId,
|
||||
["client_secret"] = clientSecret
|
||||
};
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(scope))
|
||||
{
|
||||
form["scope"] = scope;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
logger.LogInformation(
|
||||
"MemberCenter token request. url={TokenUrl} client_id={ClientId} scope={Scope}",
|
||||
tokenUrl,
|
||||
clientId,
|
||||
scope ?? string.Empty);
|
||||
using var response = await client.PostAsync(tokenUrl, new FormUrlEncodedContent(form));
|
||||
if (response.IsSuccessStatusCode)
|
||||
{
|
||||
await using var stream = await response.Content.ReadAsStreamAsync();
|
||||
using var json = await JsonDocument.ParseAsync(stream);
|
||||
if (json.RootElement.TryGetProperty("access_token", out var tokenElement))
|
||||
{
|
||||
var accessToken = tokenElement.GetString();
|
||||
if (!string.IsNullOrWhiteSpace(accessToken))
|
||||
{
|
||||
logger.LogInformation("MemberCenter token request succeeded.");
|
||||
return accessToken;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
var body = await response.Content.ReadAsStringAsync();
|
||||
logger.LogWarning(
|
||||
"MemberCenter token request failed. status={StatusCode} body={Body}",
|
||||
(int)response.StatusCode,
|
||||
Truncate(body, 1000));
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "MemberCenter token request error, fallback to static token.");
|
||||
}
|
||||
}
|
||||
|
||||
var fallbackToken = configuration["MemberCenter:ApiToken"];
|
||||
if (!string.IsNullOrWhiteSpace(fallbackToken))
|
||||
{
|
||||
logger.LogWarning("Using MemberCenter__ApiToken fallback.");
|
||||
}
|
||||
|
||||
return fallbackToken;
|
||||
}
|
||||
|
||||
static string? ResolveMemberCenterUrl(
|
||||
IConfiguration configuration,
|
||||
string? fullUrlKey,
|
||||
string baseUrlKey,
|
||||
string pathKey,
|
||||
string defaultPath)
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(fullUrlKey))
|
||||
{
|
||||
var fullUrl = configuration[fullUrlKey];
|
||||
if (!string.IsNullOrWhiteSpace(fullUrl))
|
||||
{
|
||||
return fullUrl;
|
||||
}
|
||||
}
|
||||
|
||||
var baseUrl = configuration[baseUrlKey];
|
||||
if (string.IsNullOrWhiteSpace(baseUrl))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var path = configuration[pathKey];
|
||||
if (string.IsNullOrWhiteSpace(path))
|
||||
{
|
||||
path = defaultPath;
|
||||
}
|
||||
|
||||
return $"{baseUrl.TrimEnd('/')}/{path.TrimStart('/')}";
|
||||
}
|
||||
|
||||
static string BuildMaskedAuthHeader(string token)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(token))
|
||||
{
|
||||
return "Bearer <empty>";
|
||||
}
|
||||
|
||||
var visible = token.Length <= 8 ? token : token[^8..];
|
||||
return $"Bearer ***{visible}";
|
||||
}
|
||||
|
||||
static string Truncate(string? input, int maxLen)
|
||||
{
|
||||
if (string.IsNullOrEmpty(input))
|
||||
{
|
||||
return string.Empty;
|
||||
}
|
||||
|
||||
return input.Length <= maxLen ? input : $"{input[..maxLen]}...(truncated)";
|
||||
}
|
||||
|
||||
@ -10,6 +10,7 @@
|
||||
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="8.0.0" />
|
||||
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="8.0.23" />
|
||||
<PackageReference Include="AWSSDK.SimpleEmailV2" Version="3.7.401.2" />
|
||||
<PackageReference Include="AWSSDK.SQS" Version="3.7.500" />
|
||||
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="8.0.0">
|
||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
|
||||
<PrivateAssets>all</PrivateAssets>
|
||||
|
||||
@ -149,20 +149,20 @@ public sealed class DevMockSenderWorker : BackgroundService
|
||||
var deliveredCount = 0;
|
||||
if (provider == "ses")
|
||||
{
|
||||
deliveredCount = await SendViaSesBulkAsync(campaign, replacements, cancellationToken);
|
||||
deliveredCount = await SendViaSesAsync(job, campaign, replacements, cancellationToken);
|
||||
}
|
||||
else
|
||||
{
|
||||
deliveredCount = replacements.Count;
|
||||
foreach (var preview in replacements)
|
||||
{
|
||||
preview.Placeholders.TryGetValue("unsubscribe_url", out var unsubscribeUrl);
|
||||
var unsubscribeHeaders = BuildListUnsubscribeHeaders(campaign.Template, preview.Placeholders);
|
||||
_logger.LogInformation(
|
||||
"MOCK send preview. send_job_id={SendJobId} email={Email} subject={Subject} unsubscribe_url={UnsubscribeUrl} body_html={BodyHtml} body_text={BodyText}",
|
||||
"MOCK send preview. send_job_id={SendJobId} email={Email} subject={Subject} list_unsubscribe={ListUnsubscribe} body_html={BodyHtml} body_text={BodyText}",
|
||||
job.Id,
|
||||
preview.Email,
|
||||
preview.Subject,
|
||||
unsubscribeUrl ?? string.Empty,
|
||||
unsubscribeHeaders.ListUnsubscribe ?? string.Empty,
|
||||
preview.BodyHtml,
|
||||
preview.BodyText);
|
||||
}
|
||||
@ -172,7 +172,6 @@ public sealed class DevMockSenderWorker : BackgroundService
|
||||
{
|
||||
var previewEvents = replacements.Select(preview =>
|
||||
{
|
||||
var unsubscribeUrl = preview.Placeholders.GetValueOrDefault("unsubscribe_url", string.Empty);
|
||||
var payload = JsonSerializer.Serialize(new
|
||||
{
|
||||
send_job_id = job.Id,
|
||||
@ -183,7 +182,8 @@ public sealed class DevMockSenderWorker : BackgroundService
|
||||
subject = preview.Subject,
|
||||
body_html = preview.BodyHtml,
|
||||
body_text = preview.BodyText,
|
||||
unsubscribe_url = unsubscribeUrl,
|
||||
list_unsubscribe = BuildListUnsubscribeHeaders(campaign.Template, preview.Placeholders).ListUnsubscribe,
|
||||
list_unsubscribe_post = BuildListUnsubscribeHeaders(campaign.Template, preview.Placeholders).ListUnsubscribePost,
|
||||
placeholders = preview.Placeholders,
|
||||
generated_at = DateTimeOffset.UtcNow
|
||||
});
|
||||
@ -245,7 +245,26 @@ public sealed class DevMockSenderWorker : BackgroundService
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<int> SendViaSesBulkAsync(Campaign campaign, IReadOnlyCollection<RecipientPreview> recipients, CancellationToken cancellationToken)
|
||||
private async Task<int> SendViaSesAsync(
|
||||
SendJob job,
|
||||
Campaign campaign,
|
||||
IReadOnlyCollection<RecipientPreview> recipients,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var mode = (_configuration["Ses:SendMode"] ?? "raw_bulk").Trim().ToLowerInvariant();
|
||||
if (mode == "bulk_template")
|
||||
{
|
||||
return await SendViaSesBulkTemplateAsync(job, campaign, recipients, cancellationToken);
|
||||
}
|
||||
|
||||
return await SendViaSesRawBulkAsync(job, campaign, recipients, cancellationToken);
|
||||
}
|
||||
|
||||
private async Task<int> SendViaSesBulkTemplateAsync(
|
||||
SendJob job,
|
||||
Campaign campaign,
|
||||
IReadOnlyCollection<RecipientPreview> recipients,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var region = _configuration["Ses:Region"] ?? "us-east-1";
|
||||
var fromEmail = _configuration["Ses:FromEmail"] ?? string.Empty;
|
||||
@ -286,6 +305,7 @@ public sealed class DevMockSenderWorker : BackgroundService
|
||||
{
|
||||
ToAddresses = new List<string> { preview.Email }
|
||||
},
|
||||
ReplacementTags = BuildSesMessageTags(preview.Placeholders),
|
||||
ReplacementEmailContent = new ReplacementEmailContent
|
||||
{
|
||||
ReplacementTemplate = new ReplacementTemplate
|
||||
@ -317,6 +337,96 @@ public sealed class DevMockSenderWorker : BackgroundService
|
||||
return delivered;
|
||||
}
|
||||
|
||||
private async Task<int> SendViaSesRawBulkAsync(
|
||||
SendJob job,
|
||||
Campaign campaign,
|
||||
IReadOnlyCollection<RecipientPreview> recipients,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var region = _configuration["Ses:Region"] ?? "us-east-1";
|
||||
var fromEmail = _configuration["Ses:FromEmail"] ?? string.Empty;
|
||||
if (string.IsNullOrWhiteSpace(fromEmail))
|
||||
{
|
||||
_logger.LogWarning("SES send skipped: Ses__FromEmail is empty.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
using var ses = new AmazonSimpleEmailServiceV2Client(RegionEndpoint.GetBySystemName(region));
|
||||
|
||||
var delivered = 0;
|
||||
var grouped = recipients
|
||||
.GroupBy(x =>
|
||||
{
|
||||
var headers = BuildListUnsubscribeHeaders(campaign.Template, x.Placeholders);
|
||||
return $"{x.Subject}\n---\n{x.BodyHtml}\n---\n{x.BodyText}\n---\n{headers.ListUnsubscribe}\n---\n{headers.ListUnsubscribePost}";
|
||||
}, StringComparer.Ordinal)
|
||||
.ToList();
|
||||
|
||||
foreach (var group in grouped)
|
||||
{
|
||||
foreach (var batch in group.Chunk(50))
|
||||
{
|
||||
var subject = batch[0].Subject ?? string.Empty;
|
||||
var bodyHtml = batch[0].BodyHtml;
|
||||
var bodyText = batch[0].BodyText;
|
||||
var listUnsubscribeHeaders = BuildListUnsubscribeHeaders(campaign.Template, batch[0].Placeholders);
|
||||
var request = new SendEmailRequest
|
||||
{
|
||||
FromEmailAddress = fromEmail,
|
||||
ConfigurationSetName = _configuration["Ses:ConfigurationSet"],
|
||||
Destination = new Destination
|
||||
{
|
||||
ToAddresses = batch.Select(x => x.Email).ToList()
|
||||
},
|
||||
EmailTags = BuildSesMessageTags(new Dictionary<string, string>
|
||||
{
|
||||
["tenant_id"] = job.TenantId.ToString("D"),
|
||||
["list_id"] = job.ListId.ToString("D"),
|
||||
["campaign_id"] = campaign.Id.ToString("D"),
|
||||
["send_job_id"] = job.Id.ToString("D")
|
||||
}),
|
||||
Content = new EmailContent
|
||||
{
|
||||
Simple = new Message
|
||||
{
|
||||
Subject = new Content
|
||||
{
|
||||
Data = subject,
|
||||
Charset = "UTF-8"
|
||||
},
|
||||
Body = new Body
|
||||
{
|
||||
Html = string.IsNullOrWhiteSpace(bodyHtml)
|
||||
? null
|
||||
: new Content { Data = bodyHtml, Charset = "UTF-8" },
|
||||
Text = string.IsNullOrWhiteSpace(bodyText)
|
||||
? null
|
||||
: new Content { Data = bodyText, Charset = "UTF-8" }
|
||||
},
|
||||
Headers = BuildSesMessageHeaders(listUnsubscribeHeaders)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
var response = await ses.SendEmailAsync(request, cancellationToken);
|
||||
delivered += batch.Length;
|
||||
_logger.LogInformation(
|
||||
"SES raw bulk send succeeded. message_id={MessageId} recipients={RecipientCount}",
|
||||
response.MessageId,
|
||||
batch.Length);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "SES raw bulk send failed. recipients={RecipientCount}", batch.Length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return delivered;
|
||||
}
|
||||
|
||||
private async Task<Dictionary<Guid, string>> FetchOneClickUnsubscribeTokensAsync(
|
||||
Guid tenantId,
|
||||
Guid listId,
|
||||
@ -429,13 +539,93 @@ public sealed class DevMockSenderWorker : BackgroundService
|
||||
["unsubscribe_token"] = unsubscribeToken ?? string.Empty
|
||||
};
|
||||
|
||||
var unsubscribeTemplate = ExtractTemplateString(campaign.Template, "unsubscribe_url");
|
||||
var unsubscribeUrl = Personalize(unsubscribeTemplate, values);
|
||||
values["unsubscribe_url"] = unsubscribeUrl;
|
||||
if (subscription.ExternalSubscriberId.HasValue)
|
||||
{
|
||||
values["subscriber_id"] = subscription.ExternalSubscriberId.Value.ToString("D");
|
||||
}
|
||||
|
||||
return values;
|
||||
}
|
||||
|
||||
private static List<MessageTag> BuildSesMessageTags(Dictionary<string, string> placeholders)
|
||||
{
|
||||
var tags = new List<MessageTag>();
|
||||
AddTag("tenant_id");
|
||||
AddTag("list_id");
|
||||
AddTag("campaign_id");
|
||||
AddTag("send_job_id");
|
||||
|
||||
return tags;
|
||||
|
||||
void AddTag(string key)
|
||||
{
|
||||
if (!placeholders.TryGetValue(key, out var value) || string.IsNullOrWhiteSpace(value))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
tags.Add(new MessageTag
|
||||
{
|
||||
Name = key,
|
||||
Value = value
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private static (string? ListUnsubscribe, string? ListUnsubscribePost) BuildListUnsubscribeHeaders(
|
||||
string? campaignTemplate,
|
||||
Dictionary<string, string> placeholders)
|
||||
{
|
||||
var urlTemplate = ExtractTemplateString(campaignTemplate, "list_unsubscribe_url_template");
|
||||
var mailto = ExtractTemplateString(campaignTemplate, "list_unsubscribe_mailto");
|
||||
var url = Personalize(urlTemplate, placeholders);
|
||||
|
||||
var values = new List<string>();
|
||||
if (!string.IsNullOrWhiteSpace(url))
|
||||
{
|
||||
values.Add($"<{url}>");
|
||||
}
|
||||
if (!string.IsNullOrWhiteSpace(mailto))
|
||||
{
|
||||
values.Add($"<{mailto}>");
|
||||
}
|
||||
|
||||
if (values.Count == 0)
|
||||
{
|
||||
return (null, null);
|
||||
}
|
||||
|
||||
return (string.Join(", ", values), "List-Unsubscribe=One-Click");
|
||||
}
|
||||
|
||||
private static List<MessageHeader>? BuildSesMessageHeaders((string? ListUnsubscribe, string? ListUnsubscribePost) headers)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(headers.ListUnsubscribe))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var result = new List<MessageHeader>
|
||||
{
|
||||
new()
|
||||
{
|
||||
Name = "List-Unsubscribe",
|
||||
Value = headers.ListUnsubscribe
|
||||
}
|
||||
};
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(headers.ListUnsubscribePost))
|
||||
{
|
||||
result.Add(new MessageHeader
|
||||
{
|
||||
Name = "List-Unsubscribe-Post",
|
||||
Value = headers.ListUnsubscribePost
|
||||
});
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private static async Task<string?> ResolveMemberCenterAccessTokenAsync(
|
||||
IConfiguration configuration,
|
||||
HttpClient client,
|
||||
|
||||
808
src/SendEngine.Api/Services/SesEventProcessingService.cs
Normal file
808
src/SendEngine.Api/Services/SesEventProcessingService.cs
Normal file
@ -0,0 +1,808 @@
|
||||
using System.Net.Http.Headers;
|
||||
using System.Text.Json;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using SendEngine.Api.Models;
|
||||
using SendEngine.Domain.Entities;
|
||||
using SendEngine.Infrastructure.Data;
|
||||
|
||||
namespace SendEngine.Api.Services;
|
||||
|
||||
public sealed class SesEventProcessingService
|
||||
{
|
||||
private readonly IServiceScopeFactory _scopeFactory;
|
||||
private readonly IConfiguration _configuration;
|
||||
private readonly ILogger<SesEventProcessingService> _logger;
|
||||
|
||||
public SesEventProcessingService(
|
||||
IServiceScopeFactory scopeFactory,
|
||||
IConfiguration configuration,
|
||||
ILogger<SesEventProcessingService> logger)
|
||||
{
|
||||
_scopeFactory = scopeFactory;
|
||||
_configuration = configuration;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public async Task<SesProcessResult> ProcessRawJsonAsync(
|
||||
string rawJson,
|
||||
string? snsSignature,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
JsonDocument bodyDoc;
|
||||
try
|
||||
{
|
||||
bodyDoc = JsonDocument.Parse(rawJson);
|
||||
}
|
||||
catch (JsonException ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "SES payload parse failed.");
|
||||
return SesProcessResult.Permanent(422, "invalid_payload", "json_parse_failed");
|
||||
}
|
||||
|
||||
using (bodyDoc)
|
||||
{
|
||||
return await ProcessBodyAsync(bodyDoc.RootElement, snsSignature, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<SesProcessResult> ProcessBodyAsync(
|
||||
JsonElement body,
|
||||
string? snsSignature,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (!TryNormalizeSesEventRequest(body, out var request, out var parseError))
|
||||
{
|
||||
if (parseError.StartsWith("unsupported_sns_type:", StringComparison.Ordinal))
|
||||
{
|
||||
_logger.LogInformation("SES webhook ignored non-notification SNS message. reason={Reason}", parseError);
|
||||
return SesProcessResult.Ok(200, "ignored", parseError);
|
||||
}
|
||||
|
||||
_logger.LogWarning("SES webhook rejected: invalid_payload. reason={Reason}", parseError);
|
||||
return SesProcessResult.Permanent(422, "invalid_payload", parseError);
|
||||
}
|
||||
|
||||
if (request.TenantId == Guid.Empty || string.IsNullOrWhiteSpace(request.Email))
|
||||
{
|
||||
_logger.LogWarning("SES webhook rejected: tenant_id or email missing.");
|
||||
return SesProcessResult.Permanent(422, "tenant_id_email_required", null);
|
||||
}
|
||||
|
||||
var skipValidation = _configuration.GetValue("Ses:SkipSignatureValidation", true);
|
||||
_logger.LogInformation("SES webhook received. Ses__SkipSignatureValidation={SkipValidation}", skipValidation);
|
||||
if (!skipValidation && string.IsNullOrWhiteSpace(snsSignature))
|
||||
{
|
||||
_logger.LogWarning("SES webhook rejected: missing X-Amz-Sns-Signature while signature validation is enabled.");
|
||||
return SesProcessResult.Permanent(401, "unauthorized", "missing_signature");
|
||||
}
|
||||
|
||||
var normalizedEventType = NormalizeSesEventType(request.EventType, request.BounceType);
|
||||
request.Email = request.Email.Trim().ToLowerInvariant();
|
||||
request.EventType = normalizedEventType;
|
||||
|
||||
var testFriendlyEnabled = _configuration.GetValue("TestFriendly:Enabled", false);
|
||||
if (testFriendlyEnabled)
|
||||
{
|
||||
Console.WriteLine(
|
||||
"[TEST-FRIENDLY][SES] received event_type={0} tenant_id={1} email={2} skip_signature_validation={3}",
|
||||
normalizedEventType, request.TenantId, request.Email, skipValidation);
|
||||
_logger.LogWarning("TEST-FRIENDLY enabled: skip DB operations for /webhooks/ses, keep callback flow.");
|
||||
|
||||
if (TryMapDisableReason(normalizedEventType, out var reason))
|
||||
{
|
||||
if (!TryExtractGuidFromTags(request.Tags, "subscriber_id", out var subscriberId) ||
|
||||
!TryExtractGuidFromTags(request.Tags, "list_id", out var listId))
|
||||
{
|
||||
return SesProcessResult.Permanent(
|
||||
422,
|
||||
"test_friendly_tags_required",
|
||||
"tags.subscriber_id and tags.list_id must be UUID strings when TestFriendly is enabled.");
|
||||
}
|
||||
|
||||
var shouldNotify = true;
|
||||
if (normalizedEventType == "soft_bounced")
|
||||
{
|
||||
var threshold = _configuration.GetValue("Bounce:SoftBounceThreshold", 5);
|
||||
shouldNotify = IsSoftBounceThresholdReachedFromTags(request.Tags, threshold);
|
||||
}
|
||||
|
||||
if (shouldNotify)
|
||||
{
|
||||
await NotifyMemberCenterDisableAsync(
|
||||
_logger,
|
||||
request.TenantId,
|
||||
new[] { (SubscriberId: subscriberId, ListId: listId) },
|
||||
reason,
|
||||
request.OccurredAt,
|
||||
cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
return SesProcessResult.Ok(200, "ok", "test-friendly-no-db");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var db = scope.ServiceProvider.GetRequiredService<SendEngineDbContext>();
|
||||
|
||||
if (!await EnsureTenantForWebhookAsync(db, request.TenantId, false, cancellationToken))
|
||||
{
|
||||
_logger.LogWarning("SES webhook rejected: tenant_not_found. tenant_id={TenantId}", request.TenantId);
|
||||
return SesProcessResult.Permanent(422, "tenant_not_found", null);
|
||||
}
|
||||
|
||||
_logger.LogInformation(
|
||||
"SES webhook processing started. mode=normal event_type={EventType} tenant_id={TenantId} email={Email}",
|
||||
normalizedEventType, request.TenantId, request.Email);
|
||||
|
||||
var payload = JsonSerializer.Serialize(request);
|
||||
var inbox = new EventInbox
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
TenantId = request.TenantId,
|
||||
EventType = $"ses.{normalizedEventType}",
|
||||
Source = "ses",
|
||||
Payload = payload,
|
||||
ReceivedAt = DateTimeOffset.UtcNow,
|
||||
Status = "received"
|
||||
};
|
||||
|
||||
db.EventsInbox.Add(inbox);
|
||||
await db.SaveChangesAsync(cancellationToken);
|
||||
|
||||
await ApplySesEventAsync(db, request, normalizedEventType, cancellationToken);
|
||||
inbox.Status = "processed";
|
||||
inbox.ProcessedAt = DateTimeOffset.UtcNow;
|
||||
await db.SaveChangesAsync(cancellationToken);
|
||||
|
||||
_logger.LogInformation(
|
||||
"SES webhook processed. event_type={EventType} tenant_id={TenantId} email={Email}",
|
||||
normalizedEventType, request.TenantId, request.Email);
|
||||
|
||||
return SesProcessResult.Ok(200, "ok", null);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "SES processing failed unexpectedly.");
|
||||
return SesProcessResult.Transient(500, "internal_error", "exception");
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ApplySesEventAsync(
|
||||
SendEngineDbContext db,
|
||||
SesEventRequest request,
|
||||
string normalizedEventType,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var subscriptions = await db.Subscriptions
|
||||
.Join(
|
||||
db.Lists.AsNoTracking(),
|
||||
s => s.ListId,
|
||||
l => l.Id,
|
||||
(s, l) => new { Subscription = s, ListTenantId = l.TenantId })
|
||||
.Where(x => x.ListTenantId == request.TenantId && x.Subscription.Email == request.Email)
|
||||
.Select(x => x.Subscription)
|
||||
.ToListAsync(cancellationToken);
|
||||
if (subscriptions.Count == 0)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"SES event ignored: subscription not found. tenant_id={TenantId} email={Email} event_type={EventType}",
|
||||
request.TenantId, request.Email, normalizedEventType);
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.LogInformation(
|
||||
"SES event matched subscriptions. tenant_id={TenantId} email={Email} matched_count={MatchedCount} event_type={EventType}",
|
||||
request.TenantId, request.Email, subscriptions.Count, normalizedEventType);
|
||||
|
||||
switch (normalizedEventType)
|
||||
{
|
||||
case "hard_bounced":
|
||||
await SuppressAndNotifyAsync(db, request.TenantId, subscriptions, "hard_bounce", request.OccurredAt, cancellationToken);
|
||||
return;
|
||||
case "soft_bounced":
|
||||
var threshold = _configuration.GetValue("Bounce:SoftBounceThreshold", 5);
|
||||
var reached = await IsSoftBounceThresholdReachedAsync(db, request.TenantId, request.Email, threshold, cancellationToken);
|
||||
if (reached)
|
||||
{
|
||||
await SuppressAndNotifyAsync(db, request.TenantId, subscriptions, "soft_bounce_threshold", request.OccurredAt, cancellationToken);
|
||||
}
|
||||
return;
|
||||
case "complaint":
|
||||
await SuppressAndNotifyAsync(db, request.TenantId, subscriptions, "complaint", request.OccurredAt, cancellationToken);
|
||||
return;
|
||||
case "suppression":
|
||||
await SuppressAndNotifyAsync(db, request.TenantId, subscriptions, "suppression", request.OccurredAt, cancellationToken);
|
||||
return;
|
||||
default:
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<bool> IsSoftBounceThresholdReachedAsync(
|
||||
SendEngineDbContext db,
|
||||
Guid tenantId,
|
||||
string normalizedEmail,
|
||||
int threshold,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
if (threshold < 1)
|
||||
{
|
||||
threshold = 1;
|
||||
}
|
||||
|
||||
var count = await db.Database.SqlQueryRaw<int>(
|
||||
"""
|
||||
SELECT count(*)::int AS "Value"
|
||||
FROM events_inbox
|
||||
WHERE tenant_id = {0}
|
||||
AND source = 'ses'
|
||||
AND event_type = 'ses.soft_bounced'
|
||||
AND payload->>'email' = {1}
|
||||
""",
|
||||
tenantId,
|
||||
normalizedEmail)
|
||||
.SingleAsync(cancellationToken);
|
||||
|
||||
return count >= threshold;
|
||||
}
|
||||
|
||||
private async Task SuppressAndNotifyAsync(
|
||||
SendEngineDbContext db,
|
||||
Guid tenantId,
|
||||
IReadOnlyCollection<Subscription> subscriptions,
|
||||
string reason,
|
||||
DateTimeOffset occurredAt,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var targets = subscriptions
|
||||
.Where(x => x.ExternalSubscriberId.HasValue)
|
||||
.Select(x => (SubscriberId: x.ExternalSubscriberId!.Value, ListId: x.ListId))
|
||||
.Distinct()
|
||||
.ToArray();
|
||||
|
||||
foreach (var subscription in subscriptions)
|
||||
{
|
||||
if (string.Equals(subscription.Status, "suppressed", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
subscription.Status = "suppressed";
|
||||
subscription.UpdatedAt = DateTimeOffset.UtcNow;
|
||||
}
|
||||
|
||||
await db.SaveChangesAsync(cancellationToken);
|
||||
|
||||
if (targets.Length == 0)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"MemberCenter callback skipped: no external_subscriber_id found. tenant_id={TenantId} reason={Reason}",
|
||||
tenantId, reason);
|
||||
return;
|
||||
}
|
||||
|
||||
await NotifyMemberCenterDisableAsync(_logger, tenantId, targets, reason, occurredAt, cancellationToken);
|
||||
}
|
||||
|
||||
private async Task NotifyMemberCenterDisableAsync(
|
||||
ILogger logger,
|
||||
Guid tenantId,
|
||||
IEnumerable<(Guid SubscriberId, Guid ListId)> targets,
|
||||
string reason,
|
||||
DateTimeOffset occurredAt,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var url = ResolveMemberCenterUrl(
|
||||
null,
|
||||
"MemberCenter:BaseUrl",
|
||||
"MemberCenter:DisableSubscriptionPath",
|
||||
"/subscriptions/disable");
|
||||
|
||||
if (string.IsNullOrWhiteSpace(url))
|
||||
{
|
||||
logger.LogWarning("MemberCenter callback skipped: disable URL is empty.");
|
||||
return;
|
||||
}
|
||||
|
||||
using var client = new HttpClient();
|
||||
var token = await ResolveMemberCenterAccessTokenAsync(client, logger, cancellationToken);
|
||||
if (string.IsNullOrWhiteSpace(token))
|
||||
{
|
||||
logger.LogWarning("MemberCenter callback skipped: access token is empty.");
|
||||
return;
|
||||
}
|
||||
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", token);
|
||||
|
||||
foreach (var target in targets)
|
||||
{
|
||||
var body = new
|
||||
{
|
||||
tenant_id = tenantId,
|
||||
subscriber_id = target.SubscriberId,
|
||||
list_id = target.ListId,
|
||||
reason,
|
||||
disabled_by = "send_engine",
|
||||
occurred_at = occurredAt
|
||||
};
|
||||
|
||||
var response = await client.PostAsJsonAsync(url, body, cancellationToken);
|
||||
if (!response.IsSuccessStatusCode)
|
||||
{
|
||||
var responseBody = await response.Content.ReadAsStringAsync(cancellationToken);
|
||||
logger.LogWarning(
|
||||
"MemberCenter disable callback failed. status={StatusCode} tenant_id={TenantId} subscriber_id={SubscriberId} list_id={ListId} reason={Reason} body={Body}",
|
||||
(int)response.StatusCode,
|
||||
tenantId,
|
||||
target.SubscriberId,
|
||||
target.ListId,
|
||||
reason,
|
||||
responseBody);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<string?> ResolveMemberCenterAccessTokenAsync(HttpClient client, ILogger logger, CancellationToken cancellationToken)
|
||||
{
|
||||
var tokenUrl = ResolveMemberCenterUrl(
|
||||
null,
|
||||
"MemberCenter:BaseUrl",
|
||||
"MemberCenter:TokenPath",
|
||||
"/oauth/token");
|
||||
var clientId = _configuration["MemberCenter:ClientId"];
|
||||
var clientSecret = _configuration["MemberCenter:ClientSecret"];
|
||||
var scope = _configuration["MemberCenter:Scope"];
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(tokenUrl) &&
|
||||
!string.IsNullOrWhiteSpace(clientId) &&
|
||||
!string.IsNullOrWhiteSpace(clientSecret))
|
||||
{
|
||||
var form = new Dictionary<string, string>
|
||||
{
|
||||
["grant_type"] = "client_credentials",
|
||||
["client_id"] = clientId,
|
||||
["client_secret"] = clientSecret
|
||||
};
|
||||
if (!string.IsNullOrWhiteSpace(scope))
|
||||
{
|
||||
form["scope"] = scope;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
using var response = await client.PostAsync(tokenUrl, new FormUrlEncodedContent(form), cancellationToken);
|
||||
var body = await response.Content.ReadAsStringAsync(cancellationToken);
|
||||
if (response.IsSuccessStatusCode)
|
||||
{
|
||||
using var stream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(body));
|
||||
using var json = await JsonDocument.ParseAsync(stream, cancellationToken: cancellationToken);
|
||||
if (json.RootElement.TryGetProperty("access_token", out var tokenElement))
|
||||
{
|
||||
var accessToken = tokenElement.GetString();
|
||||
if (!string.IsNullOrWhiteSpace(accessToken))
|
||||
{
|
||||
return accessToken;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "MemberCenter token request failed by exception.");
|
||||
}
|
||||
}
|
||||
|
||||
var fallback = _configuration["MemberCenter:ApiToken"];
|
||||
return string.IsNullOrWhiteSpace(fallback) ? null : fallback;
|
||||
}
|
||||
|
||||
private string? ResolveMemberCenterUrl(string? directUrl, string baseUrlKey, string pathKey, string defaultPath)
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(directUrl))
|
||||
{
|
||||
return directUrl;
|
||||
}
|
||||
|
||||
var baseUrl = _configuration[baseUrlKey];
|
||||
if (string.IsNullOrWhiteSpace(baseUrl))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var path = _configuration[pathKey] ?? defaultPath;
|
||||
return new Uri(new Uri(baseUrl), path).ToString();
|
||||
}
|
||||
|
||||
private static string NormalizeSesEventType(string? eventType, string? bounceType)
|
||||
{
|
||||
var normalized = eventType?.Trim().ToLowerInvariant() ?? string.Empty;
|
||||
if (normalized == "bounce")
|
||||
{
|
||||
var bounce = bounceType?.Trim().ToLowerInvariant() ?? string.Empty;
|
||||
return bounce switch
|
||||
{
|
||||
"hard" => "hard_bounced",
|
||||
"soft" => "soft_bounced",
|
||||
_ => "bounce"
|
||||
};
|
||||
}
|
||||
|
||||
return normalized;
|
||||
}
|
||||
|
||||
private static bool TryMapDisableReason(string normalizedEventType, out string reason)
|
||||
{
|
||||
switch (normalizedEventType)
|
||||
{
|
||||
case "hard_bounced":
|
||||
reason = "hard_bounce";
|
||||
return true;
|
||||
case "soft_bounced":
|
||||
reason = "soft_bounce_threshold";
|
||||
return true;
|
||||
case "complaint":
|
||||
reason = "complaint";
|
||||
return true;
|
||||
case "suppression":
|
||||
reason = "suppression";
|
||||
return true;
|
||||
default:
|
||||
reason = string.Empty;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private static bool TryExtractGuidFromTags(Dictionary<string, string>? tags, string key, out Guid value)
|
||||
{
|
||||
value = Guid.Empty;
|
||||
if (tags is null)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
var keyCandidates = new[]
|
||||
{
|
||||
key,
|
||||
key.ToLowerInvariant(),
|
||||
key.ToUpperInvariant(),
|
||||
key.Replace("_", string.Empty),
|
||||
ToCamelCase(key)
|
||||
};
|
||||
|
||||
foreach (var candidate in keyCandidates)
|
||||
{
|
||||
if (tags.TryGetValue(candidate, out var raw) && Guid.TryParse(raw, out value))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
private static string ToCamelCase(string value)
|
||||
{
|
||||
var parts = value.Split('_', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
|
||||
if (parts.Length == 0)
|
||||
{
|
||||
return value;
|
||||
}
|
||||
|
||||
return parts[0] + string.Concat(parts.Skip(1).Select(p => char.ToUpperInvariant(p[0]) + p[1..]));
|
||||
}
|
||||
|
||||
private static bool IsSoftBounceThresholdReachedFromTags(Dictionary<string, string>? tags, int threshold)
|
||||
{
|
||||
if (threshold < 1)
|
||||
{
|
||||
threshold = 1;
|
||||
}
|
||||
|
||||
if (tags is null)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
return tags.TryGetValue("soft_bounce_count", out var raw) && int.TryParse(raw, out var count)
|
||||
? count >= threshold
|
||||
: true;
|
||||
}
|
||||
|
||||
private static bool TryNormalizeSesEventRequest(JsonElement body, out SesEventRequest request, out string error)
|
||||
{
|
||||
request = new SesEventRequest();
|
||||
error = string.Empty;
|
||||
|
||||
if (body.TryGetProperty("tenant_id", out _))
|
||||
{
|
||||
try
|
||||
{
|
||||
request = JsonSerializer.Deserialize<SesEventRequest>(body.GetRawText()) ?? new SesEventRequest();
|
||||
return true;
|
||||
}
|
||||
catch (JsonException ex)
|
||||
{
|
||||
error = $"flat_payload_invalid_json:{ex.Message}";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
if (!body.TryGetProperty("Type", out var snsTypeElement))
|
||||
{
|
||||
error = "missing_type";
|
||||
return false;
|
||||
}
|
||||
|
||||
var snsType = snsTypeElement.GetString() ?? string.Empty;
|
||||
if (!string.Equals(snsType, "Notification", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
error = $"unsupported_sns_type:{snsType}";
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!body.TryGetProperty("Message", out var messageElement) || messageElement.ValueKind != JsonValueKind.String)
|
||||
{
|
||||
error = "missing_message";
|
||||
return false;
|
||||
}
|
||||
|
||||
var messageJson = messageElement.GetString() ?? string.Empty;
|
||||
if (string.IsNullOrWhiteSpace(messageJson))
|
||||
{
|
||||
error = "empty_message";
|
||||
return false;
|
||||
}
|
||||
|
||||
JsonDocument sesDoc;
|
||||
try
|
||||
{
|
||||
sesDoc = JsonDocument.Parse(messageJson);
|
||||
}
|
||||
catch (JsonException ex)
|
||||
{
|
||||
error = $"ses_message_invalid_json:{ex.Message}";
|
||||
return false;
|
||||
}
|
||||
|
||||
using (sesDoc)
|
||||
{
|
||||
var root = sesDoc.RootElement;
|
||||
var eventType = TryGetString(root, "eventType");
|
||||
if (string.IsNullOrWhiteSpace(eventType))
|
||||
{
|
||||
error = "missing_eventType";
|
||||
return false;
|
||||
}
|
||||
|
||||
var tags = ExtractSesTags(root);
|
||||
var email = ResolveSesRecipientEmail(root);
|
||||
if (string.IsNullOrWhiteSpace(email))
|
||||
{
|
||||
error = "missing_recipient_email";
|
||||
return false;
|
||||
}
|
||||
|
||||
var occurredAt =
|
||||
ResolveSesOccurredAt(root) ??
|
||||
ParseDateTimeOffset(TryGetString(body, "Timestamp")) ??
|
||||
DateTimeOffset.UtcNow;
|
||||
|
||||
var mailElement = TryGetProperty(root, "mail");
|
||||
var messageId =
|
||||
TryGetString(mailElement, "messageId") ??
|
||||
TryGetString(body, "MessageId") ??
|
||||
string.Empty;
|
||||
|
||||
var bounceType = ResolveBounceType(root, eventType);
|
||||
var tenantId = Guid.Empty;
|
||||
if (tags.TryGetValue("tenant_id", out var tenantRaw) && Guid.TryParse(tenantRaw, out var parsedTenant))
|
||||
{
|
||||
tenantId = parsedTenant;
|
||||
}
|
||||
|
||||
request = new SesEventRequest
|
||||
{
|
||||
EventType = eventType,
|
||||
MessageId = messageId,
|
||||
TenantId = tenantId,
|
||||
Email = email,
|
||||
BounceType = bounceType,
|
||||
OccurredAt = occurredAt,
|
||||
Tags = tags
|
||||
};
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private static string? ResolveBounceType(JsonElement root, string eventType)
|
||||
{
|
||||
if (!string.Equals(eventType, "Bounce", StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var bounceType = TryGetString(TryGetProperty(root, "bounce"), "bounceType")?.Trim().ToLowerInvariant();
|
||||
return bounceType switch
|
||||
{
|
||||
"permanent" => "hard",
|
||||
"transient" => "soft",
|
||||
_ => null
|
||||
};
|
||||
}
|
||||
|
||||
private static DateTimeOffset? ResolveSesOccurredAt(JsonElement root)
|
||||
{
|
||||
var eventType = TryGetString(root, "eventType")?.Trim().ToLowerInvariant();
|
||||
var timestamp = eventType switch
|
||||
{
|
||||
"bounce" => TryGetString(TryGetProperty(root, "bounce"), "timestamp"),
|
||||
"complaint" => TryGetString(TryGetProperty(root, "complaint"), "timestamp"),
|
||||
"delivery" => TryGetString(TryGetProperty(root, "delivery"), "timestamp"),
|
||||
_ => null
|
||||
};
|
||||
|
||||
return ParseDateTimeOffset(timestamp)
|
||||
?? ParseDateTimeOffset(TryGetString(TryGetProperty(root, "mail"), "timestamp"));
|
||||
}
|
||||
|
||||
private static string? ResolveSesRecipientEmail(JsonElement root)
|
||||
{
|
||||
var eventType = TryGetString(root, "eventType")?.Trim().ToLowerInvariant();
|
||||
switch (eventType)
|
||||
{
|
||||
case "bounce":
|
||||
var bounced = TryGetFirstArrayItem(TryGetProperty(TryGetProperty(root, "bounce"), "bouncedRecipients"));
|
||||
var bouncedEmail = TryGetString(bounced, "emailAddress");
|
||||
if (!string.IsNullOrWhiteSpace(bouncedEmail))
|
||||
{
|
||||
return bouncedEmail;
|
||||
}
|
||||
break;
|
||||
case "complaint":
|
||||
var complained = TryGetFirstArrayItem(TryGetProperty(TryGetProperty(root, "complaint"), "complainedRecipients"));
|
||||
var complaintEmail = TryGetString(complained, "emailAddress");
|
||||
if (!string.IsNullOrWhiteSpace(complaintEmail))
|
||||
{
|
||||
return complaintEmail;
|
||||
}
|
||||
break;
|
||||
case "delivery":
|
||||
var recipient = TryGetFirstArrayString(TryGetProperty(TryGetProperty(root, "delivery"), "recipients"));
|
||||
if (!string.IsNullOrWhiteSpace(recipient))
|
||||
{
|
||||
return recipient;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
return TryGetFirstArrayString(TryGetProperty(TryGetProperty(root, "mail"), "destination"));
|
||||
}
|
||||
|
||||
private static Dictionary<string, string> ExtractSesTags(JsonElement root)
|
||||
{
|
||||
var result = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
|
||||
var tagsElement = TryGetProperty(TryGetProperty(root, "mail"), "tags");
|
||||
if (tagsElement.ValueKind != JsonValueKind.Object)
|
||||
{
|
||||
return result;
|
||||
}
|
||||
|
||||
foreach (var property in tagsElement.EnumerateObject())
|
||||
{
|
||||
string? value = property.Value.ValueKind switch
|
||||
{
|
||||
JsonValueKind.Array => property.Value.EnumerateArray()
|
||||
.Where(x => x.ValueKind == JsonValueKind.String)
|
||||
.Select(x => x.GetString())
|
||||
.FirstOrDefault(x => !string.IsNullOrWhiteSpace(x)),
|
||||
JsonValueKind.String => property.Value.GetString(),
|
||||
_ => null
|
||||
};
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(value))
|
||||
{
|
||||
result[property.Name] = value!;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private static JsonElement TryGetProperty(JsonElement element, string propertyName)
|
||||
{
|
||||
if (element.ValueKind == JsonValueKind.Object && element.TryGetProperty(propertyName, out var property))
|
||||
{
|
||||
return property;
|
||||
}
|
||||
|
||||
return default;
|
||||
}
|
||||
|
||||
private static string? TryGetString(JsonElement element, string propertyName)
|
||||
{
|
||||
var property = TryGetProperty(element, propertyName);
|
||||
return property.ValueKind == JsonValueKind.String ? property.GetString() : null;
|
||||
}
|
||||
|
||||
private static DateTimeOffset? ParseDateTimeOffset(string? value)
|
||||
{
|
||||
return !string.IsNullOrWhiteSpace(value) && DateTimeOffset.TryParse(value, out var parsed) ? parsed : null;
|
||||
}
|
||||
|
||||
private static JsonElement TryGetFirstArrayItem(JsonElement arrayElement)
|
||||
{
|
||||
if (arrayElement.ValueKind != JsonValueKind.Array)
|
||||
{
|
||||
return default;
|
||||
}
|
||||
|
||||
foreach (var item in arrayElement.EnumerateArray())
|
||||
{
|
||||
return item;
|
||||
}
|
||||
|
||||
return default;
|
||||
}
|
||||
|
||||
private static string? TryGetFirstArrayString(JsonElement arrayElement)
|
||||
{
|
||||
if (arrayElement.ValueKind != JsonValueKind.Array)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
foreach (var item in arrayElement.EnumerateArray())
|
||||
{
|
||||
if (item.ValueKind == JsonValueKind.String)
|
||||
{
|
||||
return item.GetString();
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private static async Task<bool> EnsureTenantForWebhookAsync(
|
||||
SendEngineDbContext db,
|
||||
Guid tenantId,
|
||||
bool autoCreateTenant,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var tenantExists = await db.Tenants.AsNoTracking().AnyAsync(x => x.Id == tenantId, cancellationToken);
|
||||
if (tenantExists)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!autoCreateTenant)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
db.Tenants.Add(new Tenant
|
||||
{
|
||||
Id = tenantId,
|
||||
Name = $"tenant-{tenantId:N}",
|
||||
CreatedAt = DateTimeOffset.UtcNow
|
||||
});
|
||||
await db.SaveChangesAsync(cancellationToken);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public sealed record SesProcessResult(
|
||||
bool Success,
|
||||
bool TransientFailure,
|
||||
int StatusCode,
|
||||
string Error,
|
||||
string? Reason)
|
||||
{
|
||||
public static SesProcessResult Ok(int statusCode, string error, string? reason) =>
|
||||
new(true, false, statusCode, error, reason);
|
||||
public static SesProcessResult Permanent(int statusCode, string error, string? reason) =>
|
||||
new(false, false, statusCode, error, reason);
|
||||
public static SesProcessResult Transient(int statusCode, string error, string? reason) =>
|
||||
new(false, true, statusCode, error, reason);
|
||||
}
|
||||
158
src/SendEngine.Api/Services/SqsSesPollerWorker.cs
Normal file
158
src/SendEngine.Api/Services/SqsSesPollerWorker.cs
Normal file
@ -0,0 +1,158 @@
|
||||
using System.Text.Json;
|
||||
using Amazon;
|
||||
using Amazon.SQS;
|
||||
using Amazon.SQS.Model;
|
||||
|
||||
namespace SendEngine.Api.Services;
|
||||
|
||||
public sealed class SqsSesPollerWorker : BackgroundService
|
||||
{
|
||||
private readonly IConfiguration _configuration;
|
||||
private readonly ILogger<SqsSesPollerWorker> _logger;
|
||||
private readonly IServiceScopeFactory _scopeFactory;
|
||||
|
||||
public SqsSesPollerWorker(
|
||||
IConfiguration configuration,
|
||||
ILogger<SqsSesPollerWorker> logger,
|
||||
IServiceScopeFactory scopeFactory)
|
||||
{
|
||||
_configuration = configuration;
|
||||
_logger = logger;
|
||||
_scopeFactory = scopeFactory;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
var enabled = _configuration.GetValue("Sqs:Enabled", false);
|
||||
if (!enabled)
|
||||
{
|
||||
_logger.LogInformation("SQS SES poller disabled.");
|
||||
return;
|
||||
}
|
||||
|
||||
var queueUrl = _configuration["Sqs:QueueUrl"];
|
||||
if (string.IsNullOrWhiteSpace(queueUrl))
|
||||
{
|
||||
_logger.LogWarning("SQS SES poller enabled but Sqs:QueueUrl is empty.");
|
||||
return;
|
||||
}
|
||||
|
||||
var region = _configuration["Sqs:Region"]
|
||||
?? _configuration["AWS:Region"]
|
||||
?? _configuration["Ses:Region"]
|
||||
?? "ap-northeast-1";
|
||||
var waitSeconds = Math.Clamp(_configuration.GetValue("Sqs:PollWaitSeconds", 20), 1, 20);
|
||||
var maxMessages = Math.Clamp(_configuration.GetValue("Sqs:MaxMessages", 10), 1, 10);
|
||||
var visibilityTimeout = Math.Clamp(_configuration.GetValue("Sqs:VisibilityTimeoutSeconds", 30), 0, 43200);
|
||||
|
||||
_logger.LogInformation(
|
||||
"SQS SES poller started. queue_url={QueueUrl} region={Region} wait_seconds={WaitSeconds} max_messages={MaxMessages}",
|
||||
queueUrl, region, waitSeconds, maxMessages);
|
||||
|
||||
using var sqs = new AmazonSQSClient(RegionEndpoint.GetBySystemName(region));
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
ReceiveMessageResponse received;
|
||||
try
|
||||
{
|
||||
received = await sqs.ReceiveMessageAsync(new ReceiveMessageRequest
|
||||
{
|
||||
QueueUrl = queueUrl,
|
||||
MaxNumberOfMessages = maxMessages,
|
||||
WaitTimeSeconds = waitSeconds,
|
||||
VisibilityTimeout = visibilityTimeout,
|
||||
MessageSystemAttributeNames = new List<string>
|
||||
{
|
||||
"ApproximateReceiveCount",
|
||||
"SentTimestamp"
|
||||
}
|
||||
}, stoppingToken);
|
||||
}
|
||||
catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
_logger.LogInformation("SQS poller stopping: receive canceled by shutdown signal.");
|
||||
break;
|
||||
}
|
||||
catch (TaskCanceledException)
|
||||
{
|
||||
_logger.LogWarning("SQS receive canceled (likely timeout/interruption).");
|
||||
continue;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "SQS receive failed.");
|
||||
await Task.Delay(TimeSpan.FromSeconds(3), stoppingToken);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (received.Messages.Count == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
foreach (var message in received.Messages)
|
||||
{
|
||||
var receiveCount = message.Attributes.TryGetValue("ApproximateReceiveCount", out var countRaw)
|
||||
? countRaw
|
||||
: "1";
|
||||
try
|
||||
{
|
||||
var signature = TryExtractSnsSignature(message.Body) ?? "sqs-relay";
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var processor = scope.ServiceProvider.GetRequiredService<SesEventProcessingService>();
|
||||
var result = await processor.ProcessRawJsonAsync(message.Body, signature, stoppingToken);
|
||||
if (!result.Success)
|
||||
{
|
||||
_logger.LogWarning("SQS message processing not-success. status={StatusCode} transient={Transient} error={Error} reason={Reason} receive_count={ReceiveCount} message_id={MessageId}",
|
||||
result.StatusCode, result.TransientFailure, result.Error, result.Reason, receiveCount, message.MessageId);
|
||||
if (result.TransientFailure)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (result.Success || !result.TransientFailure)
|
||||
{
|
||||
await sqs.DeleteMessageAsync(queueUrl, message.ReceiptHandle, stoppingToken);
|
||||
_logger.LogInformation(
|
||||
"SQS message processed and deleted. message_id={MessageId} receive_count={ReceiveCount}",
|
||||
message.MessageId, receiveCount);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(
|
||||
ex,
|
||||
"SQS message processing failed. message_id={MessageId} receive_count={ReceiveCount}",
|
||||
message.MessageId, receiveCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static string? TryExtractSnsSignature(string? body)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(body))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
using var doc = JsonDocument.Parse(body);
|
||||
if (doc.RootElement.TryGetProperty("Signature", out var signature)
|
||||
&& signature.ValueKind == JsonValueKind.String)
|
||||
{
|
||||
return signature.GetString();
|
||||
}
|
||||
}
|
||||
catch (JsonException)
|
||||
{
|
||||
// ignore: body may not be SNS envelope
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user