Exchange Direct no RabbitMQ
Introdução
Em tutoriais anteriores, exploramos o Fanout Exchange (que envia cada mensagem para todas as filas bindadas). Agora, examinamos o Direct Exchange, que permite encaminhar mensagens para filas específicas com base em uma routing key exata.
Isso é útil quando queremos, por exemplo, ter filas diferentes para tipos distintos de mensagens, sem fazer broadcast para todas as filas.
Visão Geral do Exchange Direct
O Direct Exchange roteará a mensagem apenas para filas cujo binding key corresponda exatamente à routing key fornecida ao publicar.
Exemplo Simplificado
- Fila A está vinculada ao Exchange com binding key
"order_new". - Fila B está vinculada ao Exchange com binding key
"order_upd".
Se publicarmos uma mensagem com routing key "order_new", apenas a Fila A a receberá.
No caso do código abaixo, temos:
- Uma fila “order” (que recebe
"order_new"e"order_upd"). - Uma fila “finance_orders” (que recebe apenas
"order_new").
Dessa forma, as mensagens com routing key "order_new" são enviadas para duas filas (porque "order" e "finance_orders" ambas possuem binding key "order_new"). Já as mensagens com routing key "order_upd" são enviadas somente para a fila "order".
Código de Exemplo
Classe de Domínio: Order
namespace Produtor;
public class Order
{
public long Id { get; private set; }
public DateTime CreatedDate { get; }
public DateTime LastUpdated { get; private set; }
public long Amount { get; private set; }
public Order(long id, long amount)
{
Id = id;
CreatedDate = DateTime.UtcNow;
LastUpdated = CreatedDate;
Amount = amount;
}
public void UpdateOrder(long amount)
{
Amount = amount;
LastUpdated = DateTime.UtcNow;
}
}Observação
Essa classe representa um pedido simples com Id, CreatedDate, LastUpdated e Amount.
Produtor
namespace Produtor;
public static class Program
{
public static async Task Main(string[] args)
{
var factory = new ConnectionFactory { HostName = "localhost" };
var manualResetEvent = new ManualResetEvent(false);
// Mantém a thread principal bloqueada
manualResetEvent.Reset();
using var connection = await factory.CreateConnectionAsync();
// Setup do canal e do Exchange
var channel1 = await SetupChannel(connection);
var channel2 = await SetupChannel(connection);
// Cria dois publishers (Produtor A e B)
BuildPublishers(channel1, "Produtor A", manualResetEvent);
BuildPublishers(channel2, "Produtor B", manualResetEvent);
manualResetEvent.WaitOne();
}
public static async Task<IChannel> SetupChannel(this IConnection connection)
{
var channel = await connection.CreateChannelAsync();
// Declara filas
await channel.QueueDeclareAsync("order", durable: false, exclusive: false, autoDelete: false, arguments: null);
await channel.QueueDeclareAsync("finance_orders", durable: false, exclusive: false, autoDelete: false, arguments: null);
// Declara Exchange do tipo Direct
await channel.ExchangeDeclareAsync("order", ExchangeType.Direct);
// Binds: relaciona a fila com uma routing key específica
await channel.QueueBindAsync("order", "order", "order_new");
await channel.QueueBindAsync("order", "order", "order_upd");
// "finance_orders" só recebe mensagens com "order_new"
await channel.QueueBindAsync("finance_orders", "order", "order_new");
return channel;
}
public static void BuildPublishers(IChannel channel, string publisherName, ManualResetEvent manual)
{
Task.Run(async () =>
{
int idIndex = 1;
var random = new Random(DateTime.UtcNow.Millisecond * DateTime.UtcNow.Second);
while (true)
{
try
{
Console.WriteLine($"Press [Enter] to send messages from {publisherName}...");
Console.ReadLine();
// Cria um pedido "novo"
var order = new Order(idIndex++, random.Next(1000, 9999));
var message1 = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(order));
// Publica a mensagem com routing key "order_new"
await channel.BasicPublishAsync(
exchange: "order",
routingKey: "order_new",
body: message1
);
Console.WriteLine($"New order Id {order.Id}: Amount {order.Amount} | Created: {order.CreatedDate:o}");
// Atualiza o pedido
order.UpdateOrder(random.Next(100, 999));
var message2 = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(order));
// Publica com routing key "order_upd"
await channel.BasicPublishAsync(
exchange: "order",
routingKey: "order_upd",
body: message2
);
Console.WriteLine($"Updated order Id {order.Id}: Amount {order.Amount} | LastUpdated: {order.LastUpdated:o}");
}
catch (Exception ex)
{
Console.WriteLine($"[ERROR in {publisherName}]: {ex.Message}");
manual.Set();
}
}
});
}
}Observações Principais
-
Exchange: criada com
ExchangeType.Direct. -
Bindings:
"order"é bindada com routing key"order_new"e"order_upd"."finance_orders"é bindada apenas com routing key"order_new".
-
Publicações:
- Mensagens com routing key
"order_new"chegam a duas filas ("order"e"finance_orders"). - Mensagens com routing key
"order_upd"chegam somente à fila"order".
- Mensagens com routing key
Consumidor
namespace Consumidor;
public static class Program
{
public static async Task Main(string[] args)
{
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
var channel = await connection.CreateChannelAsync();
// Nome da fila por parâmetro (ex: "order" ou "finance_orders")
var queueName = args.Length > 0 ? args[0] : "order";
await channel.QueueDeclareAsync(
queue: queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
// BasicQos para melhorar balanceamento
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) =>
{
try
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"[Queue: {queueName}] [x] Received {message}");
// ACK manual
channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
return Task.CompletedTask;
}
catch (Exception ex)
{
Console.WriteLine($"[Queue: {queueName}] [x] Error: {ex.Message}");
channel.BasicNackAsync(ea.DeliveryTag, multiple: false, requeue: true);
return Task.CompletedTask;
}
};
await channel.BasicConsumeAsync(
queue: queueName,
autoAck: false,
consumer: consumer
);
Console.WriteLine($"Consumer on queue '{queueName}' is running...");
Console.ReadLine();
}
}Notas
-
Consumidor continua quase idêntico a outros exemplos anteriores.
-
Usamos
autoAck = falseeBasicAckAsync(ouBasicNackAsync) para confirmar ou reenviar mensagens em caso de falha. -
Podemos rodar este consumidor múltiplas vezes, cada vez apontando para
"order"ou"finance_orders"(ou ambos).
Funcionamento Prático
-
Ao gerar um novo pedido (routing key =
"order_new"), a mensagem é entregue às filas"order"e"finance_orders". -
Ao atualizar um pedido (routing key =
"order_upd"), a mensagem é entregue apenas à fila"order". -
Cada consumidor atrelado a essas filas recebe as mensagens correspondentes.
Esse roteamento é feito automaticamente pelo RabbitMQ com base nos binds e routing keys definidas.
Conclusão
O Direct Exchange é indicado quando precisamos despachar mensagens para filas específicas, dependendo de uma routing key exata. Isso viabiliza cenários onde cada tipo de mensagem (por exemplo, "new", "upd", "error", etc.) vai para filas diferentes, sem broadcast para todos.
Dicas e Práticas
-
Combine Direct Exchanges com outras estratégias (como filas de Dead Letter para erros).
-
Em cenários complexos, avalie se Topic Exchange não atenderia melhor (pois permite wildcards).
-
Sempre monitore e teste cenários de publicação/consumo para verificar throughput e garantir roteamento conforme esperado.