MQ基础

1. 初识 RabbitMQ

1.1 同步调用和异步调用

同步调用的方式存在下列问题:

  • 拓展性差
  • 性能下降
  • 级联失败

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方
  • 消息 Broker:管理、暂存、转发消息,你可以把它理解成微信服务器
  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

异步调用的优势包括:

  • 耦合度更低
  • 性能更好
  • 业务拓展性强
  • 故障隔离,避免级联失败

当然,异步通信也并非完美无缺,它存在下列缺点:

  • 完全依赖于 Broker 的可靠性、安全性和性能
  • 架构复杂,后期维护和调试麻烦

1.2 常见 MQ

image.png

2. RabbitMQ

2.1 RabbitMQ 安装

1
2
3
4
5
6
7
8
9
10
11
docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=123 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hmall\
-d \
rabbitmq:3.8-management

2.2 收发消息

创建队列之和和交换机建立绑定关系,然后发送消息到交换机,交换机再将消息发送到队列。

2.3 数据隔离

对于小型企业而言,出于成本考虑,我们通常只会搭建一套 MQ 集群,公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用 virtual host 的隔离特性,将不同项目隔离。一般会做两件事情:

  • 给每个项目创建独立的运维账号,将管理权限分离。
  • 给每个项目创建不同的 virtual host,将每个项目的数据隔离。

3.SpringAMQP

将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于 RabbitMQ 采用了 AMQP 协议,因此它具备跨语言的特性。任何语言只要遵循 AMQP 协议收发消息,都可以与 RabbitMQ 交互。并且 RabbitMQ 官方也提供了各种不同语言的客户端。
但是,RabbitMQ 官方提供的 Java 客户端编码相对复杂,一般生产环境下我们更多会结合 Spring 来使用。而 Spring 的官方刚好基于 RabbitMQ 提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于 SpringBoot 对其实现了自动装配,使用起来非常方便。

SpringAMQP 提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了 RabbitTemplate 工具,用于发送消息

3.1 WorkQueues 模型

消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致 1 个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了 1 秒。这样显然是有问题的。

Work 模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置 prefetch 来控制消费者预取的消息数量

在 spring 中有一个简单的配置,可以解决这个问题。我们修改 consumer 服务的 application.yml 文件,添加配置:

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

3.2 交换机类型

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是 Fanout 交换机
  • Direct:订阅,基于 RoutingKey(路由 key)发送给订阅了消息的队列
  • Topic:通配符订阅,与 Direct 类似,只不过 RoutingKey 可以使用通配符
  • Headers:头匹配,基于 MQ 的消息头匹配,用的较少。

3.3 声明队列和交换机

基于注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

4. MQ 的可靠性

消息到达 MQ 以后,如果 MQ 不能及时保存,也会导致消息丢失,所以 MQ 的可靠性也非常重要。

4.1.数据持久化

为了提升性能,默认情况下 MQ 的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化

说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么 MQ 会在消息持久化以后才发送 ACK 回执,进一步确保消息的可靠性。
不过出于性能考虑,为了减少 IO 次数,发送到 MQ 的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在 100 毫秒左右,这就会导致 ACK 有一定的延迟,因此建议生产者确认全部采用异步方式。

4.2 LazyQueue

在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:

  • 消费者宕机或出现网络故障
  • 消息发送量激增,超过了消费者处理速度
  • 消费者处理业务发生阻塞

一旦出现消息堆积问题,RabbitMQ 的内存占用就会越来越高,直到触发内存预警上限。此时 RabbitMQ 会将内存消息刷到磁盘上,这个行为成为 PageOut. PageOut 会耗费一段时间,并且会阻塞队列进程。因此在这个过程中 RabbitMQ 不会再处理新的消息,生产者的所有请求都会被阻塞。

为了解决这个问题,从 RabbitMQ 的 3.6.0 版本开始,就增加了 Lazy Queues 的模式,也就是惰性队列。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储
    而在 3.12 版本之后,LazyQueue 已经成为所有队列的默认格式。因此官方推荐升级 MQ 为 3.12 版本或者所有队列都设置为 LazyQueue 模式。
1
2
3
4
5
6
7
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() // 开启Lazy模式
.build();
}

5.消费者的可靠性

5.1 消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ 从队列中删除该消息
  • nack:消息处理失败,RabbitMQ 需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息

一般 reject 方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过 try catch 机制捕获,消息处理成功时返回 ack,处理失败时返回 nack.

由于消息回执的处理代码比较统一,因此 SpringAMQP 帮我们实现了消息确认。并允许我们通过配置文件设置 ACK 处理方式,有三种模式:

  • none:不处理。即消息投递给消费者后立刻 ack,消息会立刻从 MQ 删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用 api,发送 ack 或 reject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack. 当业务出现异常时,根据异常判断返回不同结果:
    • 如果是业务异常,会自动返回 nack;
    • 如果是消息处理或校验异常,自动返回 reject;

5.2 消费者重试机制

当消费者出现异常后,消息会不断 requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次 requeue 到队列,再次投递,直到消息处理成功为止。
极端情况就是消费者一直无法执行成功,那么消息 requeue 就会无限循环,导致 mq 的消息处理飙升,带来不必要的压力:可以配置消息的最大重试次数,超过次数后,消息会被丢弃。

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

5.3 失败处理策略

Spring 也允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由 MessageRecovery 接口来定义的,它有 3 个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是 RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。

5.4 业务幂等性

5.4.1 唯一消息 ID

这个思路非常简单:

  1. 每一条消息都生成一个唯一的 id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息 ID 保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

5.4.2 业务判断

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

相比较而言,消息 ID 的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。

6. 延迟消息

6.1 死信交换机与延迟消息

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

image.png

RabbitMQ 的消息过期是基于追溯方式来实现的,也就是说当一个消息的 TTL 到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。
当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的 TTL 时间不一定准确。

6.2 DelayExchange 插件

先查看 RabbitMQ 的插件目录对应的数据卷。
docker volume inspect mq-plugins
结果如下,把插件放到这个目录/var/lib/docker/volumes/mq-plugins/_data

1
2
3
4
5
6
7
8
9
10
11
[
{
"CreatedAt": "2025-03-02T16:35:49+08:00",
"Driver": "local",
"Labels": null,
"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
"Name": "mq-plugins",
"Options": null,
"Scope": "local"
}
]

接下来执行命令,安装插件:

1
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

声明延迟交换机

1
2
3
4
5
6
7
8
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}

发送延迟消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}

延迟消息插件内部会维护一个本地数据库表,同时使用 Elang Timers 功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的 CPU 开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息。

6.3 处理超时订单

image.png


MQ基础
https://kongshuilinhua.github.io/2025/03/02/MQ基础/
作者
FireFLy
发布于
2025年3月2日
许可协议