dotnet-cqrs/docs/event-streaming/stream-configuration/access-control.md

12 KiB

Access Control

Stream-level permissions and rate limiting for secure event streaming.

Overview

Access control configuration provides fine-grained security per stream:

  • Read/Write Permissions - Control who can read/write events
  • Consumer Group Limits - Prevent resource exhaustion
  • Rate Limiting - Throttle write operations
  • Public/Private Streams - Configure visibility

Quick Start

using Svrnty.CQRS.Events.Abstractions;

var configStore = serviceProvider.GetRequiredService<IStreamConfigurationStore>();

await configStore.SetConfigurationAsync(new StreamConfiguration
{
    StreamName = "orders",
    AccessControl = new AccessControlConfiguration
    {
        PublicRead = false,
        PublicWrite = false,
        AllowedReaders = new List<string> { "admin", "order-service" },
        AllowedWriters = new List<string> { "order-service" }
    }
});

Access Control Properties

public class AccessControlConfiguration
{
    public bool PublicRead { get; set; }                    // Allow anonymous reads
    public bool PublicWrite { get; set; }                   // Allow anonymous writes
    public List<string> AllowedReaders { get; set; }        // Authorized readers
    public List<string> AllowedWriters { get; set; }        // Authorized writers
    public List<string> DeniedReaders { get; set; }         // Explicit deny
    public List<string> DeniedWriters { get; set; }         // Explicit deny
    public int MaxConsumerGroups { get; set; }              // Consumer group limit
    public int MaxEventsPerSecond { get; set; }             // Write rate limit
    public int MaxEventsPerMinute { get; set; }             // Read rate limit
    public bool RequireAuthentication { get; set; }         // Require auth
}

Public vs Private Streams

Public Stream

// Public read-only stream
var accessControl = new AccessControlConfiguration
{
    PublicRead = true,
    PublicWrite = false,
    AllowedWriters = new List<string> { "admin" }
};

await configStore.SetConfigurationAsync(new StreamConfiguration
{
    StreamName = "public-announcements",
    AccessControl = accessControl,
    Tags = new List<string> { "public" }
});

Private Stream

// Private stream - restricted access
var accessControl = new AccessControlConfiguration
{
    PublicRead = false,
    PublicWrite = false,
    AllowedReaders = new List<string> { "admin", "finance-service" },
    AllowedWriters = new List<string> { "finance-service" },
    RequireAuthentication = true
};

await configStore.SetConfigurationAsync(new StreamConfiguration
{
    StreamName = "financial-transactions",
    AccessControl = accessControl,
    Tags = new List<string> { "private", "sensitive" }
});

Reader Permissions

// Multiple authorized readers
var accessControl = new AccessControlConfiguration
{
    PublicRead = false,
    AllowedReaders = new List<string>
    {
        "admin",
        "order-service",
        "analytics-service",
        "reporting-service"
    }
};

await configStore.SetConfigurationAsync(new StreamConfiguration
{
    StreamName = "orders",
    AccessControl = accessControl
});

Writer Permissions

// Single writer (recommended)
var accessControl = new AccessControlConfiguration
{
    PublicWrite = false,
    AllowedWriters = new List<string> { "order-service" }
};

// Multiple writers (use with caution)
var accessControl = new AccessControlConfiguration
{
    PublicWrite = false,
    AllowedWriters = new List<string>
    {
        "order-service",
        "admin-service"
    }
};

Explicit Deny

// Allow all except denied
var accessControl = new AccessControlConfiguration
{
    PublicRead = true,
    DeniedReaders = new List<string> { "untrusted-service" },
    AllowedWriters = new List<string> { "admin" },
    DeniedWriters = new List<string> { "legacy-service" }
};

// Deny takes precedence over allow

Consumer Group Limits

// Limit consumer groups per stream
var accessControl = new AccessControlConfiguration
{
    MaxConsumerGroups = 10  // Max 10 consumer groups
};

await configStore.SetConfigurationAsync(new StreamConfiguration
{
    StreamName = "orders",
    AccessControl = accessControl
});

// Attempts to create 11th consumer group will fail

Rate Limiting

Write Rate Limiting

// Limit write throughput
var accessControl = new AccessControlConfiguration
{
    MaxEventsPerSecond = 1000  // Max 1000 events/sec
};

await configStore.SetConfigurationAsync(new StreamConfiguration
{
    StreamName = "orders",
    AccessControl = accessControl
});

// Writes exceeding limit will be throttled or rejected

Read Rate Limiting

// Limit read throughput per consumer
var accessControl = new AccessControlConfiguration
{
    MaxEventsPerMinute = 60000  // Max 60k events/min (1000/sec)
};

await configStore.SetConfigurationAsync(new StreamConfiguration
{
    StreamName = "analytics",
    AccessControl = accessControl
});

Combined Rate Limiting

// Limit both reads and writes
var accessControl = new AccessControlConfiguration
{
    MaxEventsPerSecond = 500,     // Write limit
    MaxEventsPerMinute = 100000   // Read limit
};

Domain-Specific Examples

Financial Transactions

// Strict access control
var financialAccessControl = new AccessControlConfiguration
{
    PublicRead = false,
    PublicWrite = false,
    AllowedReaders = new List<string> { "admin", "finance-service", "audit-service" },
    AllowedWriters = new List<string> { "finance-service" },
    RequireAuthentication = true,
    MaxConsumerGroups = 5,           // Limited consumers
    MaxEventsPerSecond = 100         // Moderate throughput
};

await configStore.SetConfigurationAsync(new StreamConfiguration
{
    StreamName = "financial-transactions",
    AccessControl = financialAccessControl,
    Tags = new List<string> { "financial", "sensitive", "compliance" }
});

Public Announcements

// Public read, admin write
var announcementAccessControl = new AccessControlConfiguration
{
    PublicRead = true,               // Anyone can read
    PublicWrite = false,
    AllowedWriters = new List<string> { "admin", "announcement-service" },
    MaxConsumerGroups = 100,         // Many consumers allowed
    MaxEventsPerSecond = 10          // Low write volume
};

await configStore.SetConfigurationAsync(new StreamConfiguration
{
    StreamName = "public-announcements",
    AccessControl = announcementAccessControl,
    Tags = new List<string> { "public" }
});

User Activity Logs

// Per-user isolation
var activityAccessControl = new AccessControlConfiguration
{
    PublicRead = false,
    PublicWrite = false,
    // Users can only read their own activity
    AllowedReaders = new List<string> { "user:{userId}", "admin" },
    AllowedWriters = new List<string> { "activity-tracking-service" },
    RequireAuthentication = true,
    MaxEventsPerSecond = 10000       // High volume
};

await configStore.SetConfigurationAsync(new StreamConfiguration
{
    StreamName = "user-activity",
    AccessControl = activityAccessControl
});

Multi-Tenant Events

// Tenant isolation
var tenantAccessControl = new AccessControlConfiguration
{
    PublicRead = false,
    PublicWrite = false,
    AllowedReaders = new List<string> { $"tenant:{tenantId}", "admin" },
    AllowedWriters = new List<string> { $"tenant:{tenantId}" },
    RequireAuthentication = true,
    MaxConsumerGroups = 20,
    MaxEventsPerSecond = 1000
};

await configStore.SetConfigurationAsync(new StreamConfiguration
{
    StreamName = $"tenant-{tenantId}-events",
    AccessControl = tenantAccessControl,
    Tags = new List<string> { "multi-tenant", $"tenant-{tenantId}" }
});

Authorization Integration

ASP.NET Core Integration

// Middleware to enforce access control
app.Use(async (context, next) =>
{
    var streamName = context.Request.RouteValues["streamName"]?.ToString();
    var user = context.User.Identity?.Name;

    var config = await configStore.GetConfigurationAsync(streamName);
    var accessControl = config?.AccessControl;

    if (accessControl != null && !accessControl.PublicRead)
    {
        if (!accessControl.AllowedReaders.Contains(user))
        {
            context.Response.StatusCode = 403;
            await context.Response.WriteAsync("Access denied");
            return;
        }
    }

    await next();
});

Custom Authorization Service

public interface IStreamAuthorizationService
{
    Task<bool> CanReadAsync(string streamName, string userId);
    Task<bool> CanWriteAsync(string streamName, string userId);
}

public class StreamAuthorizationService : IStreamAuthorizationService
{
    private readonly IStreamConfigurationStore _configStore;

    public async Task<bool> CanReadAsync(string streamName, string userId)
    {
        var config = await _configStore.GetConfigurationAsync(streamName);
        var accessControl = config?.AccessControl;

        if (accessControl == null)
            return true;  // No restrictions

        if (accessControl.PublicRead)
            return true;

        if (accessControl.DeniedReaders.Contains(userId))
            return false;

        return accessControl.AllowedReaders.Contains(userId);
    }

    public async Task<bool> CanWriteAsync(string streamName, string userId)
    {
        var config = await _configStore.GetConfigurationAsync(streamName);
        var accessControl = config?.AccessControl;

        if (accessControl == null)
            return true;

        if (accessControl.PublicWrite)
            return true;

        if (accessControl.DeniedWriters.Contains(userId))
            return false;

        return accessControl.AllowedWriters.Contains(userId);
    }
}

// Register service
builder.Services.AddSingleton<IStreamAuthorizationService, StreamAuthorizationService>();

Rate Limiting Implementation

public class StreamRateLimiter
{
    private readonly Dictionary<string, TokenBucket> _buckets = new();

    public async Task<bool> AllowWriteAsync(string streamName, int eventCount)
    {
        var config = await _configStore.GetConfigurationAsync(streamName);
        var limit = config?.AccessControl?.MaxEventsPerSecond ?? int.MaxValue;

        var bucket = GetOrCreateBucket(streamName, limit);
        return bucket.TryConsume(eventCount);
    }

    private TokenBucket GetOrCreateBucket(string streamName, int capacity)
    {
        if (!_buckets.TryGetValue(streamName, out var bucket))
        {
            bucket = new TokenBucket(capacity, TimeSpan.FromSeconds(1));
            _buckets[streamName] = bucket;
        }
        return bucket;
    }
}

Monitoring Access Control

// Track authorization failures
var metrics = new
{
    StreamName = "orders",
    TotalRequests = 1000,
    AllowedRequests = 950,
    DeniedRequests = 50,
    DenialRate = 5.0  // 5%
};

if (metrics.DenialRate > 1.0)
{
    _logger.LogWarning(
        "High denial rate for {Stream}: {Rate:F1}%",
        metrics.StreamName,
        metrics.DenialRate);
}

// Log authorization failures
_logger.LogWarning(
    "Access denied for user {User} to stream {Stream}",
    userId,
    streamName);

Best Practices

DO

  • Use principle of least privilege
  • Require authentication for sensitive streams
  • Limit consumer groups to prevent resource exhaustion
  • Use rate limiting to prevent abuse
  • Use explicit deny for untrusted services
  • Monitor authorization failures
  • Audit access regularly
  • Use role-based access (admin, service, user)
  • Isolate multi-tenant streams
  • Document access requirements

DON'T

  • Don't make sensitive streams public
  • Don't allow unlimited consumer groups
  • Don't skip rate limiting on public streams
  • Don't forget to log denied access
  • Don't use same permissions for all streams
  • Don't hard-code user/service names
  • Don't ignore authorization failures
  • Don't forget authentication requirements

See Also