using Codex.Dal; using Codex.Dal.Entities; using Codex.Dal.Enums; using Codex.Dal.Services; using FluentValidation; using Microsoft.EntityFrameworkCore; using OpenHarbor.CQRS.Abstractions; using System.Text; namespace Codex.CQRS.Commands; /// /// Sends a user message to an agent and receives a response. /// Creates a new conversation if ConversationId is not provided. /// public record SendMessageCommand { /// /// ID of the agent to send the message to /// public Guid AgentId { get; init; } /// /// ID of existing conversation, or null to create a new conversation /// public Guid? ConversationId { get; init; } /// /// User's message content /// public string Message { get; init; } = string.Empty; /// /// Optional user identifier for future authentication support /// public string? UserId { get; init; } } /// /// Result containing the user message, agent response, and conversation metadata /// public record SendMessageResult { /// /// ID of the conversation (new or existing) /// public Guid ConversationId { get; init; } /// /// ID of the stored user message /// public Guid MessageId { get; init; } /// /// ID of the stored agent response message /// public Guid AgentResponseId { get; init; } /// /// The user's message that was sent /// public MessageDto UserMessage { get; init; } = null!; /// /// The agent's response /// public AgentResponseDto AgentResponse { get; init; } = null!; } /// /// Simplified message data transfer object /// public record MessageDto { /// /// Message content /// public string Content { get; init; } = string.Empty; /// /// When the message was created /// public DateTime Timestamp { get; init; } } /// /// Agent response with token usage and cost information /// public record AgentResponseDto { /// /// Response content from the agent /// public string Content { get; init; } = string.Empty; /// /// When the response was generated /// public DateTime Timestamp { get; init; } /// /// Number of input tokens processed /// public int? InputTokens { get; init; } /// /// Number of output tokens generated /// public int? OutputTokens { get; init; } /// /// Estimated cost of the request in USD /// public decimal? EstimatedCost { get; init; } } /// /// Handles sending a message to an agent and storing the conversation /// public class SendMessageCommandHandler : ICommandHandler { private readonly CodexDbContext _dbContext; private readonly IOllamaService _ollamaService; public SendMessageCommandHandler(CodexDbContext dbContext, IOllamaService ollamaService) { _dbContext = dbContext; _ollamaService = ollamaService; } public async Task HandleAsync(SendMessageCommand command, CancellationToken cancellationToken) { // A. Validate agent exists and is active var agent = await _dbContext.Agents .FirstOrDefaultAsync(a => a.Id == command.AgentId && !a.IsDeleted, cancellationToken); if (agent == null) { throw new InvalidOperationException($"Agent with ID {command.AgentId} not found or has been deleted."); } if (agent.Status != AgentStatus.Active) { throw new InvalidOperationException($"Agent '{agent.Name}' is not active. Current status: {agent.Status}"); } // B. Get or create conversation Conversation conversation; if (command.ConversationId.HasValue) { var existingConversation = await _dbContext.Conversations .FirstOrDefaultAsync(c => c.Id == command.ConversationId.Value, cancellationToken); if (existingConversation == null) { throw new InvalidOperationException($"Conversation with ID {command.ConversationId.Value} not found."); } conversation = existingConversation; } else { // Create new conversation with title from first message var title = command.Message.Length > 50 ? command.Message.Substring(0, 50) + "..." : command.Message; conversation = new Conversation { Id = Guid.NewGuid(), Title = title, StartedAt = DateTime.UtcNow, LastMessageAt = DateTime.UtcNow, MessageCount = 0, IsActive = true }; _dbContext.Conversations.Add(conversation); await _dbContext.SaveChangesAsync(cancellationToken); } // C. Store user message var userMessage = new ConversationMessage { Id = Guid.NewGuid(), ConversationId = conversation.Id, Role = MessageRole.User, Content = command.Message, MessageIndex = conversation.MessageCount, IsInActiveWindow = true, CreatedAt = DateTime.UtcNow }; _dbContext.ConversationMessages.Add(userMessage); conversation.MessageCount++; conversation.LastMessageAt = DateTime.UtcNow; await _dbContext.SaveChangesAsync(cancellationToken); // D. Build conversation context (get messages in active window) var contextMessages = await _dbContext.ConversationMessages .AsNoTracking() .Where(m => m.ConversationId == conversation.Id && m.IsInActiveWindow) .OrderByDescending(m => m.MessageIndex) .Take(agent.ConversationWindowSize) .OrderBy(m => m.MessageIndex) .ToListAsync(cancellationToken); // E. Create execution record var execution = new AgentExecution { Id = Guid.NewGuid(), AgentId = agent.Id, ConversationId = conversation.Id, UserPrompt = command.Message, StartedAt = DateTime.UtcNow, Status = ExecutionStatus.Running }; _dbContext.AgentExecutions.Add(execution); await _dbContext.SaveChangesAsync(cancellationToken); // F. Execute agent via Ollama var stopwatch = System.Diagnostics.Stopwatch.StartNew(); OllamaResponse ollamaResponse; try { ollamaResponse = await _ollamaService.GenerateAsync( agent.ModelEndpoint ?? "http://localhost:11434", agent.ModelName, agent.SystemPrompt, contextMessages, command.Message, agent.Temperature, agent.MaxTokens, cancellationToken ); stopwatch.Stop(); } catch (Exception ex) { stopwatch.Stop(); // Update execution to failed status execution.Status = ExecutionStatus.Failed; execution.ErrorMessage = ex.Message; execution.CompletedAt = DateTime.UtcNow; execution.ExecutionTimeMs = stopwatch.ElapsedMilliseconds; await _dbContext.SaveChangesAsync(cancellationToken); throw new InvalidOperationException($"Failed to get response from agent: {ex.Message}", ex); } // G. Store agent response var agentMessage = new ConversationMessage { Id = Guid.NewGuid(), ConversationId = conversation.Id, Role = MessageRole.Assistant, Content = ollamaResponse.Content, MessageIndex = conversation.MessageCount, IsInActiveWindow = true, TokenCount = ollamaResponse.OutputTokens, ExecutionId = execution.Id, CreatedAt = DateTime.UtcNow }; _dbContext.ConversationMessages.Add(agentMessage); conversation.MessageCount++; conversation.LastMessageAt = DateTime.UtcNow; // H. Complete execution record execution.Output = ollamaResponse.Content; execution.CompletedAt = DateTime.UtcNow; execution.ExecutionTimeMs = stopwatch.ElapsedMilliseconds; execution.InputTokens = ollamaResponse.InputTokens; execution.OutputTokens = ollamaResponse.OutputTokens; execution.TotalTokens = (ollamaResponse.InputTokens ?? 0) + (ollamaResponse.OutputTokens ?? 0); execution.Status = ExecutionStatus.Completed; await _dbContext.SaveChangesAsync(cancellationToken); // I. Return result return new SendMessageResult { ConversationId = conversation.Id, MessageId = userMessage.Id, AgentResponseId = agentMessage.Id, UserMessage = new MessageDto { Content = userMessage.Content, Timestamp = userMessage.CreatedAt }, AgentResponse = new AgentResponseDto { Content = agentMessage.Content, Timestamp = agentMessage.CreatedAt, InputTokens = execution.InputTokens, OutputTokens = execution.OutputTokens, EstimatedCost = execution.EstimatedCost } }; } } /// /// Validates SendMessageCommand input /// public class SendMessageCommandValidator : AbstractValidator { public SendMessageCommandValidator() { RuleFor(x => x.AgentId) .NotEmpty() .WithMessage("Agent ID is required."); RuleFor(x => x.Message) .NotEmpty() .WithMessage("Message is required.") .MaximumLength(10000) .WithMessage("Message must not exceed 10,000 characters."); } }