快速入门Flink (7) —— 小白都喜欢看的Flink流处理之DataSources和DataSinks

不知不觉,这已经是快速入门flink系列的第7篇博客了。早在第4篇博客中,博主就已经为大家介绍了在批处理中,数据输入data sources 与数据输出data sinks的各种分类(传送门:flink批处理的datasources和datasinks)。但是大家是否还记得flink的概念?flink是 分布式、 高性能、 随时可用以及准确的为流处理应用程序打造的开源流处理框架。所以光介绍了批处理哪里行呢!本篇博客,我们就来学习flink流处理的datasources和datasinks~

快速入门Flink (7) —— 小白都喜欢看的Flink流处理之DataSources和DataSinks

1、DataStream API开发1.1 入门案例1.1.1 Flink流处理程序的一般流程

1) 获取 Flink 流处理执行环境

2) 构建 source

3) 数据处理

4) 构建 sink

1.1.2 示例

编写 Flink 程序,用来统计单词的数量。

1.1.3 步骤

1) 获取 Flink 批处理运行环境

2) 构建一个 socket 源

3) 使用 flink 操作进行单词统计

4) 打印

说明:如果 linux 上没有安装 nc 服务 ,使用 yum 安装

代码语言:javascript代码运行次数:0运行复制

yum install -y nc

1.1.4 参考代码代码语言:javascript代码运行次数:0运行复制

import org.apache.flink.api.java.tuple.Tupleimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindow/* * @Author: Alice菌 * @Date: 2020/7/9 08:40 * @Description:      */// 入门案例,单词统计object StreamWordCount {  def main(args: Array[String]): Unit = {    // 1、 创建流处理的执行环境    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    // 2、 构建数据源,使用的socket    val socketDataStream: DataStream[String] = env.socketTextStream("node01",9999, 0)    // 3、 数据的处理    val wordDataStream: DataStream[(String, Int)] = socketDataStream.flatMap(_.split(" ")).map(_ -> 1)    //4. 使用keyBy 进行分流(分组)    // 在批处理中针对于dataset, 如果分组需要使用groupby    // 在流处理中针对于datastream, 如果分组(分流)使用keyBy    val groupedDataStream: KeyedStream[(String, Int), Tuple] = wordDataStream.keyBy(0)    //5. 使用timeWinodw 指定窗口的长度(每5秒计算一次)    // spark-》reduceBykeyAndWindow    val windowDataStream: WindowedStream[(String,Int),Tuple,TimeWindow]= groupedDataStream.timeWindow(      Time.seconds(5)    )    //6. 使用sum执行累加    val sumDataStream: DataStream[(String, Int)] = windowDataStream.sum(1)    sumDataStream.print()    env.execute("StreamWordCount")  }}

我们来测试下效果如何~

首先我们在linux上开启9999端口

nc -lk 9999

快速入门Flink (7) —— 小白都喜欢看的Flink流处理之DataSources和DataSinks

然后我们启动我们的程序,发现也是毫无波澜。

接下来就是见证奇迹的时候了,当我以飞快的速度在命令行中敲下这些字母

快速入门Flink (7) —— 小白都喜欢看的Flink流处理之DataSources和DataSinks

然后观察程序的控制台,发现打印出了每5秒内,所有的字符数的个数

快速入门Flink (7) —— 小白都喜欢看的Flink流处理之DataSources和DataSinks

有朋友肯定会好奇,为什么scala一次显示为3次,后面只显示了1次?。哈哈,注意观察我上方留下的代码,我只设置了窗口的大小,滑动距离可还没有设置呢~所以,每次都是对单独一个5秒时间内所有字母求WordCount。

OK,看到了上方的效果图,我们可以继续深入学习。

1.2 输入数据集 Data Sources

在Flink中我们可以使用 StreamExecutionEnvironment.addSource(source) 来为程序添加数据来源。

Flink 已 经 提 供 了 若 干 实 现 好 了 的 source functions ,当 然 你 也 可 以 通 过 实 现 SourceFunction 来自定义非并行的 source 或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。

1.2.1 Flink 在流处理上常见的 Source

Flink 在流处理上的 source 和在批处理上的 source 基本一致。

大致有 4 大类

基于本地集合的 source基于文件的 source基于网络套接字的 source自定义的 source1.2.2 基于集合的 source示例代码代码语言:javascript代码运行次数:0运行复制

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}import scala.collection.immutable.{Queue, Stack}import scala.collection.mutableimport scala.collection.mutable.{ArrayBuffer, ListBuffer}import org.apache.flink.api.scala._/* * @Author: Alice菌 * @Date: 2020/8/8 17:02 * @Description:      */object StreamDataSourceDemo {  def main(args: Array[String]): Unit = {    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    // 0. 用element创建DataStream    val ds0: DataStream[String] = senv.fromElements("spark","flink")    ds0.print()    // 1. 用Tuple创建DataStream    val ds1: DataStream[(Int, String)] = senv.fromElements((1,"spark"),(2,"flink"))    ds1.print()    // 2. 用Array创建DataStream    val ds2: DataStream[String] = senv.fromCollection(Array("spark","flink"))    ds2.print()    // 3. 用ArrayBuffer 创建DataStream    val ds3: DataStream[String] = senv.fromCollection(ArrayBuffer("spark","flink"))    ds3.print()    // 4. 用List创建DataStream    val ds4: DataStream[String] = senv.fromCollection(List("spark","flink"))    ds4.print()    // 5. 用List创建DataStreamm    val ds5: DataStream[String] = senv.fromCollection(ListBuffer("spark","flink"))    ds5.print()    // 6. 用Vector创建DataStream    val ds6: DataStream[String] = senv.fromCollection(Vector("spark","flink"))    ds6.print()    // 7. 用Queue创建DataStream    val ds7: DataStream[String] = senv.fromCollection(Queue("spark","flink"))    ds7.print()    // 8. 用Stack创建DataStream    val ds8: DataStream[String] = senv.fromCollection(Stack("spark", "flink"))    // 9. 用Stream创建DataStream(Stream相当于lazy List,避免在中间过程中生 成不必要的集合)    val ds9: DataStream[String] = senv.fromCollection(Stream("spark","flink"))    ds9.print()    // 10. 用Seq创建DataStream    val ds10: DataStream[String] = senv.fromCollection(Seq("spark","flink"))    ds10.print()    // 11. 用Set创建DataStream(不支持)    // val ds11: DataStream[String] = senv.fromCollection(Seq("spark", "flink"))    // ds11.print()    // 12.用Iterable创建DataStream(不支持)    // val ds12: DataStream[String] = senv.fromCollection(Iterable("spark", "flink"))    // ds12.print()    // 13.用ArraySeq创建DataStream    val ds13: DataStream[String] = senv.fromCollection(mutable.ArraySeq("spark","flink"))    ds13.print()    // 14.用 ArrayStack 创建DataStream    val ds14: DataStream[String] = senv.fromCollection(mutable.ArrayStack("spark","flink"))    ds14.print()    // 15.用Map 创建 DataStream(不支持)    //val ds15: DataStream[(Int, String)] = senv.fromCollection(Map(1 -> "spark", 2 -> "flink"))    //ds15.print()    // 16.用Range创建DataStream    val ds16: DataStream[Int] = senv.fromCollection(Range(1,9))    ds16.print()    // 17.用fromElements创建DataStream    val ds17: DataStream[Long] = senv.generateSequence(1,9)    ds17.print()    senv.execute("StreamDataSourceDemo")  }}

特别注意:

1、DataStream流式应用需要显示指定execute()方法运行程序,如果不调用则Flink流式程序不会执行。

2、无法通过Set,Iterable,Map 来创建 DataStream

1.2.3 基于文件的 source示例代码代码语言:javascript代码运行次数:0运行复制

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}/* * @Author: Alice菌 * @Date: 2020/8/8 17:42 * @Description:    基于文件的source */object StreamFileSourceDemo {  def main(args: Array[String]): Unit = {    // 1、构建流处理的环境    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    // 2、基于文件的source,构建数据集    val textDStream: DataStream[String] = senv.readTextFile("data/input/wordcount.txt")    // 3、打印输出    textDStream.print()          // 4、执行程序    senv.execute("StreamFileSourceDemo")    //3> Final Memory Finished at    //10> Total time BUILD SUCCESS    //4> Flink Flink Flink Flink Flink    //9> Final Memory Finished at    //1> Total time BUILD SUCCESS    //8> Total time BUILD SUCCESS    //12> Final Memory Finished at    //6> Hive Hive Hive Hive Hive  }}

1.2.4 基于网络套接字的 source

这里的代码跟入门案例的代码是一样哒~已经浏览过入门案例代码的朋友可以跳过啦。

其中构建数据源,使用socket : val source = env.socketTextStream("IP", PORT)

小门道AI 小门道AI

小门道AI是一个提供AI服务的网站

小门道AI 117 查看详情 小门道AI 示例代码代码语言:javascript代码运行次数:0运行复制

import org.apache.flink.api.java.tuple.Tupleimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindow/* * @Author: Alice菌 * @Date: 2020/7/9 08:40 * @Description:     基于网络套接字的 source */// 入门案例,单词统计object StreamWordCount {  def main(args: Array[String]): Unit = {    // 1、 创建流处理的执行环境    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    // 2、 构建数据源,使用的socket    val socketDataStream: DataStream[String] = env.socketTextStream("node01",9999, 0)    // 3、 数据的处理    val wordDataStream: DataStream[(String, Int)] = socketDataStream.flatMap(_.split(" ")).map(_ -> 1)    //4. 使用keyBy 进行分流(分组)    // 在批处理中针对于dataset, 如果分组需要使用groupby    // 在流处理中针对于datastream, 如果分组(分流)使用keyBy    val groupedDataStream: KeyedStream[(String, Int), Tuple] = wordDataStream.keyBy(0)    //5. 使用timeWinodw 指定窗口的长度(每5秒计算一次)    // spark-》reduceBykeyAndWindow    val windowDataStream: WindowedStream[(String,Int),Tuple,TimeWindow]= groupedDataStream.timeWindow(      Time.seconds(5)    )    //6. 使用sum执行累加    val sumDataStream: DataStream[(String, Int)] = windowDataStream.sum(1)    sumDataStream.print()    env.execute("StreamWordCount")  }}

1.2.5 自定义的 source

除了预定义的 Source 外,我们还可以通过实现 SourceFunction 来自定义 Source,然后通过 StreamExecutionEnvironment.addSource(sourceFunction)添加进来。

比如读取 Kafka 数据的 Source:addSource(new FlinkKafkaConsumer08);。我们可以实现以下三个接口来自定义 Source:

1.2.5.1 SourceFunction:创建非并行数据源参考代码代码语言:javascript代码运行次数:0运行复制

import org.apache.flink.streaming.api.functions.source.SourceFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala._/* * @Author: Alice菌 * @Date: 2020/8/8 21:51 * @Description:     自定义非并行数据源 */object StreamCustomerNoParallelSourceDemo {  def main(args: Array[String]): Unit = {    // 1、创建流处理的执行环境    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    // 2、基于自定义数据源构建数据    val longDStream: DataStream[Long] = senv.addSource(new MyNoParallelSource()).setParallelism(1)    // 3、输出打印    longDStream.print()    // 4、执行程序    senv.execute("StreamCustomerNoParallelSourceDemo")        //10> 1    //11> 2    //12> 3    //1> 4    //2> 5    //3> 6    //4> 7    //5> 8    //6> 9      }    /*  创建一个并行度为1的数据源 * 实现从1开始产生递增数字  */  class MyNoParallelSource extends SourceFunction[Long]{    // 申明一个变量number    var number:Long = 1L    var isRunning:Boolean = true    override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {            while (isRunning){        ctx.collect(number)        number += 1        // 休眠1秒        Thread.sleep(1000)        if (number == 10){          cancel()        }      }    }         override def cancel(): Unit = {      isRunning = false    }  }}

1.2.5.2 ParallelSourceFunction:创建并行数据源参考代码代码语言:javascript代码运行次数:0运行复制

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala._/* * @Author: Alice菌 * @Date: 2020/8/8 22:05 * @Description:     自定义创建并行数据源 */object StreamCustomerParallelSourceDemo {  def main(args: Array[String]): Unit = {    // 1、创建流处理的执行环境    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    // 2、基于自定义ParallelSource数据源创建并行的数据    val parallelSource: DataStream[Long] = senv.addSource(new MyParallelSource()).setParallelism(5)    // 3、打印输出    parallelSource.print()        // 4、执行程序    senv.execute("StreamCustomerParallelSourceDemo")  }  /*  创建一个并行度为1的数据源 * 实现从1开始产生递增数字   */  class MyParallelSource extends ParallelSourceFunction[Long] {    // 声明一个Long类型的变量    var number:Long = 1L    // 申明一个初始化为true的Boolean变量    var isRunning: Boolean = true        override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {      while (isRunning) {        ctx.collect(number)        number += 1        if (number > 20) {          cancel()        }      }    }    override def cancel(): Unit = {      isRunning = false    }  }}

1.2.5.3 RichParallelSourceFunction:创建并行数据源参考代码代码语言:javascript代码运行次数:0运行复制

import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction,SourceFunction}import org.apache.flink.streaming.api.scala.{DataStream,StreamExecutionEnvironment}import org.apache.flink.api.scala._import org.apache.flink.configuration.Configuration/* * @Author: Alice菌 * @Date: 2020/8/8 22:23 * @Description:    创建并行数据源 */object StreamCustomerRichParallelSourceDemo {  def main(args: Array[String]): Unit = {    // 1、 创建流处理运行环境    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    // 2、 基于 RichParallelSource 并行数据源构建数据集    val richParallelSource: DataStream[Long] = senv.addSource(new MyRichParallelSource()).setParallelism(2)    // 3、 打印输出    richParallelSource.map(line => {      println("接收到的数据:" + line)      line    })    // 4、执行程序    senv.execute("StreamCustomerRichParallelSourceDemo")  }  /*     创建一个并行度为1 的数据源     实现从 1 开始产生递增数字   */  class MyRichParallelSource extends RichParallelSourceFunction[Long] {    var count: Long = 1L    var isRunning: Boolean = true    override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {      while (isRunning){        ctx.collect(count)        count += 1        Thread.sleep(1000)              }    }    override def cancel(): Unit = {      isRunning = false    }    override def open(parameters: Configuration): Unit = {      super.close()    }  }}

1.2.6 基于 kafka 的 source示例代码代码语言:javascript代码运行次数:0运行复制

import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}import org.apache.flink.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011/* * @Author: Alice菌 * @Date: 2020/8/8 22:51 * @Description:    基于 kafka 的 source 操作 */object StreamKafkaSourceDemo {  def main(args: Array[String]): Unit = {    // 1、构建流处理执行环境    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    // 指定消费者主题    val topic: String = "test"    // 设置参数    val props: Properties = new Properties    props.setProperty("bootstrap.servers", "node01:9092")    props.setProperty("group.id", "test")    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")    // 基于 Flink,创建 Kafka消费者    val kafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic,new SimpleStringSchema(),props)    // Flink 从 topic 中最新的数据开始消费    kafkaConsumer.setStartFromLatest()    // 构建基于 kafka 的数据源    val kafkaDataStream: DataStream[String] = senv.addSource(kafkaConsumer)    // 打印输出消费的数据    kafkaDataStream.print()    // 执行流处理的程序    senv.execute("StreamKafkaSourceDemo")  }}

演示效果

我们启动kafka,模拟生产者来生产数据。

node01 服务器执行以下命令来模拟生产者进行生产数据。

代码语言:javascript代码运行次数:0运行复制

cd /export/servers/kafka_2.11-1.0.0bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test

然后我们启动所写的程序

同时,在kafka中生产一些数据

快速入门Flink (7) —— 小白都喜欢看的Flink流处理之DataSources和DataSinks

观察程序的控制台

快速入门Flink (7) —— 小白都喜欢看的Flink流处理之DataSources和DataSinks

看到这样的效果就说明我们的代码是OK了~

1.2.7 基于 mysql 的 source 操作

上面就是 Flink 自带的 Kafka source,那么接下来就模仿着写一个从 MySQL 中读取数据 的 Source。

首先我们先确定需要查询指定数据库下的某张表。

这里我们以 blogs 数据库下的 notice表为例。

快速入门Flink (7) —— 小白都喜欢看的Flink流处理之DataSources和DataSinks

下面,我们通过Flink来获取到该数据表的内容。

示例代码代码语言:javascript代码运行次数:0运行复制

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}/* * @Author: Alice菌 * @Date: 2020/8/8 23:52 * @Description:     基于mysql的source操作 */object StreamFromMysqlSource {  def main(args: Array[String]): Unit = {    // 1、创建流处理执行环境    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    import org.apache.flink.api.scala._        // 2、添加自定义的 mysql 数据源对象    val studentDataStream: DataStream[Student] = senv.addSource(new MysqlSource())    studentDataStream.print()    senv.execute("StreamFromMysqlSource")  }  // 3、创建mysql自定义数据源对象  class MysqlSource extends RichSourceFunction[Student](){    // 3.1 声明Connection对象    var connection:Connection = _    // 3.2 声明PreparedStatement对象    var ps: PreparedStatement = _        // 在 open 方法中进行配置链接信息 drive  url username password    // 加载驱动 Class.forName(),DriveManager 获取链接,调用prepareStatement,预编译执行sql    override def open(parameters: Configuration): Unit = {      val driver: String = "com.mysql.jdbc.Driver"      val url: String = "jdbc:mysql://localhost:3306/blogs"      val username: String = "root"      val password: String = "root"      Class.forName(driver)      connection = DriverManager.getConnection(url,username,password)      val sql: String =        """          |select nid,ntitle,content from notice        """.stripMargin             ps = connection.prepareStatement(sql)          }    // 在run方法中进行查询,结果封装成样例类,ctx进行collect    override def run(ctx: SourceFunction.SourceContext[Student]): Unit = {      // 执行 SQL 查询      val queryResultSet: ResultSet = ps.executeQuery()             while (queryResultSet.next()){                // 分别获取到查询的值        val nid: Int = queryResultSet.getInt("nid")        val ntitle: String = queryResultSet.getString("ntitle")        val content: String = queryResultSet.getString("content")        // 将获取到的值,封装成样例类        val student: Student = Student(nid,ntitle,content)        ctx.collect(student)      }    }    override def close(): Unit = {      if (connection != null){        connection.close()      }      if (ps != null){        ps.close()      }    }    override def cancel(): Unit = {    }  }  case class Student(nid: Int, ntitle: String, content: String) {    override def toString: String = {      "文章id:" + nid + " 标题:" + ntitle + " 内容:" + content }  }}

运行效果

快速入门Flink (7) —— 小白都喜欢看的Flink流处理之DataSources和DataSinks

看到这样的效果,说明我们的代码是OK的。

1.3 数据输出 Data Sinks

介绍完了常用的数据输入DataSources,我们接下里来讲Flink流处理常用的数据输出 DataSinks。

大致分为以下几类

将数据sink到本地文件sink到本地集合sink到hdfssink到kafkasink到MySQL

前三种我们可以参考批处理,方式都是一样的(传送门:Flink批处理的DataSources和DataSinks),这里我们就介绍第四、五种,如何 sink 到 kafak 和 mysql 。

1.3.1 sink 到 kafka参考代码代码语言:javascript代码运行次数:0运行复制

import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011/* * @Author: Alice菌 * @Date: 2020/8/10 10:08 * @Description:     sink 到 kafka */object StreamKafkaSink {  def main(args: Array[String]): Unit = {    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    // 导入隐式转换    import org.apache.flink.api.scala._    val source: DataStream[String] = senv.fromElements("1,小丽,北京,女")    val properties: Properties = new Properties()    properties.setProperty("bootstrap.servers","node01:9092")    val flinkKafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String]("test",new SimpleStringSchema(),properties)    source.addSink(flinkKafkaProducer)    // 打印    source.print()    // 执行    senv.execute("StreamKafkaSink")  }}

演示效果

在运行程序前,我们通过以下命令,开启 kafka 的消费者,进行消费数据

代码语言:javascript代码运行次数:0运行复制

cd /export/servers/kafka_2.11-1.0.0bin/kafka-console-consumer.sh --from-beginning --topic test --zookeeper node01:2181,node02:2181,node03:2181
快速入门Flink (7) —— 小白都喜欢看的Flink流处理之DataSources和DataSinks

我们可以发现,当前 test 主题下的内容,仍是我们之前手动生产的数据。当我们启动程序,通过使用flink往kafka的 test 分区下打入数据 ,再观察消费数据的变化。

快速入门Flink (7) —— 小白都喜欢看的Flink流处理之DataSources和DataSinks

可以发现多了一条我们在程序中指定的数据~说明我们的代码是ok的。

1.3.2 sink 到 mysql参考代码代码语言:javascript代码运行次数:0运行复制

import java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.flink.api.scala._import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.sink.RichSinkFunctionimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}/* * @Author: Alice菌 * @Date: 2020/8/10 10:29 * @Description:      */object StreamMysqlSink {  // 定义一个样例类,用于封装数据  case class Student(id:Int,name:String,addr:String,sex:String)  def main(args: Array[String]): Unit = {    // 1、创建执行环境    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment    // 2、准备数据    val studentDStream: DataStream[Student] = senv.fromElements(      Student(4, "小明", "上海", "男"),      Student(5, "小青", "广州", "女"),      Student(6, "小月", "深圳", "女")    )    studentDStream.addSink(new StudentSlinkToMySql)    senv.execute("StreamMysqlSink")  }  class StudentSlinkToMySql extends RichSinkFunction[Student]{    private var connection:Connection = _    private var ps:PreparedStatement = _    override def open(parameters: Configuration): Unit = {      // 设置驱动,连接地址,用户名,密码      var driver: String = "com.mysql.jdbc.Driver"      var url:String = "jdbc:mysql://localhost:3306/blogs?characterEncoding=utf-8&useSSL=false"      var username: String = "root"      var password: String = "root"      // 1、加载驱动      Class.forName(driver)      // 2、创建连接      connection = DriverManager.getConnection(url,username,password)      // 书写SQL语句      val sql: String = "insert into student(id,name,addr,sex) values(?,?,?,?);"      // 3、获得执行语句      ps = connection.prepareStatement(sql)    }    // 关闭连接操作    override def close(): Unit = {      if (connection != null){        connection.close()      }      if (ps != null){        ps.close()      }    }    // 每个元素的插入,都要触发一次 invoke,这里主要进行 invoke 插入    override def invoke(stu: Student): Unit = {      try{        // 4、组装数据,执行插入操作        ps.setInt(1,stu.id)        ps.setString(2,stu.name)        ps.setString(3,stu.addr)        ps.setString(4,stu.sex)        ps.executeUpdate()      } catch {        case e:Exception => println(e.getMessage)      }    }  }}

演示效果

在程序运行前,student表中还没有数据

快速入门Flink (7) —— 小白都喜欢看的Flink流处理之DataSources和DataSinks

运行程序后,可以观察到指定的数据被添加到了MySQL指定的数据库下的数据表里。

快速入门Flink (7) —— 小白都喜欢看的Flink流处理之DataSources和DataSinks

结语

本篇博客,博主为大家介绍了Flink在流处理过程中,常用的数据输入和输出的几种方式,这块的知识非常基础,也同样非常重要,初学Flink的朋友们可要勤加练习咯~

如果以上过程中出现了任何的纰漏错误,烦请大佬们指正?

受益的朋友或对大数据技术感兴趣的伙伴记得点赞关注支持一波?

希望我们都能在学习的道路上越走越远?

以上就是快速入门Flink (7) —— 小白都喜欢看的Flink流处理之DataSources和DataSinks的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/220815.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年11月3日 16:53:01
下一篇 2025年11月3日 16:58:09

相关推荐

  • 如何用dom2img解决网页打印样式不显示的问题?

    用dom2img解决网页打印样式不显示的问题 想将网页以所见即打印的的效果呈现,需要采取一些措施,特别是在使用了bootstrap等大量采用外部css样式的框架时。 问题根源 在常规打印操作中,浏览器通常会忽略css样式等非必要的页面元素,导致打印出的结果与网页显示效果不一致。这是因为打印机制只识别…

    2025年12月24日
    800
  • Bootstrap 中如何让文字浮于阴影之上?

    文字浮于阴影之上 文中提到的代码片段中 元素中的文字被阴影元素 所遮挡,如何让文字显示在阴影之上? bootstrap v3和v5在处理此类问题方面存在差异。 解决方法 在bootstrap v5中,给 元素添加以下css样式: .banner-content { position: relativ…

    2025年12月24日
    000
  • Bootstrap 5:如何将文字置于阴影之上?

    文字重叠阴影 在 bootstrap 5 中,将文字置于阴影之上时遇到了困难。在 bootstrap 3 中,此问题并不存在,但升级到 bootstrap 5 后却无法实现。 解决方案 为了解决这个问题,需要给 元素添加以下样式: .banner-content { position: relati…

    2025年12月24日
    400
  • Bootstrap 5 如何将文字置于阴影上方?

    如何在 bootstrap 5 中让文字位于阴影上方? 在将网站从 bootstrap 3 升级到 bootstrap 5 后,用户遇到一个问题:文字内容无法像以前那样置于阴影层之上。 解决方案: 为了将文字置于阴影层上方,需要给 banner-content 元素添加以下 css 样式: .ban…

    2025年12月24日
    100
  • HTMLrev 上的免费 HTML 网站模板

    HTMLrev 是唯一的人工策划的库专门专注于免费 HTML 模板,适用于由来自世界各地慷慨的模板创建者制作的网站、登陆页面、投资组合、博客、电子商务和管理仪表板世界。 这个人就是我自己 Devluc,我已经工作了 1 年多来构建、改进和更新这个很棒的免费资源。我自己就是一名模板制作者,所以我知道如…

    2025年12月24日
    300
  • 如何用 CSS 禁止手机端页面屏幕拖动?

    css 禁止手机端屏幕拖动 在手机端浏览网页时,常常会遇到屏幕拖动导致页面内容错乱或无法操作的情况。为了解决这个问题,可以使用 css 的 overflow 属性来禁止屏幕拖动。 解决方案 针对给定的代码,可以在 元素中添加以下 css 样式: 立即学习“前端免费学习笔记(深入)”; body{ov…

    2025年12月24日
    000
  • 如何禁用手机端屏幕拖动功能?

    解决手机端屏幕拖动问题 在移动设备上,当设备屏幕存在内容超出边界时,可以通过拖动屏幕来浏览。但有时,我们希望禁用这种拖动功能,例如当导航菜单展开时。 实施方法 要禁止屏幕拖动,可以为 body 元素添加 overflow:hidden 样式。这将禁用滚动条并阻止屏幕拖动,无论内容是否超出边界。 以下…

    2025年12月24日
    000
  • 如何用纯 CSS 替代 SCSS 中的 @import?

    如何在 css 中替代 scss 中的 @import 在项目中仅有一个文件使用 scss 的情况下,我们可能希望使用纯 css 来替代它。该 scss 文件通常包含对第三方 css 库的导入,如: /* this file is for your main application css. */@…

    2025年12月24日
    000
  • 如何用 CSS 替代 SCSS 中的 @import?

    用 css 替代 scss 中的 @import 在 scss 文件中,@import 语句用于导入其他 css 文件。然而,如果项目中只有一个文件使用 scss,我们可以考虑使用普通 css 来替代它,从而消除对 sass 和 sass-loader 的依赖。 要使用纯 css 替代 scss 文…

    2025年12月24日
    000
  • 如何用纯CSS替代scss中的@import?

    用纯css替代scss中的@import 在一个包含scss文件的项目中,我们可能需要找到一种方法来用纯css替代掉它。为了消除对scss的依赖,可以使用css中的@import指令。 /css中使用@import 纯css中的@import语法与scss中的类似: 立即学习“前端免费学习笔记(深入…

    2025年12月24日
    000
  • 如何构建一个可重复使用的 CSS 容器元素?

    探索可重复使用的 css 容器元素 在前端开发中,css 容器是一个重要的元素,它为应用程序的内容提供了一个可重复使用的布局和样式基础。让我们探讨一下一个典型容器应该包含哪些核心属性。 通常,一个容器元素仅限于定义页面内容的布局和留白。一些常见的属性包括: padding:设置容器内元素与边框之间的…

    2025年12月24日
    000
  • 什么是可重复使用的 CSS 容器?它包含哪些属性?

    什么是可重复使用的 css container? 容器在 css 中扮演着重要的角色,负责容纳页面内容并控制其布局。一个可重复使用的 container 是一组预定义的样式,可以应用于多个组件,以确保一致性和可维护性。 可重复使用的 container 包含哪些属性? 通常,可重复使用的 conta…

    2025年12月24日
    000
  • Bootstrap 4 表格中如何实现列向右对齐?

    表格对齐问题 在bootstrap 4中构建表格时,有时会遇到列不对齐的问题。本文将介绍一个解决此问题的方法,以实现列向右对齐。 问题: 假设我们有一个带有四列的表格,前两列使用 th 标签作为标题,后两列使用 td 标签表示数据。然而,我们希望后两列数据向右对齐。 解决方法: 要解决此问题,我们可…

    2025年12月24日
    000
  • Bootstrap 表格中如何实现列对齐不一致?

    表格设计中的对齐问题 使用 Bootstrap 框架创建表格时,有时会遇到列对齐不一致的问题。例如,将最后两列向右对齐,以下方法可以解决此问题: 将表格设置为 100% 宽度,以覆盖整个容器。为 1、3、4 列设置固定宽度,以确保这些列的对齐。将 2 列设置为自动宽度(不设置宽度),使其自动填充剩余…

    2025年12月24日
    000
  • 如何使用 CSS 将 HTML 表格中的特定列右对齐?

    表格对齐问题:如何将表格中的特定列右对齐? 在 html 表格中,您可以使用 css 样式来控制内容对齐方式。在这种情况下,要将最后两列向右对齐,可以使用以下步骤: 确保表格为 100% 宽度。这将允许表格占用可用空间的全部宽度。设置需要右对齐的列为固定宽度。这将为列分配一个指定宽度,确保内容始终在…

    2025年12月24日
    000
  • CSS 中的响应式屏幕尺寸类:如何利用它们创建适应各种设备的网页设计?

    css中的响应式屏幕尺寸 在网页设计中,css 提供了一组用于定义不同屏幕尺寸的类,例如 sm、md、lg、xl 和 2xl。这些类对应于特定设备屏幕的宽度范围: sm(small):代表小屏幕,通常为 576px 及以下md(medium):代表中等屏幕,通常为 576px 至 768pxlg(l…

    2025年12月24日
    000
  • ## CSS 中 sm md lg xl 2xl 屏幕尺寸究竟代表什么?

    CSS中sm md lg xl 2xl 屏幕尺寸详解 在网页设计中,CSS常用sm md lg xl 2xl等尺寸表示不同的屏幕大小范围,以便针对不同设备进行响应式设计。 具体而言: sm:代表小屏幕,通常指手机屏幕尺寸(640px)md:代表中屏幕,通常指平板电脑屏幕尺寸(768px)lg:代表大…

    2025年12月24日
    000
  • ## CSS 中 sm、md、lg、xl、2xl 代表什么尺寸?

    CSS中屏幕尺寸断点规定 CSS 中使用 sm、md、lg、xl、2xl 等表示不同屏幕尺寸,这些尺寸在响应式设计中用于控制元素在特定屏幕宽度下的显示方式。 具体屏幕尺寸如下: xs: 超小屏幕,通常指手机屏幕,宽度小于 576pxsm: 小屏幕,通常指平板电脑或手机横屏模式,宽度介于 576px …

    2025年12月24日
    000
  • 创建响应式布局的关键技术,让您不必依赖繁重的 CSS 框架

    您不需要繁重的 css 框架来构建响应式布局。 像 tailwind 和 bootstrap 这样的 css 框架确实很强大,但有时,它们对于较小的网站来说太过分了。您可以通过纯 css 代码实现它们提供的所有功能。在幕后,它们都使用相同的响应式网站基本技术。 事实上,如果你真的想知道这些框架和响应…

    2025年12月24日
    000
  • 如何在 VS Code 中解决折叠代码复制问题?

    解决 VS Code 折叠代码复制问题 在 VS Code 中使用折叠功能可以帮助组织长代码,但使用复制功能时,可能会遇到只复制可见部分的问题。以下是如何解决此问题: 当代码被折叠时,可以使用以下简单操作复制整个折叠代码: 按下 Ctrl + C (Windows/Linux) 或 Cmd + C …

    2025年12月24日
    000

发表回复

登录后才能评论
关注微信