10 KiB
10 KiB
Dead Letter Queues
Automatic error handling and retry logic for failed events.
Overview
Dead Letter Queues (DLQ) provide reliable error handling for event processing:
- Automatic Retry - Retry failed events with configurable attempts
- Exponential Backoff - Gradually increase retry delays
- Dead Lettering - Move permanently failed events to DLQ
- Error Tracking - Track failure reasons and attempt counts
Quick Start
using Svrnty.CQRS.Events.Abstractions;
var configStore = serviceProvider.GetRequiredService<IStreamConfigurationStore>();
await configStore.SetConfigurationAsync(new StreamConfiguration
{
StreamName = "orders",
DeadLetterQueue = new DeadLetterQueueConfiguration
{
Enabled = true,
MaxDeliveryAttempts = 5,
RetryDelay = TimeSpan.FromMinutes(5)
}
});
DLQ Configuration
public class DeadLetterQueueConfiguration
{
public bool Enabled { get; set; } // Enable DLQ
public string? DeadLetterStreamName { get; set; } // Custom DLQ stream
public int MaxDeliveryAttempts { get; set; } // Retry attempts
public TimeSpan RetryDelay { get; set; } // Delay between retries
public bool EnableExponentialBackoff { get; set; } // Exponential backoff
public double BackoffMultiplier { get; set; } // Backoff factor
public TimeSpan MaxRetryDelay { get; set; } // Max delay cap
}
Basic Configuration
// Simple DLQ - retry 3 times with 1 minute delay
var dlqConfig = new DeadLetterQueueConfiguration
{
Enabled = true,
MaxDeliveryAttempts = 3,
RetryDelay = TimeSpan.FromMinutes(1)
};
await configStore.SetConfigurationAsync(new StreamConfiguration
{
StreamName = "orders",
DeadLetterQueue = dlqConfig
});
Exponential Backoff
Gradually increase retry delays:
// Start with 1 minute, double each time, max 1 hour
var dlqConfig = new DeadLetterQueueConfiguration
{
Enabled = true,
MaxDeliveryAttempts = 10,
RetryDelay = TimeSpan.FromMinutes(1),
EnableExponentialBackoff = true,
BackoffMultiplier = 2.0,
MaxRetryDelay = TimeSpan.FromHours(1)
};
// Retry schedule:
// Attempt 1: Immediate
// Attempt 2: 1 minute
// Attempt 3: 2 minutes
// Attempt 4: 4 minutes
// Attempt 5: 8 minutes
// Attempt 6: 16 minutes
// Attempt 7: 32 minutes
// Attempt 8: 1 hour (capped)
// Attempt 9: 1 hour (capped)
// Attempt 10: 1 hour (capped)
// Then dead lettered
await configStore.SetConfigurationAsync(new StreamConfiguration
{
StreamName = "payment-processing",
DeadLetterQueue = dlqConfig
});
Custom DLQ Stream
Specify custom dead letter stream name:
var dlqConfig = new DeadLetterQueueConfiguration
{
Enabled = true,
DeadLetterStreamName = "orders-failed", // Custom name
MaxDeliveryAttempts = 5,
RetryDelay = TimeSpan.FromMinutes(5)
};
// Default naming: "{streamName}-dlq" (e.g., "orders-dlq")
Domain-Specific Examples
Payment Processing
// Critical - many retries with exponential backoff
var paymentDlqConfig = new DeadLetterQueueConfiguration
{
Enabled = true,
DeadLetterStreamName = "payment-failures",
MaxDeliveryAttempts = 10,
RetryDelay = TimeSpan.FromSeconds(30),
EnableExponentialBackoff = true,
BackoffMultiplier = 2.0,
MaxRetryDelay = TimeSpan.FromHours(2)
};
await configStore.SetConfigurationAsync(new StreamConfiguration
{
StreamName = "payment-processing",
DeadLetterQueue = paymentDlqConfig,
Tags = new List<string> { "critical", "payments" }
});
Email Notifications
// Non-critical - few retries with short delay
var emailDlqConfig = new DeadLetterQueueConfiguration
{
Enabled = true,
MaxDeliveryAttempts = 3,
RetryDelay = TimeSpan.FromMinutes(1),
EnableExponentialBackoff = false
};
await configStore.SetConfigurationAsync(new StreamConfiguration
{
StreamName = "email-notifications",
DeadLetterQueue = emailDlqConfig,
Tags = new List<string> { "notifications" }
});
Order Fulfillment
// Business-critical - moderate retries with backoff
var orderDlqConfig = new DeadLetterQueueConfiguration
{
Enabled = true,
DeadLetterStreamName = "order-fulfillment-dlq",
MaxDeliveryAttempts = 7,
RetryDelay = TimeSpan.FromMinutes(5),
EnableExponentialBackoff = true,
BackoffMultiplier = 1.5,
MaxRetryDelay = TimeSpan.FromMinutes(30)
};
await configStore.SetConfigurationAsync(new StreamConfiguration
{
StreamName = "order-fulfillment",
DeadLetterQueue = orderDlqConfig
});
External API Integration
// Transient failures expected - many retries
var apiDlqConfig = new DeadLetterQueueConfiguration
{
Enabled = true,
DeadLetterStreamName = "external-api-failures",
MaxDeliveryAttempts = 15,
RetryDelay = TimeSpan.FromSeconds(10),
EnableExponentialBackoff = true,
BackoffMultiplier = 2.0,
MaxRetryDelay = TimeSpan.FromMinutes(10)
};
await configStore.SetConfigurationAsync(new StreamConfiguration
{
StreamName = "external-api-calls",
DeadLetterQueue = apiDlqConfig
});
Processing Dead Lettered Events
Manual Inspection
var eventStore = serviceProvider.GetRequiredService<IEventStreamStore>();
// Read dead letter queue
await foreach (var failedEvent in eventStore.ReadStreamAsync(
"orders-dlq",
fromOffset: 0))
{
Console.WriteLine($"Event: {failedEvent.EventId}");
Console.WriteLine($"Type: {failedEvent.EventType}");
Console.WriteLine($"Original Stream: {failedEvent.Metadata["OriginalStream"]}");
Console.WriteLine($"Failure Reason: {failedEvent.Metadata["FailureReason"]}");
Console.WriteLine($"Attempt Count: {failedEvent.Metadata["AttemptCount"]}");
Console.WriteLine($"Last Attempt: {failedEvent.Metadata["LastAttemptTime"]}");
Console.WriteLine();
}
Reprocessing DLQ
// Fix issue, then reprocess dead lettered events
var eventStore = serviceProvider.GetRequiredService<IEventStreamStore>();
await foreach (var failedEvent in eventStore.ReadStreamAsync("orders-dlq"))
{
try
{
// Attempt to process again
await ProcessEventAsync(failedEvent);
// If successful, remove from DLQ
await AcknowledgeDlqEventAsync(failedEvent.EventId);
_logger.LogInformation(
"Successfully reprocessed DLQ event {EventId}",
failedEvent.EventId);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to reprocess DLQ event {EventId}",
failedEvent.EventId);
// Remains in DLQ for manual review
}
}
Automated Redriving
public class DlqRedriveService : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Wait 1 hour between redrive attempts
await Task.Delay(TimeSpan.FromHours(1), stoppingToken);
await RedriveDeadLetterQueueAsync("orders-dlq", stoppingToken);
}
}
private async Task RedriveDeadLetterQueueAsync(
string dlqStreamName,
CancellationToken ct)
{
int successCount = 0;
int failureCount = 0;
await foreach (var failedEvent in _eventStore.ReadStreamAsync(dlqStreamName))
{
try
{
await ProcessEventAsync(failedEvent);
await AcknowledgeDlqEventAsync(failedEvent.EventId);
successCount++;
}
catch
{
failureCount++;
}
}
_logger.LogInformation(
"DLQ redrive complete: {Success} succeeded, {Failed} failed",
successCount,
failureCount);
}
}
Monitoring DLQ
Metrics
// Track DLQ metrics
var dlqMetrics = new
{
TotalDeadLettered = await GetDlqCountAsync("orders-dlq"),
RecentlyDeadLettered = await GetDlqCountSince("orders-dlq", TimeSpan.FromHours(1)),
OldestDeadLetteredAge = await GetOldestDlqAgeAsync("orders-dlq")
};
if (dlqMetrics.TotalDeadLettered > 100)
{
_logger.LogWarning(
"DLQ has {Count} events - investigation needed",
dlqMetrics.TotalDeadLettered);
}
Alerts
// Alert on DLQ growth
var currentDlqCount = await GetDlqCountAsync("orders-dlq");
if (currentDlqCount > _threshold)
{
await SendAlertAsync(
"DLQ Threshold Exceeded",
$"DLQ has {currentDlqCount} events (threshold: {_threshold})");
}
Health Check
public class DlqHealthCheck : IHealthCheck
{
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct = default)
{
var dlqCount = await GetDlqCountAsync("orders-dlq");
return dlqCount switch
{
0 => HealthCheckResult.Healthy("No dead lettered events"),
< 10 => HealthCheckResult.Degraded($"{dlqCount} dead lettered events"),
_ => HealthCheckResult.Unhealthy($"{dlqCount} dead lettered events - investigation required")
};
}
}
// Register health check
builder.Services.AddHealthChecks()
.AddCheck<DlqHealthCheck>("dlq");
Disabling DLQ
// Disable DLQ (events fail immediately without retry)
var dlqConfig = new DeadLetterQueueConfiguration
{
Enabled = false
};
await configStore.SetConfigurationAsync(new StreamConfiguration
{
StreamName = "non-critical-logs",
DeadLetterQueue = dlqConfig
});
Best Practices
✅ DO
- Enable DLQ for critical streams
- Use exponential backoff for transient failures
- Set reasonable max retry delays
- Monitor DLQ size regularly
- Set up alerts for DLQ growth
- Investigate and fix root causes
- Periodically redrive DLQ
- Document failure patterns
- Test retry logic thoroughly
❌ DON'T
- Don't disable DLQ for critical streams
- Don't retry indefinitely
- Don't use short delays for rate-limited APIs
- Don't ignore growing DLQs
- Don't automatically delete DLQ events
- Don't retry for permanent failures (e.g., bad data)
- Don't use same retry config for all streams
- Don't forget to log failure reasons