Pulsar中间件入门学习

pulsar 是一种高效的服务器到服务器消息系统,具有多租户和高性能等特点,最初由 yahoo 开发,现由 apache 软件基金会管理。它是 apache 的顶级项目,定位为下一代云原生分布式消息流平台,融合了消息传递、存储和轻量级函数计算功能,采用计算与存储分离的架构设计,支持多租户、持久化存储、跨区域数据复制,具备强一致性、高吞吐、低延迟和高扩展性等流数据存储特性,被视为云原生时代实时消息流传输、存储和计算的理想解决方案。

Pulsar 的特性包括:

单实例支持多个集群,支持跨机房的消息复制。极低的发布和端到端延迟。可扩展到超过一百万个 topic。提供简单易用的客户端 API,支持 Java、Go、Python 和 C++。支持多种 topic 订阅模式,包括独占订阅、共享订阅、故障转移订阅和键共享订阅。通过 Apache BookKeeper 提供持久化消息存储,确保消息传递的可靠性。Pulsar Functions 和 Pulsar IO 提供 serverless 计算框架和数据集成解决方案。支持分层存储,可将旧数据从热存储卸载到冷/长期存储(如 S3、GCS)。

Pulsar 的架构主要包括以下组件:

Broker:负责消息传输、Topic 管理和负载均衡,不存储消息,是无状态组件。Bookie:使用 Apache BookKeeper 组件,负责消息的持久化存储。Producer:生产者,封装并发送消息到 Broker。Consumer:消费者,通过订阅 Topic 消费消息并确认。Pulsar 还定义了 Reader 角色,允许从指定位置获取消息,无需确认。Zookeeper:用于元数据存储和集群配置管理,包括租户和命名空间的一致性协调。

Pulsar 支持四种订阅模式:

独占(Exclusive)订阅:同一时间只有一个消费者可以消费数据,适用范围较小。共享(Shared)订阅:多个消费者可以同时运行,消息按轮询方式分配,但无法保证消息顺序。故障转移(Failover)订阅:在独占模式的基础上,允许启动多个消费者,当一个消费者失败时,其他消费者可以接管。键共享(KeyShared)订阅:基于共享模式,消息按键分组,同组消息由同一个消费者有序消费。

下载和安装 Pulsar 2.9.1 版本后,可以在 Linux 服务器上解压并启动单机版 Pulsar。使用命令行可以启动和终止 Pulsar 服务。

立即进入“豆包AI人工智官网入口”;

立即学习“豆包AI人工智能在线问答入口”;

在 Spring Boot 中集成 Pulsar 需要以下步骤:

引入 Maven 依赖

    io.github.majusko    pulsar-java-spring-boot-starter    1.1.0

配置 application.yml

pulsar:  service-url: pulsar://192.168.0.105:6650

创建 Pulsar 配置类

@Configurationpublic class PulsarConfig {    @Bean    public ProducerFactory producerFactory() {        return new ProducerFactory().addProducer("testTopic", String.class);    }}

定义 Topic 名称常量类

public class TopicName {    private TopicName(){}    public static final String TEST_TOPIC = "testTopic";}

创建消息生产者类

@Componentpublic class PulsarProducer {    @Resource    private PulsarTemplate template;    public void send(String topic, T message) {        try {            template.send(topic, message);        } catch (PulsarClientException e) {            e.printStackTrace();        }    }}

创建消息消费者类

@Componentpublic class TestTopicPulsarConsumer {    private static final Logger log = LoggerFactory.getLogger(TestTopicPulsarConsumer.class);    @PulsarConsumer(topic = TopicName.TEST_TOPIC, subscriptionType = SubscriptionType.Shared, clazz = String.class)    public void consume(String message) {        log.info("PulsarRealConsumer content:{}", message);    }}

创建 PulsarController 测试发送消息

@RestController@RequestMapping("/pulsar")public class PulsarController {    @Resource    private PulsarProducer pulsarProducer;    @PostMapping(value = "/sendMessage")    public CommonResponse sendMessage(@RequestParam(name = "message") String message) {        pulsarProducer.send(TopicName.TEST_TOPIC, message);        return CommonResponse.success("done");    }}

定义公共响应体类

public class CommonResponse {    private String code;    private Boolean success;    private T data;    public static  CommonResponse success(T t){        return new CommonResponse("200",true,t);    }    public CommonResponse(String code, Boolean success, T data) {        this.code = code;        this.success = success;        this.data = data;    }    //getter、setter方法}

启动项目后,可以使用 Postman 测试消息发送和接收功能。

Pulsar中间件入门学习Pulsar中间件入门学习Pulsar中间件入门学习Pulsar中间件入门学习Pulsar中间件入门学习Pulsar中间件入门学习Pulsar中间件入门学习Pulsar中间件入门学习

以上就是Pulsar中间件入门学习的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/33311.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年11月4日 10:06:10
下一篇 2025年11月4日 10:10:01

相关推荐

发表回复

登录后才能评论
关注微信