using System;
using Svrnty.CQRS.Events.Abstractions.EventStore;
using Svrnty.CQRS.Events.Abstractions.Streaming;
using Svrnty.CQRS.Events.Subscriptions;
using Svrnty.CQRS.Events.Abstractions.Configuration;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Routing;
using Svrnty.CQRS.Events.Abstractions;
namespace Svrnty.CQRS.Events.Management;
///
/// Extension methods for mapping event streaming management API endpoints.
///
///
///
/// Phase 6 Feature:
/// Provides REST API for managing event streams, subscriptions, and consumers.
/// Useful for operational tasks, monitoring, and troubleshooting.
///
///
/// Security Note:
/// These endpoints expose operational data and allow modification of consumer state.
/// In production, use authorization policies to restrict access to administrators only.
///
///
public static class ManagementApiExtensions
{
///
/// Maps event streaming management API endpoints.
///
/// The endpoint route builder.
/// Optional route prefix (default: "api/event-streams").
/// The endpoint route builder for chaining.
///
///
/// Mapped Endpoints:
/// - GET /api/event-streams - List all streams
/// - GET /api/event-streams/{name} - Get stream details
/// - GET /api/event-streams/{name}/subscriptions - List subscriptions
/// - GET /api/event-streams/subscriptions/{id} - Get subscription details
/// - GET /api/event-streams/subscriptions/{id}/consumers - List consumers (if supported)
/// - GET /api/event-streams/subscriptions/{id}/consumers/{consumerId} - Get consumer info
/// - POST /api/event-streams/subscriptions/{id}/consumers/{consumerId}/reset-offset - Reset offset
/// - DELETE /api/event-streams/subscriptions/{id}/consumers/{consumerId} - Remove consumer
///
///
/// Authorization:
/// Consider adding .RequireAuthorization() to these endpoints in production.
///
///
public static IEndpointRouteBuilder MapEventStreamManagementApi(
this IEndpointRouteBuilder endpoints,
string routePrefix = "api/event-streams")
{
var group = endpoints.MapGroup(routePrefix)
.WithTags("Event Stream Management");
// GET /api/event-streams - List all streams
group.MapGet("/", GetAllStreams)
.WithName("GetAllStreams")
.WithSummary("List all event streams")
.WithDescription("Returns information about all configured event streams including length and subscription count.");
// GET /api/event-streams/{name} - Get stream details
group.MapGet("/{name}", GetStream)
.WithName("GetStream")
.WithSummary("Get stream details")
.WithDescription("Returns detailed information about a specific stream including its configuration and subscriptions.");
// GET /api/event-streams/{name}/subscriptions - List subscriptions for a stream
group.MapGet("/{name}/subscriptions", GetStreamSubscriptions)
.WithName("GetStreamSubscriptions")
.WithSummary("List subscriptions for a stream")
.WithDescription("Returns all subscriptions consuming from the specified stream.");
// GET /api/event-streams/subscriptions/{id} - Get subscription details
group.MapGet("/subscriptions/{id}", GetSubscription)
.WithName("GetSubscription")
.WithSummary("Get subscription details")
.WithDescription("Returns detailed information about a specific subscription.");
// GET /api/event-streams/subscriptions/{id}/consumers/{consumerId} - Get consumer info
group.MapGet("/subscriptions/{id}/consumers/{consumerId}", GetConsumerInfo)
.WithName("GetConsumerInfo")
.WithSummary("Get consumer position and lag")
.WithDescription("Returns the current offset, lag, and status of a specific consumer.");
// POST /api/event-streams/subscriptions/{id}/consumers/{consumerId}/reset-offset - Reset offset
group.MapPost("/subscriptions/{id}/consumers/{consumerId}/reset-offset", ResetConsumerOffset)
.WithName("ResetConsumerOffset")
.WithSummary("Reset consumer offset")
.WithDescription("Resets a consumer's offset to a specific position. Use 0 for beginning, -1 for latest.");
return endpoints;
}
private static async Task GetAllStreams(
IEnumerable streamConfigurations,
IEnumerable subscriptions,
IEventStreamStore streamStore,
CancellationToken cancellationToken)
{
var streamInfos = new List();
foreach (var config in streamConfigurations)
{
var streamSubs = subscriptions.Where(s => s.StreamName == config.StreamName).ToList();
long length = 0;
try
{
length = await streamStore.GetStreamLengthAsync(config.StreamName, cancellationToken);
}
catch
{
// Stream might not exist yet
}
streamInfos.Add(new StreamInfo
{
Name = config.StreamName,
Type = config.Type.ToString(),
DeliverySemantics = config.DeliverySemantics.ToString(),
Scope = config.Scope.ToString(),
Length = length,
SubscriptionCount = streamSubs.Count,
Subscriptions = streamSubs.Select(s => s.SubscriptionId).ToList()
});
}
return Results.Ok(streamInfos);
}
private static async Task GetStream(
string name,
IEnumerable streamConfigurations,
IEnumerable subscriptions,
IEventStreamStore streamStore,
CancellationToken cancellationToken)
{
var config = streamConfigurations.FirstOrDefault(s => s.StreamName == name);
if (config == null)
return Results.NotFound(new { error = $"Stream '{name}' not found" });
var streamSubs = subscriptions.Where(s => s.StreamName == name).ToList();
long length = 0;
try
{
length = await streamStore.GetStreamLengthAsync(name, cancellationToken);
}
catch
{
// Stream might not exist yet
}
var info = new StreamInfo
{
Name = config.StreamName,
Type = config.Type.ToString(),
DeliverySemantics = config.DeliverySemantics.ToString(),
Scope = config.Scope.ToString(),
Length = length,
SubscriptionCount = streamSubs.Count,
Subscriptions = streamSubs.Select(s => s.SubscriptionId).ToList()
};
return Results.Ok(info);
}
private static IResult GetStreamSubscriptions(
string name,
IEnumerable streamConfigurations,
IEnumerable subscriptions)
{
var config = streamConfigurations.FirstOrDefault(s => s.StreamName == name);
if (config == null)
return Results.NotFound(new { error = $"Stream '{name}' not found" });
var streamSubs = subscriptions
.Where(s => s.StreamName == name)
.Select(s => new SubscriptionInfo
{
SubscriptionId = s.SubscriptionId,
StreamName = s.StreamName,
Mode = s.Mode.ToString(),
IsActive = s.IsActive,
CreatedAt = s.CreatedAt,
VisibilityTimeout = s.VisibilityTimeout,
EnableUpcasting = s.EnableUpcasting,
TargetEventVersion = s.TargetEventVersion,
Description = s.Description
})
.ToList();
return Results.Ok(streamSubs);
}
private static IResult GetSubscription(
string id,
IEnumerable subscriptions)
{
var subscription = subscriptions.FirstOrDefault(s => s.SubscriptionId == id);
if (subscription == null)
return Results.NotFound(new { error = $"Subscription '{id}' not found" });
var info = new SubscriptionInfo
{
SubscriptionId = subscription.SubscriptionId,
StreamName = subscription.StreamName,
Mode = subscription.Mode.ToString(),
IsActive = subscription.IsActive,
CreatedAt = subscription.CreatedAt,
VisibilityTimeout = subscription.VisibilityTimeout,
EnableUpcasting = subscription.EnableUpcasting,
TargetEventVersion = subscription.TargetEventVersion,
Description = subscription.Description
};
return Results.Ok(info);
}
private static async Task GetConsumerInfo(
string id,
string consumerId,
IEnumerable subscriptions,
IEventStreamStore streamStore,
CancellationToken cancellationToken)
{
var subscription = subscriptions.FirstOrDefault(s => s.SubscriptionId == id);
if (subscription == null)
return Results.NotFound(new { error = $"Subscription '{id}' not found" });
try
{
var streamLength = await streamStore.GetStreamLengthAsync(subscription.StreamName, cancellationToken);
var offset = await streamStore.GetConsumerOffsetAsync(subscription.StreamName, id, cancellationToken);
var lastUpdated = await streamStore.GetConsumerLastUpdateTimeAsync(subscription.StreamName, id, cancellationToken);
var lag = streamLength - offset;
var timeSinceUpdate = DateTimeOffset.UtcNow - lastUpdated;
var isStalled = timeSinceUpdate.TotalMinutes > 5 && lag > 0;
var info = new ConsumerInfo
{
ConsumerId = consumerId,
Offset = offset,
Lag = lag,
LastUpdated = lastUpdated,
IsStalled = isStalled
};
return Results.Ok(info);
}
catch (Exception ex)
{
return Results.Problem(
detail: ex.Message,
statusCode: 500,
title: "Error retrieving consumer information");
}
}
private static async Task ResetConsumerOffset(
string id,
string consumerId,
ResetOffsetRequest request,
IEnumerable subscriptions,
IEventStreamStore streamStore,
CancellationToken cancellationToken)
{
var subscription = subscriptions.FirstOrDefault(s => s.SubscriptionId == id);
if (subscription == null)
return Results.NotFound(new { error = $"Subscription '{id}' not found" });
try
{
long newOffset = request.NewOffset;
// Handle special values
if (newOffset == -1)
{
// Set to latest (end of stream)
newOffset = await streamStore.GetStreamLengthAsync(subscription.StreamName, cancellationToken);
}
else if (newOffset < 0)
{
return Results.BadRequest(new { error = "Offset must be >= 0 or -1 for latest" });
}
// Update the consumer offset
await streamStore.UpdateConsumerOffsetAsync(subscription.StreamName, id, newOffset, cancellationToken);
var streamLength = await streamStore.GetStreamLengthAsync(subscription.StreamName, cancellationToken);
var lag = streamLength - newOffset;
return Results.Ok(new
{
message = "Consumer offset successfully reset",
subscriptionId = id,
consumerId = consumerId,
newOffset = newOffset,
streamLength = streamLength,
lag = lag
});
}
catch (Exception ex)
{
return Results.Problem(
detail: ex.Message,
statusCode: 500,
title: "Error resetting consumer offset");
}
}
}