发布于 

Java整合rabbitMQ实现死信队列

这里依旧使用 springboot 框架,同时使用spring提供的rabbitmq-starter来实现rabbitMQ死信队列。

pom依赖

在项目依赖文件 pom.xml 添加下面的依赖

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>3.0.1</version>
</dependency>

配置rabbitmq信息

1
2
3
4
5
6
7
8
spring:
rabbitmq:
password: admin
publisher-returns: true
publisher-confirm-type: correlated
host: 127.0.0.1
port: 5672
username: admin

消息可靠性保障

消息可靠性需要在生产者与消费者两个地方进行配置,这里仅配置了生产者确认,后面单独开文章写一下消费者手动确认与重试机制的问题。

这里将 publisher-returnspublisher-confirm-type配置了一下,保证在生产者产生消息之后消息中间件可以给程序反馈,确保生产者不丢消息(或者说,在生产者丢消息之后能够进行处理)。

在配置类里面配置回调逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    @Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// Mandatory为true时,消息通过交换器无法匹配到队列会返回给生产者,为false时匹配不到会直接被丢弃
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* ConfirmCallback机制只确认消息是否到达exchange(交换器),不保证消息可以路由到正确的queue;
* 需要设置:publisher-confirm-type: CORRELATED;
* springboot版本较低 参数设置改成:publisher-confirms: true
*
* 以实现方法confirm中ack属性为标准,true到达
* config : 需要开启rabbitmq得ack publisher-confirm-type
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
} else {
log.error("消息发送失败:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
}
}
});

rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
/**
* ReturnsCallback 消息机制用于处理一个不可路由的消息。在某些情况下,如果我们在发送消息的时候,当前的 exchange 不存在或者指定路由 key 路由不到,这个时候我们需要监听这种不可达的消息
* 就需要这种return机制
*
* config : 需要开启rabbitmq发送失败回退; publisher-returns 或rabbitTemplate.setMandatory(true); 设置为true
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
// 实现接口ReturnCallback,重写 returnedMessage() 方法,
// 方法有五个参数
// message(消息体)、
// replyCode(响应code)、
// replyText(响应内容)、
// exchange(交换机)、
// routingKey(队列)。

log.info("ReturnsCallback returned : {}", returned);
}
});

return rabbitTemplate;
}

通过 @Bean注解把 RabbitTemplate交给spring容器管理,配置确认逻辑。

绑定消息交换机和消息队列

在绑定之前,我们需要首先进行创建,创建若干个“业务”消息队列,同时配置私信队列,便于消息中间件在处理失败的过程中能够可以进行“兜底”处理。

  • 创建私信交换机和私信队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Bean
public Exchange deadLetterExchange() {
return ExchangeBuilder
.topicExchange("dead-letter-exchange")
.durable(true)
.build();
}

@Bean
public Queue deadLetterQueue() {
return QueueBuilder
.durable("dead-letter-queue")
.build();
}

这里同样使用 @Bean注解交给Spring容器管理,然后我们对上述内容执行绑定。

1
2
3
4
5
6
7
8
9
@Bean
public Binding userDeadLetterBinding() {
return BindingBuilder
.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("dead-letter-routing-key")
.noargs();
}

with中的参数为routingKey,可以进行自定义,在转入死信队列的时候会携带这个参数。

  • 创建“业务”消息交换机

这里使用了topic类型的交换机,具体信息暂时挖个坑,后面更新文章填坑吧。

1
2
3
4
@Bean
TopicExchange exchange() {
return new TopicExchange("topic-ex");
}
  • 创建消息队列,并且绑定死信队列
1
2
3
4
5
6
7
8
9
@Bean
public Queue firstQueue() {
return QueueBuilder.durable("first-queue")
.withArgument("x-dead-letter-exchange", "dead-letter-exchange")
//声明该队列死信消息在交换机的 路由键
.withArgument("x-dead-letter-routing-key", "dead-letter-routing-key")
.build();
}

  • 消息队列和交换机绑定
1
2
3
4
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with("first-queue");
}

这里注意,由于是topic交换机,需要保证routingKey与消息队列的一致,才能正确把消息投入到指定的队列。

生产者发送消息

1
2
3
4
5
6
7
8
@RequestMapping("/test")
public String test() {
String messageData = "message: rabbitmq message";
Map<String, Object> manMap = new HashMap<>();
manMap.put("messageData", messageData);
rabbitTemplate.convertAndSend("topic-ex", "first-queue", manMap, new CorrelationData("1001"));
return "ok";
}

访问对应的地址,即可将消息投入到指定的消息队列了。

消费者获取消息

1
2
3
4
5
6
7
@RabbitListener(queues = "first-queue")
@RabbitHandler
public void receiver(@Payload HashMap dataMsg, Channel channel, Message message) throws IOException, InterruptedException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("消费者 deliveryTag:{} dataMsg:{} ",deliveryTag ,dataMsg);

}

将消费者在程序初始化的时候保持后台运行,即可正常消费消息。