Sử dụng Dead Letter Exchange trong RabbitMQ

231

Bài viết được sự cho phép của tác giả Giang Phan

Dead Letter Exchange trong RabbitMQ

Dead Letter Exchange là gì?

Dead Letter là một tin nhắn không thể gửi đến người nhận. Dead Letter Queue (DLQ), là hàng đợi chứa tin nhắn chưa được gửi, không thể được gửi đến đích của chúng vì lý do này hay lý do khác.

Trong hàng đợi tin nhắn, DLQ là một dịch vụ được cài đặt để lưu trữ các tin nhắn đáp ứng một hoặc nhiều sự kiện sau:

  • Tin nhắn bị từ chối (rejected) bởi một Queue Exchange.
  • Message hết hạn (expire) do Time to live (TTL).
  • Vượt quá giới hạn chiều dài hàng đợi (length limit).
  Giới thiệu CloudAMQP – Một RabbitMQ server trên Cloud
  Sử dụng binding Exchange to Exchange trong RabbitMQ

Dead Letter Exchange là một Exchange bình thường, có thể là một trong 4 loại Exchange (Direct, Fanout, Topic, Headers).

Điều gì xảy ra với Dead Letter Message?

  • Gửi tới một Dead Letter Exchange.
  • Thêm một số thông tin vào header của Message trước khi gửi đến Dead Letter Exchange.

Cấu hình Dead Letter Exchange sử dụng Optional Queue Arguments

Để gán một Dead Letter Exchange cho một Queue sử dụng agruments x-dead-letter-exchange khi định nghĩa Queue.

channel.exchangeDeclare("gpcoder.exchange.name", "direct");

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "gpcoder.exchange.name");
channel.queueDeclare("GPCoderQueue", false, false, false, args);

Đoạn code trên đơn giản tạo một Exchange mới gọi là gpcoder.exchange.name và Exchange mới này là Dead Letter Exchange cho hàng đợi mới được tạo. Lưu ý rằng Exchange  không phải được khai báo khi hàng đợi được khai báo, nhưng nó phải tồn tại tại thời điểm các Message được Dead Letter, nếu không thì các Message sẽ bị hủy bỏ.

Theo mặc định RabbitMQ sẽ lấy khoá định tuyến từ Message ban đầu được gửi đến Exchange. Chúng ta cũng có thể chỉ định một khóa định tuyến sẽ được sử dụng khi Dead Letter Message nếu muốn.

args.put("x-dead-letter-routing-key", "some-routing-key");

Routing Dead-Lettered Message

Dead-lettered message được định tuyến tới Dead Letter Exchange theo thứ tự ưu tiên sau:

  • Theo khoá định tuyến được chỉ định cho hàng đợi Dead Letter Message được chỉ định.
  • Theo khoá định tuyến ban đầu Message được publish.

Ví dụ: Nếu một Message ban đầu được publish đến Exchange với khoá định tuyến foo. Sau đó Message này bị reject và nó trở thành một Dead Letter Message. Và nó được publish đến Dead Letter Exchange với khoá định tuyến foo. Giả sử chúng ta đã chỉ định một khóa định tuyến sẽ được sử dụng khi Dead Letter Message là bar, lúc này Message được publish đến Dead Letter Exchange với khoá định tuyến bar.

Dead-lettered message được re-published với tính năng publisher confirm được bật mặc định để đảm bảo rằng Dead-letter queue phải gửi xác nhận Message (ack) đã được lưu trữ trước khi nó bị xoá ở Queue gốc.

Message bị thay đổi như thế nào khi chuyển sang Dead-Lettered Message?

Header bị thay đổi:

  • Exchange name bị thay thế bởi Dead-letter exchange name.
  • Routing key có thể bị bởi một Routing key khác.
  • Nếu điều trên xảy ra, thì CC và BCC header cũng sẽ bị remove.

Thêm header x-death, với một mảng các giá trị:

  • queue: tên của Queue mà Message được publish trước khi nó trở thành Dead-Lettered Message.
  • reason: lý do xảy ra Dead-Lettered Message. Có thể là: rejected, expired, maxlen.
  • time: thời gian Message bị dead lettered.
  • exchange: tên Exchange mà Message được publish, nó có thể là dead letter exchange nếu Message bị dead lettered nhiều lần.
  • routing-keys: là routing key (bao gồm CC keys nhưng không bao gồm BCC keys) của Message được publish.
  • count: số lần mà Message bị dead-lettered trong Queue này vì lý do này.
  • original-expiration: nếu Message bị dead-lettered vì lý do TTL, nào là giá trị của thuộc tính expiration ban đầu của Message. Thuộc tính expiration sẽ bị remove từ dead-lettering message để ngăn việc expiring lần nữa trong Queue mà nó được định tuyến đến.

3 header được thêm cho mỗi dead-lettering event đầu tiên:

  • x-first-death-reason
  • x-first-death-queue
  • x-first-death-exchange

Chúng có cùng các giá trị như reason, queue, và exchange của event xảy ra Dead-Lettered ban đầu. Sau khi thêm, các header này không bao giờ được sửa đổi.

Lưu ý rằng: mảng giá trị x-death được sắp xếp gần đây nhất trước, do đó, Dead-Lettered gần đây nhất sẽ được ghi lại trong mục đầu tiên.

Ví dụ sử dụng Dead Letter Exchange trong RabbitMQ

Trong ví dụ này tôi sử dụng Dead Letter Exchange để mô phỏng trường hợp Retry xử lý sau mỗi 300 millisecond nếu ngay lần nhận Message đó không thể xử lý thành công.

Sử dụng Dead Letter Exchange trong RabbitMQ

  1. Tạo một WorkQueue và bind đến WorkExchange.
  2. Tạo một RetryQueue và bind đến RetryExchange.
    • Gán agruments: x-dead-letter-exchange đến WorkExchange
    • Gán agruments: x-message-ttl là 300 ms.
  3. Producer publish một Message đến WorkExchange. Sau đó, WorkExchange sẽ chuyển Message đến WorkQueue.
  4. Consumer nhận Message từ WorkQueue và cố gắng xử lý nó.
  5. Trường hợp xử lý thất bại, Consumer sẽ publish Message đó đến RetryExchange. Sau đó, RetryExchange sẽ chuyển Message đến RetryQueue.
  6. Message sẽ lưu tại RetryQueue trong 300 ms.
  7. Khi Message bị expire, nó sẽ được chuyển đến WorkExchange và được chuyển đến WorkQueue.
  8. Khi đó Consumer có thể nhận lại Message từ WorkQueue và xử lý lại.

Hãy xem code implement:

package com.gpcoder.deadletterexchange;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class App {

// Exchange
private static final String WORK_EXCHANGE_NAME = "GPcoder.WorkExchange";
private static final String RETRY_EXCHANGE_NAME = "GPcoder.RetryExchange";

// Queue
private static final String WORK_QUEUE_NAME = "WorkQueue";
private static final String RETRY_QUEUE_NAME = "RetryQueue";

private static final int RETRY_DELAY = 300; // in ms

private static Channel channel;
private static int RETRY_COUNT = 0;

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();

channel = connection.createChannel();

// Create the WorkQueue
channel.exchangeDeclare(WORK_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(WORK_QUEUE_NAME, true, false, false, null);
channel.queueBind(WORK_QUEUE_NAME, WORK_EXCHANGE_NAME, "", null);

// Create the RetryQueue
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", WORK_EXCHANGE_NAME);
arguments.put("x-message-ttl", RETRY_DELAY);
channel.exchangeDeclare(RETRY_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(RETRY_QUEUE_NAME, true, false, false, arguments);
channel.queueBind(RETRY_QUEUE_NAME, RETRY_EXCHANGE_NAME, "", null);

// basicPublish - ( exchange, routingKey, basicProperties, body)
String message = "GPCoder Message";
System.out.println("[" + LocalDateTime.now() + "] [Work] [Send]: " + message);
channel.basicPublish(WORK_EXCHANGE_NAME, "", null, message.getBytes());

consumer(WORK_QUEUE_NAME);
}

private static void consumer(String queueName) throws IOException {
// basicConsume - ( queue, autoAck, deliverCallback, cancelCallback)
DeliverCallback deliverCallback = (consumerTag, message) -> {
String content = new String(message.getBody());
System.out.println("[" + LocalDateTime.now() + "] [Received] [" + queueName + "]: " + content);
System.out.println("");
if (RETRY_COUNT < 5) { publishToRetryExchange(content); RETRY_COUNT++; } else { RETRY_COUNT = 0; } }; CancelCallback cancelCallback = consumerTag -> System.out.println(consumerTag);
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}

// Publish to RetryQueue on failure
private static void publishToRetryExchange(String message) throws IOException {
System.out.println("[" + LocalDateTime.now() + "] [Retry" + RETRY_COUNT + "] [Re-Publish]: " + message);
channel.basicPublish(RETRY_EXCHANGE_NAME, "", null, message.getBytes());
}
}

Output chương trình:

[2020-05-03T22:57:03.614] [Work] [Send]: GPCoder Message
[2020-05-03T22:57:03.620] [Received] [WorkQueue]: GPCoder Message

[2020-05-03T22:57:03.620] [Retry0] [Re-Publish]: GPCoder Message
[2020-05-03T22:57:03.922] [Received] [WorkQueue]: GPCoder Message

[2020-05-03T22:57:03.922] [Retry1] [Re-Publish]: GPCoder Message
[2020-05-03T22:57:04.224] [Received] [WorkQueue]: GPCoder Message

[2020-05-03T22:57:04.224] [Retry2] [Re-Publish]: GPCoder Message
[2020-05-03T22:57:04.526] [Received] [WorkQueue]: GPCoder Message

[2020-05-03T22:57:04.526] [Retry3] [Re-Publish]: GPCoder Message
[2020-05-03T22:57:04.828] [Received] [WorkQueue]: GPCoder Message

[2020-05-03T22:57:04.828] [Retry4] [Re-Publish]: GPCoder Message
[2020-05-03T22:57:05.131] [Received] [WorkQueue]: GPCoder Message

Như bạn thấy, Message được tự động chuyển từ WorkQueue sang RetryQueue sau mỗi 300 ms nhờ vào Dead Letter Message. Đây là ví dụ cho trường hợp một Message hết hạn (expire) do Time to live (TTL).

Tương tự các bạn có thể set agruments là agruments.put(“x-max-length”, 10) để test trường hợp số lượng Message trong Queue vượt quá giới hạn chiều dài hàng đợi (length limit).

Tài liệu tham khảo:

Bài viết gốc được đăng tải tại gpcoder.com

Có thể bạn quan tâm:

Xem thêm Việc làm Developer hấp dẫn trên TopDev