# Ephemeral Streams Message queue semantics with at-least-once delivery. ## Overview Ephemeral streams provide message queue functionality for background jobs, notifications, and asynchronous task processing. Messages are delivered with visibility timeouts and support acknowledge/nack operations for reliable processing. **Key Features:** - ✅ **At-least-once delivery** - Messages redelivered on failure - ✅ **Visibility timeout** - Hide messages during processing - ✅ **Acknowledge/Nack** - Confirm or reject processing - ✅ **Dead letter queue** - Failed messages moved to DLQ - ✅ **Ordered processing** - FIFO within same stream ## Message Queue Basics ### Enqueue Messages ```csharp public class EmailService { private readonly IEventStreamStore _eventStore; public async Task QueueWelcomeEmailAsync(string email, string name) { var command = new SendEmailCommand { To = email, Subject = "Welcome!", Body = $"Hello {name}, thanks for registering!", TemplateId = "welcome-email" }; // Enqueue to ephemeral stream await _eventStore.EnqueueAsync( streamName: "email-queue", message: command); } public async Task QueuePasswordResetEmailAsync(string email, string resetToken) { var command = new SendEmailCommand { To = email, Subject = "Password Reset", Body = $"Your reset token: {resetToken}", TemplateId = "password-reset" }; await _eventStore.EnqueueAsync("email-queue", command); } } ``` ### Dequeue Messages ```csharp public class EmailWorker : BackgroundService { private readonly IEventStreamStore _eventStore; private readonly IEmailSender _emailSender; private readonly ILogger _logger; protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("Email worker started"); while (!stoppingToken.IsCancellationRequested) { try { // Dequeue message with 5-minute visibility timeout var message = await _eventStore.DequeueAsync( streamName: "email-queue", visibilityTimeout: TimeSpan.FromMinutes(5), cancellationToken: stoppingToken); if (message == null) { // No messages available await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken); continue; } // Process message var command = JsonSerializer.Deserialize(message.Data) ?? throw new InvalidOperationException("Invalid message"); await _emailSender.SendAsync( command.To, command.Subject, command.Body); // Acknowledge successful processing await _eventStore.AcknowledgeAsync("email-queue", message.MessageId); _logger.LogInformation("Email sent to {Email}", command.To); } catch (OperationCanceledException) { break; } catch (Exception ex) { _logger.LogError(ex, "Error processing email"); // Message will be redelivered after visibility timeout } } _logger.LogInformation("Email worker stopped"); } } ``` ## Visibility Timeout Visibility timeout hides messages from other consumers during processing: ``` [Message enqueued] ↓ [Dequeue with 5-min timeout] ↓ Message invisible to other consumers ↓ Processing (< 5 minutes) ↓ [Acknowledge] → Message deleted ↓ OR ↓ Processing fails / timeout expires ↓ [Message visible again] → Redelivered ``` ### Setting Appropriate Timeout ```csharp // Short tasks (< 30 seconds) var message = await _eventStore.DequeueAsync( "quick-jobs", visibilityTimeout: TimeSpan.FromSeconds(30)); // Medium tasks (1-5 minutes) var message = await _eventStore.DequeueAsync( "medium-jobs", visibilityTimeout: TimeSpan.FromMinutes(5)); // Long tasks (30+ minutes) var message = await _eventStore.DequeueAsync( "long-running-jobs", visibilityTimeout: TimeSpan.FromMinutes(30)); ``` ### Extending Visibility Timeout For very long tasks, extend timeout periodically: ```csharp public async Task ProcessLongRunningJobAsync(StoredMessage message, CancellationToken ct) { // Start background task to extend visibility using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); var extendTask = Task.Run(async () => { while (!cts.Token.IsCancellationRequested) { await Task.Delay(TimeSpan.FromMinutes(4), cts.Token); // Extend visibility by another 5 minutes await _eventStore.ExtendVisibilityTimeoutAsync( "long-jobs", message.MessageId, TimeSpan.FromMinutes(5)); } }, cts.Token); try { // Process job (may take 20+ minutes) await ProcessJobAsync(message); // Acknowledge await _eventStore.AcknowledgeAsync("long-jobs", message.MessageId); } finally { // Stop extending visibility cts.Cancel(); try { await extendTask; } catch { } } } ``` ## Acknowledge and Nack ### Acknowledge (Success) ```csharp var message = await _eventStore.DequeueAsync("orders", TimeSpan.FromMinutes(5)); try { await ProcessOrderAsync(message); // Success - remove message from queue await _eventStore.AcknowledgeAsync("orders", message.MessageId); } catch (Exception ex) { _logger.LogError(ex, "Failed to process order"); // Don't acknowledge - message will be redelivered } ``` ### Nack (Failure) Explicitly reject a message and make it visible immediately: ```csharp var message = await _eventStore.DequeueAsync("orders", TimeSpan.FromMinutes(5)); try { await ProcessOrderAsync(message); await _eventStore.AcknowledgeAsync("orders", message.MessageId); } catch (ValidationException ex) { _logger.LogWarning(ex, "Invalid order - moving to DLQ"); // Nack with immediate visibility (retry right away) await _eventStore.NackAsync( streamName: "orders", messageId: message.MessageId, redeliverAfter: TimeSpan.Zero); } catch (Exception ex) { _logger.LogError(ex, "Transient error - retry after delay"); // Nack with delay (retry after 1 minute) await _eventStore.NackAsync( "orders", message.MessageId, redeliverAfter: TimeSpan.FromMinutes(1)); } ``` ## Dead Letter Queue Messages that fail repeatedly are moved to dead letter queue: ### Configure DLQ ```csharp builder.Services.AddPostgresEventStreaming( connectionString, options => { options.DefaultDeadLetterQueueName = "dlq"; options.MaxDeliveryAttempts = 5; // Move to DLQ after 5 failures }); ``` ### Process DLQ Messages ```csharp public class DeadLetterWorker : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { var message = await _eventStore.DequeueAsync("dlq", TimeSpan.FromMinutes(10), stoppingToken); if (message == null) { await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken); continue; } // Log failed message _logger.LogError( "Dead letter message: {MessageId}, Original stream: {StreamName}, Attempts: {Attempts}", message.MessageId, message.Metadata["OriginalStreamName"], message.Metadata["DeliveryAttempts"]); // Optionally store in database for manual investigation await _deadLetterRepository.SaveAsync(message); // Acknowledge to remove from DLQ await _eventStore.AcknowledgeAsync("dlq", message.MessageId); } } } ``` ## Background Job Processing ### Job Queue Pattern ```csharp // Job definition public record ProcessVideoJob { public string VideoId { get; init; } = string.Empty; public string InputUrl { get; init; } = string.Empty; public string OutputFormat { get; init; } = string.Empty; } // Enqueue job public class VideoService { public async Task UploadVideoAsync(Stream videoStream, string format) { var videoId = Guid.NewGuid().ToString(); // Save video to storage var inputUrl = await _storage.SaveAsync(videoId, videoStream); // Queue processing job await _eventStore.EnqueueAsync("video-processing", new ProcessVideoJob { VideoId = videoId, InputUrl = inputUrl, OutputFormat = format }); return videoId; } } // Process jobs public class VideoProcessingWorker : BackgroundService { private const int MaxConcurrentJobs = 4; protected override async Task ExecuteAsync(CancellationToken stoppingToken) { // Process up to 4 videos concurrently var tasks = Enumerable.Range(0, MaxConcurrentJobs) .Select(_ => ProcessJobsAsync(stoppingToken)) .ToList(); await Task.WhenAll(tasks); } private async Task ProcessJobsAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { var message = await _eventStore.DequeueAsync( "video-processing", TimeSpan.FromMinutes(30), // Long timeout for video processing stoppingToken); if (message == null) { await Task.Delay(100, stoppingToken); continue; } try { var job = JsonSerializer.Deserialize(message.Data); // Process video (may take several minutes) await _videoProcessor.ProcessAsync( job.InputUrl, job.OutputFormat, stoppingToken); await _eventStore.AcknowledgeAsync("video-processing", message.MessageId); _logger.LogInformation("Video {VideoId} processed", job.VideoId); } catch (Exception ex) { _logger.LogError(ex, "Video processing failed"); // Will retry after visibility timeout } } } } ``` ## Notification Pattern ```csharp // Notification types public record UserNotification { public int UserId { get; init; } public string Type { get; init; } = string.Empty; public string Title { get; init; } = string.Empty; public string Message { get; init; } = string.Empty; public Dictionary Data { get; init; } = new(); } // Enqueue notifications public class NotificationService { public async Task NotifyUserAsync(int userId, string title, string message) { await _eventStore.EnqueueAsync("user-notifications", new UserNotification { UserId = userId, Type = "info", Title = title, Message = message }); } public async Task NotifyOrderShippedAsync(int userId, int orderId, string trackingNumber) { await _eventStore.EnqueueAsync("user-notifications", new UserNotification { UserId = userId, Type = "order-shipped", Title = "Order Shipped", Message = $"Your order #{orderId} has been shipped", Data = new Dictionary { ["orderId"] = orderId.ToString(), ["trackingNumber"] = trackingNumber } }); } } // Send notifications public class NotificationWorker : BackgroundService { protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { var message = await _eventStore.DequeueAsync( "user-notifications", TimeSpan.FromMinutes(2), stoppingToken); if (message == null) { await Task.Delay(100, stoppingToken); continue; } try { var notification = JsonSerializer.Deserialize(message.Data); // Send via multiple channels await Task.WhenAll( SendPushNotificationAsync(notification), SendEmailNotificationAsync(notification), SaveToNotificationCenterAsync(notification)); await _eventStore.AcknowledgeAsync("user-notifications", message.MessageId); } catch (Exception ex) { _logger.LogError(ex, "Failed to send notification"); } } } } ``` ## Batch Processing Process multiple messages in a batch: ```csharp public async Task ProcessBatchAsync(CancellationToken ct) { const int batchSize = 100; var batch = new List(); // Dequeue batch for (int i = 0; i < batchSize; i++) { var message = await _eventStore.DequeueAsync("analytics", TimeSpan.FromMinutes(10), ct); if (message == null) break; batch.Add(message); } if (batch.Count == 0) return; try { // Process entire batch await ProcessAnalyticsBatchAsync(batch); // Acknowledge all foreach (var message in batch) { await _eventStore.AcknowledgeAsync("analytics", message.MessageId); } } catch (Exception ex) { _logger.LogError(ex, "Batch processing failed"); // All messages will be redelivered } } ``` ## Best Practices ### ✅ DO - Set appropriate visibility timeouts for your workload - Always acknowledge or nack messages - Implement idempotent message handlers - Use dead letter queues for failed messages - Monitor queue depth and processing lag - Scale workers based on queue size ### ❌ DON'T - Don't process messages without acknowledging - Don't use very short visibility timeouts - Don't ignore dead letter queue messages - Don't store large payloads in messages - Don't assume exactly-once delivery - Don't block message processing ## See Also - [Getting Started](getting-started.md) - [Persistent Streams](persistent-streams.md) - [Consumer Groups](../consumer-groups/README.md) - [Stream Configuration](../stream-configuration/dead-letter-queues.md)