pulsar 是一种高效的服务器到服务器消息系统,具有多租户和高性能等特点,最初由 yahoo 开发,现由 apache 软件基金会管理。它是 apache 的顶级项目,定位为下一代云原生分布式消息流平台,融合了消息传递、存储和轻量级函数计算功能,采用计算与存储分离的架构设计,支持多租户、持久化存储、跨区域数据复制,具备强一致性、高吞吐、低延迟和高扩展性等流数据存储特性,被视为云原生时代实时消息流传输、存储和计算的理想解决方案。
Pulsar 的特性包括:
单实例支持多个集群,支持跨机房的消息复制。极低的发布和端到端延迟。可扩展到超过一百万个 topic。提供简单易用的客户端 API,支持 Java、Go、Python 和 C++。支持多种 topic 订阅模式,包括独占订阅、共享订阅、故障转移订阅和键共享订阅。通过 Apache BookKeeper 提供持久化消息存储,确保消息传递的可靠性。Pulsar Functions 和 Pulsar IO 提供 serverless 计算框架和数据集成解决方案。支持分层存储,可将旧数据从热存储卸载到冷/长期存储(如 S3、GCS)。
Pulsar 的架构主要包括以下组件:
Broker:负责消息传输、Topic 管理和负载均衡,不存储消息,是无状态组件。Bookie:使用 Apache BookKeeper 组件,负责消息的持久化存储。Producer:生产者,封装并发送消息到 Broker。Consumer:消费者,通过订阅 Topic 消费消息并确认。Pulsar 还定义了 Reader 角色,允许从指定位置获取消息,无需确认。Zookeeper:用于元数据存储和集群配置管理,包括租户和命名空间的一致性协调。
Pulsar 支持四种订阅模式:
独占(Exclusive)订阅:同一时间只有一个消费者可以消费数据,适用范围较小。共享(Shared)订阅:多个消费者可以同时运行,消息按轮询方式分配,但无法保证消息顺序。故障转移(Failover)订阅:在独占模式的基础上,允许启动多个消费者,当一个消费者失败时,其他消费者可以接管。键共享(KeyShared)订阅:基于共享模式,消息按键分组,同组消息由同一个消费者有序消费。
下载和安装 Pulsar 2.9.1 版本后,可以在 Linux 服务器上解压并启动单机版 Pulsar。使用命令行可以启动和终止 Pulsar 服务。
立即进入“豆包AI人工智官网入口”;
立即学习“豆包AI人工智能在线问答入口”;
在 Spring Boot 中集成 Pulsar 需要以下步骤:
引入 Maven 依赖:
io.github.majusko pulsar-java-spring-boot-starter 1.1.0
配置 application.yml:
pulsar: service-url: pulsar://192.168.0.105:6650
创建 Pulsar 配置类:
@Configurationpublic class PulsarConfig { @Bean public ProducerFactory producerFactory() { return new ProducerFactory().addProducer("testTopic", String.class); }}
定义 Topic 名称常量类:
public class TopicName { private TopicName(){} public static final String TEST_TOPIC = "testTopic";}
创建消息生产者类:
@Componentpublic class PulsarProducer { @Resource private PulsarTemplate template; public void send(String topic, T message) { try { template.send(topic, message); } catch (PulsarClientException e) { e.printStackTrace(); } }}
创建消息消费者类:
@Componentpublic class TestTopicPulsarConsumer { private static final Logger log = LoggerFactory.getLogger(TestTopicPulsarConsumer.class); @PulsarConsumer(topic = TopicName.TEST_TOPIC, subscriptionType = SubscriptionType.Shared, clazz = String.class) public void consume(String message) { log.info("PulsarRealConsumer content:{}", message); }}
创建 PulsarController 测试发送消息:
@RestController@RequestMapping("/pulsar")public class PulsarController { @Resource private PulsarProducer pulsarProducer; @PostMapping(value = "/sendMessage") public CommonResponse sendMessage(@RequestParam(name = "message") String message) { pulsarProducer.send(TopicName.TEST_TOPIC, message); return CommonResponse.success("done"); }}
定义公共响应体类:
public class CommonResponse { private String code; private Boolean success; private T data; public static CommonResponse success(T t){ return new CommonResponse("200",true,t); } public CommonResponse(String code, Boolean success, T data) { this.code = code; this.success = success; this.data = data; } //getter、setter方法}
启动项目后,可以使用 Postman 测试消息发送和接收功能。








以上就是Pulsar中间件入门学习的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/33311.html
微信扫一扫
支付宝扫一扫