feat: Implement one-click unsubscribe feature and newsletter campaign management

- Added one-click unsubscribe functionality with token generation and verification.
- Introduced a new model for tracking one-click unsubscribe audits.
- Enhanced newsletter campaign management with the ability to send campaigns immediately.
- Implemented a scheduler for dispatching due newsletter campaigns.
- Updated views and templates to support one-click unsubscribe and campaign previews.
- Added management commands for running the newsletter scheduler.
- Removed obsolete SSL certificate file.
- Updated entrypoint script to handle different application roles.
This commit is contained in:
Warren Chen 2026-02-25 14:42:46 +09:00
parent 4c78500ec9
commit 6ea501dc62
15 changed files with 1571 additions and 44 deletions

View File

@ -1,6 +1,6 @@
# 電子報介接備忘錄(租戶端 / Wagtail
最後更新2026-02-11
最後更新2026-02-18
## 1. 目標與前提
@ -204,3 +204,204 @@
- `confirm`:已改為 `GET + query token`(修正 HTTP 405 問題)。
- `unsubscribe`:已使用 `POST`,且 body 僅送 `token`
- `unsubscribe-token`:已使用 `POST`,且 body 送 `list_id + email`
## 9. List-Unsubscribe One-Click 規範(發信流程階段導入)
說明:
- 本節為「電子報 app + 排程發信」階段的目標規範。
- 已完成的訂閱/退訂流程維持現況,不回頭重做;待發信流程上線時再整合 one-click。
### 9.1 目標
- 提供電子報編輯與排程。
- 產生 `List-Unsubscribe` one-click header。
- 呼叫發信代理 API。
- 提供 unsubscribe endpointone-click + 人類點擊頁)。
### 9.2 Unsubscribe Token 設計
建議使用 JWT 或 HMAC token。
Token 內容建議:
- `subscriber_id`
- `list_id`
- `site_id`
- `exp`
- `nonce`optional
安全要求:
- 不需登入即可退訂。
- 必須可 idempotent。
- Token 需有過期時間。
- Token 必須簽章驗證。
### 9.3 Unsubscribe Endpoint
1. One-click endpoint
- `POST /u/unsubscribe`
- Request`token`
- 流程:
- 驗證 token
- 呼叫會員平台 API
- 回傳 `200`
- 不可:
- 要求登入
- 要求二次確認
2. 人類點擊頁面
- `GET /u/unsubscribe?token=xxx`
- 流程:
- 驗證 token
- 顯示退訂成功頁面
### 9.4 發信 Header 規則
Newsletter 必須包含:
- `List-Unsubscribe: <https://domain/u/unsubscribe?token=xxx>`
- `List-Unsubscribe-Post: List-Unsubscribe=One-Click`
註記:
- Transactional email 不應包含 List-Unsubscribe。
### 9.5 與會員平台整合one-click 目標介面)
呼叫 API
- `POST /api/subscriptions/unsubscribe`
Request
- `subscriber_id`
- `list_id`
- `source: "one_click"`
- `campaign_id`
回傳:
- `success`
- `already_unsubscribed`
### 9.6 測試案例
1. Token 驗證
- 正常 token -> 成功退訂
- 過期 token -> `400``410`
- 已退訂 -> `200`
- 簽章錯誤 -> `400`
2. Header 驗證
- Newsletter 發送時必定包含 `List-Unsubscribe`
- Transactional email 不包含 `List-Unsubscribe`
- Header 格式正確
### 9.7 安全與 UX
安全:
- Unsubscribe endpoint 不受 CSRF 限制
- Token 驗證必須 server-side
- 記錄 audit log
- 不顯示 `subscriber_id` 在 URL 明文
UX
- 退訂成功頁面簡潔
- 提供重新訂閱入口
- 不強迫填問卷
### 9.8 目前實作狀態Wagtail
- 已提供 one-click endpoint
- `POST /u/unsubscribe`token 可由 query/body 提供)
- `GET /u/unsubscribe?token=...`(人類點擊頁)
- 已實作 token 驗證HMAC + `exp`)與 idempotent 行為(重複退訂維持 200
- 已實作 server-side audit log`OneClickUnsubscribeAudit`)。
- 已提供 one-click URL + Header 產生工具:
- `List-Unsubscribe`
- `List-Unsubscribe-Post`
### 9.9 Send Engine 發送介接(依 `../mass_mail_engine/docs/openapi.yaml` 對齊)
已對齊的 API
1. 建立 send job
- `POST /api/send-jobs`
- request 以 `CreateSendJobRequest` 為主(`list_id``subject``body_html/body_text/template`
- response 讀取:
- `send_job_id`
- `status`
2. 查詢 send job
- `GET /api/send-jobs/{id}`
- 若建立後狀態非最終態,持續輪詢直到最終態或超過輪詢上限。
狀態對應Wagtail campaign
- Send Engine `completed` -> campaign `sent`
- Send Engine `failed` / `cancelled` -> campaign `failed`
- 建立失敗 / 輪詢逾時 / 輪詢錯誤 -> campaign `failed`
Send Engine 最終態terminal
- `completed`
- `failed`
- `cancelled`
備註:
- 目前 dispatch 紀錄改為「每次送 job / 重試一次一筆」,不再是逐收件者一筆。
- 若要做投遞到單一收件者的最終監控delivery/bounce/complaint仍建議接 Send Engine webhook 或事件回寫機制處理。
## 10. 下一階段演進備忘(先記錄,待試用回饋後再排)
說明:
- 本節為「規劃中」項目,先記錄方向,不立即實作。
- 優先等待:電子報排版產出 + 實際 user 試用回饋。
### 10.1 編輯器能力演進
現況:
- 目前使用 Wagtail Draftail 作為 HTML 編輯器,已可插入圖片(圖庫選擇 / 上傳)。
待回饋後評估:
- 若編輯需求提升table、欄位版型、拖拉區塊、進階 email 元件),再評估導入更完整的 newsletter editor。
- 原則:先確認真實編輯痛點,再導入新編輯器,避免過早複雜化。
### 10.2 電子報樣式CSS策略
建議方向:
- 採「固定樣式基底 + 內容編輯」模式。
- CSS 不建議只放 `<head><style>`,實務上需考慮 email client 相容性,通常會在發送前做 CSS inlining將重要樣式轉為 inline style
- 可保留少量 head style例如 media query但核心排版建議以 inline 為主。
### 10.3 外框模板Header / Footer / Shell管理
目標:
- 提供可編輯的 newsletter shell例如 `layout.html`),內含 head、header、footer 與內容插槽。
建議實作概念:
- 內容編輯區僅負責 body 內文。
- 發送時由 backend 組裝:`shell + content + one-click headers/links`
- 預覽也走同一組裝流程,避免「預覽長得跟實際寄出不一致」。
### 10.4 退訂連結與個人化參數責任分工
共識方向:
- 退訂連結需與收件者身分綁定(每位收件者唯一 token / URL
- 內容模板需預留替換位(例如 `{{unsubscribe_url}}`)。
責任分工建議:
- CMS產內容、產 placeholder、組裝信件與 one-click header。
- Send Engine執行發送、接收投遞回報bounce/complaint 等)。
- SES或下游 provider最終投遞與回執來源。
### 10.5 文章區塊直接引入
需求方向:
- 支援從既有文章中挑選區塊或全文片段匯入電子報。
建議:
- 第一階段可先做「選文章 -> 帶入標題/摘要/首圖/連結」。
- 第二階段再考慮「可編輯快照」或「保持動態同步」策略。
### 10.6 預覽能力
需求方向:
- 內文編輯預覽。
- 樣式shell編輯預覽。
- 能注入假資料(假收件者、假 token、假文章看最終結果。
建議:
- 做「最終寄出 HTML 預覽」功能,預覽管線與正式發送使用同一套 renderer。
- 後續可加多裝置寬度與常見 email client 快速檢視模式(若使用者回饋有需要)。

View File

@ -0,0 +1,20 @@
from django.core.management.base import BaseCommand
from base.newsletter_scheduler import dispatch_due_campaigns
class Command(BaseCommand):
help = "Dispatch due newsletter campaigns"
def add_arguments(self, parser):
parser.add_argument("--limit", type=int, default=20)
def handle(self, *args, **options):
result = dispatch_due_campaigns(limit=options["limit"])
self.stdout.write(
self.style.SUCCESS(
"processed_campaigns={processed_campaigns} sent_recipients={sent_recipients} failed_campaigns={failed_campaigns}".format(
**result
)
)
)

View File

@ -0,0 +1,143 @@
# Generated by Django 5.2.7 on 2026-02-19 08:56
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('base', '0005_newslettersystemsettings_newslettertemplatesettings'),
]
operations = [
migrations.CreateModel(
name='OneClickUnsubscribeAudit',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('token_hash', models.CharField(db_index=True, max_length=64, unique=True)),
('subscriber_id', models.CharField(blank=True, max_length=128)),
('list_id', models.CharField(blank=True, max_length=128)),
('site_id', models.CharField(blank=True, max_length=128)),
('campaign_id', models.CharField(blank=True, max_length=128)),
('status', models.CharField(blank=True, max_length=32)),
('response_status', models.PositiveIntegerField(blank=True, null=True)),
('response_payload', models.JSONField(blank=True, default=dict)),
('error_message', models.TextField(blank=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
('processed_at', models.DateTimeField(auto_now=True)),
],
options={
'ordering': ['-created_at'],
},
),
migrations.AddField(
model_name='newslettersystemsettings',
name='member_center_one_click_unsubscribe_path',
field=models.CharField(blank=True, default='/api/subscriptions/unsubscribe', max_length=255),
),
migrations.AddField(
model_name='newslettersystemsettings',
name='one_click_endpoint_path',
field=models.CharField(blank=True, default='/u/unsubscribe', max_length=255),
),
migrations.AddField(
model_name='newslettersystemsettings',
name='one_click_token_secret',
field=models.CharField(blank=True, help_text='One-click token 簽章 secret留空則使用 Django SECRET_KEY。', max_length=255),
),
migrations.AddField(
model_name='newslettersystemsettings',
name='one_click_token_ttl_seconds',
field=models.PositiveIntegerField(default=2592000),
),
migrations.CreateModel(
name='NewsletterCampaign',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('title', models.CharField(max_length=255)),
('list_id', models.CharField(blank=True, max_length=128)),
('subject_template', models.CharField(max_length=255)),
('html_template', models.TextField()),
('text_template', models.TextField(blank=True)),
('status', models.CharField(choices=[('draft', 'Draft'), ('scheduled', 'Scheduled'), ('sending', 'Sending'), ('sent', 'Sent'), ('failed', 'Failed')], default='draft', max_length=16)),
('scheduled_at', models.DateTimeField(blank=True, null=True)),
('sent_at', models.DateTimeField(blank=True, null=True)),
('last_error', models.TextField(blank=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
],
options={
'ordering': ['-created_at'],
},
),
migrations.AddField(
model_name='newslettersystemsettings',
name='member_center_subscriptions_path',
field=models.CharField(blank=True, default='/newsletter/subscriptions', max_length=255),
),
migrations.AddField(
model_name='newslettersystemsettings',
name='member_center_oauth_token_path',
field=models.CharField(blank=True, default='/oauth/token', max_length=255),
),
migrations.AddField(
model_name='newslettersystemsettings',
name='member_center_oauth_client_id',
field=models.CharField(blank=True, max_length=255),
),
migrations.AddField(
model_name='newslettersystemsettings',
name='member_center_oauth_client_secret',
field=models.TextField(blank=True),
),
migrations.AddField(
model_name='newslettersystemsettings',
name='member_center_oauth_scope',
field=models.CharField(blank=True, default='newsletter:list.read', max_length=255),
),
migrations.AddField(
model_name='newslettersystemsettings',
name='member_center_oauth_audience',
field=models.CharField(blank=True, max_length=255),
),
migrations.AddField(
model_name='newslettersystemsettings',
name='send_engine_send_jobs_path',
field=models.CharField(blank=True, default='/api/send-jobs', max_length=255),
),
migrations.AddField(
model_name='newslettersystemsettings',
name='send_engine_retry_interval_seconds',
field=models.PositiveIntegerField(default=300),
),
migrations.AddField(
model_name='newslettersystemsettings',
name='send_engine_retry_max_attempts',
field=models.PositiveIntegerField(default=3),
),
migrations.AddField(
model_name='newslettersystemsettings',
name='site_base_url',
field=models.URLField(blank=True, help_text='排程發送使用的站台網址(例如 https://news.example.com'),
),
migrations.CreateModel(
name='NewsletterDispatchRecord',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('subscriber_id', models.CharField(blank=True, max_length=128)),
('email', models.EmailField(blank=True, max_length=254)),
('status', models.CharField(blank=True, max_length=32)),
('retry_count', models.PositiveIntegerField(default=0)),
('next_retry_at', models.DateTimeField(blank=True, null=True)),
('response_status', models.PositiveIntegerField(blank=True, null=True)),
('response_payload', models.JSONField(blank=True, default=dict)),
('error_message', models.TextField(blank=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
('campaign', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='dispatch_records', to='base.newslettercampaign')),
],
options={
'ordering': ['-created_at'],
},
),
]

View File

@ -138,13 +138,43 @@ class NewsletterSystemSettings(BaseGenericSetting):
blank=True,
default="/newsletter/unsubscribe",
)
member_center_subscriptions_path = models.CharField(
max_length=255,
blank=True,
default="/newsletter/subscriptions",
)
member_center_oauth_token_path = models.CharField(
max_length=255,
blank=True,
default="/oauth/token",
)
member_center_oauth_client_id = models.CharField(max_length=255, blank=True)
member_center_oauth_client_secret = models.TextField(blank=True)
member_center_oauth_scope = models.CharField(
max_length=255,
blank=True,
default="newsletter:list.read",
)
member_center_oauth_audience = models.CharField(max_length=255, blank=True)
member_center_one_click_unsubscribe_path = models.CharField(
max_length=255,
blank=True,
default="/api/subscriptions/unsubscribe",
)
member_center_tenant_id = models.CharField(max_length=128, blank=True)
member_center_list_id = models.CharField(max_length=128, blank=True)
member_center_timeout_seconds = models.PositiveIntegerField(default=10)
send_engine_base_url = models.URLField(blank=True)
send_engine_send_jobs_path = models.CharField(
max_length=255,
blank=True,
default="/api/send-jobs",
)
send_engine_oauth_scope = models.CharField(max_length=255, blank=True)
send_engine_timeout_seconds = models.PositiveIntegerField(default=10)
send_engine_retry_interval_seconds = models.PositiveIntegerField(default=300)
send_engine_retry_max_attempts = models.PositiveIntegerField(default=3)
smtp_relay_host = models.CharField(max_length=255, blank=True)
smtp_relay_port = models.PositiveIntegerField(default=587)
@ -160,6 +190,21 @@ class NewsletterSystemSettings(BaseGenericSetting):
sender_email = models.EmailField(blank=True)
reply_to_email = models.EmailField(blank=True)
default_charset = models.CharField(max_length=50, default="utf-8")
one_click_endpoint_path = models.CharField(
max_length=255,
blank=True,
default="/u/unsubscribe",
)
one_click_token_secret = models.CharField(
max_length=255,
blank=True,
help_text="One-click token 簽章 secret留空則使用 Django SECRET_KEY。",
)
one_click_token_ttl_seconds = models.PositiveIntegerField(default=60 * 60 * 24 * 30)
site_base_url = models.URLField(
blank=True,
help_text="排程發送使用的站台網址(例如 https://news.example.com",
)
panels = [
MultiFieldPanel(
@ -169,6 +214,16 @@ class NewsletterSystemSettings(BaseGenericSetting):
FieldPanel("member_center_confirm_path"),
FieldPanel("member_center_unsubscribe_token_path"),
FieldPanel("member_center_unsubscribe_path"),
FieldPanel("member_center_subscriptions_path"),
FieldPanel("member_center_oauth_token_path"),
FieldPanel("member_center_oauth_client_id"),
FieldPanel(
"member_center_oauth_client_secret",
widget=django_forms.PasswordInput(render_value=False),
),
FieldPanel("member_center_oauth_scope"),
FieldPanel("member_center_oauth_audience"),
FieldPanel("member_center_one_click_unsubscribe_path"),
FieldPanel("member_center_tenant_id"),
FieldPanel("member_center_list_id"),
FieldPanel("member_center_timeout_seconds"),
@ -178,8 +233,11 @@ class NewsletterSystemSettings(BaseGenericSetting):
MultiFieldPanel(
[
FieldPanel("send_engine_base_url"),
FieldPanel("send_engine_send_jobs_path"),
FieldPanel("send_engine_oauth_scope"),
FieldPanel("send_engine_timeout_seconds"),
FieldPanel("send_engine_retry_interval_seconds"),
FieldPanel("send_engine_retry_max_attempts"),
],
heading="Send Engine",
),
@ -202,6 +260,15 @@ class NewsletterSystemSettings(BaseGenericSetting):
],
heading="SMTP / Mail",
),
MultiFieldPanel(
[
FieldPanel("one_click_endpoint_path"),
FieldPanel("one_click_token_secret"),
FieldPanel("one_click_token_ttl_seconds"),
FieldPanel("site_base_url"),
],
heading="List-Unsubscribe One-Click",
),
]
class Meta:
@ -217,9 +284,116 @@ class NewsletterSystemSettings(BaseGenericSetting):
elif self.smtp_password:
self.smtp_password = encrypt_text(self.smtp_password)
if self.pk and not self.member_center_oauth_client_secret:
previous = type(self).objects.filter(pk=self.pk).only("member_center_oauth_client_secret").first()
self.member_center_oauth_client_secret = previous.member_center_oauth_client_secret if previous else ""
elif self.member_center_oauth_client_secret:
self.member_center_oauth_client_secret = encrypt_text(self.member_center_oauth_client_secret)
super().save(*args, **kwargs)
class OneClickUnsubscribeAudit(models.Model):
token_hash = models.CharField(max_length=64, unique=True, db_index=True)
subscriber_id = models.CharField(max_length=128, blank=True)
list_id = models.CharField(max_length=128, blank=True)
site_id = models.CharField(max_length=128, blank=True)
campaign_id = models.CharField(max_length=128, blank=True)
status = models.CharField(max_length=32, blank=True)
response_status = models.PositiveIntegerField(null=True, blank=True)
response_payload = models.JSONField(default=dict, blank=True)
error_message = models.TextField(blank=True)
created_at = models.DateTimeField(auto_now_add=True)
processed_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ["-created_at"]
class NewsletterCampaign(models.Model):
STATUS_DRAFT = "draft"
STATUS_SCHEDULED = "scheduled"
STATUS_SENDING = "sending"
STATUS_SENT = "sent"
STATUS_FAILED = "failed"
STATUS_CHOICES = [
(STATUS_DRAFT, "Draft"),
(STATUS_SCHEDULED, "Scheduled"),
(STATUS_SENDING, "Sending"),
(STATUS_SENT, "Sent"),
(STATUS_FAILED, "Failed"),
]
title = models.CharField(max_length=255)
list_id = models.CharField(max_length=128, blank=True)
subject_template = models.CharField(max_length=255)
html_template = models.TextField()
text_template = models.TextField(blank=True)
status = models.CharField(max_length=16, choices=STATUS_CHOICES, default=STATUS_DRAFT)
scheduled_at = models.DateTimeField(null=True, blank=True)
sent_at = models.DateTimeField(null=True, blank=True)
last_error = models.TextField(blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
panels = [
FieldPanel("title"),
FieldPanel("list_id"),
FieldPanel("subject_template"),
FieldPanel("html_template"),
FieldPanel("text_template"),
FieldPanel("scheduled_at"),
]
class Meta:
ordering = ["-created_at"]
def __str__(self):
return self.title
def save(self, *args, **kwargs):
if self._state.adding and not (self.list_id or "").strip():
default_list_id = (NewsletterSystemSettings.load().member_center_list_id or "").strip()
if default_list_id:
self.list_id = default_list_id
super().save(*args, **kwargs)
class NewsletterDispatchRecord(models.Model):
campaign = models.ForeignKey(
NewsletterCampaign,
on_delete=models.CASCADE,
related_name="dispatch_records",
)
subscriber_id = models.CharField(max_length=128, blank=True)
email = models.EmailField(blank=True)
status = models.CharField(max_length=32, blank=True)
retry_count = models.PositiveIntegerField(default=0)
next_retry_at = models.DateTimeField(null=True, blank=True)
response_status = models.PositiveIntegerField(null=True, blank=True)
response_payload = models.JSONField(default=dict, blank=True)
error_message = models.TextField(blank=True)
created_at = models.DateTimeField(auto_now_add=True)
panels = [
FieldPanel("campaign"),
FieldPanel("subscriber_id"),
FieldPanel("email"),
FieldPanel("status"),
FieldPanel("retry_count"),
FieldPanel("next_retry_at"),
FieldPanel("response_status"),
FieldPanel("response_payload"),
FieldPanel("error_message"),
]
class Meta:
ordering = ["-created_at"]
def __str__(self):
return f"{self.campaign_id}:{self.email or self.subscriber_id}"
@register_setting
class NewsletterTemplateSettings(BaseGenericSetting):
subscribe_subject_template = models.CharField(

View File

@ -1,13 +1,19 @@
import json
import logging
import hmac
import re
import secrets
import time
from dataclasses import dataclass
from email.utils import formataddr
from hashlib import sha256
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode, urljoin
from urllib.request import Request, urlopen
from django.conf import settings
from django.core.mail import EmailMultiAlternatives, get_connection
from wagtail.rich_text import expand_db_html
from .models import NewsletterSystemSettings
from .security import decrypt_text
@ -36,6 +42,7 @@ class APIResult:
class MemberCenterClient:
def __init__(self, config: NewsletterSystemSettings):
self.config = config
self._oauth_token_cache: dict[str, tuple[str, int]] = {}
def subscribe(self, payload: dict) -> APIResult:
return self._post(self.config.member_center_subscribe_path, payload)
@ -49,7 +56,20 @@ class MemberCenterClient:
def unsubscribe(self, payload: dict) -> APIResult:
return self._post(self.config.member_center_unsubscribe_path, payload)
def _get(self, path: str, query: dict) -> APIResult:
def one_click_unsubscribe(self, payload: dict) -> APIResult:
return self._post(self.config.member_center_one_click_unsubscribe_path, payload)
def list_subscriptions(self, list_id: str) -> APIResult:
auth_headers, auth_error = self._auth_headers()
if auth_error:
return APIResult(ok=False, status=0, data={}, error=auth_error)
return self._get(
self.config.member_center_subscriptions_path,
{"list_id": list_id},
headers=auth_headers,
)
def _get(self, path: str, query: dict, headers: dict | None = None) -> APIResult:
base_url = (self.config.member_center_base_url or "").strip()
if not base_url:
return APIResult(ok=False, status=0, data={}, error="member_center_base_url is empty")
@ -60,13 +80,13 @@ class MemberCenterClient:
request = Request(
endpoint,
headers={"Accept": "application/json"},
headers={"Accept": "application/json", **(headers or {})},
method="GET",
)
timeout = max(1, int(self.config.member_center_timeout_seconds or 10))
return self._send(request, timeout=timeout)
def _post(self, path: str, payload: dict) -> APIResult:
def _post(self, path: str, payload: dict, headers: dict | None = None) -> APIResult:
base_url = (self.config.member_center_base_url or "").strip()
if not base_url:
return APIResult(ok=False, status=0, data={}, error="member_center_base_url is empty")
@ -76,12 +96,74 @@ class MemberCenterClient:
request = Request(
endpoint,
data=body,
headers={"Content-Type": "application/json"},
headers={"Content-Type": "application/json", **(headers or {})},
method="POST",
)
timeout = max(1, int(self.config.member_center_timeout_seconds or 10))
return self._send(request, timeout=timeout)
def _auth_headers(self) -> tuple[dict, str]:
scope = (self.config.member_center_oauth_scope or "").strip() or "newsletter:list.read"
token, error = self._get_oauth_access_token(required_scope=scope)
if not token:
return {}, error or "member center oauth token is empty"
return {"Authorization": f"Bearer {token}"}, ""
def _get_oauth_access_token(self, required_scope: str) -> tuple[str, str]:
scope = (required_scope or "").strip()
now = int(time.time())
cached = self._oauth_token_cache.get(scope)
if cached and now < cached[1] - 30:
return cached[0], ""
base_url = (self.config.member_center_base_url or "").strip()
if not base_url:
return "", "member_center_base_url is empty (cannot resolve oauth token url)"
token_path = (self.config.member_center_oauth_token_path or "/oauth/token").strip()
token_url = urljoin(f"{base_url.rstrip('/')}/", token_path.lstrip("/"))
client_id = (self.config.member_center_oauth_client_id or "").strip()
encrypted_secret = (self.config.member_center_oauth_client_secret or "").strip()
client_secret = ""
if encrypted_secret:
try:
client_secret = decrypt_text(encrypted_secret)
except Exception:
client_secret = encrypted_secret
if not client_id or not client_secret:
return "", "member center oauth client_id/client_secret is empty"
body = urlencode(
{
k: v
for k, v in {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": scope,
"audience": (self.config.member_center_oauth_audience or "").strip(),
}.items()
if v
}
).encode("utf-8")
request = Request(
token_url,
data=body,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
timeout = max(1, int(self.config.member_center_timeout_seconds or 10))
result = self._send(request, timeout=timeout)
if not result.ok:
return "", f"oauth token request failed: HTTP {result.status} {result.error}".strip()
access_token = str(result.data.get("access_token") or "").strip()
if not access_token:
return "", "oauth token response missing access_token"
expires_in = int(result.data.get("expires_in") or 300)
self._oauth_token_cache[scope] = (access_token, int(time.time()) + max(60, expires_in))
return access_token, ""
def _send(self, request: Request, timeout: int) -> APIResult:
try:
with urlopen(request, timeout=timeout) as response:
@ -107,6 +189,38 @@ def render_placeholders(template: str, values: dict) -> str:
return rendered
def _absolutize_links(html: str, site_base_url: str) -> str:
base = (site_base_url or "").strip().rstrip("/")
if not base:
return html
def _replace(match):
prefix = match.group("prefix")
url = match.group("url")
return f"{prefix}{urljoin(f'{base}/', url.lstrip('/'))}"
pattern = re.compile(r"(?P<prefix>\b(?:src|href)=['\"])(?P<url>/[^'\"]*)")
return pattern.sub(_replace, html)
def render_newsletter_html(template: str, values: dict, site_base_url: str = "") -> str:
rendered = render_placeholders(template, values)
try:
rendered = expand_db_html(rendered)
except Exception:
pass
return _absolutize_links(rendered, site_base_url)
def render_newsletter_html_for_send_job(template: str, site_base_url: str = "") -> str:
rendered = template or ""
try:
rendered = expand_db_html(rendered)
except Exception:
pass
return _absolutize_links(rendered, site_base_url)
def extract_token(payload: dict) -> str:
if not payload:
return ""
@ -132,6 +246,204 @@ def build_from_email(sender_name: str, sender_email: str) -> str:
return sender_email
def resolve_one_click_secret(config: NewsletterSystemSettings) -> str:
return (config.one_click_token_secret or "").strip() or settings.SECRET_KEY
def _b64url_encode(data: bytes) -> str:
import base64
return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=")
def _b64url_decode(data: str) -> bytes:
import base64
padding = "=" * ((4 - len(data) % 4) % 4)
return base64.urlsafe_b64decode((data + padding).encode("ascii"))
def generate_one_click_token(*, subscriber_id: str, list_id: str, site_id: str, campaign_id: str, secret: str, ttl_seconds: int) -> str:
payload = {
"subscriber_id": subscriber_id,
"list_id": list_id,
"site_id": site_id,
"campaign_id": campaign_id,
"exp": int(time.time()) + max(1, int(ttl_seconds)),
"nonce": secrets.token_urlsafe(8),
}
payload_json = json.dumps(payload, separators=(",", ":"), ensure_ascii=True).encode("utf-8")
payload_part = _b64url_encode(payload_json)
sig = hmac.new(secret.encode("utf-8"), payload_part.encode("ascii"), sha256).digest()
sig_part = _b64url_encode(sig)
return f"{payload_part}.{sig_part}"
def verify_one_click_token(token: str, secret: str) -> tuple[dict | None, str | None]:
if not token or "." not in token:
return None, "invalid"
payload_part, sig_part = token.split(".", 1)
try:
received_sig = _b64url_decode(sig_part)
except Exception:
return None, "invalid"
expected_sig = hmac.new(secret.encode("utf-8"), payload_part.encode("ascii"), sha256).digest()
if not hmac.compare_digest(received_sig, expected_sig):
return None, "invalid"
try:
payload = json.loads(_b64url_decode(payload_part).decode("utf-8"))
except Exception:
return None, "invalid"
required_keys = ("subscriber_id", "list_id", "site_id", "exp")
if any(not payload.get(key) for key in required_keys):
return None, "invalid"
if int(payload.get("exp", 0)) < int(time.time()):
return None, "expired"
return payload, None
def build_one_click_unsubscribe_url(*, site_base_url: str, endpoint_path: str, token: str) -> str:
base = site_base_url.rstrip("/")
endpoint = endpoint_path if endpoint_path.startswith("/") else f"/{endpoint_path}"
return f"{base}{endpoint}?{urlencode({'token': token})}"
def build_list_unsubscribe_headers(*, one_click_url: str) -> dict:
return {
"List-Unsubscribe": f"<{one_click_url}>",
"List-Unsubscribe-Post": "List-Unsubscribe=One-Click",
}
class SendEngineClient:
def __init__(self, config: NewsletterSystemSettings):
self.config = config
self._oauth_token_cache: dict[str, tuple[str, int]] = {}
def create_send_job(self, payload: dict) -> APIResult:
base_url = (self.config.send_engine_base_url or "").strip()
if not base_url:
return APIResult(ok=False, status=0, data={}, error="send_engine_base_url is empty")
endpoint = urljoin(f"{base_url.rstrip('/')}/", self.config.send_engine_send_jobs_path.lstrip("/"))
body = json.dumps(payload).encode("utf-8")
auth_headers, auth_error = self._auth_headers(required_scope="newsletter:send.write")
if auth_error:
return APIResult(ok=False, status=0, data={}, error=auth_error)
headers = {"Content-Type": "application/json", **auth_headers}
request = Request(endpoint, data=body, headers=headers, method="POST")
timeout = max(1, int(self.config.send_engine_timeout_seconds or 10))
return self._send(request, timeout=timeout)
def get_send_job(self, send_job_id: str) -> APIResult:
base_url = (self.config.send_engine_base_url or "").strip()
if not base_url:
return APIResult(ok=False, status=0, data={}, error="send_engine_base_url is empty")
if not send_job_id:
return APIResult(ok=False, status=0, data={}, error="send_job_id is empty")
endpoint = urljoin(
f"{base_url.rstrip('/')}/",
f"{self.config.send_engine_send_jobs_path.lstrip('/')}/{send_job_id}",
)
auth_headers, auth_error = self._auth_headers(required_scope="newsletter:send.read")
if auth_error:
return APIResult(ok=False, status=0, data={}, error=auth_error)
headers = {"Accept": "application/json", **auth_headers}
request = Request(endpoint, headers=headers, method="GET")
timeout = max(1, int(self.config.send_engine_timeout_seconds or 10))
return self._send(request, timeout=timeout)
def _auth_headers(self, required_scope: str = "") -> tuple[dict, str]:
token, error = self._get_member_center_access_token(required_scope=required_scope)
if error:
return {}, error
return {"Authorization": f"Bearer {token}"}, ""
def _get_member_center_access_token(self, required_scope: str = "") -> tuple[str, str]:
scope = (
(required_scope or "").strip()
or (self.config.send_engine_oauth_scope or "").strip()
or "newsletter:send.write"
)
now = int(time.time())
cached = self._oauth_token_cache.get(scope)
if cached and now < cached[1] - 30:
return cached[0], ""
member_center_base_url = (self.config.member_center_base_url or "").strip()
if not member_center_base_url:
return "", "member_center_base_url is empty (cannot resolve oauth token url)"
token_path = (self.config.member_center_oauth_token_path or "/oauth/token").strip()
token_url = urljoin(f"{member_center_base_url.rstrip('/')}/", token_path.lstrip("/"))
client_id = (self.config.member_center_oauth_client_id or "").strip()
encrypted_secret = (self.config.member_center_oauth_client_secret or "").strip()
client_secret = ""
if encrypted_secret:
try:
client_secret = decrypt_text(encrypted_secret)
except Exception:
client_secret = encrypted_secret
if not client_id or not client_secret:
return ("", "member center oauth client_id/client_secret is empty")
body = urlencode(
{
k: v
for k, v in {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": scope,
"audience": (self.config.member_center_oauth_audience or "").strip(),
}.items()
if v
}
).encode("utf-8")
request = Request(
token_url,
data=body,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
timeout = max(1, int(self.config.send_engine_timeout_seconds or 10))
result = self._send(request, timeout=timeout)
if not result.ok:
return "", f"oauth token request failed: HTTP {result.status} {result.error}".strip()
access_token = str(result.data.get("access_token") or "").strip()
if not access_token:
return "", "oauth token response missing access_token"
expires_in = int(result.data.get("expires_in") or 300)
self._oauth_token_cache[scope] = (access_token, int(time.time()) + max(60, expires_in))
return access_token, ""
def _send(self, request: Request, timeout: int) -> APIResult:
try:
with urlopen(request, timeout=timeout) as response:
raw = response.read().decode("utf-8")
data = json.loads(raw) if raw else {}
status = getattr(response, "status", 200)
return APIResult(ok=200 <= status < 300, status=status, data=data)
except HTTPError as exc:
raw = exc.read().decode("utf-8") if exc.fp else ""
try:
data = json.loads(raw) if raw else {}
except json.JSONDecodeError:
data = {}
return APIResult(ok=False, status=exc.code, data=data, error=str(exc))
except (URLError, TimeoutError, ValueError, json.JSONDecodeError) as exc:
return APIResult(ok=False, status=0, data={}, error=str(exc))
def send_subscribe_email(*, to_email: str, subject: str, text_body: str, html_body: str, config: NewsletterSystemSettings) -> int:
from_email = build_from_email(config.sender_name, config.sender_email)
reply_to = [config.reply_to_email] if config.reply_to_email else None

View File

@ -0,0 +1,345 @@
import time
from django.utils import timezone
from .models import NewsletterCampaign, NewsletterDispatchRecord, NewsletterSystemSettings
from .newsletter import SendEngineClient, render_newsletter_html_for_send_job
TERMINAL_SEND_JOB_STATUSES = {"completed", "failed", "cancelled"}
SUCCESS_SEND_JOB_STATUSES = {"completed"}
def _normalize_status(value: str) -> str:
return (value or "").strip().lower()
def _is_terminal_status(value: str) -> bool:
return _normalize_status(value) in TERMINAL_SEND_JOB_STATUSES
def _is_success_status(value: str) -> bool:
return _normalize_status(value) in SUCCESS_SEND_JOB_STATUSES
def _build_send_job_payload(*, campaign: NewsletterCampaign, settings_obj: NewsletterSystemSettings, list_id: str) -> dict:
site_base_url = (settings_obj.site_base_url or "").strip().rstrip("/")
body_html = render_newsletter_html_for_send_job(campaign.html_template, site_base_url=site_base_url)
body_text = (campaign.text_template or "").strip()
payload = {
"list_id": list_id,
"name": campaign.title,
"subject": campaign.subject_template,
}
tenant_id = (settings_obj.member_center_tenant_id or "").strip()
if tenant_id:
payload["tenant_id"] = tenant_id
if body_html:
payload["body_html"] = body_html
if body_text:
payload["body_text"] = body_text
if site_base_url:
payload["template"] = {
"unsubscribe_url": f"{site_base_url}/newsletter/unsubscribe?email={{{{email}}}}",
}
return payload
def _create_dispatch_record(
*,
campaign: NewsletterCampaign,
status: str,
retry_count: int,
response_status: int | None,
response_payload: dict,
error_message: str = "",
):
NewsletterDispatchRecord.objects.create(
campaign=campaign,
status=status,
retry_count=max(0, int(retry_count)),
response_status=response_status,
response_payload=response_payload or {},
error_message=error_message or "",
)
def _schedule_retry(*, campaign: NewsletterCampaign, attempt: int, response_status: int, response_payload: dict, error_message: str, settings_obj: NewsletterSystemSettings):
max_attempts = max(0, int(settings_obj.send_engine_retry_max_attempts or 0))
interval = max(1, int(settings_obj.send_engine_retry_interval_seconds or 300))
if attempt >= max_attempts:
return
NewsletterDispatchRecord.objects.create(
campaign=campaign,
status="retry_scheduled",
retry_count=attempt + 1,
next_retry_at=timezone.now() + timezone.timedelta(seconds=interval),
response_status=response_status,
response_payload=response_payload or {},
error_message=error_message or "",
)
def _poll_send_job_to_terminal(*, send_engine: SendEngineClient, send_job_id: str, poll_interval_seconds: int = 3, max_polls: int = 40) -> tuple[bool, int, dict, str]:
history = []
last_status = ""
for _ in range(max(1, int(max_polls))):
poll = send_engine.get_send_job(send_job_id)
if not poll.ok:
error = poll.error
if poll.status in (401, 403):
error = (
f"send engine auth failed when polling send job (HTTP {poll.status}); "
"token may miss scope newsletter:send.read"
)
return False, poll.status, {"send_job_id": send_job_id, "poll_history": history, "last_response": poll.data}, error
status = _normalize_status(str(poll.data.get("status", "")))
last_status = status
history.append(
{
"polled_at": timezone.now().isoformat(),
"status": status,
"response_status": poll.status,
}
)
if _is_terminal_status(status):
return True, poll.status, {"send_job_id": send_job_id, "send_job_status": status, "poll_history": history, "send_job": poll.data}, ""
time.sleep(max(1, int(poll_interval_seconds)))
return (
False,
0,
{"send_job_id": send_job_id, "send_job_status": last_status or "unknown", "poll_history": history},
f"send job did not reach terminal status after {max(1, int(max_polls))} polls",
)
def _dispatch_once(*, campaign: NewsletterCampaign, settings_obj: NewsletterSystemSettings) -> dict:
send_engine = SendEngineClient(settings_obj)
list_id = (campaign.list_id or settings_obj.member_center_list_id or "").strip()
if not list_id:
return {
"ok": False,
"response_status": 0,
"response_payload": {},
"error": "list_id is empty",
"send_job_status": "",
}
payload = _build_send_job_payload(campaign=campaign, settings_obj=settings_obj, list_id=list_id)
if not payload.get("body_html") and not payload.get("body_text") and not payload.get("template"):
return {
"ok": False,
"response_status": 0,
"response_payload": {},
"error": "body_html/body_text/template must provide at least one value",
"send_job_status": "",
}
create_result = send_engine.create_send_job(payload)
if not create_result.ok:
create_error = create_result.error
if create_result.status in (401, 403):
create_error = (
f"send engine auth failed when creating send job (HTTP {create_result.status}); "
"verify bearer token and scope newsletter:send.write"
)
return {
"ok": False,
"response_status": create_result.status,
"response_payload": {"create_response": create_result.data, "payload": payload},
"error": create_error,
"send_job_status": "",
}
send_job_id = str(create_result.data.get("send_job_id") or create_result.data.get("id") or "").strip()
create_status = _normalize_status(str(create_result.data.get("status", "")))
response_payload = {
"payload": payload,
"create_response": create_result.data,
"send_job_id": send_job_id,
"send_job_status": create_status,
}
if not send_job_id:
return {
"ok": False,
"response_status": create_result.status,
"response_payload": response_payload,
"error": "send_job_id missing from create response",
"send_job_status": create_status,
}
if _is_terminal_status(create_status):
return {
"ok": _is_success_status(create_status),
"response_status": create_result.status,
"response_payload": response_payload,
"error": "" if _is_success_status(create_status) else f"send job terminal status: {create_status}",
"send_job_status": create_status,
}
poll_ok, poll_status, poll_payload, poll_error = _poll_send_job_to_terminal(
send_engine=send_engine,
send_job_id=send_job_id,
)
merged_payload = {**response_payload, **poll_payload}
final_status = _normalize_status(str(merged_payload.get("send_job_status", "")))
if poll_ok and _is_success_status(final_status):
return {
"ok": True,
"response_status": poll_status,
"response_payload": merged_payload,
"error": "",
"send_job_status": final_status,
}
error_message = poll_error or f"send job terminal status: {final_status or 'unknown'}"
return {
"ok": False,
"response_status": poll_status,
"response_payload": merged_payload,
"error": error_message,
"send_job_status": final_status,
}
def dispatch_campaign(campaign: NewsletterCampaign, schedule_retry_on_failure: bool = True) -> dict:
settings_obj = NewsletterSystemSettings.load()
campaign.status = NewsletterCampaign.STATUS_SENDING
campaign.last_error = ""
campaign.save(update_fields=["status", "last_error", "updated_at"])
result = _dispatch_once(campaign=campaign, settings_obj=settings_obj)
if result["ok"]:
campaign.sent_at = timezone.now()
campaign.status = NewsletterCampaign.STATUS_SENT
campaign.last_error = ""
campaign.save(update_fields=["sent_at", "status", "last_error", "updated_at"])
_create_dispatch_record(
campaign=campaign,
status="sent",
retry_count=0,
response_status=result["response_status"],
response_payload=result["response_payload"],
)
return {"sent": 1, "failed": 0, "error": "", "send_job_status": result["send_job_status"]}
campaign.status = NewsletterCampaign.STATUS_FAILED
campaign.last_error = result["error"]
campaign.save(update_fields=["status", "last_error", "updated_at"])
_create_dispatch_record(
campaign=campaign,
status="failed",
retry_count=0,
response_status=result["response_status"],
response_payload=result["response_payload"],
error_message=result["error"],
)
if schedule_retry_on_failure:
_schedule_retry(
campaign=campaign,
attempt=0,
response_status=result["response_status"],
response_payload=result["response_payload"],
error_message=result["error"],
settings_obj=settings_obj,
)
return {"sent": 0, "failed": 1, "error": result["error"], "send_job_status": result["send_job_status"]}
def process_due_retries(limit: int = 200) -> dict:
settings_obj = NewsletterSystemSettings.load()
now = timezone.now()
due_retries = list(
NewsletterDispatchRecord.objects.filter(
status="retry_scheduled",
next_retry_at__isnull=False,
next_retry_at__lte=now,
)
.select_related("campaign")
.order_by("next_retry_at", "id")[: max(1, int(limit))]
)
sent = 0
failed = 0
processed = 0
for retry in due_retries:
processed += 1
campaign = retry.campaign
retry.status = "retry_consumed"
retry.save(update_fields=["status"])
if campaign.status == NewsletterCampaign.STATUS_SENT:
continue
result = _dispatch_once(campaign=campaign, settings_obj=settings_obj)
if result["ok"]:
sent += 1
campaign.sent_at = timezone.now()
campaign.status = NewsletterCampaign.STATUS_SENT
campaign.last_error = ""
campaign.save(update_fields=["sent_at", "status", "last_error", "updated_at"])
_create_dispatch_record(
campaign=campaign,
status="sent",
retry_count=retry.retry_count,
response_status=result["response_status"],
response_payload=result["response_payload"],
)
else:
failed += 1
campaign.status = NewsletterCampaign.STATUS_FAILED
campaign.last_error = result["error"]
campaign.save(update_fields=["status", "last_error", "updated_at"])
_create_dispatch_record(
campaign=campaign,
status="failed",
retry_count=retry.retry_count,
response_status=result["response_status"],
response_payload=result["response_payload"],
error_message=result["error"],
)
_schedule_retry(
campaign=campaign,
attempt=retry.retry_count,
response_status=result["response_status"],
response_payload=result["response_payload"],
error_message=result["error"],
settings_obj=settings_obj,
)
return {"processed_retries": processed, "sent_retries": sent, "failed_retries": failed}
def dispatch_due_campaigns(limit: int = 20) -> dict:
retry_result = process_due_retries(limit=200)
now = timezone.now()
due_campaigns = list(
NewsletterCampaign.objects.filter(
status=NewsletterCampaign.STATUS_SCHEDULED,
scheduled_at__isnull=False,
scheduled_at__lte=now,
)
.order_by("scheduled_at", "id")[: max(1, int(limit))]
)
total_sent = 0
total_failed = 0
processed = 0
for campaign in due_campaigns:
processed += 1
result = dispatch_campaign(campaign)
total_sent += int(result.get("sent", 0))
if result.get("failed", 0):
total_failed += 1
return {
"processed_campaigns": processed,
"sent_recipients": total_sent,
"failed_campaigns": total_failed,
**retry_result,
}

View File

@ -1,6 +1,12 @@
from django.test import TestCase
from .newsletter import extract_token, render_placeholders
from .newsletter import (
extract_token,
generate_one_click_token,
render_placeholders,
render_newsletter_html,
verify_one_click_token,
)
from .security import decrypt_text, encrypt_text
@ -32,3 +38,39 @@ class NewsletterTemplateTests(TestCase):
self.assertTrue(encrypted.startswith("enc1:"))
self.assertNotEqual(encrypted, secret)
self.assertEqual(decrypt_text(encrypted), secret)
def test_one_click_token_roundtrip(self):
token = generate_one_click_token(
subscriber_id="sub-1",
list_id="list-1",
site_id="site-1",
campaign_id="cmp-1",
secret="test-secret",
ttl_seconds=60,
)
payload, error = verify_one_click_token(token, "test-secret")
self.assertIsNone(error)
self.assertEqual(payload["subscriber_id"], "sub-1")
self.assertEqual(payload["list_id"], "list-1")
def test_one_click_token_invalid_signature(self):
token = generate_one_click_token(
subscriber_id="sub-1",
list_id="list-1",
site_id="site-1",
campaign_id="cmp-1",
secret="test-secret",
ttl_seconds=60,
)
payload, error = verify_one_click_token(token, "other-secret")
self.assertIsNone(payload)
self.assertEqual(error, "invalid")
def test_render_newsletter_html_absolutizes_relative_links(self):
rendered = render_newsletter_html(
'<p><a href="/a">A</a><img src="/media/x.jpg"></p>',
values={},
site_base_url="https://news.example.com",
)
self.assertIn('href="https://news.example.com/a"', rendered)
self.assertIn('src="https://news.example.com/media/x.jpg"', rendered)

View File

@ -1,23 +1,38 @@
from urllib.parse import urlencode
import hashlib
import json
from django.contrib import messages
from django.contrib.admin.views.decorators import staff_member_required
from django.core.exceptions import ValidationError
from django.core.validators import validate_email
from django.http import HttpResponseNotAllowed
from django.shortcuts import redirect, render
from django.http import HttpResponseNotAllowed, JsonResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_GET, require_http_methods, require_POST
from wagtail.models import Site
from .forms import NewsletterSubscribeForm, NewsletterUnsubscribeForm
from .models import NewsletterSystemSettings, NewsletterTemplateSettings
from .models import (
NewsletterCampaign,
NewsletterSystemSettings,
NewsletterTemplateSettings,
OneClickUnsubscribeAudit,
)
from .newsletter import (
MemberCenterClient,
build_from_email,
build_list_unsubscribe_headers,
build_one_click_unsubscribe_url,
extract_token,
generate_one_click_token,
render_placeholders,
resolve_one_click_secret,
send_subscribe_email,
verify_one_click_token,
)
from .newsletter_scheduler import dispatch_campaign
def _load_settings():
@ -32,6 +47,88 @@ def _build_context(*, title: str, message: str, success: bool):
}
def _extract_one_click_token(request):
token = (request.GET.get("token") or "").strip()
if token:
return token
if request.method == "POST":
token = (request.POST.get("token") or "").strip()
if token:
return token
if request.body:
try:
body = json.loads(request.body.decode("utf-8"))
token = (body.get("token") or "").strip()
if token:
return token
except Exception:
pass
return ""
def _handle_one_click_unsubscribe(*, request, token):
system_settings, _ = _load_settings()
secret = resolve_one_click_secret(system_settings)
payload, token_error = verify_one_click_token(token, secret)
token_hash = hashlib.sha256(token.encode("utf-8")).hexdigest() if token else ""
if token_error == "expired":
return False, 410, "退訂連結已過期。"
if token_error:
return False, 400, "退訂連結無效。"
existing = OneClickUnsubscribeAudit.objects.filter(
token_hash=token_hash,
status__in=["success", "already_unsubscribed"],
).first()
if existing:
return True, 200, "您已完成退訂。"
request_payload = {
"subscriber_id": payload["subscriber_id"],
"list_id": payload["list_id"],
"source": "one_click",
"campaign_id": payload.get("campaign_id", ""),
}
result = MemberCenterClient(system_settings).one_click_unsubscribe(request_payload)
response_data = result.data if isinstance(result.data, dict) else {}
if result.ok:
already = bool(response_data.get("already_unsubscribed"))
success = bool(response_data.get("success", True)) or already
if success:
OneClickUnsubscribeAudit.objects.update_or_create(
token_hash=token_hash,
defaults={
"subscriber_id": str(payload.get("subscriber_id", "")),
"list_id": str(payload.get("list_id", "")),
"site_id": str(payload.get("site_id", "")),
"campaign_id": str(payload.get("campaign_id", "")),
"status": "already_unsubscribed" if already else "success",
"response_status": result.status,
"response_payload": response_data,
},
)
return True, 200, "您已完成退訂。"
OneClickUnsubscribeAudit.objects.update_or_create(
token_hash=token_hash,
defaults={
"subscriber_id": str(payload.get("subscriber_id", "")),
"list_id": str(payload.get("list_id", "")),
"site_id": str(payload.get("site_id", "")),
"campaign_id": str(payload.get("campaign_id", "")),
"status": "failed",
"response_status": result.status,
"response_payload": response_data,
"error_message": result.error,
},
)
return False, 502, "退訂服務暫時無法使用。"
@require_http_methods(["POST"])
def newsletter_subscribe(request):
form = NewsletterSubscribeForm(request.POST)
@ -236,6 +333,80 @@ def newsletter_unsubscribe(request):
)
@csrf_exempt
@require_http_methods(["GET", "POST"])
def one_click_unsubscribe(request):
token = _extract_one_click_token(request)
ok, status_code, message = _handle_one_click_unsubscribe(request=request, token=token)
if request.method == "POST":
body = {"success": ok}
if not ok and status_code in (400, 410):
body["error"] = message
return JsonResponse(body, status=status_code)
return render(
request,
"base/newsletter/status.html",
_build_context(
title="退訂成功" if ok else "退訂失敗",
message=f"<p>{message}</p>",
success=ok,
),
status=status_code,
)
@staff_member_required
@require_POST
def newsletter_one_click_preview(request):
settings_obj = NewsletterSystemSettings.load(request_or_site=request)
site = Site.find_for_request(request)
site_base_url = f"https://{site.hostname}" if site else request.build_absolute_uri("/").rstrip("/")
secret = resolve_one_click_secret(settings_obj)
token = generate_one_click_token(
subscriber_id=request.POST.get("subscriber_id", "preview-subscriber"),
list_id=request.POST.get("list_id", settings_obj.member_center_list_id or "preview-list"),
site_id=request.POST.get("site_id", str(site.id) if site else "preview-site"),
campaign_id=request.POST.get("campaign_id", "preview-campaign"),
secret=secret,
ttl_seconds=settings_obj.one_click_token_ttl_seconds,
)
one_click_url = build_one_click_unsubscribe_url(
site_base_url=site_base_url,
endpoint_path=settings_obj.one_click_endpoint_path or "/u/unsubscribe",
token=token,
)
headers = build_list_unsubscribe_headers(one_click_url=one_click_url)
messages.success(
request,
f"One-click URL: {one_click_url} | Headers: {headers}",
)
return redirect(request.META.get("HTTP_REFERER") or reverse("wagtailadmin_home"))
@staff_member_required
@require_GET
def newsletter_campaign_send_now(request, campaign_id: int):
campaign = get_object_or_404(NewsletterCampaign, pk=campaign_id)
if campaign.status == NewsletterCampaign.STATUS_SENDING:
messages.error(request, "Campaign is currently sending.")
return redirect(request.META.get("HTTP_REFERER") or reverse("wagtailadmin_home"))
result = dispatch_campaign(campaign, schedule_retry_on_failure=False)
if result.get("failed", 0):
messages.error(
request,
f"Send now completed with failures. sent={result.get('sent', 0)} failed={result.get('failed', 0)}",
)
else:
messages.success(
request,
f"Send now completed. sent={result.get('sent', 0)} failed={result.get('failed', 0)}",
)
return redirect(request.META.get("HTTP_REFERER") or reverse("wagtailadmin_home"))
@staff_member_required
@require_POST
def newsletter_smtp_test(request):

View File

@ -0,0 +1,104 @@
from django.urls import reverse
from django.utils.translation import get_language
from wagtail import hooks
from wagtail.admin.panels import FieldPanel
from wagtail.admin.rich_text import DraftailRichTextArea
from wagtail.admin.widgets import Button
from wagtail.permission_policies import ModelPermissionPolicy
from wagtail.snippets.models import register_snippet
from wagtail.snippets.views.snippets import CreateView, SnippetViewSet
from .models import NewsletterCampaign, NewsletterDispatchRecord, NewsletterSystemSettings
class NewsletterCampaignCreateView(CreateView):
def get_initial(self):
initial = super().get_initial()
if (initial.get("list_id") or "").strip():
return initial
default_list_id = (NewsletterSystemSettings.load().member_center_list_id or "").strip()
if default_list_id:
initial["list_id"] = default_list_id
return initial
class NewsletterCampaignViewSet(SnippetViewSet):
model = NewsletterCampaign
icon = "mail"
menu_label = "Newsletter campaigns"
menu_order = 250
add_to_admin_menu = True
add_view_class = NewsletterCampaignCreateView
list_display = ["title", "list_id", "status", "scheduled_at", "sent_at", "updated_at"]
list_filter = ["status"]
search_fields = ["title", "list_id", "subject_template"]
panels = [
FieldPanel("title"),
FieldPanel(
"list_id",
help_text="留空時會使用 Newsletter System Settings 的 member_center_list_id。",
),
FieldPanel("subject_template"),
FieldPanel(
"html_template",
widget=DraftailRichTextArea(
features=["h2", "h3", "bold", "italic", "link", "image", "ul", "ol"],
),
help_text="可用圖片按鈕從 Wagtail 圖庫選圖或上傳。建議使用圖片 URL不要在 CMS 端轉 Base64。",
),
FieldPanel("text_template"),
FieldPanel("scheduled_at"),
]
class ReadOnlySnippetPermissionPolicy(ModelPermissionPolicy):
def user_has_permission(self, user, action):
if action in {"add", "change", "delete"}:
return False
return super().user_has_permission(user, action)
class NewsletterDispatchRecordViewSet(SnippetViewSet):
model = NewsletterDispatchRecord
icon = "tasks"
menu_label = "Newsletter dispatch records"
menu_order = 251
add_to_admin_menu = True
inspect_view_enabled = True
copy_view_enabled = False
permission_policy = ReadOnlySnippetPermissionPolicy(NewsletterDispatchRecord)
list_display = [
"campaign",
"email",
"subscriber_id",
"status",
"retry_count",
"next_retry_at",
"response_status",
"created_at",
]
list_filter = ["status", "created_at"]
search_fields = ["email", "subscriber_id", "campaign__title"]
register_snippet(NewsletterCampaignViewSet)
register_snippet(NewsletterDispatchRecordViewSet)
@hooks.register("register_snippet_listing_buttons")
def newsletter_campaign_listing_buttons(snippet, user, next_url=None):
if not isinstance(snippet, NewsletterCampaign):
return
if not user.is_staff:
return
if snippet.status == NewsletterCampaign.STATUS_SENDING:
return
language = (get_language() or "").lower()
label = "馬上發送" if language.startswith("zh") else "Send now"
yield Button(
label,
reverse("newsletter_campaign_send_now", args=[snippet.pk]),
icon_name="mail",
priority=15,
)

View File

@ -1,8 +1,18 @@
#!/usr/bin/env bash
set -e
# Run pending migrations and collect static assets before starting the app
python manage.py migrate --noinput
python manage.py collectstatic --noinput
APP_ROLE="${APP_ROLE:-web}"
exec "$@"
# Run pending migrations before starting any role.
python manage.py migrate --noinput
if [ "$APP_ROLE" = "scheduler" ]; then
SCHEDULER_INTERVAL_SECONDS="${SCHEDULER_INTERVAL_SECONDS:-60}"
while true; do
python manage.py run_newsletter_scheduler --limit 20
sleep "$SCHEDULER_INTERVAL_SECONDS"
done
else
python manage.py collectstatic --noinput
exec "$@"
fi

View File

@ -57,4 +57,37 @@
</div>
</section>
{% endif %}
{% if form.one_click_endpoint_path %}
<section class="w-panel w-mt-8">
<div class="w-panel__header">
<h2 class="w-panel__heading">List-Unsubscribe One-Click 預覽</h2>
</div>
<div class="w-panel__content">
<ul class="fields">
<li>
<label class="w-block w-font-semibold w-mb-2" for="preview_subscriber_id">Subscriber ID</label>
<input class="w-input" type="text" id="preview_subscriber_id" name="subscriber_id" value="preview-subscriber">
</li>
<li>
<label class="w-block w-font-semibold w-mb-2" for="preview_list_id">List ID</label>
<input class="w-input" type="text" id="preview_list_id" name="list_id" value="{{ form.member_center_list_id.value|default:'' }}">
</li>
<li>
<label class="w-block w-font-semibold w-mb-2" for="preview_campaign_id">Campaign ID</label>
<input class="w-input" type="text" id="preview_campaign_id" name="campaign_id" value="preview-campaign">
</li>
</ul>
<button
type="submit"
class="button button-secondary"
formaction="{% url 'newsletter_one_click_preview' %}"
formmethod="post"
formnovalidate
>
產生 One-Click 連結與 Header
</button>
</div>
</section>
{% endif %}
{% endblock %}

View File

@ -20,7 +20,10 @@ urlpatterns = [
path("newsletter/subscribe/", base_views.newsletter_subscribe, name="newsletter_subscribe"),
path("newsletter/confirm/", base_views.newsletter_confirm, name="newsletter_confirm"),
path("newsletter/unsubscribe/", base_views.newsletter_unsubscribe, name="newsletter_unsubscribe"),
path("u/unsubscribe", base_views.one_click_unsubscribe, name="one_click_unsubscribe"),
path("newsletter/smtp-test/", base_views.newsletter_smtp_test, name="newsletter_smtp_test"),
path("newsletter/one-click-preview/", base_views.newsletter_one_click_preview, name="newsletter_one_click_preview"),
path("newsletter/campaigns/<int:campaign_id>/send-now/", base_views.newsletter_campaign_send_now, name="newsletter_campaign_send_now"),
]

View File

@ -1,31 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIFazCCA1OgAwIBAgIRAIIQz7DSQONZRGPgu2OCiwAwDQYJKoZIhvcNAQELBQAw
TzELMAkGA1UEBhMCVVMxKTAnBgNVBAoTIEludGVybmV0IFNlY3VyaXR5IFJlc2Vh
cmNoIEdyb3VwMRUwEwYDVQQDEwxJU1JHIFJvb3QgWDEwHhcNMTUwNjA0MTEwNDM4
WhcNMzUwNjA0MTEwNDM4WjBPMQswCQYDVQQGEwJVUzEpMCcGA1UEChMgSW50ZXJu
ZXQgU2VjdXJpdHkgUmVzZWFyY2ggR3JvdXAxFTATBgNVBAMTDElTUkcgUm9vdCBY
MTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBAK3oJHP0FDfzm54rVygc
h77ct984kIxuPOZXoHj3dcKi/vVqbvYATyjb3miGbESTtrFj/RQSa78f0uoxmyF+
0TM8ukj13Xnfs7j/EvEhmkvBioZxaUpmZmyPfjxwv60pIgbz5MDmgK7iS4+3mX6U
A5/TR5d8mUgjU+g4rk8Kb4Mu0UlXjIB0ttov0DiNewNwIRt18jA8+o+u3dpjq+sW
T8KOEUt+zwvo/7V3LvSye0rgTBIlDHCNAymg4VMk7BPZ7hm/ELNKjD+Jo2FR3qyH
B5T0Y3HsLuJvW5iB4YlcNHlsdu87kGJ55tukmi8mxdAQ4Q7e2RCOFvu396j3x+UC
B5iPNgiV5+I3lg02dZ77DnKxHZu8A/lJBdiB3QW0KtZB6awBdpUKD9jf1b0SHzUv
KBds0pjBqAlkd25HN7rOrFleaJ1/ctaJxQZBKT5ZPt0m9STJEadao0xAH0ahmbWn
OlFuhjuefXKnEgV4We0+UXgVCwOPjdAvBbI+e0ocS3MFEvzG6uBQE3xDk3SzynTn
jh8BCNAw1FtxNrQHusEwMFxIt4I7mKZ9YIqioymCzLq9gwQbooMDQaHWBfEbwrbw
qHyGO0aoSCqI3Haadr8faqU9GY/rOPNk3sgrDQoo//fb4hVC1CLQJ13hef4Y53CI
rU7m2Ys6xt0nUW7/vGT1M0NPAgMBAAGjQjBAMA4GA1UdDwEB/wQEAwIBBjAPBgNV
HRMBAf8EBTADAQH/MB0GA1UdDgQWBBR5tFnme7bl5AFzgAiIyBpY9umbbjANBgkq
hkiG9w0BAQsFAAOCAgEAVR9YqbyyqFDQDLHYGmkgJykIrGF1XIpu+ILlaS/V9lZL
ubhzEFnTIZd+50xx+7LSYK05qAvqFyFWhfFQDlnrzuBZ6brJFe+GnY+EgPbk6ZGQ
3BebYhtF8GaV0nxvwuo77x/Py9auJ/GpsMiu/X1+mvoiBOv/2X/qkSsisRcOj/KK
NFtY2PwByVS5uCbMiogziUwthDyC3+6WVwW6LLv3xLfHTjuCvjHIInNzktHCgKQ5
ORAzI4JMPJ+GslWYHb4phowim57iaztXOoJwTdwJx4nLCgdNbOhdjsnvzqvHu7Ur
TkXWStAmzOVyyghqpZXjFaH3pO3JLF+l+/+sKAIuvtd7u+Nxe5AW0wdeRlN8NwdC
jNPElpzVmbUq4JUagEiuTDkHzsxHpFKVK7q4+63SM1N95R1NbdWhscdCb+ZAJzVc
oyi3B43njTOQ5yOf+1CceWxG1bQVs5ZufpsMljq4Ui0/1lvh+wjChP4kqKOJ2qxq
4RgqsahDYVvTH9w7jXbyLeiNdd8XM2w9U/t7y0Ff/9yi0GE44Za4rF2LN9d11TPA
mRGunUHBcnWEvgJBQl9nJEiU0Zsnvgc/ubhPgXRR4Xq37Z0j4r7g1SgEEzwxA57d
emyPxgcYxn/eR44/KJ4EBs+lVDR3veyJm+kXQ99b21/+jh5Xos1AnX5iItreGCc=
-----END CERTIFICATE-----