使用 Reactor Kafka 消费指定范围消息后停止 Consumer

使用 reactor kafka 消费指定范围消息后停止 consumer

本文介绍了如何使用 Reactor Kafka 从指定 Topic 的起始位置开始消费消息,直到达到该 Topic Partition 的最新 Offset,并在消费完成后优雅地停止 Consumer。通过结合 seekToBeginning、endOffsets 和 takeUntil 等 Reactor Kafka 的特性,可以实现精确的消息消费控制。

在某些场景下,我们需要消费 Kafka Topic 中的全部或部分消息,并在消费完成后停止 Consumer,例如数据迁移、历史数据分析等。Reactor Kafka 提供了强大的 API 来实现这种需求。以下是一个示例,展示了如何使用 Reactor Kafka 从 Topic 的起始位置消费到最新 Offset,然后停止 Consumer。

代码示例

import org.apache.kafka.common.TopicPartition;import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;import org.springframework.kafka.listener.ConsumerProperties;import org.springframework.kafka.support.Acknowledgment;import reactor.core.Disposable;import reactor.core.publisher.Flux;import reactor.core.publisher.Mono;import reactor.kafka.receiver.ReceiverOptions;import reactor.kafka.receiver.ReceiverPartition;import reactor.kafka.receiver.ReceiverRecord;import java.time.Duration;import java.util.Collections;import java.util.Map;public class KafkaConsumerExample {    public Disposable consumeMessages(String topic, String groupId, String bootstrapServers) {        TopicPartition topicPartition = new TopicPartition(topic, 0); // 假设只有一个 Partition        // 配置 Consumer 属性        Map consumerProps = Map.of(                "bootstrap.servers", bootstrapServers,                "group.id", groupId,                "key.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class,                "value.deserializer", org.apache.kafka.common.serialization.StringDeserializer.class,                "auto.offset.reset", "earliest" // 从最早的 Offset 开始消费        );        // 创建 ReceiverOptions        ReceiverOptions receiverOptions = ReceiverOptions.create(consumerProps)                .subscription(Collections.singleton(topic))                .addAssignListener(partitions -> partitions.forEach(ReceiverPartition::seekToBeginning));        // 创建 ReactiveKafkaConsumerTemplate        ReactiveKafkaConsumerTemplate kafkaConsumer = new ReactiveKafkaConsumerTemplate(receiverOptions);        // 消费消息并停止 Consumer        return kafkaConsumer                .receive()                .flatMap(record -> {                    // 获取当前 Partition 的最新 Offset                    Mono<Map> endOffsetsMono = kafkaConsumer.doOnConsumer(consumer -> consumer.endOffsets(Collections.singleton(topicPartition)));                    return endOffsetsMono.map(topicPartitionToLastOffset -> {                        long lastOffset = topicPartitionToLastOffset.get(topicPartition);                        return new RecordWithLastOffset(record, lastOffset);                    });                })                .takeUntil(recordWithLastOffset -> recordWithLastOffset.record.offset() >= (recordWithLastOffset.lastOffset - 1))                .subscribe(recordWithLastOffset -> {                    ReceiverRecord record = recordWithLastOffset.record;                    Acknowledgment acknowledgment = record.receiverOffset();                    System.out.printf("Received message: topic-partition=%s offset=%d key=%s value=%sn",                            acknowledgment.topicPartition(),                            acknowledgment.offset(),                            record.key(),                            record.value());                    acknowledgment.acknowledge();                });    }    private static class RecordWithLastOffset {        private final ReceiverRecord record;        private final long lastOffset;        public RecordWithLastOffset(ReceiverRecord record, long lastOffset) {            this.record = record;            this.lastOffset = lastOffset;        }    }    public static void main(String[] args) {        String topic = "your-topic-name";        String groupId = "your-group-id";        String bootstrapServers = "localhost:9092";        KafkaConsumerExample example = new KafkaConsumerExample();        Disposable disposable = example.consumeMessages(topic, groupId, bootstrapServers);        // 保持程序运行一段时间,以便消费消息        try {            Thread.sleep(10000);        } catch (InterruptedException e) {            e.printStackTrace();        }        // 取消订阅,停止消费        disposable.dispose();    }}

代码解释

配置 Consumer 属性: 设置 Kafka Consumer 的连接信息、序列化方式、GroupId 以及 Offset 重置策略。auto.offset.reset = earliest 确保从 Topic 的起始位置开始消费。创建 ReceiverOptions: 使用配置的 Consumer 属性创建 ReceiverOptions,并通过 subscription 指定要消费的 Topic。addAssignListener 用于在 Partition 分配后,通过 seekToBeginning 将 Consumer 的 Offset 重置到起始位置。创建 ReactiveKafkaConsumerTemplate: 使用 ReceiverOptions 创建 ReactiveKafkaConsumerTemplate,用于消费 Kafka 消息。消费消息并停止 Consumer:kafkaConsumer.receive(): 从 Kafka Topic 接收消息,返回一个 Flux<ReceiverRecord>。flatMap: 对于每个接收到的消息,使用kafkaConsumer.doOnConsumer来获取当前TopicPartition的最新Offset。doOnConsumer允许你访问底层的KafkaConsumer对象,从而可以调用consumer.endOffsets方法。map: 将ReceiverRecord和获取到的最新Offset封装到一个自定义的RecordWithLastOffset对象中。takeUntil: 使用 takeUntil 操作符,当消费到最新 Offset 的前一个位置时,停止消费。record.offset() >= (lastOffset – 1) 判断当前消息的 Offset 是否已经达到或超过了最新 Offset 的前一个位置。subscribe: 订阅 Flux,处理接收到的消息。在 subscribe 方法中,可以执行消息处理逻辑,并使用 record.receiverOffset().acknowledge() 提交 Offset。取消订阅: 使用 disposable.dispose() 取消订阅,停止 Consumer。

注意事项

示例代码假设 Topic 只有一个 Partition。如果 Topic 有多个 Partition,需要根据实际情况进行调整。endOffsets 方法返回的是一个 Map,其中 Long 值是每个 Partition 的最新 Offset。Offset 的提交方式有多种,示例代码中使用的是手动提交,即在 subscribe 方法中调用 record.receiverOffset().acknowledge() 提交 Offset。也可以使用自动提交,通过设置 Consumer 的 enable.auto.commit 属性来实现。在实际应用中,需要处理可能出现的异常情况,例如 Kafka 连接失败、消息处理失败等。

总结

通过结合 Reactor Kafka 的 seekToBeginning、endOffsets 和 takeUntil 等特性,可以实现精确的消息消费控制,并在消费完成后优雅地停止 Consumer。这种方式适用于需要消费指定范围消息的场景,例如数据迁移、历史数据分析等。在实际应用中,需要根据具体的需求进行调整和优化。

以上就是使用 Reactor Kafka 消费指定范围消息后停止 Consumer的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/79224.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年11月15日 07:44:24
下一篇 2025年11月15日 08:04:48

相关推荐

发表回复

登录后才能评论
关注微信