using Codex.Dal;
using Codex.Dal.Entities;
using Codex.Dal.Enums;
using Codex.Dal.Services;
using FluentValidation;
using Microsoft.EntityFrameworkCore;
using OpenHarbor.CQRS.Abstractions;
using System.Text;
namespace Codex.CQRS.Commands;
///
/// Sends a user message to an agent and receives a response.
/// Creates a new conversation if ConversationId is not provided.
///
public record SendMessageCommand
{
///
/// ID of the agent to send the message to
///
public Guid AgentId { get; init; }
///
/// ID of existing conversation, or null to create a new conversation
///
public Guid? ConversationId { get; init; }
///
/// User's message content
///
public string Message { get; init; } = string.Empty;
///
/// Optional user identifier for future authentication support
///
public string? UserId { get; init; }
}
///
/// Result containing the user message, agent response, and conversation metadata
///
public record SendMessageResult
{
///
/// ID of the conversation (new or existing)
///
public Guid ConversationId { get; init; }
///
/// ID of the stored user message
///
public Guid MessageId { get; init; }
///
/// ID of the stored agent response message
///
public Guid AgentResponseId { get; init; }
///
/// The user's message that was sent
///
public MessageDto UserMessage { get; init; } = null!;
///
/// The agent's response
///
public AgentResponseDto AgentResponse { get; init; } = null!;
}
///
/// Simplified message data transfer object
///
public record MessageDto
{
///
/// Message content
///
public string Content { get; init; } = string.Empty;
///
/// When the message was created
///
public DateTime Timestamp { get; init; }
}
///
/// Agent response with token usage and cost information
///
public record AgentResponseDto
{
///
/// Response content from the agent
///
public string Content { get; init; } = string.Empty;
///
/// When the response was generated
///
public DateTime Timestamp { get; init; }
///
/// Number of input tokens processed
///
public int? InputTokens { get; init; }
///
/// Number of output tokens generated
///
public int? OutputTokens { get; init; }
///
/// Estimated cost of the request in USD
///
public decimal? EstimatedCost { get; init; }
}
///
/// Handles sending a message to an agent and storing the conversation
///
public class SendMessageCommandHandler : ICommandHandler
{
private readonly CodexDbContext _dbContext;
private readonly IOllamaService _ollamaService;
public SendMessageCommandHandler(CodexDbContext dbContext, IOllamaService ollamaService)
{
_dbContext = dbContext;
_ollamaService = ollamaService;
}
public async Task HandleAsync(SendMessageCommand command, CancellationToken cancellationToken)
{
// A. Validate agent exists and is active
var agent = await _dbContext.Agents
.FirstOrDefaultAsync(a => a.Id == command.AgentId && !a.IsDeleted, cancellationToken);
if (agent == null)
{
throw new InvalidOperationException($"Agent with ID {command.AgentId} not found or has been deleted.");
}
if (agent.Status != AgentStatus.Active)
{
throw new InvalidOperationException($"Agent '{agent.Name}' is not active. Current status: {agent.Status}");
}
// B. Get or create conversation
Conversation conversation;
if (command.ConversationId.HasValue)
{
var existingConversation = await _dbContext.Conversations
.FirstOrDefaultAsync(c => c.Id == command.ConversationId.Value, cancellationToken);
if (existingConversation == null)
{
throw new InvalidOperationException($"Conversation with ID {command.ConversationId.Value} not found.");
}
conversation = existingConversation;
}
else
{
// Create new conversation with title from first message
var title = command.Message.Length > 50
? command.Message.Substring(0, 50) + "..."
: command.Message;
conversation = new Conversation
{
Id = Guid.NewGuid(),
Title = title,
StartedAt = DateTime.UtcNow,
LastMessageAt = DateTime.UtcNow,
MessageCount = 0,
IsActive = true
};
_dbContext.Conversations.Add(conversation);
await _dbContext.SaveChangesAsync(cancellationToken);
}
// C. Store user message
var userMessage = new ConversationMessage
{
Id = Guid.NewGuid(),
ConversationId = conversation.Id,
Role = MessageRole.User,
Content = command.Message,
MessageIndex = conversation.MessageCount,
IsInActiveWindow = true,
CreatedAt = DateTime.UtcNow
};
_dbContext.ConversationMessages.Add(userMessage);
conversation.MessageCount++;
conversation.LastMessageAt = DateTime.UtcNow;
await _dbContext.SaveChangesAsync(cancellationToken);
// D. Build conversation context (get messages in active window)
var contextMessages = await _dbContext.ConversationMessages
.AsNoTracking()
.Where(m => m.ConversationId == conversation.Id && m.IsInActiveWindow)
.OrderByDescending(m => m.MessageIndex)
.Take(agent.ConversationWindowSize)
.OrderBy(m => m.MessageIndex)
.ToListAsync(cancellationToken);
// E. Create execution record
var execution = new AgentExecution
{
Id = Guid.NewGuid(),
AgentId = agent.Id,
ConversationId = conversation.Id,
UserPrompt = command.Message,
StartedAt = DateTime.UtcNow,
Status = ExecutionStatus.Running
};
_dbContext.AgentExecutions.Add(execution);
await _dbContext.SaveChangesAsync(cancellationToken);
// F. Execute agent via Ollama
var stopwatch = System.Diagnostics.Stopwatch.StartNew();
OllamaResponse ollamaResponse;
try
{
ollamaResponse = await _ollamaService.GenerateAsync(
agent.ModelEndpoint ?? "http://localhost:11434",
agent.ModelName,
agent.SystemPrompt,
contextMessages,
command.Message,
agent.Temperature,
agent.MaxTokens,
cancellationToken
);
stopwatch.Stop();
}
catch (Exception ex)
{
stopwatch.Stop();
// Update execution to failed status
execution.Status = ExecutionStatus.Failed;
execution.ErrorMessage = ex.Message;
execution.CompletedAt = DateTime.UtcNow;
execution.ExecutionTimeMs = stopwatch.ElapsedMilliseconds;
await _dbContext.SaveChangesAsync(cancellationToken);
throw new InvalidOperationException($"Failed to get response from agent: {ex.Message}", ex);
}
// G. Store agent response
var agentMessage = new ConversationMessage
{
Id = Guid.NewGuid(),
ConversationId = conversation.Id,
Role = MessageRole.Assistant,
Content = ollamaResponse.Content,
MessageIndex = conversation.MessageCount,
IsInActiveWindow = true,
TokenCount = ollamaResponse.OutputTokens,
ExecutionId = execution.Id,
CreatedAt = DateTime.UtcNow
};
_dbContext.ConversationMessages.Add(agentMessage);
conversation.MessageCount++;
conversation.LastMessageAt = DateTime.UtcNow;
// H. Complete execution record
execution.Output = ollamaResponse.Content;
execution.CompletedAt = DateTime.UtcNow;
execution.ExecutionTimeMs = stopwatch.ElapsedMilliseconds;
execution.InputTokens = ollamaResponse.InputTokens;
execution.OutputTokens = ollamaResponse.OutputTokens;
execution.TotalTokens = (ollamaResponse.InputTokens ?? 0) + (ollamaResponse.OutputTokens ?? 0);
execution.Status = ExecutionStatus.Completed;
await _dbContext.SaveChangesAsync(cancellationToken);
// I. Return result
return new SendMessageResult
{
ConversationId = conversation.Id,
MessageId = userMessage.Id,
AgentResponseId = agentMessage.Id,
UserMessage = new MessageDto
{
Content = userMessage.Content,
Timestamp = userMessage.CreatedAt
},
AgentResponse = new AgentResponseDto
{
Content = agentMessage.Content,
Timestamp = agentMessage.CreatedAt,
InputTokens = execution.InputTokens,
OutputTokens = execution.OutputTokens,
EstimatedCost = execution.EstimatedCost
}
};
}
}
///
/// Validates SendMessageCommand input
///
public class SendMessageCommandValidator : AbstractValidator
{
public SendMessageCommandValidator()
{
RuleFor(x => x.AgentId)
.NotEmpty()
.WithMessage("Agent ID is required.");
RuleFor(x => x.Message)
.NotEmpty()
.WithMessage("Message is required.")
.MaximumLength(10000)
.WithMessage("Message must not exceed 10,000 characters.");
}
}