RocketMQ消息为什么会被重复消费?

在使用rocketmq时,消息可能会被重复消费。让我们从全局视角探讨消息发送和消费的过程。rocketmq-dashboard是一个非常实用的图形化界面工具

RocketMQ消息为什么会被重复消费?

首先,我们在RocketMQ-Dashboard上创建一个topic,每个topic下有4个队列。

每个topic是一类消息的集合,topic下再细分queue是为了提高消息消费的并发度。

RocketMQ消息为什么会被重复消费?

「当producer发送topic消息时,应该发送到topic下的哪个queue呢?」

producer会采用轮询策略来发送消息。

「那么consumer应该消费哪个queue下的消息呢?」

当只有一个消费者时,它会消费所有queue中的消息。

RocketMQ消息为什么会被重复消费?

「如果有多个消费者呢?」

只需根据各种负载均衡策略将队列分配给消费者即可,如下图展示了两种负载均衡方式。

RocketMQ消息为什么会被重复消费?RocketMQ消息为什么会被重复消费?RocketMQ消息为什么会被重复消费?

你问我这两种负载策略是如何实现的?去查看源码吧,我就不详细分析了。

「如果消费者数量超过队列的数量会发生什么?」

多余的消费者将不会消费任何队列。

RocketMQ消息为什么会被重复消费?

为什么一个consumer只能消费一个queue呢?」

多个消费者消费一个queue肯定会有并发问题,所以需要加锁,这样还不如将topic下的队列数量设置得更多一些。

「我在运行过程中可以设置topic下queue的数量吗?」

当然可以。不仅可以重新设置queue的数量,还可以实时增减consumer,以应对不同流量的场景。

「那这样说当queue或者consumer的数量发生变化的时候,需要重新执行负载均衡吧?」

是的,大家一般把这个过程称为重平衡。

下面我们来分享一下详细的细节。

消息发送流程主要有三种方式:单向发送(只发送,不管结果)、同步发送和异步发送。

RocketMQ消息为什么会被重复消费?

消息消费流程基于推还是拉?消息消费的模式有两种方式:

拉取:Consumer不断从Broker拉取。推送:Broker向Consumer推送。

这两种方式都有各自的缺点:

拉取:拉取的间隔不好确定,间隔太短没消息时会造成带宽浪费,间隔太长又会造成消息不能及时被消费。推送:「推送和速率难以适配消费速率」,推的太快,消费者消费不过来怎么办?推的太慢消息不能及时被消费。

「看起来拉取和推送难以抉择。」

降重鸟 降重鸟

要想效果好,就用降重鸟。AI改写智能降低AIGC率和重复率。

降重鸟 113 查看详情 降重鸟

然后就有大佬把拉取模式改了一下,即不会造成带宽浪费,也能基于消费的速率来决定拉取的频率!

「你猜怎么改的?」

其实很简单,Consumer发送拉取请求到Broker端,如果Broker有数据则返回,Consumer端再次拉取。如果Broker端没有数据,不立即返回,而是等待一段时间(例如5s)。

如果在等待的这段时间,有要拉取的消息,则将消息返回,Consumer端再次拉取。如果等待超时,也会直接返回,不会将这个请求一直hold住,Consumer端再次拉取。「对了,这种策略就叫做长轮询。」

「RocketMQ中有拉和推两种消费方式,但是推是基于长轮询做的。」

具体消费流程如下图所示。

RocketMQ消息为什么会被重复消费?

「拉取到消息后是怎么处理的呢?」

PullRequest类的成员变量如下图所示。

RocketMQ消息为什么会被重复消费?

当拉取到消息后,消息会被放入msgTreeMap,其中key为消息的offset,value为消息实体。

「另外还有一个重要的属性dropped,和重平衡相关,重平衡的时候会造成消息的重复消费,具体机制不分析了,看专栏吧。」

msgCount(未消费消息总数)和msgSize(未消费消息大小)是和流控相关的。

「什么是流控呢?」

就是流量控制,当消费者消费的比较慢时,减缓拉取的速度。如下图所示。

RocketMQ消息为什么会被重复消费?

当从阻塞队列中获取PullRequest时,并不会直接发起网络请求,而是先看看是否触发流控的规则,比如未消费的消息总数超过一定值,未消费的消息大小超过一定值等。

RocketMQ消息为什么会被重复消费?

接着就是收到响应,处理消息,并将PullRequest再次放入阻塞队列。

「是不是落了一个步骤?就是Consumer告诉Broker这部分消息我消费了?」

嗯嗯,你是不是以为提交offset的过程是同步的?其实并不是,「是异步的。」

Consumer怎么提交offset?

RocketMQ消息为什么会被重复消费?

当consumer消费完消息只是将offset存在本地,通过定时任务将offset提交到broker,另外broker收到提交offset的请求后,也仅仅是将offset存在map中,通过定时任务持久化到文件中。

「这样就会造成消息的重复消费。」

Consumer消费完消息并不是实时同步到Broker的,而是将offset先保存在本地map中,通过定时任务持久化上去。这就导致消息被消费了,但是此时消费者宕机了导致offset没提交,下次没提交offset的这部分消息会被再次消费。

即使offset被提交到了Broker,在还没来得及持久化的时候Broker宕机了,当重启的时候Broker会读取consumerOffset.json中保存的offset信息,这就会导致没持久化offset的这部分消息会被再次消费。

以上就是RocketMQ消息为什么会被重复消费?的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/565557.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年11月10日 03:12:30
下一篇 2025年11月10日 03:13:34

相关推荐

  • Go语言并发UDP通信中的竞态条件与深度复制解决方案

    本文深入探讨了在go语言中实现并发udp读写时可能遇到的竞态条件问题,特别是由于`net.udpaddr`结构体及其内部`ip`字段的共享复用导致的潜在风险。文章分析了竞态检测器报告的详细信息,并提出了一种通过深度复制`net.udpaddr`来有效解决数据竞态的专业方案,同时提供了示例代码和实践建…

    2025年12月16日
    000
  • Go语言中实现HTTP Basic Auth的规范方法

    本文详细介绍了在go语言中实现http basic auth的规范方法。通过构建一个可复用的中间件函数,您可以轻松地为特定的http路由添加硬编码的用户名和密码保护。文章涵盖了认证逻辑、安全比较技巧以及如何将此中间件应用于您的http处理器,同时提供了示例代码和重要的安全注意事项,确保认证过程的健壮…

    2025年12月16日
    000
  • Go CLI程序构建与部署指南

    本教程详细阐述了如何将go语言编写的命令行界面(cli)程序从开发阶段的`go run`命令转换为可直接执行的独立二进制文件。文章将深入讲解`go build`和`go install`这两个核心命令的使用方法、它们之间的区别,以及如何将编译后的程序部署到系统路径中,使其能像原生命令一样被调用,从而…

    2025年12月16日
    000
  • Golang如何实现测试报告生成_Golang测试报告生成实践详解

    使用go test生成测试与覆盖率报告,结合gotestsum输出XML供CI解析,通过脚本聚合多维度数据形成可视化总览,实现从本地到持续集成的闭环质量监控。 在Go语言开发中,测试是保障代码质量的重要环节。但仅有测试用例还不够,生成清晰、可读的测试报告能帮助团队快速定位问题、评估覆盖率和提升交付信…

    2025年12月16日
    000
  • 如何在Golang中测试接口实现_Golang 接口实现测试实践

    在 Go 语言中,接口是实现多态和解耦的重要机制。确保某个类型正确实现了特定接口,对于维护代码的稳定性至关重要。虽然 Go 编译器会在类型实际使用时检查接口实现,但在某些场景下——比如重构或公共库开发——我们希望提前确认实现关系是否成立。这就需要通过测试来验证接口实现。 使用空结构体断言在编译期验证…

    2025年12月16日
    000
  • Go语言点对点网络实现指南:入门与并发特性应用

    本文旨在为使用go语言实现点对点(p2p)网络提供入门指导。我们将探讨构建p2p网络的最佳起点,并重点介绍go语言中如何利用其并发特性,特别是通道(channels),来简化异步网络事件的处理,从而提升开发效率和系统稳定性。文章还将推荐权威学习资源,帮助开发者高效开启go语言p2p网络编程之旅。 1…

    2025年12月16日
    000
  • Golang如何实现结构体字段动态赋值_Golang 结构体字段动态赋值实践

    Golang通过reflect包实现结构体字段动态赋值,核心在于使用reflect.ValueOf获取值的反射表示,并通过Elem()、FieldByName()和Set()等方法操作可导出字段,需传入结构体指针以确保可设置性。示例中定义了SetField函数,对User结构体的Name和Age字段…

    2025年12月16日
    000
  • 如何用Golang实现单例模式保证唯一实例_Golang 单例模式实例保证实践

    使用 sync.Once 可实现线程安全的懒加载单例,保证全局唯一实例;通过 init 函数可实现饿汉式单例,启动即初始化;推荐 sync.Once 方式,兼顾并发安全与延迟加载。 在 Golang 中实现单例模式,关键在于确保一个类(结构体)在整个程序生命周期中只创建一个实例,并提供一个全局访问点…

    2025年12月16日 好文分享
    000
  • Go语言中UDP连接的并发读写:解决数据竞争问题

    本文深入探讨了go语言中并发读写udp连接时可能遇到的数据竞争问题,特别是net.udpaddr结构体在多goroutine间共享导致的竞态。通过分析go的竞态检测器报告,文章阐明了问题根源,并提出了一种健壮的解决方案:对udpaddr进行深度拷贝。文章提供了详细的go语言示例代码,展示了如何构建一…

    2025年12月16日
    000
  • Go CLI 程序结构、构建与运行:从 go run 到独立可执行文件

    本文详细介绍了go语言命令行界面(cli)程序的标准结构、包管理,以及如何将其构建为独立可执行文件。通过对比`go run`、`go build`和`go install`命令,指导开发者将go程序部署为系统命令,提升用户体验和便捷性,实现类似`myprogram`直接运行的效果。 在Go语言中开发…

    2025年12月16日
    000
  • 使用Go语言构建点对点(P2P)网络:入门与核心特性

    go语言凭借其强大的并发原语,特别是`channels`,在实现点对点(p2p)网络时展现出独特优势,能够显著简化异步网络事件的处理。本文旨在为希望使用go构建p2p网络的开发者提供一个清晰的入门指南,重点介绍go语言在此领域的关键特性及其应用,并推荐权威的学习资源,帮助读者高效地开启p2p网络编程…

    2025年12月16日
    000
  • Golang如何应用单例模式管理配置_Golang 单例模式配置管理实践

    使用单例模式管理配置可避免重复加载、节省内存并保证一致性。通过 sync.Once 实现初始化一次的线程安全单例,确保多个 goroutine 获取同一实例;若需热更新,可结合 sync.RWMutex 支持动态重载,兼顾安全与灵活性,是 Go 项目中推荐的配置管理方式。 在 Go 语言项目中,配置…

    2025年12月16日
    000
  • 如何在WSL中运行Golang开发环境_WSL Golang跨系统开发配置详解

    首先安装WSL并选择Ubuntu发行版,以管理员身份运行wsl –install -d Ubuntu-22.04完成安装;启动后更新系统包,通过wget下载Go官方包并解压至/usr/local;配置PATH、GOPATH环境变量至~/.bashrc并执行source ~/.bashrc…

    2025年12月16日
    000
  • Golang如何在Docker中搭建开发环境_Golang Docker开发环境实践

    使用Docker进行Go开发可确保环境一致性、提升CI/CD效率。通过多阶段构建优化镜像体积,结合docker-compose与air实现热重载,支持快速迭代,适用于现代Golang项目开发。 在现代Go语言开发中,使用Docker搭建开发环境能有效保证团队协作的一致性,避免“在我机器上能跑”的问题…

    2025年12月16日
    000
  • Golang如何搭建本地Web开发环境_Golang Web开发环境配置实践

    首先安装Go并配置环境,设置模块模式与代理;接着创建项目并初始化模块,编写基于net/http的简单Web服务;然后引入Gin框架提升开发效率,通过go get添加依赖并改写服务逻辑;最后配置air实现热重载,完成从环境搭建到高效开发的全流程。 搭建Golang本地Web开发环境其实并不复杂,只要理…

    2025年12月16日
    000
  • Golang如何进行HTTP负载测试_Golang HTTP负载测试实践

    答案:使用Go语言进行HTTP负载测试可评估服务性能。通过标准库实现并发请求,统计吞吐量、响应时间与错误率;也可用go-wrk等工具高效压测,支持梯度加压与结果分析,注意资源限制与服务端监控。 进行HTTP负载测试的目的是评估服务在高并发场景下的性能表现,比如吞吐量、响应时间、错误率等。Golang…

    2025年12月16日
    000
  • Go语言常量声明:避免函数调用与复杂表达式初始化

    本文深入探讨了go语言中常量(`const`)声明的限制,特别是不能直接使用函数调用,尤其是涉及多返回值或潜在错误的函数。文章解释了`const`值必须在编译时确定,并对比了`const`与`var`在包级别声明时的差异。针对需要将函数结果(如url解析)作为“常量”使用的场景,文章提供了两种解决方…

    2025年12月16日
    000
  • 如何在Golang中通过反射获取函数参数类型_Golang 函数参数类型获取实践

    答案:通过reflect包可动态获取函数参数类型,示例代码展示了如何用reflect.TypeOf、NumIn、In等方法解析函数签名,包括处理可变参数和接收器作为首参的情况,适用于RPC、ORM等需运行时类型信息的场景。 要在Golang中动态获取一个函数的参数类型,我们主要依赖标准库中的refl…

    2025年12月16日
    000
  • 如何用Golang配置多版本Go切换工具_Golang 多版本Go切换实践

    推荐使用轻量级开源工具g管理多Go版本,通过g install安装指定版本如1.18.10和1.21.5,g use切换版本,结合.go-version文件与shell钩子实现项目级自动切换,简化版本控制。 在实际开发中,不同项目可能依赖不同版本的Go语言环境,比如一个老项目使用Go 1.18,而新…

    2025年12月16日
    000
  • 如何在Golang中使用包内函数_Golang 包函数使用实践

    在Golang中调用包内函数需遵循导出规则和模块路径,1. 只有首字母大写的函数才能被外部访问;2. 通过import导入自定义包如”yourproject/utils”;3. 使用别名可简化长包名引用;4. 匿名导入_用于触发初始化副作用。 在 Golang 中使用包内函数…

    2025年12月16日
    000

发表回复

登录后才能评论
关注微信