Exchange Fanout no RabbitMQ
Introdução
Até agora, exploramos cenários de comunicação em que enviamos mensagens a filas específicas (ou ao default exchange, que faz um direct implícito). Entretanto, há casos em que precisamos replicar a mesma mensagem para várias filas ao mesmo tempo. Para isso, o RabbitMQ disponibiliza diferentes tipos de Exchanges, entre eles o Fanout.
O que é um Exchange Fanout?
-
Quando publicamos mensagens em uma Exchange do tipo Fanout, o RabbitMQ distribui (replica) automaticamente cada mensagem para todas as filas que estiverem bindadas a essa Exchange.
-
Em outras palavras, independentemente de routing key, todas as filas associadas ao Fanout Exchange recebem uma cópia da mensagem.
Exemplos de Uso
-
Logs: Quando queremos enviar a mesma mensagem de log para múltiplos destinos (ex.: filas de monitoring, storage, analytics).
-
Broadcast: Em aplicações de broadcast em tempo real, cada consumidor (fila) recebe o mesmo dado ao mesmo tempo.
-
Integrações: Cópia de uma mesma mensagem de “Pedido” para a fila de “Financeiro”, “Logística” e “Auditoria”, por exemplo.
Exemplo de Código
Produtor (Criando Exchange Fanout e Bindando Filas)
namespace Produtor;
public static class Program
{
public static async Task Main(string[] args)
{
var factory = new ConnectionFactory { HostName = "localhost" };
var manualResetEvent = new ManualResetEvent(false);
// Bloqueia a thread principal
manualResetEvent.Reset();
// Abre 1 conexão TCP
using var connection = await factory.CreateConnectionAsync();
// Exemplo: Vamos criar/ligar 3 filas: "order", "log", "finance_orders"
var channel1 = await SetupChannel(connection);
var channel2 = await SetupChannel(connection);
// Cria dois publicadores (Produtor A e B)
BuildPublishers(channel1, "Produtor A", manualResetEvent);
BuildPublishers(channel2, "Produtor B", manualResetEvent);
// Aguarda até chamarmos manualResetEvent.Set()
manualResetEvent.WaitOne();
}
public static async Task<IChannel> SetupChannel(this IConnection connection)
{
var channel = await connection.CreateChannelAsync();
// Declara 3 filas
await channel.QueueDeclareAsync("order", durable: false, exclusive: false, autoDelete: false, arguments: null);
await channel.QueueDeclareAsync("log", durable: false, exclusive: false, autoDelete: false, arguments: null);
await channel.QueueDeclareAsync("finance_orders", durable: false, exclusive: false, autoDelete: false, arguments: null);
// Declara Exchange do tipo Fanout
await channel.ExchangeDeclareAsync("order", ExchangeType.Fanout);
// Faz o 'bind' de cada fila no Exchange "order"
// (string.Empty) como routingKey pois no Fanout ela é ignorada
await channel.QueueBindAsync("order", "order", string.Empty);
await channel.QueueBindAsync("log", "order", string.Empty);
await channel.QueueBindAsync("finance_orders", "order", string.Empty);
return channel;
}
public static void BuildPublishers(IChannel channel, string publisherName, ManualResetEvent manual)
{
Task.Run(async () =>
{
int count = 0;
while (true)
{
try
{
Console.WriteLine($"Press [Enter] to send 100 messages from {publisherName}...");
Console.ReadLine();
for (int i = 0; i < 100; i++)
{
string message = $"OrderNumber: {count++} from {publisherName}";
var body = Encoding.UTF8.GetBytes(message);
// Publica no Exchange "order" (tipo Fanout)
// routingKey é ignorado neste modo
await channel.BasicPublishAsync(
exchange: "order",
routingKey: string.Empty,
body: body
);
Console.WriteLine($"{publisherName} - [x] Sent: {message}");
}
}
catch (Exception ex)
{
Console.WriteLine($"[ERROR in {publisherName}]: {ex.Message}");
// Caso queiramos encerrar a aplicação se algo falhar
manual.Set();
}
}
});
}
}
Notas
-
ExchangeDeclareAsync("order", ExchangeType.Fanout)
cria uma Exchange chamada “order” do tipo Fanout. -
Bindings:
QueueBindAsync("order", "order", string.Empty)
indica que a fila"order"
recebe mensagens publicadas em"order"
Exchange. Podemos repetir para quantas filas quisermos. -
Quando publicamos (
BasicPublishAsync
) no Exchange “order”, todas as filas bindadas recebem cópias da mensagem.
Consumidor (Consumindo cada Fila)
namespace Consumidor;
public static class Program
{
public static async Task Main(string[] args)
{
var factory = new ConnectionFactory { HostName = "localhost" };
// Abre 1 conexão
using var connection = await factory.CreateConnectionAsync();
var channel = await connection.CreateChannelAsync();
// Nome da fila vem por parâmetro, ex. "order" ou "log" ou "finance_orders"
var queueName = args.Length > 0 ? args[0] : "order";
// Declara a fila (garante que exista)
await channel.QueueDeclareAsync(
queue: queueName,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null
);
// Configura prefetchCount para 1 (opcional, melhora balanceamento)
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) =>
{
try
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"[Queue: {queueName}] [x] Received {message}");
// ACK manual da mensagem
channel.BasicAckAsync(ea.DeliveryTag, multiple: false);
return Task.CompletedTask;
}
catch (Exception ex)
{
Console.WriteLine($"[Queue: {queueName}] [x] Error: {ex.Message}");
channel.BasicNackAsync(ea.DeliveryTag, multiple: false, requeue: true);
return Task.CompletedTask;
}
};
await channel.BasicConsumeAsync(
queue: queueName,
autoAck: false,
consumer: consumer
);
Console.WriteLine($"Consumer on queue '{queueName}' is running...");
Console.ReadLine();
}
}
Notas
-
Cada consumidor é ligado a uma fila específica. Como o Exchange Fanout envia mensagens para todas as filas associadas, cada consumidor (ligado a cada fila) receberá as mesmas mensagens.
-
Podemos rodar este executável múltiplas vezes, cada vez passando um parâmetro diferente (
"order"
,"log"
,"finance_orders"
) para consumir de filas distintas.
Observações Importantes
Uso do Fanout
-
routingKey é ignorada nesse tipo de exchange. A mensagem vai para todas as filas bindadas.
-
Bom para broadcast, mas pode gerar grande volume de mensagens duplicadas.
Outros Tipos de Exchange
-
Direct: Roteia conforme
routingKey
exata. -
Topic: Roteia por pattern (curinga) de
routingKey
. -
Headers: Usa cabeçalhos (Headers) para determinar roteamento.
Produtor
- Agora publica em uma Exchange (
exchange: "order"
) em vez de publicar diretamente na fila.
Consumidor
- Continua consumindo de uma fila normal. As filas recebem as mensagens da Fanout Exchange via binding.
Balanceamento
- É possível combinar com
BasicQos
(prefetch) para cada consumidor gerenciar melhor seu volume de mensagens, especialmente quando há múltiplos consumidores por fila.
Escalabilidade
- Se preciso replicar a mesma informação para várias filas (e, consequentemente, para vários serviços), o Fanout Exchange simplifica a arquitetura.
Conclusão
O Fanout Exchange é perfeito para casos em que desejamos copiar uma mesma mensagem para diversos destinos (filas). Ele oferece uma forma simples de broadcast sem precisar enviar manualmente a mensagem para cada fila. Cada nova fila que se bindar ao Exchange passará a receber todas as mensagens publicadas nele.
Esse padrão facilita cenários como:
-
Logs centralizados: toda mensagem é enviada a várias filas, cada qual tratada por equipes ou propósitos diferentes (monitoramento, auditoria, etc.).
-
Notificações simultâneas em diferentes sistemas.