阿里二面:RocketMQ 消费者拉取一批消息,其中部分消费失败了,偏移量怎样更新?

大家好,我是君哥。

最近有读者参加面试时被问了一个问题,如果消费者拉取了一批消息,比如 100 条,第 100 条消息消费成功了,但是第 50 条消费失败,偏移量会怎样更新?就着这个问题,今天来聊一下,如果一批消息有消费失败的情况时,偏移量怎么保存。

1 拉取消息

1.1 封装拉取请求

以 RocketMQ 推模式为例,RocketMQ 消费者启动代码如下:

public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context){ try{System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); }catch (Exception e){return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} }); consumer.start();}

上面的 DefaultMQPushConsumer 是一个推模式的消费者,启动方法是 start。消费者启动后会触发重平衡线程(RebalanceService),这个线程的任务是在死循环中不停地进行重平衡,最终封装拉取消息的请求到 pullRequestQueue。这个过程涉及到的 UML 类图如下:

☞☞☞AI 智能聊天, 问答助手, AI 智能搜索, 免费无限量使用 DeepSeek R1 模型☜☜☜

图片

Typewise.app Typewise.app

面向客户服务和销售团队的AI写作解决方案。

Typewise.app 39 查看详情 Typewise.app

1.2 处理拉取请求

封装好拉取消息的请求 PullRequest 后,RocketMQ 就会不停地从 pullRequestQueue 获取消息拉取请求进行处理。UML 类图如下:

图片

拉取消息的入口方法是一个死循环,代码如下:

//PullMessageServicepublic void run(){ log.info(this.getServiceName() + " service started"); while (!this.isStopped()) {try { PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest);} catch (InterruptedException ignored) {} catch (Exception e) { log.error("Pull Message Service Run Method exception", e);} } log.info(this.getServiceName() + " service end");}

这里拉取到消息后,提交给 PullCallback 这个回调函数进行处理。

拉取到的消息首先被 put 到 ProcessQueue 中的 msgTreeMap 上,然后被封装到 ConsumeRequest 这个线程类来处理。把代码精简后,ConsumeRequest 处理逻辑如下:

//ConsumeMessageConcurrentlyService.javapublic void run(){ MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; try {//1.执行消费逻辑,这里的逻辑是在文章开头的代码中定义的status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { } if (!processQueue.isDropped()) {//2.处理消费结果ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else {log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); }}

2 处理消费结果

2.1 并发消息

并发消息处理消费结果的代码做精简后如下:

//ConsumeMessageConcurrentlyService.javapublic void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest){ int ackIndex = context.getAckIndex(); switch (status) {case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) {ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; break;case RECONSUME_LATER: break;default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { } break;case CLUSTERING: List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i = 0 && !consumeRequest.getProcessQueue().isDropped()) {this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); }}

从上面的代码可以看出,如果处理消息的逻辑是串行的,比如文章开头的代码使用 for 循环来处理消息,那如果在某一条消息处理失败了,直接退出循环,给 ConsumeConcurrentlyContext 的 ackIndex 变量赋值为消息列表中失败消息的位置,这样这条失败消息后面的消息就不再处理了,发送给 Broker 等待重新拉取。代码如下:

public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20181109221800"); consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context){ for (int i = 0; i < msgs.size(); i++) {try{ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);}catch (Exception e){ context.setAckIndex(i); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} }); consumer.start();}

消费成功的消息则从 ProcessQueue 中的 msgTreeMap 中移除,并且返回 msgTreeMap 中最小的偏移量(firstKey)去更新。注意:集群模式偏移量保存在 Broker 端,更新偏移量需要发送消息到 Broker,而广播模式偏移量保存在 Consumer 端,只需要更新本地偏移量就可以。

如果处理消息的逻辑是并行的,处理消息失败后给 ackIndex 赋值是没有意义的,因为可能有多条消息失败,给 ackIndex 变量赋值并不准确。最好的方法就是给 ackIndex 赋值 0,整批消息全部重新消费,这样又可能带来冥等问题。

2.2 顺序消息

对于顺序消息,从 msgTreeMap 取出消息后,先要放到 consumingMsgOrderlyTreeMap 上面,更新偏移量时,是从 consumingMsgOrderlyTreeMap 上取最大的消息偏移量(lastKey)。

3 总结

回到开头的问题,如果一批消息按照顺序消费,是不可能出现第 100 条消息消费成功了,但第 50 条消费失败的情况,因为第 50 条消息失败的时候,应该退出循环,不再继续进行消费。

如果是并发消费,如果出现了这种情况,建议是整批消息全部重新消费,也就是给 ackIndex 赋值 0,这样必须考虑冥等问题。

以上就是阿里二面:RocketMQ 消费者拉取一批消息,其中部分消费失败了,偏移量怎样更新?的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/838453.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年11月27日 11:29:16
下一篇 2025年11月27日 11:33:11

相关推荐

  • 如何用HTML插入标签云组件_HTML CSS3变换与随机颜色生成算法

    使用HTML构建标签结构,CSS3添加旋转与过渡效果,JavaScript生成随机HSL颜色并设置字体大小,实现动态交互的标签云组件。 要在网页中实现一个动态的标签云组件,结合 HTML、CSS3 变换和随机颜色生成算法,可以按照以下步骤操作。这个组件不仅能提升页面视觉效果,还能通过色彩和旋转增加交…

    2025年12月23日
    000
  • 如何在Go Gin应用中集成前端JavaScript模块(如Sentry)

    本文探讨了在Go Gin框架下,通过HTML模板服务前端页面时,如何有效集成JavaScript模块(如Sentry)。针对浏览器不直接支持Node.js模块导入语法的问题,文章详细阐述了利用CDN引入Sentry SDK的解决方案,并提供了具体的代码示例,帮助开发者实现前端错误监控功能,避免了复杂…

    2025年12月23日
    000
  • html官网浏览入口_html网站设计免费平台

    html官网浏览入口在https://www.codepen.io,该平台支持实时预览代码、创建Pen项目、Fork开源示例,可添加外部资源,具备点赞评论收藏等社区互动功能,设有挑战活动与作品集分类,开放API接口,界面简洁适合初学者,在线编写无需配置环境,支持多种预处理器和响应式测试。 html官…

    2025年12月23日
    000
  • html如何修改日期样式

    在html中,可以使用“::-webkit-datetime-edit”伪元素选择器来修改日期格式,只需要用该选择器选中元素,在设置具体样式即可,具体语法为“::-webkit-datetime-edit{属性:属性值}”。 本教程操作环境:windows7系统、CSS3&&HTML…

    2025年12月21日
    100
  • 单选框的type属性值为什么

    单选框的type属性值为“radio”。html type属性可以规定要显示的输入框“”元素的类型;值为“radio”时显示为单选框、“checkbox”时显示为复选框、“select”时显示为下拉式选框等等。 本教程操作环境:windows7系统、HTML5版、Dell G3电脑。 在HTML中,…

    2025年12月21日
    000
  • HTML中type是什么意思

    在HTML中,type是类型的意思,是一个标签属性,主要用于定义标签元素的类型或文档(脚本)的MIME类型;例在input标签中type属性可以规定input元素的类型,在script标签中type属性可以规定脚本的MIME类型。 本教程操作环境:windows7系统、html5版、Dell G3电…

    2025年12月21日
    000
  • HTML中ul标签如何去掉点?HTML无序列表的样式实例解析

    本篇文章主要讲述的是关于html中的ul标签的默认小点给取消掉,还有关于html的无序列表ul标签的样式解释,给出了ul标签中的type属性三种值的介绍。现在就让我们一起来看本篇文章吧 首先这篇文章一开始我们就开始介绍在html中是怎么把ul标签的点给去掉的: 大家应该都使用过ul无序列表标签,ul…

    2025年12月21日 好文分享
    000
  • html中的ol标签如何去掉标号呢?标签的使用方法总结

    本篇文章介绍了html的ol标签是怎么去掉序号标号的,这里还有代码的详细解释,还有介绍了关于html ol有序列表标签如何更改序号,下文介绍了三种序号,大家也可以自己去想填写怎样的序号。现在来看这篇文章吧 一、我们先看看html中的ol标签是如何去掉标号的呢: 我们都知道html的ol标签是个有序列…

    2025年12月21日 好文分享
    000
  • HTML ul标签的什么意思?HTML ul标签的作用详解

    本篇文章主要的为大家讲解了关于html ul标签的三种重要的用法,还有关于html ul标签的解释,包含li标签的还有type属性对ul标签的使用情况,好了,下面大家一起来看文章吧 首先让我们先来解释一下HTML ul标签的意思: ul标签定义的是表格当中无序列表,表格当中的无序列表都是在 标签之中…

    2025年12月21日
    000
  • javascript框架和库是什么_如何选择React、Vue或Angular?

    JavaScript框架与库分别提供按需调用的功能集合和约束性开发结构;React是UI组件库,生态灵活但需自行整合工具;Vue渐进式易上手,兼顾原型与工程化;Angular是全功能TypeScript框架,适合强规范企业级项目。 JavaScript框架和库是封装好的代码集合,用来简化前端开发——…

    2025年12月21日
    000
  • React应用生产环境环境变量配置深度指南

    本文针对react应用在生产环境中无法读取`.env`文件配置的环境变量问题,深入剖析其工作原理、常见原因及排查方法。通过详细的步骤和示例代码,指导开发者正确配置和使用环境变量,解决api调用层面的`null`响应问题,确保应用在生产环境下的稳定运行。 在React应用开发中,环境变量(如API密钥…

    2025年12月21日
    000
  • JS注解怎么实现文档化_ JS注解生成开发文档的流程与工具

    JSDoc是一种JavaScript结构化注释规范,通过@param、@returns等标签描述代码元素,并借助工具生成HTML文档,结合IDE支持和CI/CD可提升团队协作效率。 JavaScript本身不支持原生注解(Annotation)像Java那样的语法,但通过约定的注释格式和配套工具,可…

    2025年12月21日
    000
  • JS注解怎么标注联合类型_ JS联合类型的注解书写与使用技巧

    在JavaScript中可通过JSDoc使用联合类型注解,如string|number表示多类型支持,结合@param、@typedef等标签提升代码可读性与编辑器提示,适用于函数参数、返回值等场景。 在JavaScript中,虽然原生不支持类型注解,但在使用JSDoc配合现代编辑器(如VS Cod…

    2025年12月21日
    000
  • VS Code主题开发:告别JSON,拥抱脚本化生成

    vs code主题扩展最终需json格式定义,但开发者可通过javascript或typescript等脚本语言生成此json文件。这种方法有效解决了大型json文件难以维护、不支持注释等问题,并能实现颜色动态计算,显著提升主题开发的灵活性与效率。 为什么选择脚本化生成VS Code主题? 在开发V…

    2025年12月20日
    000
  • 如何用Quasar框架开发一个跨平台应用?

    Quasar基于Vue.js用一套代码构建多平台应用,支持响应式网站、PWA、移动App和桌面应用。通过quasar create创建项目,利用模式(SPA、PWA、Electron等)切换目标平台,使用Quasar组件库编写通用UI,配合Pinia管理状态,最后通过不同构建命令发布到各平台,实现高…

    2025年12月20日
    000
  • 怎么利用JavaScript进行前端代码覆盖率统计?

    答案:利用JavaScript进行前端代码覆盖率统计的核心是通过Istanbul/nyc等工具对代码插桩,结合测试框架收集执行数据并生成报告。具体流程包括:在代码执行前通过Babel或Webpack插件(如babel-plugin-istanbul)插入计数器实现插桩;运行测试时记录哪些代码被执行;…

    2025年12月20日
    100
  • typescript中的参数分享

    TypeScript 中的参数共享允许组件间共享参数,实现跨组件状态维护和数据变更共享。通过 @Input 装饰器传递父组件参数,使用 @Output 装饰器定义子组件事件,以便在子组件状态改变时通知父组件。参数共享提高复用性,简化状态管理,允许子组件向父组件发出通知,但应谨慎使用,避免大量数据共享…

    2025年12月19日
    000
  • 手机如何运行typescript方法

    要在手机上运行 TypeScript 方法,可以使用 TypeScript 编译器或第三方库:TypeScript 编译器: 将 TypeScript 代码编译成 JavaScript,然后集成到移动应用程序中。第三方库: 如 React Native 或 NativeScript,允许使用 Typ…

    2025年12月19日
    000
  • typescript用来干嘛_typescript的作用

    TypeScript 是一种用于构建大型复杂应用程序的开源编程语言,它扩展了 JavaScript 的功能,具有以下作用:类型系统:编译时检查类型错误,提高代码可靠性。面向对象编程特性:支持类、接口、抽象类,增强代码组织性和维护性。模块系统:分解程序为可重用模块,提升可维护性和可扩展性。全面的类型推…

    2025年12月19日
    000
  • TypeScript基本用法和语法

    TypeScript 是一种具有类型系统的 JavaScript 超集,提供以下特性:类型注解:确保变量、函数和类的类型一致。接口:定义方法和属性,供类实现。枚举:提供命名常量集。泛型:创建可重用且类型安全的组件。 TypeScript 基本用法和语法 TypeScript 是一种超集 JavaSc…

    2025年12月19日
    000

发表回复

登录后才能评论
关注微信