RabbitMQ com ASP.NET Core e Worker
Visão Geral
Neste cenário, vamos:
- Executar o RabbitMQ em um contêiner Docker
- Criar uma API ASP.NET Core que publica mensagens em uma fila RabbitMQ
- Criar um aplicativo console (Worker) que consome essas mensagens e as processa
Para tornarmos o sistema mais robusto, abordaremos conceitos de durabilidade (para que as mensagens não sejam perdidas caso o broker reinicie) e configuração de prefetch (fair dispatch, para distribuir tarefas de forma mais equilibrada entre múltiplos consumidores).
Subindo o RabbitMQ via Docker
Para rodar o RabbitMQ usando Docker, pode-se utilizar um comando semelhante ao abaixo (adapte se necessário):
docker run -d --hostname my-rabbit --name rabbitmq-container -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management
- 5672: Porta padrão para conexões do protocolo AMQP (utilizada pelas aplicações).
- 15672: Porta para acesso ao Management Plugin (interface web do RabbitMQ).
Após executar o comando:
- Acesse
http://localhost:15672
no navegador. - Faça login com
guest / guest
(credenciais padrão, caso não tenha alterado).
Criando a API (Publicadora de Mensagens)
Estrutura do Projeto
- Trata-se de uma Web API ASP.NET Core contendo um Controller (
OrderController
). - Ela recebe objetos do tipo
Order
via POST, converte em JSON e envia à fila RabbitMQ.
Exemplo de Código
namespace WebAppOrder.Controllers;
[Route("api/[controller]")]
[ApiController]
public class OrderController : ControllerBase
{
private readonly ILogger<OrderController> _logger;
public OrderController(ILogger<OrderController> logger)
{
_logger = logger;
}
[HttpPost("insert")]
public async Task<IActionResult> InsertOrder(Order order)
{
try
{
// 1. Conexão com o RabbitMQ
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
// 2. Declara a fila com durabilidade ativada
await channel.QueueDeclareAsync(
queue: "orderQueue",
durable: true, // <-- Torna a fila durável
exclusive: false,
autoDelete: false,
arguments: null
);
// 3. Converte o objeto em JSON e depois em bytes
var message = JsonSerializer.Serialize(order);
var body = Encoding.UTF8.GetBytes(message);
// 4. Configura as propriedades da mensagem para persistência
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // <-- Mensagens persistentes
// 5. Publica na fila
await channel.BasicPublishAsync(
exchange: string.Empty,
routingKey: "orderQueue",
mandatory: false,
basicProperties: properties,
body: body
);
// Retorna HTTP 202 indicando aceite do processamento assíncrono
return Accepted(order);
}
catch (Exception ex)
{
_logger.LogError("Error inserting order", ex);
return StatusCode(500);
}
}
}
Pontos de Observação
- Fila durável (
durable: true
): assim, se o RabbitMQ for reiniciado, a definição da fila permanece. - Mensagens persistentes (
properties.Persistent = true
): reduz a chance de perda de mensagens em caso de crash do RabbitMQ (não é uma garantia 100%, mas na maioria dos casos é suficiente). return Accepted
: status202
indicando que o pedido foi aceito; o processamento efetivo será feito pelo Worker.
Criando o Worker (Consumidor de Mensagens)
Estrutura do Projeto
- Um projeto Console que se conecta ao RabbitMQ, escuta a fila
orderQueue
e processa cada mensagem. - Ao final do processamento, envia um ack confirmando que a mensagem foi processada com sucesso.
Exemplo de Código
namespace AppOrderWorker;
public class Program
{
public static async Task Main(string[] args)
{
try
{
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
// Declara a fila como durável
await channel.QueueDeclareAsync(
queue: "orderQueue",
durable: true, // <-- fila durável
exclusive: false,
autoDelete: false,
arguments: null
);
// Fair dispatch (prefetch=1): só entrega nova mensagem
// ao worker quando ele terminar a anterior
await channel.BasicQosAsync(
prefetchSize: 0,
prefetchCount: 1,
global: false
);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) =>
{
try
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var order = JsonSerializer.Deserialize<Order>(message);
Console.WriteLine($" [x] Received: #{order.OrderNumber}, " +
$"Item: {order.ItemName}, Price: {order.Price:N2}");
// Simula processamento (ex.: await Task.Delay(...))
// Manual ACK
await channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
}
catch (Exception)
{
// Se algo falhar, devolve a mensagem para a fila
await channel.BasicNackAsync(ea.DeliveryTag, multiple: false, requeue: true);
}
};
// Consumo com autoAck=false para ACK manual
await channel.BasicConsumeAsync(
queue: "orderQueue",
autoAck: false,
consumer: consumer
);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine($"An error occurred: {ex.Message}");
}
}
}
Pontos de Observação
- Fila Durável: mesmo se o broker reiniciar, a definição da fila permanece.
- autoAck = false: A confirmação da mensagem (ACK) é manual, garantindo que a mensagem seja removida da fila apenas depois de processada.
- PrefetchCount = 1 (Fair Dispatch): Impede que o RabbitMQ envie várias mensagens de uma vez a um mesmo consumidor; assim, cada Worker só recebe a próxima mensagem depois de concluir (e dar ACK) na anterior.
- BasicAckAsync: ACK final para liberar a mensagem, que então é removida da fila.
- BasicNackAsync: Se houver falha, podemos devolver a mensagem para reprocessamento (ou descartar, dependendo da estratégia).
Testando o Fluxo
-
Inicie o contêiner com o RabbitMQ (caso ainda não esteja em execução).
-
Rode o Worker (Aplicação Console). Ele ficará aguardando as mensagens.
-
Execute a API e faça uma requisição POST no endpoint configurado, por exemplo:
POST http://localhost:5000/api/order/insert Content-Type: application/json { "orderNumber": 123, "itemName": "Mouse Gamer", "price": 199.90 }
-
Verifique no console do Worker o recebimento e o processamento da mensagem.
Se você iniciar múltiplos Workers, poderá observar como o RabbitMQ distribui as mensagens entre eles, respeitando a configuração de prefetch e durabilidade.
Conclusão
Com poucos ajustes, é possível:
- Tornar filas e mensagens duráveis, garantindo maior resiliência contra reinícios ou falhas do RabbitMQ.
- Configurar prefetch para implementar o fair dispatch, evitando que um único Worker receba uma carga desproporcional de mensagens.
- Manter a arquitetura desacoplada, publicando na API e consumindo no Worker, com confirmação e reenvio em caso de falhas.
Use este exemplo como ponto de partida para configurações avançadas (exchanges personalizadas, bindings, dead-letter queues, publisher confirms, etc.) e crie sistemas altamente escaláveis e tolerantes a falhas.