架构师系列-消息中间件(11)- RocketMQ 进阶(5)-深入分析(2)

3. 文件刷盘机制

RocketMQ 的消息是存储在磁盘上的,这样做有两个优点:

  • 保证断电后恢复

  • 让存储的消息量超出内存的限制

RocketMQ 存储与读写是基于 JDK NIO 的内存映射机制,具体使用 MappedByteBuffer(基于 MappedByteBuffer 操作大文件的方式,其读写性能极高)RocketMQ 的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息 超出内存的限制 RocketMQ 为了提高性能,会尽可能地保证 磁盘的顺序写 消息在通过 Producer 写人 RocketMQ 的时候,有两种写磁盘方式:

 

3.1 同步刷盘方式

如上图所示,只有在消息真正持久化至磁盘后,RocketMQ的Broker端才会真正地返回给Producer端一个成功的ACK响应,同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用领域。

3.2 异步刷盘

能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。

消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。异步和同步刷盘的区别在于,异步刷盘时,主线程并不会阻塞,在将刷盘线程wakeup后,就会继续执行。

3.3 刷盘方式对比
同步刷盘异步刷盘
消息情况在返回写成功状态时,消息已经被写入磁盘中,即消息被写入内存的PAGECACHE 中后,立刻通知刷新线程刷盘,等待刷盘完成,才会唤醒等待的线程并返回成功状态在返回写成功状态时,消息可能只是被写入内存的 PAGECACHE 中,当内存的消息量积累到一定程度时,触发写操作快速写入
性能需要等待刷盘才能返回结果消息写入内存后立刻返回结果,吞吐量更高
可靠性可以保持MQ的消息状态和生产者/消费者的消息状态一致Master宕机,磁盘损坏的情况下,会丢失少量的消息, 导致MQ的消息状态和生产者/消费者的消息状态不一致

4. 过期文件删除

4.1 为什么删除过期文件

由于RocketMQ操作CommitLog、ConsumeQueue文件是基于文件内存映射机制,并且在启动的时候会将所有的文件加载,为了避免内存与磁盘的浪费、能够让磁盘能够循环利用、避免因为磁盘不足导致消息无法写入等引入了文件过期删除机制。

4.2 删除文件的思路

RocketMQ顺序写CommitLog文件、ComsumeQueue文件,所有的写操作都会落到最后一个文件上,因此在当前写文件之前的文件将不会有数据插入,也就不会有任何变动,因此可通过时间来做判断,比如超过72小时未更新的文件将会被删除

注意:RocketMQ删除过期文件时不会关注该文件的内容是否全部被消费

4.3 如何防止重复投递

客户端通过broker的消费进度确定自己需要拉取那些消息

消息的存储是一直存在于CommitLog中的,而由于CommitLog是以文件为单位(而非消息)存在的,CommitLog的设计是只允许顺序写的,且每个消息大小不定长,所以这决定了消息文件几乎不可能按照消息为单位删除(否则性能会极具下降,逻辑也非常复杂),所以消息被消费了,消息所占据的物理空间并不会立刻被回收

但消息既然一直没有删除,那RocketMQ怎么知道应该投递过的消息就不再投递?

答案是客户端自身维护——客户端拉取完消息之后,在响应体中,broker会返回下一次应该拉取的位置,PushConsumer通过这一个位置,更新自己下一次的pull请求,这样就保证了正常情况下,消息只会被投递一次。

4.4 过期文件删除流程

删除过期文件的整体流程如下:

  • 开启定时任务每10s扫描是否有文件需要删除
  • 有三种情况会进入删除文件操作:到了deleteWhere指定的时间点(默认是凌晨4点)、磁盘不足、手动触发
  • 对于磁盘不足的情况,当磁盘使用率大于磁盘空间警戒线水位(默认是90%),会阻止消息写入,当超过85%时会强制删除文件(需要设置允许强制删除参数,否者不生效),其他两种情况都只能删除过期的文件(文件最后更新时间+文件最大的存活时间 < 当前时间)
  • 当被删除的文件存在引用时,会有一个文件删除缓存时间,在这段时间内,该文件不会被删除,主要是留给引用该文件程序一些时间,当超过了文件删除缓存时间后,每次都会将该文件的引用减少1000,直到减少小于等于0后才释放该文件引用的相关资源,然后将该文件放入一个“文件删除集合”中
  • 一次连续删除文件中间会存在一定的间隔,不会连续释放文件相关的资源
  • 一次连续删除的文件总和不大于10
  • 将“文件删除集合”中的文件从硬盘上删除

5. 高可用

5.1 NameServer 高可用

由于 NameServer 节点是无状态的,且各个节点直接的数据是一致的,故存在多个 NameServer 节点的情况下,部分 NameServer 不可用也可以保证 MQ 服务正常运行

5.1.1 BrokerServer 高可用

RocketMQ是通过 Master 和 Slave 的配合达到 BrokerServer 模块的高可用性的,一个 Master 可以配置多个 Slave,同时也支持配置多个 Master-Slave 组。

当其中一个 Master 出现问题时:

  • 由于Slave只负责读,当 Master 不可用,它对应的 Slave 仍能保证消息被正常消费
  • 由于配置多组 Master-Slave 组,其他的 Master-Slave 组也会保证消息的正常发送和消费
5.2 消息消费高可用

Consumer 的高可用是依赖于 Master-Slave 配置的,由于 Master 能够支持读写消息,Slave 支持读消息,当 Master 不可用或繁忙时, Consumer 会被自动切换到从 Slave 读取(自动切换,无需配置)。故当 Master 的机器故障后,消息仍可从 Slave 中被消费

5.3 消息发送高可用

在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同 brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息,RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足, 需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文 件,用新的配置文件启动Broker。

 

5.4 消息主从复制
5.4.1 同步复制和异步复制

若一个 Broker 组有一个 Master 和 Slave,消息需要从 Master 复制到 Slave 上,有同步复制和异步复制两种方式

同步复制异步复制
概念即等 Master 和 Slave 均写成功后才反馈给客户端写成功状态只要 Master 写成功,就反馈客户端写成功状态
可靠性可靠性高,若 Master 出现故障,Slave 上有全部的备份数据,容易恢复若 Master 出现故障,可能存在一些数据还没来得及写入 Slave,可能会丢失
效率由于是同步复制,会增加数据写入延迟,降低系统吞吐量由于只要写入 Master 即可,故数据写入延迟较低,吞吐量较高
5.4.2 配置方式

可以对 broker 配置文件里的 brokerRole 参数进行设置,提供的值有:

  • ASYNC_MASTER:异步复制
  • SYNC_MASTER:同步复制
  • SLAVE:表明当前是从节点,无需配置 brokerRole
5.4.3 实际应用

在实际应用中,由于同步刷盘方式会频繁触发磁盘写操作,明显降低性能,故通常配置为:

  • 刷盘方式:ASYNC_FLUSH(异步刷盘)
  • 主从复制:SYNC_MASTER(同步复制)

异步刷盘能够避免频繁触发磁盘写操作,除非服务器宕机,否则不会造成消息丢失。

主从同步复制能够保证消息不丢失,即使 Master 节点异常,也能保证 Slave 节点存储所有消息并被正常消费掉。

6. 业务上保障

6.1 为什么从业务上保证
6.1.1 消息丢失问题

RocketMQ虽然号称消息不会丢失,但是还是有几率存在MQ宕机以及rocketMQ使用上的问题可能存在消息丢失等,对于类似于支付确认的消息一般来说是一条都不允许丢失的

6.1.2 消息幂等性

在网络环境中,由于网络不稳定等因素,消息队列的消息有可能出现重复,大概有以下几种:

6.1.2.1 发送时消息重复

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败, 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

6.1.2.2 投递时消息重复

消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断,为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

6.1.2.3 负载均衡时消息重复

包括但不限于网络抖动、Broker 重启以及订阅方应用重启

当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息

结合三种情况,可以发现消息重发的最后结果都是消费者接收到了重复消息,那么,我们只需要在消费者端统一进行幂等处理就能够实现消息幂等。

6.2 整体架构

上面我们是从MQ本身来保证消息的的可靠性,下面我们从业务上来分析如何保障MQ的可靠性,Mq 的消息成功投递和消费是比较难的,这里提供一个思路,如何保证消息成功投递并且消息是幂等的。

 

6.2.1 表结构设计

这里面涉及到两张表

6.2.1.1 消息存根

消息发送到 mq 之前先把消息存储到 mongodb 或者 mysql 中,消息有一个存根是为了避免消息在 mq 中丢失后就找不到发送的消息,主要需要保存几个关键信息:

  • messageId: 消息流水号,用来在消息的生产端和消费端串联使用,根据这个id找到消费端和生产端唯一的消息
  • messageContent:消息内容
  • count:发送到 broker 的次数,如果超过特定次数就不往 broker 发
  • status:消息状态,已确认、未确认、已作废,如果消费端已经确认就需要修改该状态
  • offset:偏移量,发送的时候记录消息的偏移量。
  • resendTime:重发时间,需要加上消息可能在队列的堆积时间,否则可能造成消息还未被消费到就被重发了
  • createDatre:记录消息的发送时间
6.2.1.2 消息日志

为了保障消息能够成功发送到MQ,需要在拉取到消息后的第一时间将消息ID保存到消息日志表,用来让发送端知道消息消息是否成功发送到了MQ,需要包含一下字段:

  • messageId: 消息流水号,为了和消息存根来保持一致
  • offset:用来记录当前接收到的消息的偏移量,重发时需要通过统计最大偏移量来确定消费者是否堆积来确定是否重发
  • createDate:用来记录当前消息接收到的时间
6.3 幂等性校验
6.3.1 Redis幂等性校验

首先进行Redis的setNX进行幂等性校验,有以下情况

  • 成功:代表redis幂等性校验通过,为了防止Redis数据误删或者过期,需要进行数据库的幂等性校验,检查消息存根是否是未处理状态,如果是未处理状态则进行业务,则说明消息是未处理状态,否则幂等性校验失败
  • 失败:代表redis幂等性校验未通过,则不通过数据库直接进行校验失败处理
6.3.2 DB幂等性校验

如果Redis校验没有通过,还需要DB进行幂等性校验,有以下情况

  • 成功:说明消息确定时未处理的,需要进行处理
  • 失败:说明Redis缓存已经过期或者误删,这里做兜底处理
6.3.3 校验失败处理

幂等性校验失败则说明消息是重复,存在一下两种情况

6.3.3.1 Redis校验失败

如果直接Redis的setNX校验失败,说明是重复消息,但是这个时候消息是不知道消息是否处理完成,有以下两种情况:

  • 消息处理完成:需要删除消息日志以及返回成功消费的标志
  • 消息未处理完成:这个时候说明已经有一个线程在处理消息了,直接结束并返回成功消费的标志。

正确处理逻辑如下

这个时候的处理业务应该是Redis校验失败后,但是并不能确定消息是否真的已经处理完成还是处理中的消息,需要先检查消息存根状态:

  • 如果消息处理成功需要删除消息日志,返回成功标志。
  • 如果没有处理成功,则说明另一个线程正在处理,直接返回成功标志。
6.3.3.2 DB校验失败

这个时候说明消息已经处理完成了,redis并且已经通过setNX已经设置标志了,需要删除消息日志,并且返回成功标志。

6.3.4 校验成功

幂等性校验成功后就需要处理业务操作了

6.4 业务处理

业务操作分为两种情况:

6.4.1 操作成功

如果操作成功,需要修改消息存根状态,并且删除消息日志,然后返回成功标志

6.4.2 操作失败(异常)

如果操作失败,需要消费端重试,这个时候删除redis的setNX的值,并且返回重试指令,让消费端进行重试,但是重试可能一直不成功,RocketMQ的消费端重试机制,达到上限后会投递到死信队列,后期需要人工处理。

6.5 定时重发消息
6.5.1 重发消息筛选

需要符合一下条件的数据才会被筛选出来

  • 消息存根中未确认的消息
  • 消息存根中发送次数小于最大发送次数的消息
  • 消息存根中下次发送时间大于当前时间的消息
  • 消息存根中的偏移量小于消息日志中的最大偏移量的消息
  • 消息存根中的消息ID在消息日志中不存在的消息
6.5.2 重发消息

符合以上条件的消息需要进行重发,调用MQProducer客户端进行重发消息,重发完成后还需要做一下事情

  • 修改消息的重发次数,当前次数+1
  • 修改消息的下次重发时间,通过规则计算需要间隔多长时间才需要重发
  • 修改消息的偏移量,设置为当前发送消息的的偏移量
6.5.3 人工处理

如果消息的发送次数达到最大的发送次数,将无法进行重发,需要人工进行处理,可以通过发邮件以及其他的方式通知开发人员进行后续的人工处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/575368.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

通配符HTTPS安全证书

众多类型的SSL证书&#xff0c;要说适用或者说省钱肯定是通配符了&#xff0c;因为谁都想一本SSL证书包括了整条域名&#xff0c;而且也不用一条一条单独管理。 通配符HTTPS安全证书&#xff0c;其实就是通配符SSL证书&#xff0c;SSL证书主流CA的参数都一样&#xff0c;通配符…

python中如何用matplotlib写雷达图

#代码 import numpy as np # import matplotlib as plt # from matplotlib import pyplot as plt import matplotlib.pyplot as pltplt.rcParams[font.sans-serif].insert(0, SimHei) plt.rcParams[axes.unicode_minus] Falselabels np.array([速度, 力量, 经验, 防守, 发球…

迅雷不限速破解方法

背景&#xff1a;现在迅雷和百度云的下载速度真的太恶心了&#xff0c;所以总有大佬可以采用厉害的方法进行破解&#xff0c;在网上看了一圈&#xff0c;很多都是骗人或者是无效的&#xff0c;找了一个靠谱的方法&#xff0c;亲测速度能达到10M以上&#xff0c;非常给力。 以下…

数据库工程师的工作职责(合集)

数据库工程师的工作职责1 职责&#xff1a; 1. 日常数据库的基本安装&#xff0c;维护&#xff0c;升级&#xff0c;监控的; 2. 配合研发部门进行数据库设计支持&#xff0c;协助开发、设计和进行SQL语言优化; 3. 配合相关部门数据库相关的任务&#xff0c;比如数据导入导出&am…

怎么使用下载视频号视频?详细视频下载使用教程

越来越多的人开始使用视频号等平台来分享和观看视频内容。然而&#xff0c;有时候我们可能会遇到需要将视频保存到本地设备以便离线观看或进一步编辑的情况。 本文将为您详细介绍如何使用视频下载plus&#xff0c;来下载视频号的视频内容。 一、了解视频号下载功能 首先&…

SublimeText - 汉化插件安装教程

第一步&#xff1a;快捷键CTRLShiftp&#xff0c;&#xff08;如果是Mac&#xff0c;则是command Shiftp&#xff09; 弹出查找栏—找到 install Package&#xff0c;并点击选择。 如下图&#xff1a; 第二步&#xff1a;再次弹出的框中&#xff0c;选择 ChineseLocalizations…

飞鹤与满趣健达成战略合作 加速深化国际化布局

继获得加拿大地区首张婴幼儿配方奶粉生产执照后&#xff0c;中国飞鹤的海外征途再添新动作。4月25日&#xff0c;中国飞鹤加拿大皇家妙克与美国婴童用品巨头满趣健&#xff08;Munchkin&#xff09;在北京正式达成战略合作。此次合作彰显了中国乳企的硬核实力&#xff0c;也是飞…

前后缀分离,CF1209 C. Maximal Intersection

目录 一、题目 1、题目描述 2、输入输出 2.1输入 2.2输出 3、原题链接 二、解题报告 1、思路分析 2、复杂度 3、代码详解 一、题目 1、题目描述 2、输入输出 2.1输入 2.2输出 3、原题链接 Problem - 1029C - Codeforces 二、解题报告 1、思路分析 线段相交具有可…

亚马逊风控有哪些?如何在账号风控种避免封号?

如今商业竞争愈发激烈的时代&#xff0c;数据的准确性和可靠性已经成为商家和消费者共同追求的目标。为了达到这一目标&#xff0c;亚马逊采取了一系列风险管控措施&#xff0c;旨在杜绝恶意行为、虚假交易等违规情况&#xff0c;从而确保交易在平台上的安全与诚信。许多亚马逊…

汇隆晶片授权世强硬创,代理产品工作温度范围覆盖工业/车规/航天级

凭借独特的线上线下技术分销以及团队优异的推新能力&#xff0c;世强先进&#xff08;深圳&#xff09;科技股份有限公司&#xff08;下称“世强先进”&#xff09;获得浙江汇隆晶片技术有限公司&#xff08;下称“汇隆晶片”&#xff0c;英文名&#xff1a;HLC&#xff09;授权…

【JAVA】一文掌握Java并发编程

Java 开发中&#xff0c;并发编程属于相当重要的一个知识点&#xff0c;可以说&#xff0c;Java 的并发能力&#xff0c;是成就今日 Java 地位的因素之一。Java 的并发编程由浅入深实质上是包含 Java&#xff08;API&#xff09;层、JVM&#xff08;虚拟机&#xff09;层、内核…

网络攻击近在咫尺:数据加密与SSL成为信息安全之盾

随着互联网的日益普及和科技的迅猛发展&#xff0c;网络攻击已经成为信息安全领域面临的一大难题。近期&#xff0c;一场网络安全实验让我们对网络攻击有了更为深刻的认识。在实验中&#xff0c;网络安全工程师通过模拟攻击&#xff0c;展示了木马植入、文件浏览、键盘监听、病…

激活IDM下载器并配置百度网盘

前言&#xff1a; 最近想下载一些软件&#xff0c;奈何不充钱的百度网盘的速度实在太慢了&#xff0c;不到一个G的文件夹奈何下了一晚上&#xff0c;只能重新找一下idm的下载了。 但是idm的正版是需要收费的&#xff0c;所以有白嫖党的破解版就横空出世了。 正文&#xff1a…

JavaEE——Spring Boot + jwt

目录 什么是Spring Boot jwt&#xff1f; 如何实现Spring Boot jwt&#xff1a; 1. 添加依赖 2、创建JWT工具类 3. 定义认证逻辑 4. 添加过滤器 5、 http请求测试 什么是Spring Boot jwt&#xff1f; Spring Boot和JWT&#xff08;JSON Web Token&#xff09;是一对常…

HarmonyOS hsp制作与引用

1. HarmonyOS hsp制作与引用 1.1 介绍 HSP动态共享包&#xff08;模块&#xff09;,应用内HSP指的是专门为某一应用开发的HSP&#xff0c;只能被该应用内部其他HAP/HSP使用&#xff0c;用于应用内部代码、资源的共享。应用内HSP跟随其宿主应用的APP包一起发布&#xff0c;与该…

阶跃星辰:探索智能科技的星辰大海

引言 在当今快速发展的科技时代&#xff0c;人工智能已经成为推动社会进步的重要力量。阶跃星辰&#xff0c;正是在这一背景下诞生的。 阶跃星辰是一家专注于通用人工智能探索的公司&#xff0c;成立于2023年4月。该公司的创始团队由一群对人工智能充满热情和渴望的人组成&am…

【Python】异常、模块与包

目录 捕获异常 异常的传递 Python中的模块 模块的导入方式 as定义别名 自定义模块 Python包 第三方包 综合案例 当我们的程序遇到了BUG, 那么接下来有两种情况: ① 整个程序因为一个BUG停止运行 ② 对BUG进行提醒, 整个程序继续运行 但是在真实工作中, 我们肯定不能…

快解析搭建网站解决方案

在如今网络时代下&#xff0c;各行各业都需要有自己的门户网站。 企业搭建自己的门户网站&#xff0c;有着众多实际意义: 1.可以全面详细地介绍企业及企业产品&#xff0c;这是企业网站的一个最基本的功能。企业可以把任何想让大众知道的信息放到网站&#xff0c;当人们想知道…

http忽略ssl认证

我们在发请求时&#xff0c;会遇到需要ssl证书验证的报错&#xff0c;针对该错误以及所使用的不同的创建连接的方式&#xff0c;进行ssl证书忽略 忽略SSL证书的流程 简介&#xff1a;需要告诉client使用一个不同的TrustManager。TrustManager是一个检查给定的证书是否有效的类…

pytest参数化数据驱动(数据库/execl/yaml)

常见的数据驱动 数据结构&#xff1a; 列表、字典、json串 文件&#xff1a; txt、csv、excel 数据库&#xff1a; 数据库链接 数据库提取 参数化&#xff1a; pytest.mark.parametrize() pytest.fixture()…