Warren Chen f55c766881 Add subscription floating action button with toggle functionality
- Implemented a floating action button (FAB) for newsletter subscription in the template.
- Added JavaScript to handle the toggle state of the FAB and close it on outside clicks or Escape key press.
- Created CSS styles for the FAB, including animations and responsive design.
- Added a Django template tag to return a random default cover image for the FAB.
- Integrated a form for email input and submission within the FAB.
2026-04-02 02:51:39 +09:00

400 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from urllib.parse import urlencode
import hashlib
import json
from django.contrib import messages
from django.contrib.admin.views.decorators import staff_member_required
from django.core.exceptions import ValidationError
from django.core.validators import validate_email
from django.http import HttpResponseNotAllowed, JsonResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_GET, require_http_methods, require_POST
from .forms import NewsletterSubscribeForm, NewsletterUnsubscribeForm
from .models import (
NewsletterCampaign,
NewsletterSystemSettings,
NewsletterTemplateSettings,
OneClickUnsubscribeAudit,
)
from .newsletter import (
MemberCenterClient,
build_from_email,
extract_token,
render_placeholders,
send_subscribe_email,
)
from .newsletter_scheduler import dispatch_campaign
def _load_settings():
return NewsletterSystemSettings.load(), NewsletterTemplateSettings.load()
def _build_context(*, title: str, message: str, success: bool):
return {
"title": title,
"message": message,
"success": success,
}
def _extract_one_click_token(request):
token = (request.GET.get("token") or "").strip()
if token:
return token
if request.method == "POST":
token = (request.POST.get("token") or "").strip()
if token:
return token
if request.body:
try:
body = json.loads(request.body.decode("utf-8"))
token = (body.get("token") or "").strip()
if token:
return token
except Exception:
pass
return ""
def _handle_one_click_unsubscribe(*, request, token):
system_settings, _ = _load_settings()
token_hash = hashlib.sha256(token.encode("utf-8")).hexdigest() if token else ""
if not token:
return False, 400, "退訂連結無效。"
existing = OneClickUnsubscribeAudit.objects.filter(
token_hash=token_hash,
status__in=["success", "already_unsubscribed"],
).first()
if existing:
return True, 200, "您已完成退訂。"
result = MemberCenterClient(system_settings).unsubscribe({"token": token})
response_data = result.data if isinstance(result.data, dict) else {}
if result.ok:
already = bool(response_data.get("already_unsubscribed"))
success = bool(response_data.get("success", True)) or already
if success:
OneClickUnsubscribeAudit.objects.update_or_create(
token_hash=token_hash,
defaults={
"subscriber_id": "",
"list_id": "",
"site_id": "",
"campaign_id": "",
"status": "already_unsubscribed" if already else "success",
"response_status": result.status,
"response_payload": response_data,
},
)
return True, 200, "您已完成退訂。"
OneClickUnsubscribeAudit.objects.update_or_create(
token_hash=token_hash,
defaults={
"subscriber_id": "",
"list_id": "",
"site_id": "",
"campaign_id": "",
"status": "failed",
"response_status": result.status,
"response_payload": response_data,
"error_message": result.error,
},
)
return False, 502, "退訂服務暫時無法使用。"
@require_http_methods(["POST"])
def newsletter_subscribe(request):
form = NewsletterSubscribeForm(request.POST)
system_settings, template_settings = _load_settings()
if not form.is_valid():
return render(
request,
"base/newsletter/status.html",
_build_context(
title="訂閱失敗",
message="請輸入正確 email。",
success=False,
),
status=400,
)
email = form.cleaned_data["email"].strip().lower()
client = MemberCenterClient(system_settings)
subscribe_payload = {
"email": email,
"list_id": system_settings.member_center_list_id,
"source": "wagtail",
}
subscribe_result = client.subscribe(subscribe_payload)
if not subscribe_result.ok:
return render(
request,
"base/newsletter/status.html",
_build_context(
title="訂閱失敗",
message="訂閱服務暫時無法使用,請稍後再試。",
success=False,
),
status=502,
)
token = extract_token(subscribe_result.data)
confirm_url = request.build_absolute_uri(reverse("newsletter_confirm"))
unsubscribe_url = request.build_absolute_uri(reverse("newsletter_unsubscribe"))
if token:
query = urlencode({"token": token, "email": email})
confirm_url = f"{confirm_url}?{query}"
unsubscribe_url = f"{unsubscribe_url}?{query}"
values = {
"token": token,
"email": email,
"list_id": system_settings.member_center_list_id,
"tenant_id": system_settings.member_center_tenant_id,
"confirm_url": confirm_url,
"unsubscribe_url": unsubscribe_url,
}
try:
send_subscribe_email(
to_email=email,
subject=render_placeholders(template_settings.subscribe_subject_template, values),
text_body=render_placeholders(template_settings.subscribe_text_template, values),
html_body=render_placeholders(template_settings.subscribe_html_template, values),
config=system_settings,
)
except Exception:
return render(
request,
"base/newsletter/status.html",
_build_context(
title="訂閱失敗",
message="確認信寄送失敗,請稍後再試。",
success=False,
),
status=502,
)
return render(
request,
"base/newsletter/status.html",
_build_context(
title="訂閱確認已送出!",
message="感謝您的訂閱<br>下一步,請前往訂閱的信箱收取確認信函<br><br>在信函中點擊連結",
success=True,
),
)
@require_GET
def newsletter_confirm(request):
token = (request.GET.get("token") or "").strip()
email = (request.GET.get("email") or "").strip().lower()
system_settings, template_settings = _load_settings()
if not token:
return render(
request,
"base/newsletter/status.html",
_build_context(
title="訂閱認證失敗",
message=template_settings.confirm_failure_template,
success=False,
),
status=400,
)
result = MemberCenterClient(system_settings).confirm(token)
if result.ok:
return render(
request,
"base/newsletter/status.html",
_build_context(
title="訂閱認證成功!",
message=template_settings.confirm_success_template,
success=True,
),
)
return render(
request,
"base/newsletter/status.html",
_build_context(
title="訂閱認證失敗",
message=template_settings.confirm_failure_template,
success=False,
),
status=400,
)
@require_http_methods(["GET", "POST"])
def newsletter_unsubscribe(request):
system_settings, template_settings = _load_settings()
client = MemberCenterClient(system_settings)
if request.method == "GET":
token = (request.GET.get("token") or "").strip()
email = (request.GET.get("email") or "").strip().lower()
if not token and email:
token_result = client.request_unsubscribe_token(
{
"email": email,
"list_id": system_settings.member_center_list_id,
}
)
if token_result.ok:
token = extract_token(token_result.data)
form = NewsletterUnsubscribeForm(initial={"email": email, "token": token})
can_submit = bool(token)
return render(
request,
"base/newsletter/unsubscribe.html",
{
"form": form,
"can_submit": can_submit,
"intro_message": template_settings.unsubscribe_intro_template,
},
status=200 if can_submit else 400,
)
if request.method != "POST":
return HttpResponseNotAllowed(["GET", "POST"])
form = NewsletterUnsubscribeForm(request.POST)
if not form.is_valid():
return render(
request,
"base/newsletter/status.html",
_build_context(
title="退訂失敗",
message=template_settings.unsubscribe_failure_template,
success=False,
),
status=400,
)
payload = {
"token": form.cleaned_data["token"],
}
result = client.unsubscribe(payload)
if result.ok:
return render(
request,
"base/newsletter/status.html",
_build_context(
title="退訂成功",
message=template_settings.unsubscribe_success_template,
success=True,
),
)
return render(
request,
"base/newsletter/status.html",
_build_context(
title="退訂失敗",
message=template_settings.unsubscribe_failure_template,
success=False,
),
status=400,
)
@csrf_exempt
@require_http_methods(["GET", "POST"])
def one_click_unsubscribe(request):
token = _extract_one_click_token(request)
ok, status_code, message = _handle_one_click_unsubscribe(request=request, token=token)
if request.method == "POST":
body = {"success": ok}
if not ok and status_code in (400, 410):
body["error"] = message
return JsonResponse(body, status=status_code)
return render(
request,
"base/newsletter/status.html",
_build_context(
title="退訂成功" if ok else "退訂失敗",
message=f"<p>{message}</p>",
success=ok,
),
status=status_code,
)
@staff_member_required
@require_GET
def newsletter_campaign_send_now(request, campaign_id: int):
campaign = get_object_or_404(NewsletterCampaign, pk=campaign_id)
if campaign.status == NewsletterCampaign.STATUS_SENDING:
messages.error(request, "Campaign is currently sending.")
return redirect(request.META.get("HTTP_REFERER") or reverse("wagtailadmin_home"))
result = dispatch_campaign(campaign, schedule_retry_on_failure=False)
if result.get("failed", 0):
messages.error(
request,
f"Send now completed with failures. sent={result.get('sent', 0)} failed={result.get('failed', 0)}",
)
else:
messages.success(
request,
f"Send now completed. sent={result.get('sent', 0)} failed={result.get('failed', 0)}",
)
return redirect(request.META.get("HTTP_REFERER") or reverse("wagtailadmin_home"))
@staff_member_required
@require_POST
def newsletter_smtp_test(request):
to_email = (request.POST.get("smtp_test_email") or "").strip().lower()
redirect_to = request.META.get("HTTP_REFERER") or reverse("wagtailadmin_home")
if not to_email:
messages.error(request, "請輸入測試收件 Email。")
return redirect(redirect_to)
try:
validate_email(to_email)
except ValidationError:
messages.error(request, "測試收件 Email 格式不正確。")
return redirect(redirect_to)
settings_obj = NewsletterSystemSettings.load(request_or_site=request)
try:
sent_count = send_subscribe_email(
to_email=to_email,
subject="[SMTP Test] Newsletter SMTP 設定測試",
text_body="這是一封測試信,代表 SMTP 設定可正常寄送。",
html_body="<p>這是一封測試信,代表 SMTP 設定可正常寄送。</p>",
config=settings_obj,
)
except Exception as exc:
messages.error(request, f"測試信寄送失敗:{exc}")
return redirect(redirect_to)
from_email = build_from_email(settings_obj.sender_name, settings_obj.sender_email)
messages.success(
request,
f"SMTP 已接受請求sent_count={sent_count}from={from_email or 'settings.DEFAULT_FROM_EMAIL'}to={to_email}",
)
return redirect(redirect_to)