《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事

大家好,我是小菜。
一个希望能够成为 吹着牛X谈架构 的男人!如果你也想成为我想成为的人,不然点个关注做个伴,让小菜不再孤单!

本文主要介绍 RabbitMQ的消息丢失问题
如有需要,可以参考
如有帮助,不忘 点赞 ?
微信公众号已开启,小菜良记,没关注的同学们记得关注哦!
是的,最终是对 RabbitMQ 下手了!
面试中常见的RabbitMQ面试题也是多了去了,常见的如下:
  • 消息可靠性问题:如何确保发送的消息至少被消费一次?
  • 延迟消息问题:如何实现消息的延迟投递?
  • 高可用问题:如何避免单点的MQ故障而导致的不可用问题?
  • 消息堆积问题:如何解决数百万级以上消息堆积,无法及时消费问题?
这几个问题又得让你脑壳疼一阵子,是不是也在网上看了挺多博文介绍这方面的解决方案,但是却看了又忘,实际便是因为缺少实操,这篇小菜便重点讲述下 RabbitMQ 如何解决消息丢失问题~
一、消息可靠性问题
消息可靠性问题我们又可能将其理解为如何防止消息丢失?那为什么消息会丢失呢?我们可以先看看消息投递的整个过程:
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

我们从图中可以从三个阶段分析可能造成消息丢失:
  • publisher 发送消息到 exchange
  • exchange 分发到 queue
  • queue 投递到 customer
既然我们知道了哪些阶段可能造成数据丢失,那我们就可以从源头防范于未然~!
工程结构
工程结构很简单,就是一个简单的 Spring Boot 项目,里面有个 消费者生产者 两个模块
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

1、生产者发送丢失 RabbitMQ 中提供了 publisher confirm 机制来避免消息发送到 MQ 的过程中丢失的问题。消息发送到 MQ 以后,会返回一个确认结果给生产者,用于表示消息是否确认成功。该确认结果存在两种请求:
  • publisher-confirm
该类型是 发送者确认 ,存在两种情况
  1. 消息成功投递到交换机,返回 ack
  2. 消息未投递到交换机,返回 nack
  • publisher-return
该类型是 发送者回执 ,存在两种情况
  1. 消息投递到交换机,且成功分发到队列,返回 ack
  2. 消息投递到交换机,但未成功分发到队列,返回 nack
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

注意:确认机制发送消息时,需要给每个消息设置一个全局唯一ID,以区分不同消息,避免ack冲突
接下来我们用代码来说明具体的操作方式
1)配置文件 我们首先看下 生产者 的配置文件
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

前面几个配置 RabbitMQ 的连接信息没啥好讲的,我们来看几个比较陌生的配置
  • publisher-confirm-type
开启发送确认,这里可以支持两种类型
  1. simple:同步等待 confirm 结果,直到超时
  2. correlated:异步回调,定义 ConfirmCallback,MQ返回结果时会回调这个 ConfirmCallback
  • publisher-returns
开启 public-return,同样是基于 CallBack 机制,不过是定义 ReturnCallback
  • template.mandatory
定义路由失败时的策略。
  • true:调用 ReturnCallback
  • false:直接丢弃消息
2)定义回调事件 每个 RabbitTemplate 只能配置一个 ReturnCallback
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

3)发送消息 《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

执行发送代码之前,我们确保已经创建了(一个直连交换机 direct-exchange,一个队列 direct-queue,且绑定的 key 为 direct
正常情况下,我们执行代码肯定是发送成功的,可以看到控制台绿色输出
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

且我们在消息队列中也成功接收到了消息:
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

到这步是没有任何问题的,那我们就需要手动给它制造点问题~ 我们可以修改 交换机名称,这个时候发送消息的时候找不到交换机,那么交换机肯定就会返回 nack ,再看是否可以进入到我们代码中的判断:
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

代码执行虽然是绿色的,但因为rabbitMQ找不到正确的交换机,而导致消息发送失败,也就是下图的这个过程:
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

这一个是 publish -> exchange 失败我们顺利的捕获到了,那么 exchange -> queue 这步的失败是我们是否能够正常捕获?我们可以通过修改 路由 key 使交换机路由不到对应的 queue
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

可以发现当交换机没有路由到相对应的 queue 时,也成功触发了我们自定义的回调函数,然后看 rabbitMQ 控制台是可以发现消息已经成功投递到交换机
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

到这里,我们通过两种简单的错误模拟,使程序都能顺利的进入到我们预先定义的回调中,如果遇到发送失败的情况,我们可以在失败的回调中自定义消息重发机制,最大程度上避免消息丢失的问题
4)总结 我们可以通过 publisher-confirmpublisher-return 两种错误捕获机制,来避免 生产者 -> exchange -> queue 这条链路的消息丢失
  • publisher-confirm
    1. 消息成功发送到 exchange,返回 ack
    2. 消息未能成功发送到 exchange,返回 nack
    3. 消息发送过程中出现异常,没有收到回执,则进入 failureCallback 回调
  • 【《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事】publisher-return
    1. 消息成功发送到 exchange,但没有路由到 queue,调用自定义回调函数 returnCallback
2、消息存储丢失 消息存储丢失是啥意思?其实就是持久化 的概念,当消息已经成功发送到 queue 时,这个时候如果消费者没有及时进行消费,rabbitMQ 又刚好宕机重启了,那么这个时候就会发现消息丢失了。
这是因为 MQ 默认是内存存储消息,我们可以通过开启持久化的功能来确保在 MQ 中的消息不丢失
其实我们通过 RabbitMQ 提供的 GUI 创建交换机或队列的时候就可以发现有持久化的这个选项
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

如果将 durability 设为 durable 后,我们可以发现无论如何重启 MQ,重启后交换机和队列依然存在。
但是很多时候我们交换机队列 的创建并非在 GUI 上创建,而是通过应用代码的方式创建
  • 交换机持久化
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

  • 队列持久化
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

  • 消息持久化
默认情况下,AMQP 发出的消息都是持久化的,不用特意指定
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

3、消费者消费丢失 RabbitMQ 采取的机制是当确认消息被消费者消费后就会立即删除
那么如何确认消息已被消费者消费?那就还得依靠回执来确认,消费者获取消息后,需要向 RabbitMQ 发送 ack 回执,表明自己已经处理消息。其中 ack 在 AMQP 中有三种确认模式:
  • manual:手动 ack,需要在业务代码结束后,调用 api 发送 ack
  • auto:自动 ack,由 spring 监测 listener 代码是否出现异常,没有异常则返回 ack,反之返回 nack
  • none:关闭 ack,MQ 在消息投递后会立即删除消息
上述三种方式都是通过修改配置文件:
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

1)manual 该方式需要用户自己手动确认,灵活性较好
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

这个时候如果执行逻辑是正常的,那么在 RabbitMQ 上就会将该消息删除,但是如果执行的逻辑抛出了异常,没有进入到手动确认的环节,RabbitMQ 将会把该消息保留:
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

2)auto 该方式在没有异常发生时会自动进行消息确认
我们在配置文件中将确认方式改为 auto 进行测试:
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

正常情况下接收消息是没有任何问题的,那我们同样制造些非正常情况:
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

我们手动制造了点异常,发现消息没有被 RabbitMQ 删除的同时,而且控制台一直在报错,无止境的在尝试重新消费,这如果放在线上环境难免有些令人崩溃。
当消费者出现异常后,消息会不断 requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次 requeue,无限循环,就会导致 MQ 的消息处理飙升
而发生这种情况的原因所在便是因为 RabbitMQ的消息失败重试机制,但很多时候我们可能不想一直重试,只需要经过几次尝试,如果失败就放弃处理,这个时候我们就需要在配置文件中配置失败重试机制:
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

开启该配置后,我们重启项目进行观察
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

通过控制台可以看到在重试 3 次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试机制生效了。而且我们回到 RabbitMQ 控制台可以看到对应消息被删除了,说明最后 SpringAMQP 返回的是 ack,导致消息被 MQ 删除
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

但是这种处理方式并不优雅,重试后直接删除消息过于 暴力,那么有没有更好的处理方式?答案是有的!
我们可以利用 AMQP 提供的 MessageRecovery 接口来实现,该接口有三种不同的实现方式:
  • RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢失消息。默认方式,以上就是采用这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
三种方式可以根据不同场景进行采用,分析一下,不难发现第三种 RepublishMessageRecoverer 是比较优雅的~ 当重试失败后会将消息投递到一个指定专门存放异常消息的队列,后续由人工集中进行处理!具体使用方式如下:
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

通过自定义异常处理后,我们重启项目查看控制台:
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

可以发现重试3次后,我们的异常消息进入到了我们自定义的异常队列中
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

3)none 该方式没啥好讲的~ 无论消息异常与否 MQ 都会进行删除!
4、总结 假如这个时候面试再问你,如何确保 RabbitMQ消息的可靠性?那你可得好好唠嗑唠嗑
如何保证消息不丢失?
1)首先分析丢失的场景有哪些? 消息丢失可能发生在 发送时丢失(未送达 exchange / 未路由到 queue)消息未持久化而MQ宕机消费者接收消息未能正确消费
2)然后如何预防
  • 开启生产者确认机制,确保生产者的消息能到达队列
确认机制包括 publisher-confirmpublisher-return
当未送达到 交换机 我们可以通过 publisher-confirm 返回的 acknack 来确认
当 交换机 未成功路由到 队列,我们可以通过 publisher-return 自定义的回调函数来确认,每个 RabbitTemplate 只能配置一个 ReturnCallback
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
持久化功能分为 交换机持久化队列持久化消息持久化,我们都需要将 durable 设置为 true
  • 开启消费者确认机制最低为 auto 级别
消费者确认机制有三种类型:manual (手动确认)auto (自动确认)none (关闭 ack)
  • 失败重试机制
我们手动设置 MessageResoverer 为 RepublishMessageRecoverer 方式,将投递失败的消息转到异常队列中,交由人工处理
这一套组合拳回答下来,面试官还不得默默承认你有点东西?
当然这只是 RabbitMQ 的问题之一,我们下篇继续其他几个问题的解决方式~
不要空谈,不要贪懒,和小菜一起做个吹着牛X做架构的程序猿吧~点个关注做个伴,让小菜不再孤单。咱们下文见!
《RabbitMQ》|《RabbitMQ》 | 消息丢失也就这么回事
文章图片

今天的你多努力一点,明天的你就能少说一句求人的话!
我是小菜,一个和你一起变强的男人。
微信公众号已开启, 小菜良记,没关注的同学们记得关注哦!

    推荐阅读