RabbitMQ的应用场景以及基本原理介绍

RabbitMQ的应用场景以及基本原理介绍

RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queuing Protocol)的开源实现。

AMQP :高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

一、应用场景

异步处理应用解耦流量削峰

二、RabbitMQ 特性

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

 - 可靠性(Reliability)RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 - 灵活的路由(Flexible Routing)在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。 - 消息集群(Clustering)多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。 - 高可用(Highly Available Queues)队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。 - 多种协议(Multi-protocol)RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。 - 多语言客户端(Many Clients)RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。 - 管理界面(Management UI)RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。 - 跟踪机制(Tracing)如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。 - 插件机制(Plugin System)RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

三、RabbitMQ 基本概念

 - Message消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。 - Publisher消息的生产者,也是一个向交换器发布消息的客户端应用程序。 - Exchange交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 - Routing Key路由关键字,exchange根据这个关键字进行消息投递。 - Binding绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 - Queue消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。 - Connection网络连接,比如一个TCP连接。 - Channel信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。 - Consumer消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 - Virtual Host虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。 - Broker表示消息队列服务器实体。它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,

四、Exchange 类型

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:

direct

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。

fanout

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

topic

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。

五、ConnectionFactory、Connection、Channel

ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。

Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

六、任务分发机制

1、Round-robin dispathching循环分发

RabbbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的,如果现在load加重,那么只需要创建更多的Consumer来进行任务处理

2、Message acknowledgment 消息 确认

在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开。 这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的bug——Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑…
另外pub message是没有ack的。

3、Message durability 消息持久化

如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为RabbitMQ的简单介绍,所以这里将不讲解RabbitMQ相关的事务。
要持久化队列queue的持久化需要在声明时指定durable=True;
这里要注意,队列的名字一定要是Broker中不存在的,不然不能改变此队列的任何属性.
队列和交换机有一个创建时候指定的标志durable,durable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列中的消息会在重启后恢复
消息持久化包括3部分

 - 1.exchange持久化,在声明时指定durable => truehannel.ExchangeDeclare(ExchangeName,"direct",durable:true,autoDelete:false,arguments:null);//声明消息队列,且为可持久化的 - 2.queue持久化,在声明时指定durable => truechannel.QueueDeclare(QueueName,durable:true,exclusive:false,autoDelete:false,arguments:null);//声明消息队列,且为可持久化的 - 3.消息持久化,在投递时指定delivery_mode => 2(1是非持久化).channel.basic_publish(exchange='',                      routing_key="task_queue",                      body=message,                      properties=pika.BasicProperties(                         delivery_mode = 2, # make message persistent                      ))

如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的,如果exchange和queue两者之间有一个持久化,一个非持久化,则不允许建立绑定.
注意:一旦创建了队列和交换机,就不能修改其标志了,例如,创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。

关于持久化的进一步讨论:为了数据不丢失,我们采用了:在数据处理结束后发送ack,这样RabbitMQ Server会认为Message Deliver 成功。持久化queue,可以防止RabbitMQ Server 重启或者crash引起的数据丢失。持久化Message,理由同上。但是这样能保证数据100%不丢失吗?答案是否定的。问题就在与RabbitMQ需要时间去把这些信息存到磁盘上,这个time window虽然短,但是它的确还是有。在这个时间窗口内如果数据没有保存,数据还会丢失。还有另一个原因就是RabbitMQ并不是为每个Message都做fsync:它可能仅仅是把它保存到Cache里,还没来得及保存到物理磁盘上。因此这个持久化还是有问题。但是对于大多数应用来说,这已经足够了。当然为了保持一致性,你可以把每次的publish放到一个transaction中。这个transaction的实现需要user defined codes。那么商业系统会做什么呢?一种可能的方案是在系统panic时或者异常重启时或者断电时,应该给各个应用留出时间去flash cache,保证每个应用都能exit gracefully。

4、Fair dispath 公平分发

你可能也注意到了,分发机制不是那么优雅,默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。n是取余后的,它不管Consumer是否还有unacked Message,只是按照这个默认的机制进行分发.
那么如果有个Consumer工作比较重,那么就会导致有的Consumer基本没事可做,有的Consumer却毫无休息的机会,那么,Rabbit是如何处理这种问题呢?

AppMall应用商店 AppMall应用商店

AI应用商店,提供即时交付、按需付费的人工智能应用服务

AppMall应用商店 56 查看详情 AppMall应用商店 4.1 Prefetch count

前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。

通过basic.qos方法设置prefetch_count=1,这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message,换句话说,在接收到该Consumer的ack前,它不会将新的Message分发给它

1
channel.basic_qos(prefetch_count=1)

注意,这种方法可能会导致queue满。当然,这种情况下你可能需要添加更多的Consumer,或者创建更多的virtualHost来细化你的设计。

七、消息序列化

RabbitMQ使用ProtoBuf序列化消息,它可作为RabbitMQ的Message的数据格式进行传输,由于是结构化的数据,这样就极大的方便了Consumer的数据高效处理,当然也可以使用XML,与XML相比, ProtoBuf有以下优势:
1.简单
2.size小了3-10倍
3.速度快了20-100倍
4.易于编程
6.减少了语义的歧义.
,ProtoBuf具有速度和空间的优势,使得它现在应用非常广泛

八、RPC

MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。 但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。
RabbitMQ  中实现RPC 的机制是:
客户端发送请求(消息)时,在消息的属性(MessageProperties ,在AMQP 协议中定义了14中properties ,这些属性会随着消息一起发送)中设置两个值replyTo (一个Queue 名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue 中)和correlationId (此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
服务器端收到消息并处理
服务器端处理完消息后,将生成一条应答消息到replyTo 指定的Queue ,同时带上correlationId 属性
客户端之前已订阅replyTo 指定的Queue ,从中收到服务器的应答消息后,根据其中的correlationId 属性分析哪条请求被执行了,根据执行结果进行后续业务处理

九、RabbitMQ 选型和对比

1.从社区活跃度

按照目前网络上的资料,RabbitMQ 、activeM 、ZeroMQ 三者中,综合来看,RabbitMQ 是首选。

2.持久化消息比较

ZeroMq 不支持,ActiveMq 和RabbitMq 都支持。持久化消息主要是指我们机器在不可抗力因素等情况下挂掉了,消息不会丢失的机制。

3.综合技术实现

可靠性、灵活的路由、集群、事务、高可用的队列、消息排序、问题追踪、可视化管理工具、插件系统等等。
RabbitMq / Kafka 最好,ActiveMq 次之,ZeroMq 最差。当然ZeroMq 也可以做到,不过自己必须手动写代码实现,代码量不小。尤其是可靠性中的:持久性、投递确认、发布者证实和高可用性。

4.高并发

毋庸置疑,RabbitMQ 最高,原因是它的实现语言是天生具备高并发高可用的erlang 语言。

5.比较关注的比较, RabbitMQ 和 Kafka

RabbitMq 比Kafka 成熟,在可用性上,稳定性上,可靠性上,  RabbitMq  胜于  Kafka  (理论上)。
另外,Kafka 的定位主要在日志等方面, 因为Kafka 设计的初衷就是处理日志的,可以看做是一个日志(消息)系统一个重要组件,针对性很强,所以 如果业务方面还是建议选择 RabbitMq 。
还有就是,Kafka 的性能(吞吐量、TPS )比RabbitMq 要高出来很多。
选型最后总结:
如果我们系统中已经有选择  Kafka  ,或者   RabbitMq  ,并且完全可以满足现在的业务,建议就不用重复去增加和造轮子。
可以在  Kafka  和   RabbitMq  中选择一个适合自己团队和业务的,这个才是最重要的。但是毋庸置疑现阶段,综合考虑没有第三选择。

更多Laravel相关技术文章,请访问Laravel教程栏目进行学习!

以上就是RabbitMQ的应用场景以及基本原理介绍的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/366614.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年11月6日 05:35:33
下一篇 2025年11月6日 05:36:20

相关推荐

  • SOAP与消息队列?如何结合RabbitMQ?

    SOAP与RabbitMQ结合的核心在于通过消息队列实现异步化,解决传统SOAP同步阻塞、紧耦合、扩展性差等痛点。该方案引入适配层(如API Gateway),将SOAP请求转换为轻量消息发布至RabbitMQ,由消费者异步调用SOAP服务,并通过关联ID(Correlation ID)实现响应匹配…

    2025年12月17日
    000
  • 快速入门:使用Go语言操作RabbitMQ队列

    如何在go语言中操作rabbitmq队列?1. 建立连接:使用amqp.dial函数并传入正确的连接字符串,通过go get github.com/rabbitmq/amqp091-go安装依赖库;2. 声明队列:通过conn.channel()创建通道后,使用ch.queuedeclare声明队列…

    2025年12月15日 好文分享
    000
  • Python怎样操作消息队列?RabbitMQ连接指南

    python操作rabbitmq最常见方式是使用pika库,具体步骤如下:1. 安装pika并启动rabbitmq服务;2. 建立连接和通道,本地连接用localhost,远程需配置ip和认证信息;3. 发送消息前声明队列,通过basic_publish发送消息到指定队列;4. 接收消息使用basi…

    2025年12月14日 好文分享
    000
  • Python中如何操作RabbitMQ?pika消息队列实践

    在 python 中操作 rabbitmq 最常用的方式是使用 pika 库,它功能稳定且简单易用。1. 安装 pika 使用 pip install pika,并通过 blockingconnection 建立同步连接;2. 声明队列时设置 durable=true 以实现持久化,声明交换机时使用…

    2025年12月14日 好文分享
    000
  • PHP消息队列:RabbitMQ实战

    php结合rabbitmq构建异步处理系统需安装rabbitmq和amqp扩展1,通过amqp类连接服务器2,声明交换机和队列并绑定路由键3,使用publish方法发布消息4,利用consume消费消息并调用ack确认5,为避免消息丢失应启用持久化、发布者确认或事务机制6,可通过rabbitmq m…

    2025年12月11日 好文分享
    000
  • Symfony 怎么将RabbitMQ消息转数组

    答案:将Symfony中RabbitMQ消息转为数组需根据消息体格式选择反序列化方式,常见为JSON或PHP序列化;若为JSON,使用json_decode($messageBody, true)转换并校验错误;若为PHP序列化,使用unserialize()但需注意安全风险;其他格式则用对应解析器…

    2025年12月10日
    000
  • PHP中的消息队列:如何集成RabbitMQ处理异步任务

    使用RabbitMQ集成PHP,可以有效处理异步任务,提升应用响应速度和可扩展性。关键在于将耗时操作放入消息队列,由消费者异步处理,主应用流程无需等待。 解决方案: 首先,你需要安装RabbitMQ服务器以及PHP的AMQP扩展。然后,在你的PHP应用中,创建一个生产者,负责将任务信息(例如,需要处…

    2025年12月10日 好文分享
    100
  • PHP中如何操作RabbitMQ?

    在php中使用rabbitmq可以通过phpamqplib库实现,步骤如下:1. 安装rabbitmq服务器和phpamqplib库;2. 创建连接和通道,声明队列;3. 编写生产者发送消息和消费者接收消息的代码。使用rabbitmq时需注意消息持久化、重复消费和顺序性问题,并通过日志记录和监控提升…

    2025年12月10日
    000
  • PHP 函数如何使用 RabbitMQ 调用外部函数?

    是的,可以通过 rabbitmq 在 php 函数中调用外部函数。具体步骤如下:建立与 rabbitmq 的连接。声明一个队列。将函数名作为消息体发送到队列中。接收函数的应用程序或服务将调用此函数并返回结果。 使用 PHP 函数通过 RabbitMQ 调用外部函数 RabbitMQ 是一种消息队列系…

    2025年12月9日
    000
  • Java中RabbitMQ的特点 分析AMQP实现

    java中使用rabbitmq的特点在于其便捷的客户端api和与spring生态的无缝集成,适合构建异步、解耦的微服务架构。1. rabbitmq作为amqp协议实现,提供消息路由、持久化和可靠性机制;2. 使用java操作rabbitmq需关注连接管理、交换机和队列声明、消息发布与消费、错误处理;…

    2025年12月4日 java
    000
  • YII框架的消息队列是什么?YII框架如何集成RabbitMQ?

    yii框架集成rabbitmq需安装php-amqplib扩展并配置连接信息;2. 创建生产者类发送持久化消息到指定队列;3. 创建消费者类接收并处理消息,启用手动ack确认机制;4. 选择队列类型时,direct适用于精确路由,fanout用于广播,topic支持模式匹配,headers满足复杂路…

    2025年12月4日
    000
  • rabbitmq 的消息是怎么发送的?

    RabbitMQ消息发送的核心组件包括生产者、连接、信道、交换机、队列和绑定。生产者通过连接建立信道,将消息发布到交换机,交换机根据类型和路由键将消息路由至队列,消费者从队列中获取消息。交换机是消息路由的“指挥官”,主要有四种类型:Direct Exchange(直连交换机)精确匹配路由键与绑定键,…

    2025年12月2日 java
    000
  • rabbitmq 怎么保证消息的稳定性?

    答案:RabbitMQ通过持久化、确认机制和镜像队列保障消息稳定性。需配置交换机、队列和消息均持久化,并启用Publisher Confirms机制确保消息送达,结合镜像队列提升高可用性,同时通过监控与调优应对消息积压,保障系统稳定运行。 RabbitMQ保证消息稳定性的核心在于持久化、确认机制和镜…

    2025年12月1日 java
    100
  • RabbitMQ消息确认机制详细配置方法

    rabbitmq消息确认机制通过生产者确认和消费者确认确保消息可靠传输。1. 生产者确认(publisher confirms):开启confirm模式后,可通过异步监听或同步等待确认消息是否到达服务器,支持批量确认和单条确认;2. 消费者确认(consumer acknowledgements):…

    2025年12月1日 java
    000
  • Swoole与RabbitMQ集成实践:打造高可用性消息队列系统

    随着互联网时代的到来,消息队列系统变得越来越重要。它可以使不同的应用之间实现异步操作、降低耦合度、提高可扩展性,进而提升整个系统的性能和用户体验。在消息队列系统中,rabbitmq是一个强大的开源消息队列软件,它支持多种消息协议、被广泛应用于金融交易、电子商务、在线游戏等领域。 在实际应用中,往往需…

    2025年11月29日
    000
  • Linux中怎么安装RabbitMQ

    安装Erlang 由于RabbitMQ依赖Erlang, 所以需要先安装Erlang。 Erlang的安装方式大概有两种: 1.从Erlang Solution安装(推荐) # 添加erlang solutions源 $ wget https://packages.erlang-solutions.…

    运维 2025年11月28日
    000
  • rabbitmq 有哪些重要的组件?

    RabbitMQ的交换机类型包括Direct、Fanout、Topic和Headers Exchange,分别用于点对点路由、广播、模式匹配路由和基于消息头的路由;队列用于存储消息,具备持久化、独占性、自动删除和TTL等特性,实现解耦、异步处理和流量控制;绑定则通过绑定键连接交换机与队列,定义消息路…

    2025年11月27日 java
    000
  • rabbitmq 中 vhost 的作用是什么?

    vhost是RabbitMQ中实现多租户和权限隔离的核心机制,通过创建多个虚拟主机,实现用户间资源、权限和环境的完全隔离。每个vhost拥有独立的队列、交换机和绑定关系,支持不同应用或团队在单一RabbitMQ实例上安全共存。可通过rabbitmqctl命令或Web管理界面创建和管理vhost,并为…

    2025年11月27日 java
    100
  • windows rabbitmq安装

    windows系统上安装rabbitmq的关键步骤之一是确保rabbitmq服务器与erlang版本的兼容性,否则可能会导致无法正常使用。 RabbitMQ的下载页面可以在GitHub上找到:https://www.php.cn/link/16e4e18e4779ab2edc89086b198975…

    2025年11月27日 系统教程
    000
  • thinkPHP5如何使用rabbitmq

    thinkPHP5如何使用rabbitmq? 安装好 tp5 的 rabbitmq 扩展后,在项目根目录文件添加文件 rabbitmq.php 引导启动 rabbitmq。 <?phpdefine('APP_PATH', __DIR__ . '/applicati…

    2025年11月23日
    000

发表回复

登录后才能评论
关注微信