717 lines
24 KiB
C#

using System.Security.Claims;
using System.Text;
using System.Text.Json;
using System.Net.Http.Json;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.EntityFrameworkCore;
using Microsoft.IdentityModel.Tokens;
using SendEngine.Api.Models;
using SendEngine.Api.Services;
using SendEngine.Api.Security;
using SendEngine.Domain.Entities;
using SendEngine.Infrastructure;
using SendEngine.Infrastructure.Data;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddInfrastructure(builder.Configuration);
builder.Services.AddHostedService<DevMockSenderWorker>();
builder.Services.AddHostedService<SqsSesPollerWorker>();
builder.Services.AddScoped<SesEventProcessingService>();
var jwtAuthority = builder.Configuration["Jwt:Authority"];
var jwtMetadataAddress = builder.Configuration["Jwt:MetadataAddress"];
var memberCenterBaseUrl = builder.Configuration["MemberCenter:BaseUrl"];
if (string.IsNullOrWhiteSpace(jwtAuthority) &&
string.IsNullOrWhiteSpace(jwtMetadataAddress) &&
!string.IsNullOrWhiteSpace(memberCenterBaseUrl) &&
Uri.TryCreate(memberCenterBaseUrl, UriKind.Absolute, out var memberCenterBaseUri))
{
jwtMetadataAddress = new Uri(memberCenterBaseUri, "/.well-known/openid-configuration").ToString();
}
var signingKey = builder.Configuration["Jwt:SigningKey"];
var useOidcJwks = !string.IsNullOrWhiteSpace(jwtAuthority) || !string.IsNullOrWhiteSpace(jwtMetadataAddress);
var expectedIssuer = builder.Configuration["Jwt:Issuer"];
var expectedAudience = builder.Configuration["Jwt:Audience"];
var requireHttpsMetadata = builder.Configuration.GetValue("Jwt:RequireHttpsMetadata", false);
if (!useOidcJwks && string.IsNullOrWhiteSpace(signingKey))
{
throw new InvalidOperationException(
"JWT config missing. Set Jwt:Authority or Jwt:MetadataAddress for JWKS validation, " +
"or set Jwt:SigningKey for symmetric key validation.");
}
builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme)
.AddJwtBearer(options =>
{
if (useOidcJwks)
{
if (!string.IsNullOrWhiteSpace(jwtAuthority))
{
options.Authority = jwtAuthority;
}
if (!string.IsNullOrWhiteSpace(jwtMetadataAddress))
{
options.MetadataAddress = jwtMetadataAddress;
}
options.RequireHttpsMetadata = requireHttpsMetadata;
options.TokenValidationParameters = new TokenValidationParameters
{
ValidateIssuer = true,
ValidateAudience = true,
ValidAudience = expectedAudience
};
if (!string.IsNullOrWhiteSpace(expectedIssuer))
{
options.TokenValidationParameters.ValidIssuer = expectedIssuer;
}
}
else
{
options.TokenValidationParameters = new TokenValidationParameters
{
ValidateIssuer = true,
ValidateAudience = true,
ValidateIssuerSigningKey = true,
ValidIssuer = expectedIssuer,
ValidAudience = expectedAudience,
IssuerSigningKey = new SymmetricSecurityKey(Encoding.UTF8.GetBytes(signingKey!))
};
}
options.Events = new JwtBearerEvents
{
OnAuthenticationFailed = context =>
{
if (!context.Request.Path.StartsWithSegments("/api"))
{
return Task.CompletedTask;
}
var logger = context.HttpContext.RequestServices
.GetRequiredService<ILoggerFactory>()
.CreateLogger("SendEngine.Api.Auth");
logger.LogWarning(
context.Exception,
"JWT authentication failed. path={Path} expected_issuer={ExpectedIssuer} expected_audience={ExpectedAudience} mode={Mode}",
context.Request.Path.Value,
expectedIssuer ?? string.Empty,
expectedAudience ?? string.Empty,
useOidcJwks ? "jwks" : "symmetric");
return Task.CompletedTask;
}
};
});
builder.Services.AddAuthorization();
var app = builder.Build();
var autoMigrate = builder.Configuration.GetValue("Db:AutoMigrate", true);
if (autoMigrate)
{
using var scope = app.Services.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<SendEngineDbContext>();
db.Database.Migrate();
}
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
var enableHttpsRedirection = builder.Configuration.GetValue("HttpsRedirection:Enabled", true);
if (enableHttpsRedirection)
{
app.UseHttpsRedirection();
}
app.UseAuthentication();
app.UseAuthorization();
app.Use(async (context, next) =>
{
if (context.Request.Path.StartsWithSegments("/webhooks/subscriptions") ||
context.Request.Path.StartsWithSegments("/webhooks/lists/full-sync"))
{
context.Request.EnableBuffering();
using var reader = new StreamReader(context.Request.Body, Encoding.UTF8, leaveOpen: true);
var rawBody = await reader.ReadToEndAsync();
context.Request.Body.Position = 0;
context.Items[WebhookValidator.RawBodyItemKey] = rawBody;
}
await next();
});
app.MapGet("/health", () => Results.Ok(new { status = "ok" }))
.WithName("Health")
.WithOpenApi();
app.MapPost("/api/send-jobs", async (
HttpContext httpContext,
CreateSendJobRequest request,
SendEngineDbContext db,
ILoggerFactory loggerFactory) =>
{
var logger = loggerFactory.CreateLogger("SendEngine.Api.SendJobs");
if (!HasScope(httpContext.User, "newsletter:send.write"))
{
return Results.StatusCode(StatusCodes.Status403Forbidden);
}
var tenantId = GetTenantId(httpContext.User);
if (tenantId is null)
{
return Results.StatusCode(StatusCodes.Status403Forbidden);
}
if (request.TenantId == Guid.Empty)
{
request.TenantId = tenantId.Value;
}
else if (request.TenantId != tenantId.Value)
{
return Results.StatusCode(StatusCodes.Status403Forbidden);
}
if (request.ListId == Guid.Empty)
{
return Results.UnprocessableEntity(new { error = "list_id_required" });
}
if (string.IsNullOrWhiteSpace(request.Subject))
{
return Results.UnprocessableEntity(new { error = "subject_required" });
}
var hasContent = !string.IsNullOrWhiteSpace(request.BodyHtml)
|| !string.IsNullOrWhiteSpace(request.BodyText)
|| request.Template.HasValue;
if (!hasContent)
{
return Results.UnprocessableEntity(new { error = "content_required" });
}
if (request.WindowStart.HasValue && request.WindowEnd.HasValue
&& request.WindowStart.Value >= request.WindowEnd.Value)
{
return Results.UnprocessableEntity(new { error = "window_invalid" });
}
var tenantExists = await db.Tenants.AsNoTracking().AnyAsync(x => x.Id == request.TenantId);
if (!tenantExists)
{
return Results.UnprocessableEntity(new { error = "tenant_not_found" });
}
var listExists = await db.Lists.AsNoTracking()
.AnyAsync(x => x.Id == request.ListId && x.TenantId == request.TenantId);
if (!listExists)
{
logger.LogWarning(
"CreateSendJob auto-created list because list was missing. tenant_id={TenantId} list_id={ListId}",
request.TenantId,
request.ListId);
db.Lists.Add(new MailingList
{
Id = request.ListId,
TenantId = request.TenantId,
Name = $"list-{request.ListId:N}",
CreatedAt = DateTimeOffset.UtcNow
});
}
var campaign = new Campaign
{
Id = Guid.NewGuid(),
TenantId = request.TenantId,
ListId = request.ListId,
Name = request.Name,
Subject = request.Subject,
BodyHtml = request.BodyHtml,
BodyText = request.BodyText,
Template = request.Template.HasValue ? request.Template.Value.GetRawText() : null,
CreatedAt = DateTimeOffset.UtcNow
};
var sendJob = new SendJob
{
Id = Guid.NewGuid(),
TenantId = request.TenantId,
ListId = request.ListId,
CampaignId = campaign.Id,
ScheduledAt = request.ScheduledAt,
WindowStart = request.WindowStart,
WindowEnd = request.WindowEnd,
Status = "pending",
CreatedAt = DateTimeOffset.UtcNow,
UpdatedAt = DateTimeOffset.UtcNow
};
db.Campaigns.Add(campaign);
db.SendJobs.Add(sendJob);
await db.SaveChangesAsync();
return Results.Ok(new CreateSendJobResponse
{
SendJobId = sendJob.Id,
Status = sendJob.Status
});
}).RequireAuthorization().WithName("CreateSendJob").WithOpenApi();
app.MapGet("/api/send-jobs/{id:guid}", async (HttpContext httpContext, Guid id, SendEngineDbContext db) =>
{
if (!HasScope(httpContext.User, "newsletter:send.read"))
{
return Results.StatusCode(StatusCodes.Status403Forbidden);
}
var tenantId = GetTenantId(httpContext.User);
if (tenantId is null)
{
return Results.StatusCode(StatusCodes.Status403Forbidden);
}
var sendJob = await db.SendJobs.AsNoTracking()
.FirstOrDefaultAsync(x => x.Id == id && x.TenantId == tenantId.Value);
if (sendJob is null)
{
return Results.NotFound();
}
return Results.Ok(new SendJobResponse
{
Id = sendJob.Id,
TenantId = sendJob.TenantId,
ListId = sendJob.ListId,
CampaignId = sendJob.CampaignId,
Status = sendJob.Status,
ScheduledAt = sendJob.ScheduledAt,
WindowStart = sendJob.WindowStart,
WindowEnd = sendJob.WindowEnd
});
}).RequireAuthorization().WithName("GetSendJob").WithOpenApi();
app.MapPost("/api/send-jobs/{id:guid}/cancel", async (HttpContext httpContext, Guid id, SendEngineDbContext db) =>
{
if (!HasScope(httpContext.User, "newsletter:send.write"))
{
return Results.StatusCode(StatusCodes.Status403Forbidden);
}
var tenantId = GetTenantId(httpContext.User);
if (tenantId is null)
{
return Results.StatusCode(StatusCodes.Status403Forbidden);
}
var sendJob = await db.SendJobs.FirstOrDefaultAsync(x => x.Id == id && x.TenantId == tenantId.Value);
if (sendJob is null)
{
return Results.NotFound();
}
sendJob.Status = "cancelled";
sendJob.UpdatedAt = DateTimeOffset.UtcNow;
await db.SaveChangesAsync();
return Results.Ok(new SendJobStatusResponse { Id = sendJob.Id, Status = sendJob.Status });
}).RequireAuthorization().WithName("CancelSendJob").WithOpenApi();
app.MapPost("/webhooks/subscriptions", async (
HttpContext httpContext,
SubscriptionEventRequest request,
SendEngineDbContext db,
ILoggerFactory loggerFactory) =>
{
var logger = loggerFactory.CreateLogger("SendEngine.Webhooks.Subscriptions");
if (request.TenantId == Guid.Empty || request.ListId == Guid.Empty || request.Subscriber.Id == Guid.Empty)
{
logger.LogWarning("Subscription webhook rejected: tenant_id/list_id/subscriber.id required.");
return Results.UnprocessableEntity(new { error = "tenant_id_list_id_subscriber_id_required" });
}
if (!IsSupportedSubscriptionEvent(request.EventType))
{
logger.LogWarning("Subscription webhook rejected: unsupported_event_type={EventType}", request.EventType);
return Results.UnprocessableEntity(new { error = "unsupported_event_type" });
}
var secret = builder.Configuration["Webhook:Secrets:member_center"] ?? string.Empty;
var skewSeconds = builder.Configuration.GetValue("Webhook:TimestampSkewSeconds", 300);
var allowNullTenantClient = builder.Configuration.GetValue("Webhook:AllowNullTenantClient", false);
var validation = await WebhookValidator.ValidateAsync(
httpContext,
db,
secret,
skewSeconds,
request.TenantId,
allowNullTenantClient,
"newsletter:events.write");
if (validation is not null)
{
var reason = httpContext.Items.TryGetValue(WebhookValidator.FailureReasonItemKey, out var value)
? value?.ToString()
: "unknown";
var statusCode = httpContext.Items.TryGetValue(WebhookValidator.FailureStatusCodeItemKey, out var statusValue) &&
statusValue is int code
? code
: StatusCodes.Status401Unauthorized;
logger.LogWarning(
"Subscription webhook auth validation failed. tenant_id={TenantId} event_type={EventType} reason={Reason}",
request.TenantId,
request.EventType,
reason);
return Results.Json(
new
{
error = reason == "replay_detected" ? "replay_detected" : "webhook_auth_failed",
reason
},
statusCode: statusCode);
}
var testFriendlyEnabled = builder.Configuration.GetValue("TestFriendly:Enabled", false);
if (!await EnsureTenantForWebhookAsync(db, request.TenantId, testFriendlyEnabled))
{
logger.LogWarning("Subscription webhook rejected: tenant_not_found. tenant_id={TenantId}", request.TenantId);
return Results.UnprocessableEntity(new { error = "tenant_not_found" });
}
logger.LogInformation(
"Subscription webhook processing started. tenant_id={TenantId} list_id={ListId} subscriber_id={SubscriberId} event_type={EventType}",
request.TenantId,
request.ListId,
request.Subscriber.Id,
request.EventType);
var payload = JsonSerializer.Serialize(request);
var inbox = new EventInbox
{
Id = request.EventId == Guid.Empty ? Guid.NewGuid() : request.EventId,
TenantId = request.TenantId,
EventType = request.EventType,
Source = "member_center",
Payload = payload,
ReceivedAt = DateTimeOffset.UtcNow,
Status = "received"
};
db.EventsInbox.Add(inbox);
await EnsureListExistsAsync(db, request.TenantId, request.ListId);
await ApplySubscriptionSnapshotAsync(db, request);
inbox.Status = "processed";
inbox.ProcessedAt = DateTimeOffset.UtcNow;
try
{
await db.SaveChangesAsync();
}
catch (DbUpdateException)
{
logger.LogWarning(
"Subscription webhook conflict. tenant_id={TenantId} list_id={ListId} subscriber_id={SubscriberId} event_type={EventType}",
request.TenantId,
request.ListId,
request.Subscriber.Id,
request.EventType);
return Results.Conflict(new { error = "duplicate_event_or_unique_violation" });
}
logger.LogInformation(
"Subscription webhook processed. tenant_id={TenantId} list_id={ListId} subscriber_id={SubscriberId} event_type={EventType}",
request.TenantId,
request.ListId,
request.Subscriber.Id,
request.EventType);
return Results.Ok();
}).WithName("SubscriptionWebhook").WithOpenApi();
app.MapPost("/webhooks/lists/full-sync", async (
HttpContext httpContext,
FullSyncBatchRequest request,
SendEngineDbContext db,
ILoggerFactory loggerFactory) =>
{
var logger = loggerFactory.CreateLogger("SendEngine.Webhooks.FullSync");
if (request.TenantId == Guid.Empty || request.ListId == Guid.Empty)
{
logger.LogWarning("Full-sync webhook rejected: tenant_id/list_id required.");
return Results.UnprocessableEntity(new { error = "tenant_id_list_id_required" });
}
var secret = builder.Configuration["Webhook:Secrets:member_center"] ?? string.Empty;
var skewSeconds = builder.Configuration.GetValue("Webhook:TimestampSkewSeconds", 300);
var allowNullTenantClient = builder.Configuration.GetValue("Webhook:AllowNullTenantClient", false);
var validation = await WebhookValidator.ValidateAsync(
httpContext,
db,
secret,
skewSeconds,
request.TenantId,
allowNullTenantClient,
"newsletter:events.write");
if (validation is not null)
{
var reason = httpContext.Items.TryGetValue(WebhookValidator.FailureReasonItemKey, out var value)
? value?.ToString()
: "unknown";
var statusCode = httpContext.Items.TryGetValue(WebhookValidator.FailureStatusCodeItemKey, out var statusValue) &&
statusValue is int code
? code
: StatusCodes.Status401Unauthorized;
logger.LogWarning(
"Full-sync webhook auth validation failed. tenant_id={TenantId} list_id={ListId} sync_id={SyncId} reason={Reason}",
request.TenantId,
request.ListId,
request.SyncId,
reason);
return Results.Json(
new
{
error = reason == "replay_detected" ? "replay_detected" : "webhook_auth_failed",
reason
},
statusCode: statusCode);
}
var testFriendlyEnabled = builder.Configuration.GetValue("TestFriendly:Enabled", false);
if (!await EnsureTenantForWebhookAsync(db, request.TenantId, testFriendlyEnabled))
{
logger.LogWarning("Full-sync webhook rejected: tenant_not_found. tenant_id={TenantId}", request.TenantId);
return Results.UnprocessableEntity(new { error = "tenant_not_found" });
}
logger.LogInformation(
"Full-sync webhook processing started. tenant_id={TenantId} list_id={ListId} sync_id={SyncId} batch={BatchNo}/{BatchTotal} subscriber_count={SubscriberCount}",
request.TenantId,
request.ListId,
request.SyncId,
request.BatchNo,
request.BatchTotal,
request.Subscribers.Count);
var payload = JsonSerializer.Serialize(request);
var inbox = new EventInbox
{
Id = Guid.NewGuid(),
TenantId = request.TenantId,
EventType = "list.full_sync",
Source = "member_center",
Payload = payload,
ReceivedAt = DateTimeOffset.UtcNow,
Status = "received"
};
db.EventsInbox.Add(inbox);
await EnsureListExistsAsync(db, request.TenantId, request.ListId);
foreach (var subscriber in request.Subscribers)
{
if (subscriber.Id == Guid.Empty || string.IsNullOrWhiteSpace(subscriber.Email))
{
continue;
}
await UpsertSubscriptionAsync(
db,
request.ListId,
subscriber.Id,
subscriber.Email,
NormalizeStatus(subscriber.Status, "active"),
subscriber.Preferences);
}
inbox.Status = "processed";
inbox.ProcessedAt = DateTimeOffset.UtcNow;
await db.SaveChangesAsync();
logger.LogInformation(
"Full-sync webhook processed. tenant_id={TenantId} list_id={ListId} sync_id={SyncId} batch={BatchNo}/{BatchTotal}",
request.TenantId,
request.ListId,
request.SyncId,
request.BatchNo,
request.BatchTotal);
return Results.Ok();
}).WithName("FullSyncWebhook").WithOpenApi();
app.MapPost("/webhooks/ses", async (
HttpContext httpContext,
JsonElement body,
SesEventProcessingService sesProcessor) =>
{
var signature = httpContext.Request.Headers["X-Amz-Sns-Signature"].ToString();
var result = await sesProcessor.ProcessBodyAsync(body, signature, httpContext.RequestAborted);
if (result.Success)
{
return Results.StatusCode(result.StatusCode);
}
if (result.TransientFailure)
{
return Results.StatusCode(result.StatusCode);
}
return Results.Json(
new { error = result.Error, reason = result.Reason },
statusCode: result.StatusCode);
}).WithName("SesWebhook").WithOpenApi();
app.Run();
static Guid? GetTenantId(ClaimsPrincipal user)
{
var value = user.FindFirst("tenant_id")?.Value;
return Guid.TryParse(value, out var tenantId) ? tenantId : null;
}
static bool HasScope(ClaimsPrincipal user, string scope)
{
var raw = user.FindFirst("scope")?.Value;
if (string.IsNullOrWhiteSpace(raw))
{
return false;
}
return raw.Split(' ', StringSplitOptions.RemoveEmptyEntries)
.Contains(scope, StringComparer.Ordinal);
}
static bool IsSupportedSubscriptionEvent(string eventType)
{
return eventType is "subscription.activated" or "subscription.unsubscribed" or "preferences.updated";
}
// TEST-FRIENDLY TEMPORARY LOGIC:
// When enabled, webhook ingestion can auto-create missing tenants to simplify local end-to-end testing.
// TODO(remove-test-friendly): Remove this helper and always require pre-provisioned tenant records.
static async Task<bool> EnsureTenantForWebhookAsync(SendEngineDbContext db, Guid tenantId, bool autoCreateTenant)
{
var tenantExists = await db.Tenants.AsNoTracking().AnyAsync(x => x.Id == tenantId);
if (tenantExists)
{
return true;
}
if (!autoCreateTenant)
{
return false;
}
db.Tenants.Add(new Tenant
{
Id = tenantId,
Name = $"tenant-{tenantId:N}",
CreatedAt = DateTimeOffset.UtcNow
});
await db.SaveChangesAsync();
return true;
}
static async Task EnsureListExistsAsync(SendEngineDbContext db, Guid tenantId, Guid listId)
{
var listExists = await db.Lists.AsNoTracking()
.AnyAsync(x => x.Id == listId && x.TenantId == tenantId);
if (listExists)
{
return;
}
db.Lists.Add(new MailingList
{
Id = listId,
TenantId = tenantId,
Name = $"list-{listId:N}",
CreatedAt = DateTimeOffset.UtcNow
});
}
static async Task ApplySubscriptionSnapshotAsync(SendEngineDbContext db, SubscriptionEventRequest request)
{
var status = request.EventType switch
{
"subscription.activated" => "active",
"subscription.unsubscribed" => "unsubscribed",
"preferences.updated" => NormalizeStatus(request.Subscriber.Status, "active"),
_ => "active"
};
await UpsertSubscriptionAsync(
db,
request.ListId,
request.Subscriber.Id,
request.Subscriber.Email,
status,
request.Subscriber.Preferences);
}
static async Task UpsertSubscriptionAsync(
SendEngineDbContext db,
Guid listId,
Guid externalSubscriberId,
string email,
string status,
Dictionary<string, object>? preferences)
{
var now = DateTimeOffset.UtcNow;
var normalizedEmail = email.Trim().ToLowerInvariant();
var subscription = await db.Subscriptions
.FirstOrDefaultAsync(x => x.ListId == listId && x.Email == normalizedEmail);
var preferenceJson = preferences is null ? null : JsonSerializer.Serialize(preferences);
if (subscription is null)
{
subscription = new Subscription
{
Id = Guid.NewGuid(),
ListId = listId,
Email = normalizedEmail,
ExternalSubscriberId = externalSubscriberId == Guid.Empty ? null : externalSubscriberId,
Status = status,
Preferences = preferenceJson,
CreatedAt = now,
UpdatedAt = now
};
db.Subscriptions.Add(subscription);
}
else
{
subscription.Email = normalizedEmail;
subscription.ExternalSubscriberId = externalSubscriberId == Guid.Empty ? subscription.ExternalSubscriberId : externalSubscriberId;
subscription.Status = status;
subscription.Preferences = preferenceJson;
subscription.UpdatedAt = now;
}
}
static string NormalizeStatus(string? status, string fallback)
{
if (string.IsNullOrWhiteSpace(status))
{
return fallback;
}
var value = status.Trim().ToLowerInvariant();
return value switch
{
"active" => "active",
"unsubscribed" => "unsubscribed",
"bounced" => "bounced",
"complaint" => "complaint",
"suppressed" => "suppressed",
_ => fallback
};
}