加入星计划,您可以享受以下权益:

  • 创作内容快速变现
  • 行业影响力扩散
  • 作品版权保护
  • 300W+ 专业用户
  • 1.5W+ 优质创作者
  • 5000+ 长期合作伙伴
立即加入
  • 正文
    •  RocketMQ如何生产和消费消息 ● 
    • 浅析各类情况 ●
    •  总结 ●
  • 推荐器件
  • 相关推荐
  • 电子产业图谱
申请入驻 产业图谱

五分钟技术趣谈 | RocketMQ中各类重复消费的原理浅析

2023/12/31
4529
阅读需 10 分钟
加入交流群
扫码加入
获取工程师必备礼包
参与热点资讯讨论

作者:李佳斌,单位:中国移动智慧家庭运营中心

随着大数据云计算时代的到来,我国的各个产业每天都在产生不可估计的数据,以及对数据的各式各样的需求,消息中间件在处理数据、消费数据的过程中越来越受到重视。在高并发、微服务、分布式的场景下,如何合理地利用消息中间件,如何保证MQ消费消息的幂等性?所谓知其然,才能知其所以然,本文将通过RocketMQ作为例子,来扒一扒什么情况下会导致重复消费。

 RocketMQ如何生产和消费消息  

先简单介绍下RocketMQ正常生产消息和消费消息的流程,如下图。

1.生产者在发送消息之前根据负载均衡策略(默认是轮询)选择一个Queue,然后跟这个Queue所在的机器建立连接,把消息发送到这个Queue上。

2.消费者消费这个Queue,就能获取到对应的消息。

- 问题出现

当异常情况出现时,如消息发送超时或者消息消费超时,RocketMQ为保证消息发送成功,会启动重试机制,选择另一台机器的Queue重发。现在假设有这样一种情况,消费者实际正确接收到了消息,只是由于网络波动导致响应超时了,那就会出现消息重复发送,导致消费者重复消费的情况出现。

那除此之外,还有没有其他情况会导致消息重复消费的情况呢?总结起来一共有如下几种情况。

1)消息发送异常时的重复消费

2)消费消息时抛出了异常

3)消费者提交offset失败

4)Broker持久化offset失败

5)主从同步失败

6)重平衡

7)清理长时间消费的消息

浅析各类情况 

- 消费消息时抛出异常

问题分析一

RocketMQ在并发消费的模式下会调用MessageListenerConcurrently的consumeMessage方法,入参是msgs集合。当调用该方法消费消息出现异常时,返回的结果status就会是null。这种情况下会导致status被设置为RECONSUME_LATER,也就是说消息之后会被重复消费。

问题分析二

传入的是msgs集合。上述原因一中消息处理之后,不管成功失败,都会对结果进行处理。而集合中的任意一个失败,都会导致status被设置为RECONSUME_LATER。在对结果处理是,判断到RECONSUME_LATER时,就会对msgs重新遍历并发送消息,重新消费,从而导致之前成功处理的消息都会被重复消费。不过好在msgs消息的数量默认情况下是1。

- 消费者提交offset失败

何为offset

producer发送消息到broker,Rocketmq会将消息的内容持久化到commitLog文件中,再分发到topic下的消费队列consume Queue,消费者提交消费请求时,broker从该consumer负责的消费队列中根据请求参数传入的起始offset来获取需要消费的消息索引信息,再从commitLog中获取具体的消息内容返回给consumer。消费成功之后,消费者提交offset,来记录这个queue消费到哪个位置了。

问题分析

RocketMq设计的时候,消费完消息,并不是同步提交offset,而是将offset保存到内存中,通过一个定时任务(默认是5S一次),以网络请求的方式将offset提交给broker。如果最新的offset还没提交,此时服务器宕机了,那么重启之后,就会从broker中读取到之前的提交的offset,并从此处开始消费,此时就会出现重复消费的情况了。

- broker持久化offset失败

问题分析

与消费者提交offset同理,Broker为了防止数据丢失,会将offset持久化到磁盘中。同样的也是通过一个默认5S的定时任务来处理持久化操作。所以offset的完整过程就如下图。当broker宕机时,就会导致offset丢失,此时如果消费者重新拉取消费进度,就会比实际消费的进度要低,导致重复消费。

- 主从同步失败

问题分析

为保证RocketMQ服务的高可用,一般项目中都会启用主从备份的模式,当主节点挂掉之后,从节点就会升级为主节点对外提供服务。因此就需要进行主从同步,保证数据的一致性。默认情况下每隔10S,从节点会向主节点请求,同步元数据,包括消费进度。此时如果主节点宕机了,从节点就无法获取到10S之内的消费进度,自然也就会导致重复消费。

- 重平衡

何为重平衡

RocketMQ的消费者有两种模式,集群消费模式和广播消费模式,绝大多数场景采用的都是集群消费模式。前面提到的消费进度就是在集群消费模式下才会存在。集群消费模式中有一个消费组的概念。一个消费组可以有多个消费者,不同消费组之间消费消息互不干扰,而同一消费组的消费者按照一定的算法分配消息队列进行消息消费,保证一个消息只能被一个消费组消费一次。当消费组中的消费组增加或者减少时就会触发重平衡。如图,原先消费组中有两个消费者,平均消费4个队列,每个消费组2个队列;当加入了一个新的消费者时,为了保证新的消费者能够消费消息,就会进行重平衡,重新分配消息队列。

问题分析

假设在重平衡发生时,此时消费者2还在正常消费Queue4,当消费者3加入,重平衡完成时,此时消费者2判断到Queue4已经不属于自己消费了,就会将Queue4设置为dropped,消费完成时,发现队列是dropped状态,那么消费者2的消费进度offset就不会被提交。成功消费了消息,但是消费进度却没有被提交,于是当消费者3开始消费消息时,就会从服务端拉取到之前的消费进度,造成队列4的消息被重复消费。

- 清理长时间消费的消息

清理机制讲解

RocketMQ中有一个机制会定时清理长时间正在消费的消息,默认是15分钟执行一次清理任务。之所以这么做,是有原因的。我们说过,消息被消费之后,就会提交offset。当一个线程消费了所有消息时,就会把消息从集合中移除,提交的消息进度offset就是msg5的offset+1。

假设,现在是两个线程消费,线程2消费完成,之后提交offset,但是此时线程1还在处理前两条消息,因此为了保证消费消息的不丢失,移除之后发现集合中还有剩余消息,就会把msg1的offset 返回提交上去。而一旦集合最前面的消息长时间处理,就会导致这个消费进度一直在最前面。此时如果服务器重启,就会导致很多消费过的消息都会被重复消费。因此引入了清理长时间消费的机制。

问题分析

引入清理长时间消费的消息机制后,一旦发现某个消息已经处理超过15分钟了,就会将消息移除,保障后续消息消费进度的正常提交,之后会隔一定的时间再次消费这个被移除的消息。但是,这个消息虽然被移除了,却并不是没有消费过,因此再次消费就会导致重复消费的问题出现。

 总结 

RocketMq的官方文档中对消息传递有这样的解释:RocketMq确保所有消息至少被传递一次,在大多数情况下,消息不会重复。可见RocketMq为了保证消息的不丢失,牺牲了消息投递的重复率。因此我们在使用RokcetMq时需要合理使用它的特点,设计合理的幂等技术方案来解决重复消费的问题。

推荐器件

更多器件
器件型号 数量 器件厂商 器件描述 数据手册 ECAD模型 风险等级 参考价格 更多信息
NCV7344D10R2G 1 onsemi CAN FD Transceiver, High Speed, Low Power with NC, long filter time, 3000-REEL
$0.82 查看
TJA1050T/CM,118 1 NXP Semiconductors TJA1050 - High-speed CAN transceiver SOIC 8-Pin

ECAD模型

下载ECAD模型
$1.96 查看
LAN8742A-CZ 1 SMSC Ethernet Transceiver, 4 X 4 MM, 0.90 MM HEIGHT, HALOGEN FREE AND ROHS COMPLIANT, SQFN-24
$1.51 查看
中国移动

中国移动

中国移动有限公司(「本公司」,包括子公司合称为「本集团」)于1997年9月3日在香港成立,本集团在中国内地所有三十一个省、自治区、直辖市以及香港特别行政区提供通信和信息服务,业务主要涵盖个人、家庭、政企和新兴市场的语音、数据、宽带、专线、IDC、云计算、物联网等,是中国内地最大的通信和信息服务供应商,亦是全球网络和客户规模最大、盈利能力领先、市值排名位居前列的世界级通信和信息运营商。

中国移动有限公司(「本公司」,包括子公司合称为「本集团」)于1997年9月3日在香港成立,本集团在中国内地所有三十一个省、自治区、直辖市以及香港特别行政区提供通信和信息服务,业务主要涵盖个人、家庭、政企和新兴市场的语音、数据、宽带、专线、IDC、云计算、物联网等,是中国内地最大的通信和信息服务供应商,亦是全球网络和客户规模最大、盈利能力领先、市值排名位居前列的世界级通信和信息运营商。收起

查看更多

相关推荐

电子产业图谱

移动Labs是中国移动的社交化新媒体平台,是面向外部行业及产业链合作伙伴的信息发布、业务发展和产业推进门户。