Spring Boot整合RocketMQ事务消息教程

spring boot整合rocketmq事务消息的核心在于利用其两阶段提交机制解决分布式系统中的数据一致性问题。1. 引入rocketmq spring boot starter依赖简化配置;2. 在application.yml中配置nameserver地址和生产者组;3. 实现rocketmqlocaltransactionlistener接口,重写executelocaltransaction和checklocaltransaction方法处理本地事务及状态回查;4. 在业务代码中使用rocketmqtemplate发送事务消息。rocketmq通过“半消息”机制确保消息发送与本地事务的原子性:发送半消息后执行本地事务,成功则提交,失败则回滚,若状态未知则由broker定期回查。关键点包括注解@rocketmqtransactionlistener的正确使用、本地事务的完整执行、checklocaltransaction的幂等设计。实际应用中需应对幂等性、事务超时、异常监控和性能开销等问题,合理配置参数并结合日志监控保障最终一致性。

Spring Boot整合RocketMQ事务消息教程

Spring Boot整合RocketMQ事务消息,说白了,就是为了解决分布式系统里数据一致性的那个老大难问题。我们都知道,在微服务架构下,一个操作可能涉及到多个服务和多个数据库,如果其中一个环节出错了,怎么保证整个业务流程的数据状态是正确的、一致的?RocketMQ的事务消息机制,提供了一个两阶段提交的变种方案,让这个事情变得相对可靠。它不是万能药,但确实是处理特定场景下分布式事务的一个非常实用的工具

Spring Boot整合RocketMQ事务消息教程

解决方案

整合Spring Boot和RocketMQ事务消息,核心在于利用RocketMQ提供的两阶段提交能力,确保本地事务和消息发送的原子性。

Spring Boot整合RocketMQ事务消息教程

首先,你需要引入Spring Boot RocketMQ Starter的依赖。这个是基础,省去了很多繁琐的配置。

立即进入“豆包AI人工智官网入口”;

立即学习“豆包AI人工智能在线问答入口”;

    org.apache.rocketmq    rocketmq-spring-boot-starter    2.2.2 

接着,在你的application.ymlapplication.properties里配置RocketMQ的NameServer地址和一些生产者组信息。

Spring Boot整合RocketMQ事务消息教程

rocketmq:  name-server: 127.0.0.1:9876 # 你的NameServer地址  producer:    group: my_transaction_producer_group # 事务消息专用的生产者组    send-message-timeout: 3000

然后,关键一步是实现RocketMQLocalTransactionListener接口。这个接口有两个方法,executeLocalTransactioncheckLocalTransaction,它们是事务消息机制的核心。

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;import org.springframework.messaging.Message;import org.springframework.stereotype.Component;@Component@RocketMQTransactionListener(txProducerGroup = "my_transaction_producer_group")public class OrderTransactionListener implements RocketMQLocalTransactionListener {    // 假设这是你的本地服务,用来处理业务逻辑和查询状态    // @Autowired    // private OrderService orderService;    /**     * 执行本地事务     * 在发送半消息成功后,Broker会回调这个方法     */    @Override    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {        String messageBody = new String((byte[]) msg.getPayload());        String transactionId = (String) msg.getHeaders().get("MQ_TRANSACTION_ID"); // 获取事务ID        try {            // 1. 解析消息,获取业务参数            // 2. 执行本地事务,比如:创建订单,扣减库存等            //    boolean success = orderService.createOrderAndDeductStock(messageBody, transactionId);            System.out.println("执行本地事务,消息体: " + messageBody + ", 事务ID: " + transactionId);            // 模拟本地事务执行结果            boolean success = true; // 假设本地事务成功            if (success) {                // 如果本地事务执行成功,返回COMMIT,Broker会投递消息                return RocketMQLocalTransactionState.COMMIT;            } else {                // 如果本地事务执行失败,返回ROLLBACK,Broker会删除半消息                return RocketMQLocalTransactionState.ROLLBACK;            }        } catch (Exception e) {            // 出现异常,返回UNKNOW,让Broker进行回查            System.err.println("本地事务执行异常: " + e.getMessage());            return RocketMQLocalTransactionState.UNKNOWN;        }    }    /**     * 检查本地事务状态     * 当Broker没有收到COMMIT/ROLLBACK指令,或者Producer宕机后重启,Broker会回调这个方法     */    @Override    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {        String messageBody = new String((byte[]) msg.getPayload());        String transactionId = (String) msg.getHeaders().get("MQ_TRANSACTION_ID");        // 1. 根据消息的唯一标识(通常是业务ID或事务ID)查询本地事务的真实状态        //    比如:查询订单是否已创建成功,或者库存是否已扣减        //    OrderState state = orderService.getOrderState(transactionId);        System.out.println("检查本地事务状态,消息体: " + messageBody + ", 事务ID: " + transactionId);        // 模拟根据事务ID查询本地事务状态        // 假设通过transactionId可以查询到本地事务是否已成功        boolean transactionCompleted = true; // 假设本地事务已经成功完成        if (transactionCompleted) {            // 如果本地事务已成功,返回COMMIT            return RocketMQLocalTransactionState.COMMIT;        } else {            // 如果本地事务未完成或失败,返回ROLLBACK            // 这里要特别注意,如果业务逻辑是幂等的,即使重复执行checkLocalTransaction也不会有问题            return RocketMQLocalTransactionState.ROLLBACK;            // 也可以返回UNKNOWN,让Broker稍后再次回查,但通常建议直接判断最终状态        }    }}

最后,在你的业务代码中,使用RocketMQTemplate发送事务消息。

import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Service;@Servicepublic class OrderService {    @Autowired    private RocketMQTemplate rocketMQTemplate;    public void createOrder(String orderId, String userId, double amount) {        // 构建消息体        String messageBody = String.format("{"orderId":"%s", "userId":"%s", "amount":%s}", orderId, userId, amount);        Message message = MessageBuilder.withPayload(messageBody)                                                .setHeader("orderId", orderId) // 可以在这里设置业务ID,方便回查                                                .build();        // 发送事务消息,指定事务生产者组和目标Topic        // 第二个参数arg可以传递给executeLocalTransaction方法,用于传递一些额外上下文信息        rocketMQTemplate.sendMessageInTransaction(            "my_transaction_producer_group", // 对应监听器上的txProducerGroup            "order_created_topic",           // 消息的Topic            message,            null // 附加参数,这里可以为空,或者传递业务相关数据        );        System.out.println("已发送订单创建事务消息: " + orderId);    }}

这样一套流程下来,当createOrder方法被调用时:

RocketMQ会先发送一个“半消息”到Broker。半消息发送成功后,Broker会回调OrderTransactionListenerexecuteLocalTransaction方法,此时你执行本地的订单创建和库存扣减等业务逻辑。根据本地事务的执行结果,返回COMMIT(本地事务成功,消息可投递)、ROLLBACK(本地事务失败,消息删除)或UNKNOWN(状态不明,待回查)。如果返回UNKNOWN,或者Producer在返回COMMIT/ROLLBACK之前宕机,Broker会定期调用checkLocalTransaction方法来查询本地事务的最终状态,以决定是提交还是回滚消息。

如何理解RocketMQ事务消息的核心机制?

RocketMQ的事务消息,我个人觉得它最精妙的地方就在于那个“半消息”和“回查”机制。它不像传统的分布式事务协议那么重,但又能在一定程度上保证消息发送和本地事务的原子性。

想象一下这个过程:当你的生产者要发一条事务消息时,它并不是直接把消息发出去让消费者立马就能看到。它首先发的是一个所谓的“半消息”(Half Message)。这个半消息,消费者是看不到的,它躺在Broker那里,处于一种“待定”状态。Broker收到半消息后,会给生产者一个确认,告诉它“我收到了”。

接下来,生产者就会去执行自己的本地事务,比如你创建订单、扣减库存这些数据库操作。这个本地事务的成功与否,是决定半消息命运的关键。

如果本地事务成功了,生产者会通知Broker:“好了,我这边搞定了,那个半消息可以转正了,你把它投递给消费者吧!”Broker收到这个“提交”指令,就会把半消息变成普通消息,消费者就能消费了。

如果本地事务失败了,生产者就会通知Broker:“哎呀,我这边没搞定,那个半消息就别发了,直接删了吧!”Broker收到“回滚”指令,就会把半消息删掉。

但这里有个细节,你得搞清楚:万一生产者在执行完本地事务后,还没来得及告诉Broker是提交还是回滚,它自己就宕机了呢?或者网络突然抖了一下,指令没发出去呢?这时候,Broker会很聪明地启动一个“回查”机制。它会定期地去问生产者:“喂,你那个半消息到底是个什么情况?是提交还是回滚?”此时,生产者(或者说,生产者重启后)就会通过实现checkLocalTransaction方法来回答Broker。在这个方法里,你需要根据消息里带的业务唯一标识(比如订单ID),去查询你的本地数据库,看看对应的业务操作到底成功了没有。如果成功了,就告诉Broker提交;如果失败了,就告诉Broker回滚。

所以,这个checkLocalTransaction方法,在我看来,就是整个RocketMQ事务消息的“灵魂”所在。它解决了生产者在提交或回滚指令发出前宕机的极端情况,确保了最终的一致性。没有它,事务消息的可靠性就会大打折扣。

在Spring Boot中实现事务监听器有哪些关键点?

在Spring Boot里实现RocketMQLocalTransactionListener,确实有几个地方是需要特别注意的,否则很容易踩坑。

首先,@RocketMQTransactionListener这个注解是核心。你必须把它加到你的监听器类上,并且txProducerGroup这个属性一定要和你在RocketMQTemplate里调用sendMessageInTransaction时传入的生产者组名称保持一致。这是RocketMQ用来识别哪个监听器对应哪个事务生产者的关键。如果名字对不上,Broker是无法正确回调你的监听器的。

其次,就是executeLocalTransaction方法。这个方法是你在发送半消息后,立即执行本地业务逻辑的地方。这里面的代码,应该是一个完整的本地事务单元。比如,如果你要创建订单并扣减库存,那这两个操作应该在一个数据库事务里完成。这个方法最终返回的RocketMQLocalTransactionState,直接决定了半消息的命运。

返回COMMIT:意味着你的本地事务成功了,Broker可以放心地把消息投递出去。返回ROLLBACK:意味着你的本地事务失败了,Broker应该删除半消息,不让它被投递。返回UNKNOWN:这是个很重要的状态。通常在你无法确定本地事务结果(比如代码抛异常了,或者依赖的服务调用超时了)时返回。返回UNKNOWN会让Broker稍后发起回查,给你一个补救的机会。所以,异常捕获在这里非常重要,不要轻易地把所有异常都直接导致ROLLBACK,有时候UNKNOWN是更好的选择。

再来就是checkLocalTransaction方法。这个方法是幂等性设计和最终一致性的保障。当Broker回查时,它会把之前发送的半消息传给你。在这个方法里,你必须能够根据消息中的业务唯一标识(比如订单号、业务流水号等),去你的本地数据库查询该业务的真实状态。

如果查询到业务已经成功完成,就返回COMMIT。如果查询到业务确实失败了(比如订单创建失败),就返回ROLLBACK。理论上,你也可以在这里返回UNKNOWN,让Broker再次回查。但实际应用中,如果能明确判断出最终状态,直接返回COMMITROLLBACK会更高效,也能避免不必要的多次回查。

一个常见的误区是,有人会把executeLocalTransaction里的业务逻辑写得过于简单,或者没有做好异常处理,导致返回UNKNOWN的场景被忽视。而checkLocalTransaction的实现如果不够健壮,不能准确判断本地事务状态,那么RocketMQ的事务消息机制就形同虚设了,最终还是可能导致数据不一致。确保这两个方法能正确、幂等地反映本地事务的真实状态,是实现事务消息的关键。

RocketMQ事务消息在实际应用中会遇到哪些挑战及应对策略?

RocketMQ事务消息虽然好用,但在实际落地中,我们还是会遇到一些挑战,需要提前考虑并做好应对策略。

首先,幂等性是绕不开的话题。这不仅仅是消费者需要考虑的,在事务消息的checkLocalTransaction回调中,本地事务查询也需要具备幂等性。因为Broker可能会多次回查,或者Producer在发送COMMIT/ROLLBACK指令前多次尝试发送半消息。你的本地业务操作(比如创建订单、扣减库存)必须能够承受重复执行的风险。常见的做法是,利用业务唯一ID(如订单号、业务流水号)在数据库中做唯一约束,或者在更新时加入状态判断,避免重复处理。比如,插入数据前先查询是否存在,或者更新时只更新状态为“待处理”的记录。

其次是事务超时与检查频率。RocketMQ Broker对事务消息有默认的超时时间,超过这个时间如果Producer没有给出明确指令,就会触发回查。同时,回查的频率也是可配置的。在实际业务中,如果你的本地事务执行时间可能比较长,或者依赖的服务响应慢,就可能导致频繁的UNKNOWN状态和回查。你需要根据业务特点合理配置这些超时参数,并且确保你的checkLocalTransaction方法能够快速、准确地返回结果,避免成为性能瓶颈。如果本地事务确实需要长时间才能完成,可能需要考虑更复杂的异步处理或状态机模式,而不是单纯依赖事务消息的短时回查。

再一个挑战是异常处理与监控。在executeLocalTransactioncheckLocalTransaction方法中,任何未捕获的异常都可能导致意外的行为。我们应该尽可能地捕获异常,并根据异常类型返回ROLLBACKUNKNOWN。同时,对事务消息的整个生命周期进行有效的监控非常重要。你需要能够实时知道有多少半消息处于UNKNOWN状态,有多少回查失败,或者有多少事务最终被回滚。通过日志、Metrics和告警系统,及时发现并处理这些异常情况,避免潜在的数据不一致。比如,可以针对checkLocalTransaction中返回UNKNOWN的次数或持续时间设置告警,提示人工介入排查。

最后,性能考量也是一个实际问题。事务消息相比普通消息,增加了两阶段提交的开销,这会带来一定的性能损耗。并不是所有的消息发送都需要强一致性保障。在设计系统时,需要权衡业务对一致性的要求和系统性能的需求。对于那些可以接受最终一致性的场景,使用普通消息结合消费者幂等性设计可能更简单高效。只有那些对数据一致性要求极高、本地事务和消息发送必须原子性的场景,才应该考虑使用事务消息。过度使用事务消息,反而可能成为系统的瓶颈。

以上就是Spring Boot整合RocketMQ事务消息教程的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/150777.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年12月3日 08:57:46
下一篇 2025年12月3日 09:29:23

相关推荐

  • Uniapp 中如何不拉伸不裁剪地展示图片?

    灵活展示图片:如何不拉伸不裁剪 在界面设计中,常常需要以原尺寸展示用户上传的图片。本文将介绍一种在 uniapp 框架中实现该功能的简单方法。 对于不同尺寸的图片,可以采用以下处理方式: 极端宽高比:撑满屏幕宽度或高度,再等比缩放居中。非极端宽高比:居中显示,若能撑满则撑满。 然而,如果需要不拉伸不…

    2025年12月24日
    400
  • 如何让小说网站控制台显示乱码,同时网页内容正常显示?

    如何在不影响用户界面的情况下实现控制台乱码? 当在小说网站上下载小说时,大家可能会遇到一个问题:网站上的文本在网页内正常显示,但是在控制台中却是乱码。如何实现此类操作,从而在不影响用户界面(UI)的情况下保持控制台乱码呢? 答案在于使用自定义字体。网站可以通过在服务器端配置自定义字体,并通过在客户端…

    2025年12月24日
    800
  • 如何在地图上轻松创建气泡信息框?

    地图上气泡信息框的巧妙生成 地图上气泡信息框是一种常用的交互功能,它简便易用,能够为用户提供额外信息。本文将探讨如何借助地图库的功能轻松创建这一功能。 利用地图库的原生功能 大多数地图库,如高德地图,都提供了现成的信息窗体和右键菜单功能。这些功能可以通过以下途径实现: 高德地图 JS API 参考文…

    2025年12月24日
    400
  • 如何使用 scroll-behavior 属性实现元素scrollLeft变化时的平滑动画?

    如何实现元素scrollleft变化时的平滑动画效果? 在许多网页应用中,滚动容器的水平滚动条(scrollleft)需要频繁使用。为了让滚动动作更加自然,你希望给scrollleft的变化添加动画效果。 解决方案:scroll-behavior 属性 要实现scrollleft变化时的平滑动画效果…

    2025年12月24日
    000
  • 如何为滚动元素添加平滑过渡,使滚动条滑动时更自然流畅?

    给滚动元素平滑过渡 如何在滚动条属性(scrollleft)发生改变时为元素添加平滑的过渡效果? 解决方案:scroll-behavior 属性 为滚动容器设置 scroll-behavior 属性可以实现平滑滚动。 html 代码: click the button to slide right!…

    2025年12月24日
    500
  • 如何选择元素个数不固定的指定类名子元素?

    灵活选择元素个数不固定的指定类名子元素 在网页布局中,有时需要选择特定类名的子元素,但这些元素的数量并不固定。例如,下面这段 html 代码中,activebar 和 item 元素的数量均不固定: *n *n 如果需要选择第一个 item元素,可以使用 css 选择器 :nth-child()。该…

    2025年12月24日
    200
  • 使用 SVG 如何实现自定义宽度、间距和半径的虚线边框?

    使用 svg 实现自定义虚线边框 如何实现一个具有自定义宽度、间距和半径的虚线边框是一个常见的前端开发问题。传统的解决方案通常涉及使用 border-image 引入切片图片,但是这种方法存在引入外部资源、性能低下的缺点。 为了避免上述问题,可以使用 svg(可缩放矢量图形)来创建纯代码实现。一种方…

    2025年12月24日
    100
  • 如何让“元素跟随文本高度,而不是撑高父容器?

    如何让 元素跟随文本高度,而不是撑高父容器 在页面布局中,经常遇到父容器高度被子元素撑开的问题。在图例所示的案例中,父容器被较高的图片撑开,而文本的高度没有被考虑。本问答将提供纯css解决方案,让图片跟随文本高度,确保父容器的高度不会被图片影响。 解决方法 为了解决这个问题,需要将图片从文档流中脱离…

    2025年12月24日
    000
  • 为什么 CSS mask 属性未请求指定图片?

    解决 css mask 属性未请求图片的问题 在使用 css mask 属性时,指定了图片地址,但网络面板显示未请求获取该图片,这可能是由于浏览器兼容性问题造成的。 问题 如下代码所示: 立即学习“前端免费学习笔记(深入)”; icon [data-icon=”cloud”] { –icon-cl…

    2025年12月24日
    200
  • 如何利用 CSS 选中激活标签并影响相邻元素的样式?

    如何利用 css 选中激活标签并影响相邻元素? 为了实现激活标签影响相邻元素的样式需求,可以通过 :has 选择器来实现。以下是如何具体操作: 对于激活标签相邻后的元素,可以在 css 中使用以下代码进行设置: li:has(+li.active) { border-radius: 0 0 10px…

    2025年12月24日
    100
  • 如何模拟Windows 10 设置界面中的鼠标悬浮放大效果?

    win10设置界面的鼠标移动显示周边的样式(探照灯效果)的实现方式 在windows设置界面的鼠标悬浮效果中,光标周围会显示一个放大区域。在前端开发中,可以通过多种方式实现类似的效果。 使用css 使用css的transform和box-shadow属性。通过将transform: scale(1.…

    2025年12月24日
    200
  • 为什么我的 Safari 自定义样式表在百度页面上失效了?

    为什么在 Safari 中自定义样式表未能正常工作? 在 Safari 的偏好设置中设置自定义样式表后,您对其进行测试却发现效果不同。在您自己的网页中,样式有效,而在百度页面中却失效。 造成这种情况的原因是,第一个访问的项目使用了文件协议,可以访问本地目录中的图片文件。而第二个访问的百度使用了 ht…

    2025年12月24日
    000
  • 如何用前端实现 Windows 10 设置界面的鼠标移动探照灯效果?

    如何在前端实现 Windows 10 设置界面中的鼠标移动探照灯效果 想要在前端开发中实现 Windows 10 设置界面中类似的鼠标移动探照灯效果,可以通过以下途径: CSS 解决方案 DEMO 1: Windows 10 网格悬停效果:https://codepen.io/tr4553r7/pe…

    2025年12月24日
    000
  • 使用CSS mask属性指定图片URL时,为什么浏览器无法加载图片?

    css mask属性未能加载图片的解决方法 使用css mask属性指定图片url时,如示例中所示: mask: url(“https://api.iconify.design/mdi:apple-icloud.svg”) center / contain no-repeat; 但是,在网络面板中却…

    2025年12月24日
    000
  • 如何用CSS Paint API为网页元素添加时尚的斑马线边框?

    为元素添加时尚的斑马线边框 在网页设计中,有时我们需要添加时尚的边框来提升元素的视觉效果。其中,斑马线边框是一种既醒目又别致的设计元素。 实现斜向斑马线边框 要实现斜向斑马线间隔圆环,我们可以使用css paint api。该api提供了强大的功能,可以让我们在元素上绘制复杂的图形。 立即学习“前端…

    2025年12月24日
    000
  • 图片如何不撑高父容器?

    如何让图片不撑高父容器? 当父容器包含不同高度的子元素时,父容器的高度通常会被最高元素撑开。如果你希望父容器的高度由文本内容撑开,避免图片对其产生影响,可以通过以下 css 解决方法: 绝对定位元素: .child-image { position: absolute; top: 0; left: …

    2025年12月24日
    000
  • CSS 帮助

    我正在尝试将文本附加到棕色框的左侧。我不能。我不知道代码有什么问题。请帮助我。 css .hero { position: relative; bottom: 80px; display: flex; justify-content: left; align-items: start; color:…

    2025年12月24日 好文分享
    200
  • 前端代码辅助工具:如何选择最可靠的AI工具?

    前端代码辅助工具:可靠性探讨 对于前端工程师来说,在HTML、CSS和JavaScript开发中借助AI工具是司空见惯的事情。然而,并非所有工具都能提供同等的可靠性。 个性化需求 关于哪个AI工具最可靠,这个问题没有一刀切的答案。每个人的使用习惯和项目需求各不相同。以下是一些影响选择的重要因素: 立…

    2025年12月24日
    300
  • 如何用 CSS Paint API 实现倾斜的斑马线间隔圆环?

    实现斑马线边框样式:探究 css paint api 本文将探究如何使用 css paint api 实现倾斜的斑马线间隔圆环。 问题: 给定一个有多个圆圈组成的斑马线图案,如何使用 css 实现倾斜的斑马线间隔圆环? 答案: 立即学习“前端免费学习笔记(深入)”; 使用 css paint api…

    2025年12月24日
    000
  • 如何使用CSS Paint API实现倾斜斑马线间隔圆环边框?

    css实现斑马线边框样式 想定制一个带有倾斜斑马线间隔圆环的边框?现在使用css paint api,定制任何样式都轻而易举。 css paint api 这是一个新的css特性,允许开发人员创建自定义形状和图案,其中包括斑马线样式。 立即学习“前端免费学习笔记(深入)”; 实现倾斜斑马线间隔圆环 …

    2025年12月24日
    100

发表回复

登录后才能评论
关注微信