Bài viết được sự cho phép của tác giả Giang Phan
Trong các bài viết trước chúng ta đã tìm hiểu về Default Exchange, cách tạo Work Queue với RabbitMQ. Trong bài này, chúng ta sẽ cùng tìm hiểu về Direct Exchange trong RabbitMQ.
Flow của một Message trong Direct Exchange
Direct Exchange (trao đổi trực tiếp – amq.direct) định tuyến message đến Queue dựa vào routing key.
Một Exchange không xác định tên (empty String), đây là loại Default Exchange, một dạng đặc biệt của là Direct Exchange. Default Exchange được liên kết ngầm định với mọi Queue với khóa định tuyến bằng với tên Queue.
Flow của một Message trong Direct Exchange như sau:
- Một Producer sẽ tạo một Message và publish tới Exchange.
- Một Queue sẽ binding tới Exchange sử dụng routing key. Chúng ta có thể tạo nhiều Queue và binding tới Exchange, có thể sử dụng cùng routing key, hoặc các routing key khác nhau.
- Một Message tới Exchange với thông tin routing key. Dựa vào thông tin routing key, message sẽ được chuyển đến một hoặc nhiều Queue đã binding có cùng routing key với routing key của Message.
Chúng ta đã thấy cách hoạt động của Default Exchange ở các bài viết trước. Trong phần tiếp theo của bài viết này, chúng ta sẽ thấy cách hoạt động của Direct Exchange với routing key cụ thể.
Ví dụ Direct Exchange trong RabbitMQ
Trong ví dụ này, tôi tạo một Direct Exchange có tên GPCoderDirectExchange, tạo 3 Queue binding tới Direct Exchange này với 3 routing key:
- QDeveloper sẽ binding với routing key devGroup.
- QManager sẽ binding với routing key managerGroup.
- QGeneral sẽ binding với routing key generalGroup.
Một số class của chương trình:
- ConnectionManager : hỗ trợ tạo Connection đến RabbitMQ.
- DirectExchangeChannel : class util hỗ trợ tạo Echange, Queue, binding Queue đến Exchange, publish/ subscribe message, …
- Constant : định nghĩa constant chứa các thông tin về tên Exchange, Queue, Routing Key.
- Producer: để gửi Message đến Exchange.
- Consumer: để nhận Message từ Queue.
- App: giả lập việc gửi nhận Message thông qua Direct Exchange của RabbitMQ.
ConnectionManager.java
package com.gpcoder.directexchange; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionManager { private ConnectionManager() { super(); } public static Connection createConnection() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); return factory.newConnection(); } }
DirectExchangeChannel.java
package com.gpcoder.directexchange; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class DirectExchangeChannel { private String exchangeName; private Channel channel; private Connection connection; public DirectExchangeChannel(Connection connection, String exchangeName) throws IOException { this.exchangeName = exchangeName; this.connection = connection; this.channel = connection.createChannel(); } public void declareExchange() throws IOException { // exchangeDeclare( exchange, builtinExchangeType, durable) channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true); } public void declareQueues(String ...queueNames) throws IOException { for (String queueName : queueNames) { // queueDeclare - (queueName, durable, exclusive, autoDelete, arguments) channel.queueDeclare(queueName, true, false, false, null); } } public void performQueueBinding(String queueName, String routingKey) throws IOException { // Create bindings - (queue, exchange, routingKey) channel.queueBind(queueName, exchangeName, routingKey); } public void subscribeMessage(String queueName) throws IOException { // basicConsume - ( queue, autoAck, deliverCallback, cancelCallback) channel.basicConsume(queueName, true, ((consumerTag, message) -> { System.out.println("[Received] [" + queueName + "]: " + consumerTag); System.out.println("[Received] [" + queueName + "]: " + new String(message.getBody())); }), consumerTag -> { System.out.println(consumerTag); }); } public void publishMessage(String message, String routingKey) throws IOException { // basicPublish - ( exchange, routingKey, basicProperties, body) System.out.println("[Send] [" + routingKey + "]: " + message); channel.basicPublish(exchangeName, routingKey, null, message.getBytes()); } }
Constant.java
package com.gpcoder.directexchange; public final class Constant { // Exchange public static final String EXCHANGE_NAME = "GPCoderDirectExchange"; // Routing key public static final String DEV_ROUTING_KEY = "devGroup"; public static final String MANAGER_ROUTING_KEY = "managerGroup"; public static final String GENERAL_ROUTING_KEY = "generalGroup"; // Queue public static final String DEV_QUEUE_NAME = "QDeveloper"; public static final String MANAGER_QUEUE_NAME = "QManager"; public static final String GENERAL_QUEUE_NAME = "QGeneral"; private Constant() { super(); } }
Producer.java
package com.gpcoder.directexchange; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; import static com.gpcoder.directexchange.Constant.*; public class Producer { private DirectExchangeChannel channel; public void start() throws IOException, TimeoutException { // Create connection Connection connection = ConnectionManager.createConnection(); // Create channel channel = new DirectExchangeChannel(connection, EXCHANGE_NAME); // Create direct exchange channel.declareExchange(); // Create queues channel.declareQueues(DEV_QUEUE_NAME, MANAGER_QUEUE_NAME, GENERAL_QUEUE_NAME); // Binding queues with routing keys channel.performQueueBinding(DEV_QUEUE_NAME, DEV_ROUTING_KEY); channel.performQueueBinding(MANAGER_QUEUE_NAME, MANAGER_ROUTING_KEY); channel.performQueueBinding(GENERAL_QUEUE_NAME, GENERAL_ROUTING_KEY); } public void send(String message, String routingKey) throws IOException { // Send message channel.publishMessage(message, routingKey); } }
Consumer.java
package com.gpcoder.directexchange; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; import static com.gpcoder.directexchange.Constant.*; public class Consumer { private DirectExchangeChannel channel; public void start() throws IOException, TimeoutException { // Create connection Connection connection = ConnectionManager.createConnection(); // Create channel channel = new DirectExchangeChannel(connection, EXCHANGE_NAME); // Create direct exchange channel.declareExchange(); // Create queues channel.declareQueues(DEV_QUEUE_NAME, MANAGER_QUEUE_NAME, GENERAL_QUEUE_NAME); // Binding queues with routing keys channel.performQueueBinding(DEV_QUEUE_NAME, DEV_ROUTING_KEY); channel.performQueueBinding(MANAGER_QUEUE_NAME, MANAGER_ROUTING_KEY); channel.performQueueBinding(GENERAL_QUEUE_NAME, GENERAL_ROUTING_KEY); } public void subscribe() throws IOException { // Subscribe message channel.subscribeMessage(DEV_QUEUE_NAME); channel.subscribeMessage(MANAGER_QUEUE_NAME); channel.subscribeMessage(GENERAL_QUEUE_NAME); } }
App.java
package com.gpcoder.directexchange; import java.io.IOException; import java.util.concurrent.TimeoutException; import static com.gpcoder.directexchange.Constant.*; public class App { public static void main(String[] args) throws IOException, TimeoutException { // Create producer Producer producer = new Producer(); producer.start(); // Publish some message producer.send("This message for all developers", DEV_ROUTING_KEY); producer.send("This message for all managers", MANAGER_ROUTING_KEY); producer.send("This message for everyone", GENERAL_ROUTING_KEY); Consumer consumer = new Consumer(); consumer.start(); consumer.subscribe(); } }
Output chương trình:
[Send] [devGroup]: This message for all developers [Send] [managerGroup]: This message for all managers [Send] [generalGroup]: This message for everyone [Received] [QDeveloper]: amq.ctag-F_4tm402_GYRx8FBn6rLPw [Received] [QDeveloper]: This message for all developers [Received] [QManager]: amq.ctag-DbVV5-XhzLyFtFd5Kij8DQ [Received] [QManager]: This message for all managers [Received] [QGeneral]: amq.ctag-feJgzR4t-P2tjvBWEb13yA [Received] [QGeneral]: This message for everyone
Như bạn thấy, Consumer/ Producer chỉ gửi/nhận đúng Message ở Queue mà nó binding.
Tài liệu tham khảo:
Bài viết gốc được đăng tải tại gpcoder.com
Có thể bạn quan tâm:
- Sử dụng binding Exchange to Exchange trong RabbitMQ
- Sử dụng Alternate Exchange trong RabbitMQ
- Sử dụng Headers Exchange trong RabbitMQ
Xem thêm Việc làm IT hấp dẫn trên TopDev