Warren Chen eb8307cb3b feat: Implement one-click unsubscribe feature and newsletter campaign management
- Added one-click unsubscribe functionality with token generation and verification.
- Introduced a new model for tracking one-click unsubscribe audits.
- Enhanced newsletter campaign management with the ability to send campaigns immediately.
- Implemented a scheduler for dispatching due newsletter campaigns.
- Updated views and templates to support one-click unsubscribe and campaign previews.
- Added management commands for running the newsletter scheduler.
- Removed obsolete SSL certificate file.
- Updated entrypoint script to handle different application roles.
2026-04-02 02:51:39 +09:00

502 lines
19 KiB
Python

import json
import logging
import hmac
import re
import secrets
import time
from dataclasses import dataclass
from email.utils import formataddr
from hashlib import sha256
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode, urljoin
from urllib.request import Request, urlopen
from django.conf import settings
from django.core.mail import EmailMultiAlternatives, get_connection
from wagtail.rich_text import expand_db_html
from .models import NewsletterSystemSettings
from .security import decrypt_text
logger = logging.getLogger(__name__)
PLACEHOLDER_KEYS = (
"token",
"email",
"list_id",
"tenant_id",
"confirm_url",
"unsubscribe_url",
)
@dataclass
class APIResult:
ok: bool
status: int
data: dict
error: str = ""
class MemberCenterClient:
def __init__(self, config: NewsletterSystemSettings):
self.config = config
self._oauth_token_cache: dict[str, tuple[str, int]] = {}
def subscribe(self, payload: dict) -> APIResult:
return self._post(self.config.member_center_subscribe_path, payload)
def confirm(self, token: str) -> APIResult:
return self._get(self.config.member_center_confirm_path, {"token": token})
def request_unsubscribe_token(self, payload: dict) -> APIResult:
return self._post(self.config.member_center_unsubscribe_token_path, payload)
def unsubscribe(self, payload: dict) -> APIResult:
return self._post(self.config.member_center_unsubscribe_path, payload)
def one_click_unsubscribe(self, payload: dict) -> APIResult:
return self._post(self.config.member_center_one_click_unsubscribe_path, payload)
def list_subscriptions(self, list_id: str) -> APIResult:
auth_headers, auth_error = self._auth_headers()
if auth_error:
return APIResult(ok=False, status=0, data={}, error=auth_error)
return self._get(
self.config.member_center_subscriptions_path,
{"list_id": list_id},
headers=auth_headers,
)
def _get(self, path: str, query: dict, headers: dict | None = None) -> APIResult:
base_url = (self.config.member_center_base_url or "").strip()
if not base_url:
return APIResult(ok=False, status=0, data={}, error="member_center_base_url is empty")
endpoint = urljoin(f"{base_url.rstrip('/')}/", path.lstrip("/"))
if query:
endpoint = f"{endpoint}?{urlencode(query)}"
request = Request(
endpoint,
headers={"Accept": "application/json", **(headers or {})},
method="GET",
)
timeout = max(1, int(self.config.member_center_timeout_seconds or 10))
return self._send(request, timeout=timeout)
def _post(self, path: str, payload: dict, headers: dict | None = None) -> APIResult:
base_url = (self.config.member_center_base_url or "").strip()
if not base_url:
return APIResult(ok=False, status=0, data={}, error="member_center_base_url is empty")
endpoint = urljoin(f"{base_url.rstrip('/')}/", path.lstrip("/"))
body = json.dumps(payload).encode("utf-8")
request = Request(
endpoint,
data=body,
headers={"Content-Type": "application/json", **(headers or {})},
method="POST",
)
timeout = max(1, int(self.config.member_center_timeout_seconds or 10))
return self._send(request, timeout=timeout)
def _auth_headers(self) -> tuple[dict, str]:
scope = (self.config.member_center_oauth_scope or "").strip() or "newsletter:list.read"
token, error = self._get_oauth_access_token(required_scope=scope)
if not token:
return {}, error or "member center oauth token is empty"
return {"Authorization": f"Bearer {token}"}, ""
def _get_oauth_access_token(self, required_scope: str) -> tuple[str, str]:
scope = (required_scope or "").strip()
now = int(time.time())
cached = self._oauth_token_cache.get(scope)
if cached and now < cached[1] - 30:
return cached[0], ""
base_url = (self.config.member_center_base_url or "").strip()
if not base_url:
return "", "member_center_base_url is empty (cannot resolve oauth token url)"
token_path = (self.config.member_center_oauth_token_path or "/oauth/token").strip()
token_url = urljoin(f"{base_url.rstrip('/')}/", token_path.lstrip("/"))
client_id = (self.config.member_center_oauth_client_id or "").strip()
encrypted_secret = (self.config.member_center_oauth_client_secret or "").strip()
client_secret = ""
if encrypted_secret:
try:
client_secret = decrypt_text(encrypted_secret)
except Exception:
client_secret = encrypted_secret
if not client_id or not client_secret:
return "", "member center oauth client_id/client_secret is empty"
body = urlencode(
{
k: v
for k, v in {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": scope,
"audience": (self.config.member_center_oauth_audience or "").strip(),
}.items()
if v
}
).encode("utf-8")
request = Request(
token_url,
data=body,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
timeout = max(1, int(self.config.member_center_timeout_seconds or 10))
result = self._send(request, timeout=timeout)
if not result.ok:
return "", f"oauth token request failed: HTTP {result.status} {result.error}".strip()
access_token = str(result.data.get("access_token") or "").strip()
if not access_token:
return "", "oauth token response missing access_token"
expires_in = int(result.data.get("expires_in") or 300)
self._oauth_token_cache[scope] = (access_token, int(time.time()) + max(60, expires_in))
return access_token, ""
def _send(self, request: Request, timeout: int) -> APIResult:
try:
with urlopen(request, timeout=timeout) as response:
raw = response.read().decode("utf-8")
data = json.loads(raw) if raw else {}
status = getattr(response, "status", 200)
return APIResult(ok=200 <= status < 300, status=status, data=data)
except HTTPError as exc:
raw = exc.read().decode("utf-8") if exc.fp else ""
try:
data = json.loads(raw) if raw else {}
except json.JSONDecodeError:
data = {}
return APIResult(ok=False, status=exc.code, data=data, error=str(exc))
except (URLError, TimeoutError, ValueError, json.JSONDecodeError) as exc:
return APIResult(ok=False, status=0, data={}, error=str(exc))
def render_placeholders(template: str, values: dict) -> str:
rendered = template or ""
for key in PLACEHOLDER_KEYS:
rendered = rendered.replace(f"{{{{{key}}}}}", str(values.get(key, "")))
return rendered
def _absolutize_links(html: str, site_base_url: str) -> str:
base = (site_base_url or "").strip().rstrip("/")
if not base:
return html
def _replace(match):
prefix = match.group("prefix")
url = match.group("url")
return f"{prefix}{urljoin(f'{base}/', url.lstrip('/'))}"
pattern = re.compile(r"(?P<prefix>\b(?:src|href)=['\"])(?P<url>/[^'\"]*)")
return pattern.sub(_replace, html)
def render_newsletter_html(template: str, values: dict, site_base_url: str = "") -> str:
rendered = render_placeholders(template, values)
try:
rendered = expand_db_html(rendered)
except Exception:
pass
return _absolutize_links(rendered, site_base_url)
def render_newsletter_html_for_send_job(template: str, site_base_url: str = "") -> str:
rendered = template or ""
try:
rendered = expand_db_html(rendered)
except Exception:
pass
return _absolutize_links(rendered, site_base_url)
def extract_token(payload: dict) -> str:
if not payload:
return ""
for key in ("token", "confirm_token", "unsubscribe_token"):
value = payload.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
data = payload.get("data")
if isinstance(data, dict):
for key in ("token", "confirm_token", "unsubscribe_token"):
value = data.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
return ""
def build_from_email(sender_name: str, sender_email: str) -> str:
if sender_name and sender_email:
return formataddr((sender_name, sender_email))
return sender_email
def resolve_one_click_secret(config: NewsletterSystemSettings) -> str:
return (config.one_click_token_secret or "").strip() or settings.SECRET_KEY
def _b64url_encode(data: bytes) -> str:
import base64
return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=")
def _b64url_decode(data: str) -> bytes:
import base64
padding = "=" * ((4 - len(data) % 4) % 4)
return base64.urlsafe_b64decode((data + padding).encode("ascii"))
def generate_one_click_token(*, subscriber_id: str, list_id: str, site_id: str, campaign_id: str, secret: str, ttl_seconds: int) -> str:
payload = {
"subscriber_id": subscriber_id,
"list_id": list_id,
"site_id": site_id,
"campaign_id": campaign_id,
"exp": int(time.time()) + max(1, int(ttl_seconds)),
"nonce": secrets.token_urlsafe(8),
}
payload_json = json.dumps(payload, separators=(",", ":"), ensure_ascii=True).encode("utf-8")
payload_part = _b64url_encode(payload_json)
sig = hmac.new(secret.encode("utf-8"), payload_part.encode("ascii"), sha256).digest()
sig_part = _b64url_encode(sig)
return f"{payload_part}.{sig_part}"
def verify_one_click_token(token: str, secret: str) -> tuple[dict | None, str | None]:
if not token or "." not in token:
return None, "invalid"
payload_part, sig_part = token.split(".", 1)
try:
received_sig = _b64url_decode(sig_part)
except Exception:
return None, "invalid"
expected_sig = hmac.new(secret.encode("utf-8"), payload_part.encode("ascii"), sha256).digest()
if not hmac.compare_digest(received_sig, expected_sig):
return None, "invalid"
try:
payload = json.loads(_b64url_decode(payload_part).decode("utf-8"))
except Exception:
return None, "invalid"
required_keys = ("subscriber_id", "list_id", "site_id", "exp")
if any(not payload.get(key) for key in required_keys):
return None, "invalid"
if int(payload.get("exp", 0)) < int(time.time()):
return None, "expired"
return payload, None
def build_one_click_unsubscribe_url(*, site_base_url: str, endpoint_path: str, token: str) -> str:
base = site_base_url.rstrip("/")
endpoint = endpoint_path if endpoint_path.startswith("/") else f"/{endpoint_path}"
return f"{base}{endpoint}?{urlencode({'token': token})}"
def build_list_unsubscribe_headers(*, one_click_url: str) -> dict:
return {
"List-Unsubscribe": f"<{one_click_url}>",
"List-Unsubscribe-Post": "List-Unsubscribe=One-Click",
}
class SendEngineClient:
def __init__(self, config: NewsletterSystemSettings):
self.config = config
self._oauth_token_cache: dict[str, tuple[str, int]] = {}
def create_send_job(self, payload: dict) -> APIResult:
base_url = (self.config.send_engine_base_url or "").strip()
if not base_url:
return APIResult(ok=False, status=0, data={}, error="send_engine_base_url is empty")
endpoint = urljoin(f"{base_url.rstrip('/')}/", self.config.send_engine_send_jobs_path.lstrip("/"))
body = json.dumps(payload).encode("utf-8")
auth_headers, auth_error = self._auth_headers(required_scope="newsletter:send.write")
if auth_error:
return APIResult(ok=False, status=0, data={}, error=auth_error)
headers = {"Content-Type": "application/json", **auth_headers}
request = Request(endpoint, data=body, headers=headers, method="POST")
timeout = max(1, int(self.config.send_engine_timeout_seconds or 10))
return self._send(request, timeout=timeout)
def get_send_job(self, send_job_id: str) -> APIResult:
base_url = (self.config.send_engine_base_url or "").strip()
if not base_url:
return APIResult(ok=False, status=0, data={}, error="send_engine_base_url is empty")
if not send_job_id:
return APIResult(ok=False, status=0, data={}, error="send_job_id is empty")
endpoint = urljoin(
f"{base_url.rstrip('/')}/",
f"{self.config.send_engine_send_jobs_path.lstrip('/')}/{send_job_id}",
)
auth_headers, auth_error = self._auth_headers(required_scope="newsletter:send.read")
if auth_error:
return APIResult(ok=False, status=0, data={}, error=auth_error)
headers = {"Accept": "application/json", **auth_headers}
request = Request(endpoint, headers=headers, method="GET")
timeout = max(1, int(self.config.send_engine_timeout_seconds or 10))
return self._send(request, timeout=timeout)
def _auth_headers(self, required_scope: str = "") -> tuple[dict, str]:
token, error = self._get_member_center_access_token(required_scope=required_scope)
if error:
return {}, error
return {"Authorization": f"Bearer {token}"}, ""
def _get_member_center_access_token(self, required_scope: str = "") -> tuple[str, str]:
scope = (
(required_scope or "").strip()
or (self.config.send_engine_oauth_scope or "").strip()
or "newsletter:send.write"
)
now = int(time.time())
cached = self._oauth_token_cache.get(scope)
if cached and now < cached[1] - 30:
return cached[0], ""
member_center_base_url = (self.config.member_center_base_url or "").strip()
if not member_center_base_url:
return "", "member_center_base_url is empty (cannot resolve oauth token url)"
token_path = (self.config.member_center_oauth_token_path or "/oauth/token").strip()
token_url = urljoin(f"{member_center_base_url.rstrip('/')}/", token_path.lstrip("/"))
client_id = (self.config.member_center_oauth_client_id or "").strip()
encrypted_secret = (self.config.member_center_oauth_client_secret or "").strip()
client_secret = ""
if encrypted_secret:
try:
client_secret = decrypt_text(encrypted_secret)
except Exception:
client_secret = encrypted_secret
if not client_id or not client_secret:
return ("", "member center oauth client_id/client_secret is empty")
body = urlencode(
{
k: v
for k, v in {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
"scope": scope,
"audience": (self.config.member_center_oauth_audience or "").strip(),
}.items()
if v
}
).encode("utf-8")
request = Request(
token_url,
data=body,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
timeout = max(1, int(self.config.send_engine_timeout_seconds or 10))
result = self._send(request, timeout=timeout)
if not result.ok:
return "", f"oauth token request failed: HTTP {result.status} {result.error}".strip()
access_token = str(result.data.get("access_token") or "").strip()
if not access_token:
return "", "oauth token response missing access_token"
expires_in = int(result.data.get("expires_in") or 300)
self._oauth_token_cache[scope] = (access_token, int(time.time()) + max(60, expires_in))
return access_token, ""
def _send(self, request: Request, timeout: int) -> APIResult:
try:
with urlopen(request, timeout=timeout) as response:
raw = response.read().decode("utf-8")
data = json.loads(raw) if raw else {}
status = getattr(response, "status", 200)
return APIResult(ok=200 <= status < 300, status=status, data=data)
except HTTPError as exc:
raw = exc.read().decode("utf-8") if exc.fp else ""
try:
data = json.loads(raw) if raw else {}
except json.JSONDecodeError:
data = {}
return APIResult(ok=False, status=exc.code, data=data, error=str(exc))
except (URLError, TimeoutError, ValueError, json.JSONDecodeError) as exc:
return APIResult(ok=False, status=0, data={}, error=str(exc))
def send_subscribe_email(*, to_email: str, subject: str, text_body: str, html_body: str, config: NewsletterSystemSettings) -> int:
from_email = build_from_email(config.sender_name, config.sender_email)
reply_to = [config.reply_to_email] if config.reply_to_email else None
if not config.smtp_relay_host:
raise ValueError("SMTP relay host is empty. Please save SMTP settings first.")
if config.smtp_use_tls and config.smtp_use_ssl:
raise ValueError("SMTP TLS and SSL cannot both be enabled.")
password = ""
encrypted_password = (config.smtp_password or "").strip()
if encrypted_password:
try:
password = decrypt_text(encrypted_password)
except Exception:
password = ""
connection = get_connection(
backend="django.core.mail.backends.smtp.EmailBackend",
host=config.smtp_relay_host,
port=config.smtp_relay_port,
username=config.smtp_username or None,
password=password or None,
use_tls=bool(config.smtp_use_tls),
use_ssl=bool(config.smtp_use_ssl),
timeout=max(1, int(config.smtp_timeout_seconds or 15)),
fail_silently=False,
)
message = EmailMultiAlternatives(
subject=subject,
body=text_body,
from_email=from_email or settings.DEFAULT_FROM_EMAIL,
to=[to_email],
reply_to=reply_to,
connection=connection,
)
if html_body:
message.attach_alternative(html_body, "text/html")
charset = (config.default_charset or "utf-8").strip() or "utf-8"
message.encoding = charset
sent_count = message.send(fail_silently=False)
logger.info(
"newsletter email send result sent_count=%s smtp_host=%s smtp_port=%s smtp_tls=%s smtp_ssl=%s smtp_timeout=%s from_email=%s to_email=%s",
sent_count,
config.smtp_relay_host,
config.smtp_relay_port,
bool(config.smtp_use_tls),
bool(config.smtp_use_ssl),
max(1, int(config.smtp_timeout_seconds or 15)),
from_email or settings.DEFAULT_FROM_EMAIL,
to_email,
)
return sent_count