
本教程旨在指导您如何在 apache flink 中高效消费带有键的 kafka 记录。文章详细介绍了使用自定义 `kafkarecorddeserializationschema` 来解析 kafka `consumerrecord` 中的键、值、时间戳等信息,并提供了完整的 flink 应用程序代码示例。通过遵循本文的步骤,您可以轻松地构建能够处理复杂 kafka 消息结构的 flink 流处理应用。
1. 理解带键 Kafka 记录及其重要性
在 Kafka 中,消息(记录)通常包含一个可选的键(Key)和一个值(Value)。键在许多场景下都至关重要,例如:
消息顺序保证:同一个键的所有消息会被发送到同一个分区,从而保证了这些消息的消费顺序。状态管理:在 Flink 等流处理框架中,键是进行有状态操作(如聚合、连接)的基础。数据路由:消费者可以根据键来过滤或路由消息。
当使用 kafka-console-producer.sh 并指定 –property “parse.key=true” –property “key.separator=:” 时,生产者会从输入中解析出键和值,并将它们作为独立的字段发送到 Kafka。例如,myKey:myValue 会被解析为键 myKey 和值 myValue。
2. Flink KafkaSource 的默认行为与限制
Apache Flink 提供了 KafkaSource 作为消费 Kafka 数据的首选连接器。然而,当您使用 KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class) 这样的默认配置时,KafkaSource 仅会反序列化 Kafka 记录的值部分,而忽略其键、时间戳、分区、偏移量以及头部信息。这对于只需要处理消息值的场景是足够的,但对于需要访问键或其它元数据的应用来说,这种方式就显得力不从心。
以下是仅读取非带键记录的示例代码,它无法获取 Kafka 记录的键:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.connector.kafka.source.KafkaSource;import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;import org.apache.kafka.common.serialization.StringDeserializer;public class FlinkValueOnlyKafkaConsumer { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String bootstrapServers = "localhost:9092"; // 替换为您的Kafka地址 KafkaSource source = KafkaSource.builder() .setBootstrapServers(bootstrapServers) .setTopics("test3") .setGroupId("1") .setStartingOffsets(OffsetsInitializer.earliest()) .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) .build(); DataStream stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); stream.map((MapFunction) value -> "Receiving from Kafka : " + value).print(); env.execute("Flink Value-Only Kafka Consumer"); }}
3. 自定义 KafkaRecordDeserializationSchema 读取带键记录
要从 Kafka 记录中获取键、值、时间戳等所有信息,您需要实现一个自定义的 KafkaRecordDeserializationSchema。这个接口的 deserialize 方法会接收一个 ConsumerRecord 对象,该对象提供了对原始字节形式的键、值、时间戳、分区、偏移量以及头部信息的完全访问。
3.1 定义自定义反序列化器
首先,创建一个实现 KafkaRecordDeserializationSchema 接口的类。在这个示例中,我们将反序列化键和值都为 String 类型,并将它们与时间戳一起封装到一个 Tuple3 对象中输出。
PicDoc
AI文本转视觉工具,1秒生成可视化信息图
6214 查看详情
import org.apache.flink.api.common.serialization.DeserializationSchema;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.util.Collector;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;import java.io.IOException;/** * 自定义 Kafka 记录反序列化器,用于解析键、值和时间戳。 * 输出类型为 Tuple3 */public class KeyedKafkaRecordDeserializationSchema implements KafkaRecordDeserializationSchema<Tuple3> { // transient 关键字确保这些反序列化器不会被 Flink 的序列化机制尝试序列化 private transient StringDeserializer keyDeserializer; private transient StringDeserializer valueDeserializer; /** * 在反序列化器初始化时调用,用于设置内部状态。 * 通常在这里初始化 Kafka 客户端的反序列化器。 */ @Override public void open(DeserializationSchema.InitializationContext context) throws Exception { // 根据 Kafka 生产者实际使用的序列化器来选择这里的反序列化器 // 假设键和值都是字符串,使用 StringDeserializer keyDeserializer = new StringDeserializer(); valueDeserializer = new StringDeserializer(); } /** * 核心反序列化逻辑。 * * @param record Kafka 原始的 ConsumerRecord 对象,包含字节数组形式的键和值。 * @param out 用于收集反序列化结果的 Collector。 * @throws IOException 如果反序列化过程中发生 I/O 错误。 */ @Override public void deserialize(ConsumerRecord record, Collector<Tuple3> out) throws IOException { // 反序列化键 String key = (record.key() != null) ? keyDeserializer.deserialize(record.topic(), record.key()) : null; // 反序列化值 String value = (record.value() != null) ? valueDeserializer.deserialize(record.topic(), record.value()) : null; // 获取时间戳 long timestamp = record.timestamp(); // 将反序列化后的键、值和时间戳封装成 Tuple3 并发出 out.collect(new Tuple3(key, value, timestamp)); } /** * 返回此反序列化器生产的数据类型信息。 * Flink 使用此信息进行类型检查和序列化。 */ @Override public TypeInformation<Tuple3> getProducedType() { // 使用 TypeHint 来获取泛型类型信息 return TypeInformation.of(new org.apache.flink.api.java.typeutils.TypeHint<Tuple3>() {}); }}
注意事项:
open 方法:在反序列化器首次使用时调用,用于初始化资源。将 Kafka 客户端的反序列化器(如 StringDeserializer)放在这里初始化可以避免在每次 deserialize 调用时重复创建对象,提高效率。deserialize 方法:这是核心逻辑所在。ConsumerRecord 提供了 key()、value()、timestamp()、topic()、partition()、offset() 和 headers() 等方法。您可以使用 Kafka 客户端提供的反序列化器(例如 StringDeserializer、LongDeserializer 或自定义的 Avro/Protobuf 反序列化器)来将 byte[] 转换为实际的数据类型。getProducedType 方法:必须返回此反序列化器将发出的数据流的 TypeInformation。这对于 Flink 的类型系统至关重要。
3.2 在 Flink KafkaSource 中使用自定义反序列化器
接下来,将我们自定义的 KeyedKafkaRecordDeserializationSchema 应用到 KafkaSource 中:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.connector.kafka.source.KafkaSource;import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;public class FlinkKeyedKafkaConsumer { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String bootstrapServers = "localhost:9092"; // 替换为您的 Kafka 地址 String topic = "test3"; String groupId = "1"; // 构建 KafkaSource,并指定我们自定义的反序列化器 KafkaSource<Tuple3> source = KafkaSource.<Tuple3>builder() .setBootstrapServers(bootstrapServers) .setTopics(topic) .setGroupId(groupId) .setStartingOffsets(OffsetsInitializer.earliest()) .setDeserializer(new KeyedKafkaRecordDeserializationSchema()) // 使用自定义反序列化器 .build(); // 从 KafkaSource 创建数据流 DataStream<Tuple3> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Keyed Kafka Source"); // 对数据流进行操作,现在可以访问键、值和时间戳 stream.map(record -> "Key: " + record.f0 + ", Value: " + record.f1 + ", Timestamp: " + record.f2) .print(); // 执行 Flink 作业 env.execute("Flink Keyed Kafka Consumer"); }}
3.3 Kafka 生产者示例(用于测试)
为了测试上述 Flink 消费者,您可以使用以下命令启动一个 Kafka 控制台生产者,它会生成带键的记录:
bin/kafka-console-producer.sh --topic test3 --property "parse.key=true" --property "key.separator=:" --bootstrap-server localhost:9092
然后,在控制台中输入 myKey:myValue 这样的消息,Flink 消费者将能够正确解析出 myKey 作为键,myValue 作为值。
4. 总结
通过实现自定义的 KafkaRecordDeserializationSchema,您可以完全控制 Flink 如何从 Kafka 的原始 ConsumerRecord 中提取和反序列化数据。这不仅限于键和值,还可以包括时间戳、主题、分区、偏移量甚至自定义头部信息。这种灵活性使得 Flink 能够处理各种复杂的 Kafka 消息格式,为构建强大的流处理应用提供了坚实的基础。在实际应用中,请确保自定义反序列化器中使用的 Kafka 客户端反序列化器与生产者使用的序列化器保持一致。
以上就是在 Apache Flink 中消费带键 Kafka 记录的实践教程的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/988581.html
微信扫一扫
支付宝扫一扫