using System; using Svrnty.CQRS.Events.Abstractions.EventStore; using Svrnty.CQRS.Events.Abstractions.Streaming; using Svrnty.CQRS.Events.Subscriptions; using Svrnty.CQRS.Events.Abstractions.Configuration; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Routing; using Svrnty.CQRS.Events.Abstractions; namespace Svrnty.CQRS.Events.Management; /// /// Extension methods for mapping event streaming management API endpoints. /// /// /// /// Phase 6 Feature: /// Provides REST API for managing event streams, subscriptions, and consumers. /// Useful for operational tasks, monitoring, and troubleshooting. /// /// /// Security Note: /// These endpoints expose operational data and allow modification of consumer state. /// In production, use authorization policies to restrict access to administrators only. /// /// public static class ManagementApiExtensions { /// /// Maps event streaming management API endpoints. /// /// The endpoint route builder. /// Optional route prefix (default: "api/event-streams"). /// The endpoint route builder for chaining. /// /// /// Mapped Endpoints: /// - GET /api/event-streams - List all streams /// - GET /api/event-streams/{name} - Get stream details /// - GET /api/event-streams/{name}/subscriptions - List subscriptions /// - GET /api/event-streams/subscriptions/{id} - Get subscription details /// - GET /api/event-streams/subscriptions/{id}/consumers - List consumers (if supported) /// - GET /api/event-streams/subscriptions/{id}/consumers/{consumerId} - Get consumer info /// - POST /api/event-streams/subscriptions/{id}/consumers/{consumerId}/reset-offset - Reset offset /// - DELETE /api/event-streams/subscriptions/{id}/consumers/{consumerId} - Remove consumer /// /// /// Authorization: /// Consider adding .RequireAuthorization() to these endpoints in production. /// /// public static IEndpointRouteBuilder MapEventStreamManagementApi( this IEndpointRouteBuilder endpoints, string routePrefix = "api/event-streams") { var group = endpoints.MapGroup(routePrefix) .WithTags("Event Stream Management"); // GET /api/event-streams - List all streams group.MapGet("/", GetAllStreams) .WithName("GetAllStreams") .WithSummary("List all event streams") .WithDescription("Returns information about all configured event streams including length and subscription count."); // GET /api/event-streams/{name} - Get stream details group.MapGet("/{name}", GetStream) .WithName("GetStream") .WithSummary("Get stream details") .WithDescription("Returns detailed information about a specific stream including its configuration and subscriptions."); // GET /api/event-streams/{name}/subscriptions - List subscriptions for a stream group.MapGet("/{name}/subscriptions", GetStreamSubscriptions) .WithName("GetStreamSubscriptions") .WithSummary("List subscriptions for a stream") .WithDescription("Returns all subscriptions consuming from the specified stream."); // GET /api/event-streams/subscriptions/{id} - Get subscription details group.MapGet("/subscriptions/{id}", GetSubscription) .WithName("GetSubscription") .WithSummary("Get subscription details") .WithDescription("Returns detailed information about a specific subscription."); // GET /api/event-streams/subscriptions/{id}/consumers/{consumerId} - Get consumer info group.MapGet("/subscriptions/{id}/consumers/{consumerId}", GetConsumerInfo) .WithName("GetConsumerInfo") .WithSummary("Get consumer position and lag") .WithDescription("Returns the current offset, lag, and status of a specific consumer."); // POST /api/event-streams/subscriptions/{id}/consumers/{consumerId}/reset-offset - Reset offset group.MapPost("/subscriptions/{id}/consumers/{consumerId}/reset-offset", ResetConsumerOffset) .WithName("ResetConsumerOffset") .WithSummary("Reset consumer offset") .WithDescription("Resets a consumer's offset to a specific position. Use 0 for beginning, -1 for latest."); return endpoints; } private static async Task GetAllStreams( IEnumerable streamConfigurations, IEnumerable subscriptions, IEventStreamStore streamStore, CancellationToken cancellationToken) { var streamInfos = new List(); foreach (var config in streamConfigurations) { var streamSubs = subscriptions.Where(s => s.StreamName == config.StreamName).ToList(); long length = 0; try { length = await streamStore.GetStreamLengthAsync(config.StreamName, cancellationToken); } catch { // Stream might not exist yet } streamInfos.Add(new StreamInfo { Name = config.StreamName, Type = config.Type.ToString(), DeliverySemantics = config.DeliverySemantics.ToString(), Scope = config.Scope.ToString(), Length = length, SubscriptionCount = streamSubs.Count, Subscriptions = streamSubs.Select(s => s.SubscriptionId).ToList() }); } return Results.Ok(streamInfos); } private static async Task GetStream( string name, IEnumerable streamConfigurations, IEnumerable subscriptions, IEventStreamStore streamStore, CancellationToken cancellationToken) { var config = streamConfigurations.FirstOrDefault(s => s.StreamName == name); if (config == null) return Results.NotFound(new { error = $"Stream '{name}' not found" }); var streamSubs = subscriptions.Where(s => s.StreamName == name).ToList(); long length = 0; try { length = await streamStore.GetStreamLengthAsync(name, cancellationToken); } catch { // Stream might not exist yet } var info = new StreamInfo { Name = config.StreamName, Type = config.Type.ToString(), DeliverySemantics = config.DeliverySemantics.ToString(), Scope = config.Scope.ToString(), Length = length, SubscriptionCount = streamSubs.Count, Subscriptions = streamSubs.Select(s => s.SubscriptionId).ToList() }; return Results.Ok(info); } private static IResult GetStreamSubscriptions( string name, IEnumerable streamConfigurations, IEnumerable subscriptions) { var config = streamConfigurations.FirstOrDefault(s => s.StreamName == name); if (config == null) return Results.NotFound(new { error = $"Stream '{name}' not found" }); var streamSubs = subscriptions .Where(s => s.StreamName == name) .Select(s => new SubscriptionInfo { SubscriptionId = s.SubscriptionId, StreamName = s.StreamName, Mode = s.Mode.ToString(), IsActive = s.IsActive, CreatedAt = s.CreatedAt, VisibilityTimeout = s.VisibilityTimeout, EnableUpcasting = s.EnableUpcasting, TargetEventVersion = s.TargetEventVersion, Description = s.Description }) .ToList(); return Results.Ok(streamSubs); } private static IResult GetSubscription( string id, IEnumerable subscriptions) { var subscription = subscriptions.FirstOrDefault(s => s.SubscriptionId == id); if (subscription == null) return Results.NotFound(new { error = $"Subscription '{id}' not found" }); var info = new SubscriptionInfo { SubscriptionId = subscription.SubscriptionId, StreamName = subscription.StreamName, Mode = subscription.Mode.ToString(), IsActive = subscription.IsActive, CreatedAt = subscription.CreatedAt, VisibilityTimeout = subscription.VisibilityTimeout, EnableUpcasting = subscription.EnableUpcasting, TargetEventVersion = subscription.TargetEventVersion, Description = subscription.Description }; return Results.Ok(info); } private static async Task GetConsumerInfo( string id, string consumerId, IEnumerable subscriptions, IEventStreamStore streamStore, CancellationToken cancellationToken) { var subscription = subscriptions.FirstOrDefault(s => s.SubscriptionId == id); if (subscription == null) return Results.NotFound(new { error = $"Subscription '{id}' not found" }); try { var streamLength = await streamStore.GetStreamLengthAsync(subscription.StreamName, cancellationToken); var offset = await streamStore.GetConsumerOffsetAsync(subscription.StreamName, id, cancellationToken); var lastUpdated = await streamStore.GetConsumerLastUpdateTimeAsync(subscription.StreamName, id, cancellationToken); var lag = streamLength - offset; var timeSinceUpdate = DateTimeOffset.UtcNow - lastUpdated; var isStalled = timeSinceUpdate.TotalMinutes > 5 && lag > 0; var info = new ConsumerInfo { ConsumerId = consumerId, Offset = offset, Lag = lag, LastUpdated = lastUpdated, IsStalled = isStalled }; return Results.Ok(info); } catch (Exception ex) { return Results.Problem( detail: ex.Message, statusCode: 500, title: "Error retrieving consumer information"); } } private static async Task ResetConsumerOffset( string id, string consumerId, ResetOffsetRequest request, IEnumerable subscriptions, IEventStreamStore streamStore, CancellationToken cancellationToken) { var subscription = subscriptions.FirstOrDefault(s => s.SubscriptionId == id); if (subscription == null) return Results.NotFound(new { error = $"Subscription '{id}' not found" }); try { long newOffset = request.NewOffset; // Handle special values if (newOffset == -1) { // Set to latest (end of stream) newOffset = await streamStore.GetStreamLengthAsync(subscription.StreamName, cancellationToken); } else if (newOffset < 0) { return Results.BadRequest(new { error = "Offset must be >= 0 or -1 for latest" }); } // Update the consumer offset await streamStore.UpdateConsumerOffsetAsync(subscription.StreamName, id, newOffset, cancellationToken); var streamLength = await streamStore.GetStreamLengthAsync(subscription.StreamName, cancellationToken); var lag = streamLength - newOffset; return Results.Ok(new { message = "Consumer offset successfully reset", subscriptionId = id, consumerId = consumerId, newOffset = newOffset, streamLength = streamLength, lag = lag }); } catch (Exception ex) { return Results.Problem( detail: ex.Message, statusCode: 500, title: "Error resetting consumer offset"); } } }