15 KiB
15 KiB
Ephemeral Streams
Message queue semantics with at-least-once delivery.
Overview
Ephemeral streams provide message queue functionality for background jobs, notifications, and asynchronous task processing. Messages are delivered with visibility timeouts and support acknowledge/nack operations for reliable processing.
Key Features:
- ✅ At-least-once delivery - Messages redelivered on failure
- ✅ Visibility timeout - Hide messages during processing
- ✅ Acknowledge/Nack - Confirm or reject processing
- ✅ Dead letter queue - Failed messages moved to DLQ
- ✅ Ordered processing - FIFO within same stream
Message Queue Basics
Enqueue Messages
public class EmailService
{
private readonly IEventStreamStore _eventStore;
public async Task QueueWelcomeEmailAsync(string email, string name)
{
var command = new SendEmailCommand
{
To = email,
Subject = "Welcome!",
Body = $"Hello {name}, thanks for registering!",
TemplateId = "welcome-email"
};
// Enqueue to ephemeral stream
await _eventStore.EnqueueAsync(
streamName: "email-queue",
message: command);
}
public async Task QueuePasswordResetEmailAsync(string email, string resetToken)
{
var command = new SendEmailCommand
{
To = email,
Subject = "Password Reset",
Body = $"Your reset token: {resetToken}",
TemplateId = "password-reset"
};
await _eventStore.EnqueueAsync("email-queue", command);
}
}
Dequeue Messages
public class EmailWorker : BackgroundService
{
private readonly IEventStreamStore _eventStore;
private readonly IEmailSender _emailSender;
private readonly ILogger<EmailWorker> _logger;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Email worker started");
while (!stoppingToken.IsCancellationRequested)
{
try
{
// Dequeue message with 5-minute visibility timeout
var message = await _eventStore.DequeueAsync(
streamName: "email-queue",
visibilityTimeout: TimeSpan.FromMinutes(5),
cancellationToken: stoppingToken);
if (message == null)
{
// No messages available
await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
continue;
}
// Process message
var command = JsonSerializer.Deserialize<SendEmailCommand>(message.Data)
?? throw new InvalidOperationException("Invalid message");
await _emailSender.SendAsync(
command.To,
command.Subject,
command.Body);
// Acknowledge successful processing
await _eventStore.AcknowledgeAsync("email-queue", message.MessageId);
_logger.LogInformation("Email sent to {Email}", command.To);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing email");
// Message will be redelivered after visibility timeout
}
}
_logger.LogInformation("Email worker stopped");
}
}
Visibility Timeout
Visibility timeout hides messages from other consumers during processing:
[Message enqueued]
↓
[Dequeue with 5-min timeout]
↓
Message invisible to other consumers
↓
Processing (< 5 minutes)
↓
[Acknowledge] → Message deleted
↓
OR
↓
Processing fails / timeout expires
↓
[Message visible again] → Redelivered
Setting Appropriate Timeout
// Short tasks (< 30 seconds)
var message = await _eventStore.DequeueAsync(
"quick-jobs",
visibilityTimeout: TimeSpan.FromSeconds(30));
// Medium tasks (1-5 minutes)
var message = await _eventStore.DequeueAsync(
"medium-jobs",
visibilityTimeout: TimeSpan.FromMinutes(5));
// Long tasks (30+ minutes)
var message = await _eventStore.DequeueAsync(
"long-running-jobs",
visibilityTimeout: TimeSpan.FromMinutes(30));
Extending Visibility Timeout
For very long tasks, extend timeout periodically:
public async Task ProcessLongRunningJobAsync(StoredMessage message, CancellationToken ct)
{
// Start background task to extend visibility
using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
var extendTask = Task.Run(async () =>
{
while (!cts.Token.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromMinutes(4), cts.Token);
// Extend visibility by another 5 minutes
await _eventStore.ExtendVisibilityTimeoutAsync(
"long-jobs",
message.MessageId,
TimeSpan.FromMinutes(5));
}
}, cts.Token);
try
{
// Process job (may take 20+ minutes)
await ProcessJobAsync(message);
// Acknowledge
await _eventStore.AcknowledgeAsync("long-jobs", message.MessageId);
}
finally
{
// Stop extending visibility
cts.Cancel();
try { await extendTask; } catch { }
}
}
Acknowledge and Nack
Acknowledge (Success)
var message = await _eventStore.DequeueAsync("orders", TimeSpan.FromMinutes(5));
try
{
await ProcessOrderAsync(message);
// Success - remove message from queue
await _eventStore.AcknowledgeAsync("orders", message.MessageId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process order");
// Don't acknowledge - message will be redelivered
}
Nack (Failure)
Explicitly reject a message and make it visible immediately:
var message = await _eventStore.DequeueAsync("orders", TimeSpan.FromMinutes(5));
try
{
await ProcessOrderAsync(message);
await _eventStore.AcknowledgeAsync("orders", message.MessageId);
}
catch (ValidationException ex)
{
_logger.LogWarning(ex, "Invalid order - moving to DLQ");
// Nack with immediate visibility (retry right away)
await _eventStore.NackAsync(
streamName: "orders",
messageId: message.MessageId,
redeliverAfter: TimeSpan.Zero);
}
catch (Exception ex)
{
_logger.LogError(ex, "Transient error - retry after delay");
// Nack with delay (retry after 1 minute)
await _eventStore.NackAsync(
"orders",
message.MessageId,
redeliverAfter: TimeSpan.FromMinutes(1));
}
Dead Letter Queue
Messages that fail repeatedly are moved to dead letter queue:
Configure DLQ
builder.Services.AddPostgresEventStreaming(
connectionString,
options =>
{
options.DefaultDeadLetterQueueName = "dlq";
options.MaxDeliveryAttempts = 5; // Move to DLQ after 5 failures
});
Process DLQ Messages
public class DeadLetterWorker : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var message = await _eventStore.DequeueAsync("dlq", TimeSpan.FromMinutes(10), stoppingToken);
if (message == null)
{
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
continue;
}
// Log failed message
_logger.LogError(
"Dead letter message: {MessageId}, Original stream: {StreamName}, Attempts: {Attempts}",
message.MessageId,
message.Metadata["OriginalStreamName"],
message.Metadata["DeliveryAttempts"]);
// Optionally store in database for manual investigation
await _deadLetterRepository.SaveAsync(message);
// Acknowledge to remove from DLQ
await _eventStore.AcknowledgeAsync("dlq", message.MessageId);
}
}
}
Background Job Processing
Job Queue Pattern
// Job definition
public record ProcessVideoJob
{
public string VideoId { get; init; } = string.Empty;
public string InputUrl { get; init; } = string.Empty;
public string OutputFormat { get; init; } = string.Empty;
}
// Enqueue job
public class VideoService
{
public async Task<string> UploadVideoAsync(Stream videoStream, string format)
{
var videoId = Guid.NewGuid().ToString();
// Save video to storage
var inputUrl = await _storage.SaveAsync(videoId, videoStream);
// Queue processing job
await _eventStore.EnqueueAsync("video-processing", new ProcessVideoJob
{
VideoId = videoId,
InputUrl = inputUrl,
OutputFormat = format
});
return videoId;
}
}
// Process jobs
public class VideoProcessingWorker : BackgroundService
{
private const int MaxConcurrentJobs = 4;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Process up to 4 videos concurrently
var tasks = Enumerable.Range(0, MaxConcurrentJobs)
.Select(_ => ProcessJobsAsync(stoppingToken))
.ToList();
await Task.WhenAll(tasks);
}
private async Task ProcessJobsAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var message = await _eventStore.DequeueAsync(
"video-processing",
TimeSpan.FromMinutes(30), // Long timeout for video processing
stoppingToken);
if (message == null)
{
await Task.Delay(100, stoppingToken);
continue;
}
try
{
var job = JsonSerializer.Deserialize<ProcessVideoJob>(message.Data);
// Process video (may take several minutes)
await _videoProcessor.ProcessAsync(
job.InputUrl,
job.OutputFormat,
stoppingToken);
await _eventStore.AcknowledgeAsync("video-processing", message.MessageId);
_logger.LogInformation("Video {VideoId} processed", job.VideoId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Video processing failed");
// Will retry after visibility timeout
}
}
}
}
Notification Pattern
// Notification types
public record UserNotification
{
public int UserId { get; init; }
public string Type { get; init; } = string.Empty;
public string Title { get; init; } = string.Empty;
public string Message { get; init; } = string.Empty;
public Dictionary<string, string> Data { get; init; } = new();
}
// Enqueue notifications
public class NotificationService
{
public async Task NotifyUserAsync(int userId, string title, string message)
{
await _eventStore.EnqueueAsync("user-notifications", new UserNotification
{
UserId = userId,
Type = "info",
Title = title,
Message = message
});
}
public async Task NotifyOrderShippedAsync(int userId, int orderId, string trackingNumber)
{
await _eventStore.EnqueueAsync("user-notifications", new UserNotification
{
UserId = userId,
Type = "order-shipped",
Title = "Order Shipped",
Message = $"Your order #{orderId} has been shipped",
Data = new Dictionary<string, string>
{
["orderId"] = orderId.ToString(),
["trackingNumber"] = trackingNumber
}
});
}
}
// Send notifications
public class NotificationWorker : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var message = await _eventStore.DequeueAsync(
"user-notifications",
TimeSpan.FromMinutes(2),
stoppingToken);
if (message == null)
{
await Task.Delay(100, stoppingToken);
continue;
}
try
{
var notification = JsonSerializer.Deserialize<UserNotification>(message.Data);
// Send via multiple channels
await Task.WhenAll(
SendPushNotificationAsync(notification),
SendEmailNotificationAsync(notification),
SaveToNotificationCenterAsync(notification));
await _eventStore.AcknowledgeAsync("user-notifications", message.MessageId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to send notification");
}
}
}
}
Batch Processing
Process multiple messages in a batch:
public async Task ProcessBatchAsync(CancellationToken ct)
{
const int batchSize = 100;
var batch = new List<StoredMessage>();
// Dequeue batch
for (int i = 0; i < batchSize; i++)
{
var message = await _eventStore.DequeueAsync("analytics", TimeSpan.FromMinutes(10), ct);
if (message == null)
break;
batch.Add(message);
}
if (batch.Count == 0)
return;
try
{
// Process entire batch
await ProcessAnalyticsBatchAsync(batch);
// Acknowledge all
foreach (var message in batch)
{
await _eventStore.AcknowledgeAsync("analytics", message.MessageId);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Batch processing failed");
// All messages will be redelivered
}
}
Best Practices
✅ DO
- Set appropriate visibility timeouts for your workload
- Always acknowledge or nack messages
- Implement idempotent message handlers
- Use dead letter queues for failed messages
- Monitor queue depth and processing lag
- Scale workers based on queue size
❌ DON'T
- Don't process messages without acknowledging
- Don't use very short visibility timeouts
- Don't ignore dead letter queue messages
- Don't store large payloads in messages
- Don't assume exactly-once delivery
- Don't block message processing