411 lines
10 KiB
Markdown
411 lines
10 KiB
Markdown
# Dead Letter Queues
|
|
|
|
Automatic error handling and retry logic for failed events.
|
|
|
|
## Overview
|
|
|
|
Dead Letter Queues (DLQ) provide reliable error handling for event processing:
|
|
- **Automatic Retry** - Retry failed events with configurable attempts
|
|
- **Exponential Backoff** - Gradually increase retry delays
|
|
- **Dead Lettering** - Move permanently failed events to DLQ
|
|
- **Error Tracking** - Track failure reasons and attempt counts
|
|
|
|
## Quick Start
|
|
|
|
```csharp
|
|
using Svrnty.CQRS.Events.Abstractions;
|
|
|
|
var configStore = serviceProvider.GetRequiredService<IStreamConfigurationStore>();
|
|
|
|
await configStore.SetConfigurationAsync(new StreamConfiguration
|
|
{
|
|
StreamName = "orders",
|
|
DeadLetterQueue = new DeadLetterQueueConfiguration
|
|
{
|
|
Enabled = true,
|
|
MaxDeliveryAttempts = 5,
|
|
RetryDelay = TimeSpan.FromMinutes(5)
|
|
}
|
|
});
|
|
```
|
|
|
|
## DLQ Configuration
|
|
|
|
```csharp
|
|
public class DeadLetterQueueConfiguration
|
|
{
|
|
public bool Enabled { get; set; } // Enable DLQ
|
|
public string? DeadLetterStreamName { get; set; } // Custom DLQ stream
|
|
public int MaxDeliveryAttempts { get; set; } // Retry attempts
|
|
public TimeSpan RetryDelay { get; set; } // Delay between retries
|
|
public bool EnableExponentialBackoff { get; set; } // Exponential backoff
|
|
public double BackoffMultiplier { get; set; } // Backoff factor
|
|
public TimeSpan MaxRetryDelay { get; set; } // Max delay cap
|
|
}
|
|
```
|
|
|
|
## Basic Configuration
|
|
|
|
```csharp
|
|
// Simple DLQ - retry 3 times with 1 minute delay
|
|
var dlqConfig = new DeadLetterQueueConfiguration
|
|
{
|
|
Enabled = true,
|
|
MaxDeliveryAttempts = 3,
|
|
RetryDelay = TimeSpan.FromMinutes(1)
|
|
};
|
|
|
|
await configStore.SetConfigurationAsync(new StreamConfiguration
|
|
{
|
|
StreamName = "orders",
|
|
DeadLetterQueue = dlqConfig
|
|
});
|
|
```
|
|
|
|
## Exponential Backoff
|
|
|
|
Gradually increase retry delays:
|
|
|
|
```csharp
|
|
// Start with 1 minute, double each time, max 1 hour
|
|
var dlqConfig = new DeadLetterQueueConfiguration
|
|
{
|
|
Enabled = true,
|
|
MaxDeliveryAttempts = 10,
|
|
RetryDelay = TimeSpan.FromMinutes(1),
|
|
EnableExponentialBackoff = true,
|
|
BackoffMultiplier = 2.0,
|
|
MaxRetryDelay = TimeSpan.FromHours(1)
|
|
};
|
|
|
|
// Retry schedule:
|
|
// Attempt 1: Immediate
|
|
// Attempt 2: 1 minute
|
|
// Attempt 3: 2 minutes
|
|
// Attempt 4: 4 minutes
|
|
// Attempt 5: 8 minutes
|
|
// Attempt 6: 16 minutes
|
|
// Attempt 7: 32 minutes
|
|
// Attempt 8: 1 hour (capped)
|
|
// Attempt 9: 1 hour (capped)
|
|
// Attempt 10: 1 hour (capped)
|
|
// Then dead lettered
|
|
|
|
await configStore.SetConfigurationAsync(new StreamConfiguration
|
|
{
|
|
StreamName = "payment-processing",
|
|
DeadLetterQueue = dlqConfig
|
|
});
|
|
```
|
|
|
|
## Custom DLQ Stream
|
|
|
|
Specify custom dead letter stream name:
|
|
|
|
```csharp
|
|
var dlqConfig = new DeadLetterQueueConfiguration
|
|
{
|
|
Enabled = true,
|
|
DeadLetterStreamName = "orders-failed", // Custom name
|
|
MaxDeliveryAttempts = 5,
|
|
RetryDelay = TimeSpan.FromMinutes(5)
|
|
};
|
|
|
|
// Default naming: "{streamName}-dlq" (e.g., "orders-dlq")
|
|
```
|
|
|
|
## Domain-Specific Examples
|
|
|
|
### Payment Processing
|
|
|
|
```csharp
|
|
// Critical - many retries with exponential backoff
|
|
var paymentDlqConfig = new DeadLetterQueueConfiguration
|
|
{
|
|
Enabled = true,
|
|
DeadLetterStreamName = "payment-failures",
|
|
MaxDeliveryAttempts = 10,
|
|
RetryDelay = TimeSpan.FromSeconds(30),
|
|
EnableExponentialBackoff = true,
|
|
BackoffMultiplier = 2.0,
|
|
MaxRetryDelay = TimeSpan.FromHours(2)
|
|
};
|
|
|
|
await configStore.SetConfigurationAsync(new StreamConfiguration
|
|
{
|
|
StreamName = "payment-processing",
|
|
DeadLetterQueue = paymentDlqConfig,
|
|
Tags = new List<string> { "critical", "payments" }
|
|
});
|
|
```
|
|
|
|
### Email Notifications
|
|
|
|
```csharp
|
|
// Non-critical - few retries with short delay
|
|
var emailDlqConfig = new DeadLetterQueueConfiguration
|
|
{
|
|
Enabled = true,
|
|
MaxDeliveryAttempts = 3,
|
|
RetryDelay = TimeSpan.FromMinutes(1),
|
|
EnableExponentialBackoff = false
|
|
};
|
|
|
|
await configStore.SetConfigurationAsync(new StreamConfiguration
|
|
{
|
|
StreamName = "email-notifications",
|
|
DeadLetterQueue = emailDlqConfig,
|
|
Tags = new List<string> { "notifications" }
|
|
});
|
|
```
|
|
|
|
### Order Fulfillment
|
|
|
|
```csharp
|
|
// Business-critical - moderate retries with backoff
|
|
var orderDlqConfig = new DeadLetterQueueConfiguration
|
|
{
|
|
Enabled = true,
|
|
DeadLetterStreamName = "order-fulfillment-dlq",
|
|
MaxDeliveryAttempts = 7,
|
|
RetryDelay = TimeSpan.FromMinutes(5),
|
|
EnableExponentialBackoff = true,
|
|
BackoffMultiplier = 1.5,
|
|
MaxRetryDelay = TimeSpan.FromMinutes(30)
|
|
};
|
|
|
|
await configStore.SetConfigurationAsync(new StreamConfiguration
|
|
{
|
|
StreamName = "order-fulfillment",
|
|
DeadLetterQueue = orderDlqConfig
|
|
});
|
|
```
|
|
|
|
### External API Integration
|
|
|
|
```csharp
|
|
// Transient failures expected - many retries
|
|
var apiDlqConfig = new DeadLetterQueueConfiguration
|
|
{
|
|
Enabled = true,
|
|
DeadLetterStreamName = "external-api-failures",
|
|
MaxDeliveryAttempts = 15,
|
|
RetryDelay = TimeSpan.FromSeconds(10),
|
|
EnableExponentialBackoff = true,
|
|
BackoffMultiplier = 2.0,
|
|
MaxRetryDelay = TimeSpan.FromMinutes(10)
|
|
};
|
|
|
|
await configStore.SetConfigurationAsync(new StreamConfiguration
|
|
{
|
|
StreamName = "external-api-calls",
|
|
DeadLetterQueue = apiDlqConfig
|
|
});
|
|
```
|
|
|
|
## Processing Dead Lettered Events
|
|
|
|
### Manual Inspection
|
|
|
|
```csharp
|
|
var eventStore = serviceProvider.GetRequiredService<IEventStreamStore>();
|
|
|
|
// Read dead letter queue
|
|
await foreach (var failedEvent in eventStore.ReadStreamAsync(
|
|
"orders-dlq",
|
|
fromOffset: 0))
|
|
{
|
|
Console.WriteLine($"Event: {failedEvent.EventId}");
|
|
Console.WriteLine($"Type: {failedEvent.EventType}");
|
|
Console.WriteLine($"Original Stream: {failedEvent.Metadata["OriginalStream"]}");
|
|
Console.WriteLine($"Failure Reason: {failedEvent.Metadata["FailureReason"]}");
|
|
Console.WriteLine($"Attempt Count: {failedEvent.Metadata["AttemptCount"]}");
|
|
Console.WriteLine($"Last Attempt: {failedEvent.Metadata["LastAttemptTime"]}");
|
|
Console.WriteLine();
|
|
}
|
|
```
|
|
|
|
### Reprocessing DLQ
|
|
|
|
```csharp
|
|
// Fix issue, then reprocess dead lettered events
|
|
var eventStore = serviceProvider.GetRequiredService<IEventStreamStore>();
|
|
|
|
await foreach (var failedEvent in eventStore.ReadStreamAsync("orders-dlq"))
|
|
{
|
|
try
|
|
{
|
|
// Attempt to process again
|
|
await ProcessEventAsync(failedEvent);
|
|
|
|
// If successful, remove from DLQ
|
|
await AcknowledgeDlqEventAsync(failedEvent.EventId);
|
|
|
|
_logger.LogInformation(
|
|
"Successfully reprocessed DLQ event {EventId}",
|
|
failedEvent.EventId);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex,
|
|
"Failed to reprocess DLQ event {EventId}",
|
|
failedEvent.EventId);
|
|
// Remains in DLQ for manual review
|
|
}
|
|
}
|
|
```
|
|
|
|
### Automated Redriving
|
|
|
|
```csharp
|
|
public class DlqRedriveService : BackgroundService
|
|
{
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
while (!stoppingToken.IsCancellationRequested)
|
|
{
|
|
// Wait 1 hour between redrive attempts
|
|
await Task.Delay(TimeSpan.FromHours(1), stoppingToken);
|
|
|
|
await RedriveDeadLetterQueueAsync("orders-dlq", stoppingToken);
|
|
}
|
|
}
|
|
|
|
private async Task RedriveDeadLetterQueueAsync(
|
|
string dlqStreamName,
|
|
CancellationToken ct)
|
|
{
|
|
int successCount = 0;
|
|
int failureCount = 0;
|
|
|
|
await foreach (var failedEvent in _eventStore.ReadStreamAsync(dlqStreamName))
|
|
{
|
|
try
|
|
{
|
|
await ProcessEventAsync(failedEvent);
|
|
await AcknowledgeDlqEventAsync(failedEvent.EventId);
|
|
successCount++;
|
|
}
|
|
catch
|
|
{
|
|
failureCount++;
|
|
}
|
|
}
|
|
|
|
_logger.LogInformation(
|
|
"DLQ redrive complete: {Success} succeeded, {Failed} failed",
|
|
successCount,
|
|
failureCount);
|
|
}
|
|
}
|
|
```
|
|
|
|
## Monitoring DLQ
|
|
|
|
### Metrics
|
|
|
|
```csharp
|
|
// Track DLQ metrics
|
|
var dlqMetrics = new
|
|
{
|
|
TotalDeadLettered = await GetDlqCountAsync("orders-dlq"),
|
|
RecentlyDeadLettered = await GetDlqCountSince("orders-dlq", TimeSpan.FromHours(1)),
|
|
OldestDeadLetteredAge = await GetOldestDlqAgeAsync("orders-dlq")
|
|
};
|
|
|
|
if (dlqMetrics.TotalDeadLettered > 100)
|
|
{
|
|
_logger.LogWarning(
|
|
"DLQ has {Count} events - investigation needed",
|
|
dlqMetrics.TotalDeadLettered);
|
|
}
|
|
```
|
|
|
|
### Alerts
|
|
|
|
```csharp
|
|
// Alert on DLQ growth
|
|
var currentDlqCount = await GetDlqCountAsync("orders-dlq");
|
|
|
|
if (currentDlqCount > _threshold)
|
|
{
|
|
await SendAlertAsync(
|
|
"DLQ Threshold Exceeded",
|
|
$"DLQ has {currentDlqCount} events (threshold: {_threshold})");
|
|
}
|
|
```
|
|
|
|
### Health Check
|
|
|
|
```csharp
|
|
public class DlqHealthCheck : IHealthCheck
|
|
{
|
|
public async Task<HealthCheckResult> CheckHealthAsync(
|
|
HealthCheckContext context,
|
|
CancellationToken ct = default)
|
|
{
|
|
var dlqCount = await GetDlqCountAsync("orders-dlq");
|
|
|
|
return dlqCount switch
|
|
{
|
|
0 => HealthCheckResult.Healthy("No dead lettered events"),
|
|
< 10 => HealthCheckResult.Degraded($"{dlqCount} dead lettered events"),
|
|
_ => HealthCheckResult.Unhealthy($"{dlqCount} dead lettered events - investigation required")
|
|
};
|
|
}
|
|
}
|
|
|
|
// Register health check
|
|
builder.Services.AddHealthChecks()
|
|
.AddCheck<DlqHealthCheck>("dlq");
|
|
```
|
|
|
|
## Disabling DLQ
|
|
|
|
```csharp
|
|
// Disable DLQ (events fail immediately without retry)
|
|
var dlqConfig = new DeadLetterQueueConfiguration
|
|
{
|
|
Enabled = false
|
|
};
|
|
|
|
await configStore.SetConfigurationAsync(new StreamConfiguration
|
|
{
|
|
StreamName = "non-critical-logs",
|
|
DeadLetterQueue = dlqConfig
|
|
});
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
### ✅ DO
|
|
|
|
- Enable DLQ for critical streams
|
|
- Use exponential backoff for transient failures
|
|
- Set reasonable max retry delays
|
|
- Monitor DLQ size regularly
|
|
- Set up alerts for DLQ growth
|
|
- Investigate and fix root causes
|
|
- Periodically redrive DLQ
|
|
- Document failure patterns
|
|
- Test retry logic thoroughly
|
|
|
|
### ❌ DON'T
|
|
|
|
- Don't disable DLQ for critical streams
|
|
- Don't retry indefinitely
|
|
- Don't use short delays for rate-limited APIs
|
|
- Don't ignore growing DLQs
|
|
- Don't automatically delete DLQ events
|
|
- Don't retry for permanent failures (e.g., bad data)
|
|
- Don't use same retry config for all streams
|
|
- Don't forget to log failure reasons
|
|
|
|
## See Also
|
|
|
|
- [Stream Configuration Overview](README.md)
|
|
- [Retention Configuration](retention-config.md)
|
|
- [Lifecycle Configuration](lifecycle-config.md)
|
|
- [Performance Configuration](performance-config.md)
|
|
- [Error Handling Best Practices](../../best-practices/error-handling.md)
|