基于RabbitMQ实现延迟队列

1. 延迟队列介绍

延迟队列顾名思义就是进入队列后,不会马上被消费,而是有一定的时间延迟,时间到期后再被消费。

1.1 应用场景

延迟队列可应用于一系列需要后期验证的功能,比如,账单支付超时确认、邮件发出后延迟确认等等。目前通用的解决方案是使用定时任务框架,或者采用时间轮询的方式,实现的成本较高,也不利于出错后自动重试。

1.2 解决方案

本文的实现方式是使用RabbitMQ提供的死信路由机制,即当一个消息的时间戳到期时,还没有被消费,则转发到死信路由,消费者绑定到这个死信路由上即可!

2. RabbitMQ实现延迟队列

2.1 生产者发送

对于生产者的发送过程,对于每条消息设置TTL值,表明了这条消息在normal_queue中的生存时间。

image

带注释的详细代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package delayQueue;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import config.Config;
import config.ExchangeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;

/**
* Created by zhaoyh on 2017/12/20
* @author zhaoyh
*/
public class Sender {

private static Logger LOG = LoggerFactory.getLogger(Sender.class);

private static final String EXCHANGE_NAME = "normal_exchange";
private static final String QUEUE_NAME = "normal_queue";
private static final String ROUTING_KEY = "normal_routing_key";


private static final String DLX_EXCHANGE_NAME = "dlx_exchange";
private static final String DLX_ROUTING_KEY = "dlx_routing_key";



public static void main(String[] argv) throws Exception {
ConnectionFactory factory = Config.getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, ExchangeType.DIRECT.getExchangeType());

Map<String, Object> map = new HashMap<>();
map.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
map.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
channel.queueDeclare(QUEUE_NAME, false, false, false, map);

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

LOG.info("Waiting for input...");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String content = scanner.nextLine();
// 以空格为分割
String message = content.split("\\ ")[0];

// 单位毫秒
String expiration = content.split("\\ ")[1];
try {
byte[] bytes = message.getBytes("UTF-8");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(expiration).build();

// ExchangeName
// RoutingKey
// BasicProperties
// byte[] body
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, bytes);
LOG.info("Send: " + message);
} catch (UnsupportedEncodingException e) {
LOG.error("getBytes failed!", e);
} catch (IOException e) {
LOG.error("basicPublish failed!", e);
}
}
channel.close();
connection.close();
}

}

2.2 消费者接收

在normal_queue上的消息到期时,这条消息就会被RabbitMQ系统转发到normal_queue绑定的死信路由上。然后由消费者绑定这个路由,就实现了这条消息的消费过程。

image

带注释的详细代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package delayQueue;

import com.rabbitmq.client.*;
import config.Config;
import config.ExchangeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* Created by zhaoyh on 2018/04/21
* 消费者
* @author zhaoyh
*/
public class Receiver {

private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);

private static final String EXCHANGE_NAME = "dlx_exchange";
private static final String ROUTING_KEY = "dlx_routing_key";
private static final String QUEUE_NAME = "dlx_queue";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = Config.getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, ExchangeType.DIRECT.getExchangeType());
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 直连交换机使用routing key来标记
// String queue
// String exchange
// String routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

LOG.info("Waiting for messages...");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
LOG.info("Received: " + message);
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}

}

2.3 测试

发送:

image

接收:

image

first这条消息的延迟是12秒,second消息的延时是0.13秒,但是接收的顺序还是first,second,这是因为first是第一个放到消息队列的。由此可见,即使一个消息比在同一队列中的其他消息提前过期,提前过期的也不会优先进入死信队列,它们还是按照入队列的顺序让消费者消费。

参考官方文档:

“Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).”
只有当过期的消息到了队列的顶端(队首),才会被真正的丢弃或者进入死信队列。

所以当使用RabbitMQ实现延迟队列的时候,尽量确保每个队列的延迟时间是一致的。如果需要不同的延迟,需要为每个延迟时间单独建立队列。

以上内容就是基于RabbitMQ实现延迟队列的全部内容了,谢谢你阅读到了这里!

Author:zhaoyh