import logging
from urllib.parse import urlencode
import hashlib
import json
from django.contrib import messages
from django.contrib.admin.views.decorators import staff_member_required
from django.core.exceptions import ValidationError
from django.core.validators import validate_email
from django.http import HttpResponseNotAllowed, JsonResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.utils.html import escape
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_GET, require_http_methods, require_POST
from .forms import ContactForm, NewsletterSubscribeForm, NewsletterUnsubscribeForm
from .models import (
MailSmtpSettings,
NewsletterCampaign,
NewsletterSystemSettings,
NewsletterTemplateSettings,
OneClickUnsubscribeAudit,
SystemNotificationMailSettings,
)
from .newsletter import (
MemberCenterClient,
build_from_email,
extract_token,
render_placeholders,
send_contact_notification_email,
send_contact_user_email,
send_subscribe_email,
)
from .newsletter_scheduler import dispatch_campaign
logger = logging.getLogger(__name__)
@require_GET
def health_check(request):
return JsonResponse({"status": "ok"})
def _load_settings(request_or_site=None):
return (
NewsletterSystemSettings.load(request_or_site=request_or_site),
NewsletterTemplateSettings.load(request_or_site=request_or_site),
MailSmtpSettings.load(request_or_site=request_or_site),
)
def _build_context(*, title: str, message: str, success: bool):
return {
"title": title,
"message": message,
"success": success,
}
def _render_contact_template(template: str, values: dict[str, str]) -> str:
rendered = template or ""
for key, value in values.items():
rendered = rendered.replace(f"{{{{{key}}}}}", value)
return rendered
@require_POST
def contact_form_submit(request):
form = ContactForm(request.POST)
if not form.is_valid():
if request.headers.get("x-requested-with") == "XMLHttpRequest":
return JsonResponse({"success": False, "errors": form.errors}, status=400)
messages.error(request, "表單欄位未填完整,請確認後再送出。")
return redirect(request.META.get("HTTP_REFERER") or "/")
submission = form.save(commit=False)
submission.ip_address = request.META.get("REMOTE_ADDR") or ""
submission.user_agent = request.META.get("HTTP_USER_AGENT", "")
submission.save()
notification_settings = SystemNotificationMailSettings.load(request_or_site=request)
smtp_settings = MailSmtpSettings.load(request_or_site=request)
subject_prefix = (notification_settings.contact_form_subject_prefix or "").strip()
subject = f"{subject_prefix} {submission.get_category_display()}".strip()
escaped_message_html = escape(submission.message).replace("\n", "
")
text_body = (
f"Name: {submission.name}\n"
f"Email: {submission.email}\n"
f"Contact: {submission.contact}\n"
f"Category: {submission.get_category_display()}\n"
f"Source Page: {submission.source_page}\n\n"
f"Message:\n{submission.message}\n"
)
html_body = (
"
Name: "
f"{escape(submission.name)}
"
"Email: "
f"{escape(submission.email)}
"
"Contact: "
f"{escape(submission.contact)}
"
"Category: "
f"{escape(submission.get_category_display())}
"
"Source Page: "
f"{escape(submission.source_page or '')}
"
"Message:
"
f"{escaped_message_html}
"
)
try:
send_contact_notification_email(
subject=subject,
text_body=text_body,
html_body=html_body,
notification_config=notification_settings,
smtp_config=smtp_settings,
)
except Exception as exc:
logger.warning("contact form admin notification email failed: %s", exc)
if submission.email:
values_text = {
"name": submission.name,
"email": submission.email,
"contact": submission.contact,
"category": submission.get_category_display(),
"message": submission.message,
"source_page": submission.source_page or "",
}
values_html = {
"name": escape(submission.name),
"email": escape(submission.email),
"contact": escape(submission.contact),
"category": escape(submission.get_category_display()),
"message": escape(submission.message).replace("\n", "
"),
"source_page": escape(submission.source_page or ""),
}
user_subject = _render_contact_template(
notification_settings.contact_form_user_subject_template,
values_text,
)
user_text = _render_contact_template(
notification_settings.contact_form_user_text_template,
values_text,
)
user_html = _render_contact_template(
notification_settings.contact_form_user_html_template,
values_html,
)
try:
send_contact_user_email(
to_email=submission.email,
subject=user_subject,
text_body=user_text,
html_body=user_html,
notification_config=notification_settings,
smtp_config=smtp_settings,
)
except Exception as exc:
logger.warning("contact form user copy email failed: %s", exc)
if request.headers.get("x-requested-with") == "XMLHttpRequest":
return JsonResponse({"success": True})
messages.success(request, "感謝您的來信,我們已收到您的表單。")
return redirect(request.META.get("HTTP_REFERER") or "/")
def _extract_one_click_token(request):
token = (request.GET.get("token") or "").strip()
if token:
return token
if request.method == "POST":
token = (request.POST.get("token") or "").strip()
if token:
return token
if request.body:
try:
body = json.loads(request.body.decode("utf-8"))
token = (body.get("token") or "").strip()
if token:
return token
except Exception:
pass
return ""
def _handle_one_click_unsubscribe(*, request, token):
system_settings, _, _ = _load_settings(request)
token_hash = hashlib.sha256(token.encode("utf-8")).hexdigest() if token else ""
if not token:
return False, 400, "退訂連結無效。"
existing = OneClickUnsubscribeAudit.objects.filter(
token_hash=token_hash,
status__in=["success", "already_unsubscribed"],
).first()
if existing:
return True, 200, "您已完成退訂。"
result = MemberCenterClient(system_settings).unsubscribe({"token": token})
response_data = result.data if isinstance(result.data, dict) else {}
if result.ok:
already = bool(response_data.get("already_unsubscribed"))
success = bool(response_data.get("success", True)) or already
if success:
OneClickUnsubscribeAudit.objects.update_or_create(
token_hash=token_hash,
defaults={
"subscriber_id": "",
"list_id": "",
"site_id": "",
"campaign_id": "",
"status": "already_unsubscribed" if already else "success",
"response_status": result.status,
"response_payload": response_data,
},
)
return True, 200, "您已完成退訂。"
OneClickUnsubscribeAudit.objects.update_or_create(
token_hash=token_hash,
defaults={
"subscriber_id": "",
"list_id": "",
"site_id": "",
"campaign_id": "",
"status": "failed",
"response_status": result.status,
"response_payload": response_data,
"error_message": result.error,
},
)
return False, 502, "退訂服務暫時無法使用。"
@require_http_methods(["POST"])
def newsletter_subscribe(request):
form = NewsletterSubscribeForm(request.POST)
system_settings, template_settings, smtp_settings = _load_settings(request)
if not form.is_valid():
return render(
request,
"base/newsletter/status.html",
_build_context(
title="訂閱失敗",
message="請輸入正確 email。",
success=False,
),
status=400,
)
email = form.cleaned_data["email"].strip().lower()
client = MemberCenterClient(system_settings)
subscribe_payload = {
"email": email,
"list_id": system_settings.member_center_list_id,
"source": "wagtail",
}
subscribe_result = client.subscribe(subscribe_payload)
if not subscribe_result.ok:
return render(
request,
"base/newsletter/status.html",
_build_context(
title="訂閱失敗",
message="訂閱服務暫時無法使用,請稍後再試。",
success=False,
),
status=502,
)
token = extract_token(subscribe_result.data)
confirm_url = request.build_absolute_uri(reverse("newsletter_confirm"))
unsubscribe_url = request.build_absolute_uri(reverse("newsletter_unsubscribe"))
if token:
query = urlencode({"token": token, "email": email})
confirm_url = f"{confirm_url}?{query}"
unsubscribe_url = f"{unsubscribe_url}?{query}"
values = {
"token": token,
"email": email,
"list_id": system_settings.member_center_list_id,
"tenant_id": system_settings.member_center_tenant_id,
"confirm_url": confirm_url,
"unsubscribe_url": unsubscribe_url,
}
try:
send_subscribe_email(
to_email=email,
subject=render_placeholders(template_settings.subscribe_subject_template, values),
text_body=render_placeholders(template_settings.subscribe_text_template, values),
html_body=render_placeholders(template_settings.subscribe_html_template, values),
config=system_settings,
smtp_config=smtp_settings,
)
except Exception:
return render(
request,
"base/newsletter/status.html",
_build_context(
title="訂閱失敗",
message="確認信寄送失敗,請稍後再試。",
success=False,
),
status=502,
)
return render(
request,
"base/newsletter/status.html",
_build_context(
title="訂閱確認已送出!",
message="感謝您的訂閱
下一步,請前往訂閱的信箱收取確認信函
在信函中點擊連結",
success=True,
),
)
@require_GET
def newsletter_confirm(request):
token = (request.GET.get("token") or "").strip()
email = (request.GET.get("email") or "").strip().lower()
system_settings, template_settings, _ = _load_settings(request)
if not token:
return render(
request,
"base/newsletter/status.html",
_build_context(
title="訂閱認證失敗",
message=template_settings.confirm_failure_template,
success=False,
),
status=400,
)
result = MemberCenterClient(system_settings).confirm(token)
if result.ok:
return render(
request,
"base/newsletter/status.html",
_build_context(
title="訂閱認證成功!",
message=template_settings.confirm_success_template,
success=True,
),
)
return render(
request,
"base/newsletter/status.html",
_build_context(
title="訂閱認證失敗",
message=template_settings.confirm_failure_template,
success=False,
),
status=400,
)
@require_http_methods(["GET", "POST"])
def newsletter_unsubscribe(request):
system_settings, template_settings, _ = _load_settings(request)
client = MemberCenterClient(system_settings)
if request.method == "GET":
token = (request.GET.get("token") or "").strip()
email = (request.GET.get("email") or "").strip().lower()
if not token and email:
token_result = client.request_unsubscribe_token(
{
"email": email,
"list_id": system_settings.member_center_list_id,
}
)
if token_result.ok:
token = extract_token(token_result.data)
form = NewsletterUnsubscribeForm(initial={"email": email, "token": token})
can_submit = bool(token)
return render(
request,
"base/newsletter/unsubscribe.html",
{
"form": form,
"can_submit": can_submit,
"intro_message": template_settings.unsubscribe_intro_template,
},
status=200 if can_submit else 400,
)
if request.method != "POST":
return HttpResponseNotAllowed(["GET", "POST"])
form = NewsletterUnsubscribeForm(request.POST)
if not form.is_valid():
return render(
request,
"base/newsletter/status.html",
_build_context(
title="退訂失敗",
message=template_settings.unsubscribe_failure_template,
success=False,
),
status=400,
)
payload = {
"token": form.cleaned_data["token"],
}
result = client.unsubscribe(payload)
if result.ok:
return render(
request,
"base/newsletter/status.html",
_build_context(
title="退訂成功",
message=template_settings.unsubscribe_success_template,
success=True,
),
)
return render(
request,
"base/newsletter/status.html",
_build_context(
title="退訂失敗",
message=template_settings.unsubscribe_failure_template,
success=False,
),
status=400,
)
@csrf_exempt
@require_http_methods(["GET", "POST"])
def one_click_unsubscribe(request):
token = _extract_one_click_token(request)
ok, status_code, message = _handle_one_click_unsubscribe(request=request, token=token)
if request.method == "POST":
body = {"success": ok}
if not ok and status_code in (400, 410):
body["error"] = message
return JsonResponse(body, status=status_code)
return render(
request,
"base/newsletter/status.html",
_build_context(
title="退訂成功" if ok else "退訂失敗",
message=f"{message}
",
success=ok,
),
status=status_code,
)
@staff_member_required
@require_GET
def newsletter_campaign_send_now(request, campaign_id: int):
campaign = get_object_or_404(NewsletterCampaign, pk=campaign_id)
if campaign.status == NewsletterCampaign.STATUS_SENDING:
messages.error(request, "Campaign is currently sending.")
return redirect(request.META.get("HTTP_REFERER") or reverse("wagtailadmin_home"))
result = dispatch_campaign(campaign, schedule_retry_on_failure=False)
if result.get("failed", 0):
messages.error(
request,
f"Send now completed with failures. sent={result.get('sent', 0)} failed={result.get('failed', 0)}",
)
else:
messages.success(
request,
f"Send now completed. sent={result.get('sent', 0)} failed={result.get('failed', 0)}",
)
return redirect(request.META.get("HTTP_REFERER") or reverse("wagtailadmin_home"))
@staff_member_required
@require_POST
def newsletter_smtp_test(request):
to_email = (request.POST.get("smtp_test_email") or "").strip().lower()
redirect_to = request.META.get("HTTP_REFERER") or reverse("wagtailadmin_home")
if not to_email:
messages.error(request, "請輸入測試收件 Email。")
return redirect(redirect_to)
try:
validate_email(to_email)
except ValidationError:
messages.error(request, "測試收件 Email 格式不正確。")
return redirect(redirect_to)
settings_obj = NewsletterSystemSettings.load(request_or_site=request)
smtp_settings = MailSmtpSettings.load(request_or_site=request)
try:
sent_count = send_subscribe_email(
to_email=to_email,
subject="[SMTP Test] Newsletter SMTP 設定測試",
text_body="這是一封測試信,代表 SMTP 設定可正常寄送。",
html_body="這是一封測試信,代表 SMTP 設定可正常寄送。
",
config=settings_obj,
smtp_config=smtp_settings,
)
except Exception as exc:
messages.error(request, f"測試信寄送失敗:{exc}")
return redirect(redirect_to)
from_email = build_from_email(settings_obj.sender_name, settings_obj.sender_email)
messages.success(
request,
f"SMTP 已接受請求(sent_count={sent_count}),from={from_email or 'settings.DEFAULT_FROM_EMAIL'},to={to_email}",
)
return redirect(redirect_to)