在 Apache Flink 中消费带键 Kafka 记录的实践教程

在 Apache Flink 中消费带键 Kafka 记录的实践教程

本教程旨在指导您如何在 apache flink 中高效消费带有键的 kafka 记录。文章详细介绍了使用自定义 `kafkarecorddeserializationschema` 来解析 kafka `consumerrecord` 中的键、值、时间戳等信息,并提供了完整的 flink 应用程序代码示例。通过遵循本文的步骤,您可以轻松地构建能够处理复杂 kafka 消息结构的 flink 流处理应用。

1. 理解带键 Kafka 记录及其重要性

在 Kafka 中,消息(记录)通常包含一个可选的键(Key)和一个值(Value)。键在许多场景下都至关重要,例如:

消息顺序保证:同一个键的所有消息会被发送到同一个分区,从而保证了这些消息的消费顺序。状态管理:在 Flink 等流处理框架中,键是进行有状态操作(如聚合、连接)的基础。数据路由:消费者可以根据键来过滤或路由消息。

当使用 kafka-console-producer.sh 并指定 –property “parse.key=true” –property “key.separator=:” 时,生产者会从输入中解析出键和值,并将它们作为独立的字段发送到 Kafka。例如,myKey:myValue 会被解析为键 myKey 和值 myValue。

2. Flink KafkaSource 的默认行为与限制

Apache Flink 提供了 KafkaSource 作为消费 Kafka 数据的首选连接器。然而,当您使用 KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class) 这样的默认配置时,KafkaSource 仅会反序列化 Kafka 记录的值部分,而忽略其键、时间戳、分区、偏移量以及头部信息。这对于只需要处理消息值的场景是足够的,但对于需要访问键或其它元数据的应用来说,这种方式就显得力不从心。

以下是仅读取非带键记录的示例代码,它无法获取 Kafka 记录的键:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.connector.kafka.source.KafkaSource;import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;import org.apache.kafka.common.serialization.StringDeserializer;public class FlinkValueOnlyKafkaConsumer {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        String bootstrapServers = "localhost:9092"; // 替换为您的Kafka地址        KafkaSource source = KafkaSource.builder()                .setBootstrapServers(bootstrapServers)                .setTopics("test3")                .setGroupId("1")                .setStartingOffsets(OffsetsInitializer.earliest())                .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))                .build();        DataStream stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");        stream.map((MapFunction) value -> "Receiving from Kafka : " + value).print();        env.execute("Flink Value-Only Kafka Consumer");    }}

3. 自定义 KafkaRecordDeserializationSchema 读取带键记录

要从 Kafka 记录中获取键、值、时间戳等所有信息,您需要实现一个自定义的 KafkaRecordDeserializationSchema。这个接口的 deserialize 方法会接收一个 ConsumerRecord 对象,该对象提供了对原始字节形式的键、值、时间戳、分区、偏移量以及头部信息的完全访问。

3.1 定义自定义反序列化器

首先,创建一个实现 KafkaRecordDeserializationSchema 接口的类。在这个示例中,我们将反序列化键和值都为 String 类型,并将它们与时间戳一起封装到一个 Tuple3 对象中输出。

PicDoc PicDoc

AI文本转视觉工具,1秒生成可视化信息图

PicDoc 6214 查看详情 PicDoc

import org.apache.flink.api.common.serialization.DeserializationSchema;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.util.Collector;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.common.serialization.StringDeserializer;import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;import java.io.IOException;/** * 自定义 Kafka 记录反序列化器,用于解析键、值和时间戳。 * 输出类型为 Tuple3 */public class KeyedKafkaRecordDeserializationSchema implements KafkaRecordDeserializationSchema<Tuple3> {    // transient 关键字确保这些反序列化器不会被 Flink 的序列化机制尝试序列化    private transient StringDeserializer keyDeserializer;    private transient StringDeserializer valueDeserializer;    /**     * 在反序列化器初始化时调用,用于设置内部状态。     * 通常在这里初始化 Kafka 客户端的反序列化器。     */    @Override    public void open(DeserializationSchema.InitializationContext context) throws Exception {        // 根据 Kafka 生产者实际使用的序列化器来选择这里的反序列化器        // 假设键和值都是字符串,使用 StringDeserializer        keyDeserializer = new StringDeserializer();        valueDeserializer = new StringDeserializer();    }    /**     * 核心反序列化逻辑。     *     * @param record Kafka 原始的 ConsumerRecord 对象,包含字节数组形式的键和值。     * @param out    用于收集反序列化结果的 Collector。     * @throws IOException 如果反序列化过程中发生 I/O 错误。     */    @Override    public void deserialize(ConsumerRecord record, Collector<Tuple3> out) throws IOException {        // 反序列化键        String key = (record.key() != null) ? keyDeserializer.deserialize(record.topic(), record.key()) : null;        // 反序列化值        String value = (record.value() != null) ? valueDeserializer.deserialize(record.topic(), record.value()) : null;        // 获取时间戳        long timestamp = record.timestamp();        // 将反序列化后的键、值和时间戳封装成 Tuple3 并发出        out.collect(new Tuple3(key, value, timestamp));    }    /**     * 返回此反序列化器生产的数据类型信息。     * Flink 使用此信息进行类型检查和序列化。     */    @Override    public TypeInformation<Tuple3> getProducedType() {        // 使用 TypeHint 来获取泛型类型信息        return TypeInformation.of(new org.apache.flink.api.java.typeutils.TypeHint<Tuple3>() {});    }}

注意事项:

open 方法:在反序列化器首次使用时调用,用于初始化资源。将 Kafka 客户端的反序列化器(如 StringDeserializer)放在这里初始化可以避免在每次 deserialize 调用时重复创建对象,提高效率。deserialize 方法:这是核心逻辑所在。ConsumerRecord 提供了 key()、value()、timestamp()、topic()、partition()、offset() 和 headers() 等方法。您可以使用 Kafka 客户端提供的反序列化器(例如 StringDeserializer、LongDeserializer 或自定义的 Avro/Protobuf 反序列化器)来将 byte[] 转换为实际的数据类型。getProducedType 方法:必须返回此反序列化器将发出的数据流的 TypeInformation。这对于 Flink 的类型系统至关重要。

3.2 在 Flink KafkaSource 中使用自定义反序列化器

接下来,将我们自定义的 KeyedKafkaRecordDeserializationSchema 应用到 KafkaSource 中:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.connector.kafka.source.KafkaSource;import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;public class FlinkKeyedKafkaConsumer {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        String bootstrapServers = "localhost:9092"; // 替换为您的 Kafka 地址        String topic = "test3";        String groupId = "1";        // 构建 KafkaSource,并指定我们自定义的反序列化器        KafkaSource<Tuple3> source = KafkaSource.<Tuple3>builder()                .setBootstrapServers(bootstrapServers)                .setTopics(topic)                .setGroupId(groupId)                .setStartingOffsets(OffsetsInitializer.earliest())                .setDeserializer(new KeyedKafkaRecordDeserializationSchema()) // 使用自定义反序列化器                .build();        // 从 KafkaSource 创建数据流        DataStream<Tuple3> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Keyed Kafka Source");        // 对数据流进行操作,现在可以访问键、值和时间戳        stream.map(record -> "Key: " + record.f0 + ", Value: " + record.f1 + ", Timestamp: " + record.f2)              .print();        // 执行 Flink 作业        env.execute("Flink Keyed Kafka Consumer");    }}

3.3 Kafka 生产者示例(用于测试)

为了测试上述 Flink 消费者,您可以使用以下命令启动一个 Kafka 控制台生产者,它会生成带键的记录:

bin/kafka-console-producer.sh --topic test3 --property "parse.key=true" --property "key.separator=:" --bootstrap-server localhost:9092

然后,在控制台中输入 myKey:myValue 这样的消息,Flink 消费者将能够正确解析出 myKey 作为键,myValue 作为值。

4. 总结

通过实现自定义的 KafkaRecordDeserializationSchema,您可以完全控制 Flink 如何从 Kafka 的原始 ConsumerRecord 中提取和反序列化数据。这不仅限于键和值,还可以包括时间戳、主题、分区、偏移量甚至自定义头部信息。这种灵活性使得 Flink 能够处理各种复杂的 Kafka 消息格式,为构建强大的流处理应用提供了坚实的基础。在实际应用中,请确保自定义反序列化器中使用的 Kafka 客户端反序列化器与生产者使用的序列化器保持一致。

以上就是在 Apache Flink 中消费带键 Kafka 记录的实践教程的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/988581.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年12月1日 21:11:52
下一篇 2025年12月1日 21:12:16

相关推荐

  • 如何用dom2img解决网页打印样式不显示的问题?

    用dom2img解决网页打印样式不显示的问题 想将网页以所见即打印的的效果呈现,需要采取一些措施,特别是在使用了bootstrap等大量采用外部css样式的框架时。 问题根源 在常规打印操作中,浏览器通常会忽略css样式等非必要的页面元素,导致打印出的结果与网页显示效果不一致。这是因为打印机制只识别…

    2025年12月24日
    800
  • Uniapp 中如何不拉伸不裁剪地展示图片?

    灵活展示图片:如何不拉伸不裁剪 在界面设计中,常常需要以原尺寸展示用户上传的图片。本文将介绍一种在 uniapp 框架中实现该功能的简单方法。 对于不同尺寸的图片,可以采用以下处理方式: 极端宽高比:撑满屏幕宽度或高度,再等比缩放居中。非极端宽高比:居中显示,若能撑满则撑满。 然而,如果需要不拉伸不…

    2025年12月24日
    400
  • 如何让小说网站控制台显示乱码,同时网页内容正常显示?

    如何在不影响用户界面的情况下实现控制台乱码? 当在小说网站上下载小说时,大家可能会遇到一个问题:网站上的文本在网页内正常显示,但是在控制台中却是乱码。如何实现此类操作,从而在不影响用户界面(UI)的情况下保持控制台乱码呢? 答案在于使用自定义字体。网站可以通过在服务器端配置自定义字体,并通过在客户端…

    2025年12月24日
    800
  • 如何在地图上轻松创建气泡信息框?

    地图上气泡信息框的巧妙生成 地图上气泡信息框是一种常用的交互功能,它简便易用,能够为用户提供额外信息。本文将探讨如何借助地图库的功能轻松创建这一功能。 利用地图库的原生功能 大多数地图库,如高德地图,都提供了现成的信息窗体和右键菜单功能。这些功能可以通过以下途径实现: 高德地图 JS API 参考文…

    2025年12月24日
    400
  • 如何使用 scroll-behavior 属性实现元素scrollLeft变化时的平滑动画?

    如何实现元素scrollleft变化时的平滑动画效果? 在许多网页应用中,滚动容器的水平滚动条(scrollleft)需要频繁使用。为了让滚动动作更加自然,你希望给scrollleft的变化添加动画效果。 解决方案:scroll-behavior 属性 要实现scrollleft变化时的平滑动画效果…

    2025年12月24日
    000
  • 如何为滚动元素添加平滑过渡,使滚动条滑动时更自然流畅?

    给滚动元素平滑过渡 如何在滚动条属性(scrollleft)发生改变时为元素添加平滑的过渡效果? 解决方案:scroll-behavior 属性 为滚动容器设置 scroll-behavior 属性可以实现平滑滚动。 html 代码: click the button to slide right!…

    2025年12月24日
    500
  • 如何选择元素个数不固定的指定类名子元素?

    灵活选择元素个数不固定的指定类名子元素 在网页布局中,有时需要选择特定类名的子元素,但这些元素的数量并不固定。例如,下面这段 html 代码中,activebar 和 item 元素的数量均不固定: *n *n 如果需要选择第一个 item元素,可以使用 css 选择器 :nth-child()。该…

    2025年12月24日
    200
  • 使用 SVG 如何实现自定义宽度、间距和半径的虚线边框?

    使用 svg 实现自定义虚线边框 如何实现一个具有自定义宽度、间距和半径的虚线边框是一个常见的前端开发问题。传统的解决方案通常涉及使用 border-image 引入切片图片,但是这种方法存在引入外部资源、性能低下的缺点。 为了避免上述问题,可以使用 svg(可缩放矢量图形)来创建纯代码实现。一种方…

    2025年12月24日
    100
  • Bootstrap 中如何让文字浮于阴影之上?

    文字浮于阴影之上 文中提到的代码片段中 元素中的文字被阴影元素 所遮挡,如何让文字显示在阴影之上? bootstrap v3和v5在处理此类问题方面存在差异。 解决方法 在bootstrap v5中,给 元素添加以下css样式: .banner-content { position: relativ…

    2025年12月24日
    000
  • 如何让“元素跟随文本高度,而不是撑高父容器?

    如何让 元素跟随文本高度,而不是撑高父容器 在页面布局中,经常遇到父容器高度被子元素撑开的问题。在图例所示的案例中,父容器被较高的图片撑开,而文本的高度没有被考虑。本问答将提供纯css解决方案,让图片跟随文本高度,确保父容器的高度不会被图片影响。 解决方法 为了解决这个问题,需要将图片从文档流中脱离…

    2025年12月24日
    000
  • 为什么 CSS mask 属性未请求指定图片?

    解决 css mask 属性未请求图片的问题 在使用 css mask 属性时,指定了图片地址,但网络面板显示未请求获取该图片,这可能是由于浏览器兼容性问题造成的。 问题 如下代码所示: 立即学习“前端免费学习笔记(深入)”; icon [data-icon=”cloud”] { –icon-cl…

    2025年12月24日
    200
  • 如何利用 CSS 选中激活标签并影响相邻元素的样式?

    如何利用 css 选中激活标签并影响相邻元素? 为了实现激活标签影响相邻元素的样式需求,可以通过 :has 选择器来实现。以下是如何具体操作: 对于激活标签相邻后的元素,可以在 css 中使用以下代码进行设置: li:has(+li.active) { border-radius: 0 0 10px…

    2025年12月24日
    100
  • 如何模拟Windows 10 设置界面中的鼠标悬浮放大效果?

    win10设置界面的鼠标移动显示周边的样式(探照灯效果)的实现方式 在windows设置界面的鼠标悬浮效果中,光标周围会显示一个放大区域。在前端开发中,可以通过多种方式实现类似的效果。 使用css 使用css的transform和box-shadow属性。通过将transform: scale(1.…

    2025年12月24日
    200
  • 为什么我的 Safari 自定义样式表在百度页面上失效了?

    为什么在 Safari 中自定义样式表未能正常工作? 在 Safari 的偏好设置中设置自定义样式表后,您对其进行测试却发现效果不同。在您自己的网页中,样式有效,而在百度页面中却失效。 造成这种情况的原因是,第一个访问的项目使用了文件协议,可以访问本地目录中的图片文件。而第二个访问的百度使用了 ht…

    2025年12月24日
    000
  • Bootstrap 5:如何将文字置于阴影之上?

    文字重叠阴影 在 bootstrap 5 中,将文字置于阴影之上时遇到了困难。在 bootstrap 3 中,此问题并不存在,但升级到 bootstrap 5 后却无法实现。 解决方案 为了解决这个问题,需要给 元素添加以下样式: .banner-content { position: relati…

    2025年12月24日
    400
  • 如何用前端实现 Windows 10 设置界面的鼠标移动探照灯效果?

    如何在前端实现 Windows 10 设置界面中的鼠标移动探照灯效果 想要在前端开发中实现 Windows 10 设置界面中类似的鼠标移动探照灯效果,可以通过以下途径: CSS 解决方案 DEMO 1: Windows 10 网格悬停效果:https://codepen.io/tr4553r7/pe…

    2025年12月24日
    000
  • 使用CSS mask属性指定图片URL时,为什么浏览器无法加载图片?

    css mask属性未能加载图片的解决方法 使用css mask属性指定图片url时,如示例中所示: mask: url(“https://api.iconify.design/mdi:apple-icloud.svg”) center / contain no-repeat; 但是,在网络面板中却…

    2025年12月24日
    000
  • Bootstrap 5 如何将文字置于阴影上方?

    如何在 bootstrap 5 中让文字位于阴影上方? 在将网站从 bootstrap 3 升级到 bootstrap 5 后,用户遇到一个问题:文字内容无法像以前那样置于阴影层之上。 解决方案: 为了将文字置于阴影层上方,需要给 banner-content 元素添加以下 css 样式: .ban…

    2025年12月24日
    100
  • 如何用CSS Paint API为网页元素添加时尚的斑马线边框?

    为元素添加时尚的斑马线边框 在网页设计中,有时我们需要添加时尚的边框来提升元素的视觉效果。其中,斑马线边框是一种既醒目又别致的设计元素。 实现斜向斑马线边框 要实现斜向斑马线间隔圆环,我们可以使用css paint api。该api提供了强大的功能,可以让我们在元素上绘制复杂的图形。 立即学习“前端…

    2025年12月24日
    000
  • 图片如何不撑高父容器?

    如何让图片不撑高父容器? 当父容器包含不同高度的子元素时,父容器的高度通常会被最高元素撑开。如果你希望父容器的高度由文本内容撑开,避免图片对其产生影响,可以通过以下 css 解决方法: 绝对定位元素: .child-image { position: absolute; top: 0; left: …

    2025年12月24日
    000

发表回复

登录后才能评论
关注微信