快速掌握flink sql基础
00
前言
Flink 作为一个统一的批流处理框架,其 Table API 和 SQL 是高层次的处理 API。尽管当前功能仍在积极开发中,但已经可以支持批流统一处理。Table API 允许在 Java 和 Scala 中使用直观的查询 API,结合关系运算符如 select、filter 和 join 进行查询。而 Flink SQL 则允许直接在代码中编写 SQL 实现查询操作,基于 Apache Calcite 实现 SQL 标准支持。
01
1、导入必要的依赖包
org.apache.flink flink-table-planner_2.12 1.10.1 org.apache.flink flink-table-api-scala-bridge_2.12 1.10.1 org.apache.flink flink-csv 1.10.1
flink-table-planner 是 Table API 的核心部分,提供运行时环境和执行计划生成;flink-table-api-scala-bridge 则负责 Table API 与 DataStream/DataSet API 之间的连接支持。这些依赖在 IDE 开发环境中需要添加,而在生产环境中,lib 目录通常已包含 planner,只需添加 bridge 即可。如果需要使用自定义函数或连接 Kafka,还需要 flink-table-common 中的 SQL client。
02
2、两种 planner(旧版与 Blink)的区别
Blink 将批处理视为流处理的特殊情况,不支持表与 DataSet 之间的转换,批处理作业直接转换为 DataStream 程序处理。Blink planner 不支持 BatchTableSource,使用有界的 Blink planner 只支持新目录,不支持旧的 ExternalCatalog。旧版 planner 和 Blink planner 在 FilterableTableSource 的实现上不兼容,旧版会将 PlannerExpressions 下推到 filterableTableSource,而 Blink planner 则下推 Expressions。基于字符串的配置选项仅适用于 Blink planner,PlannerConfig 在两种 planner 中实现不同。Blink planner 支持在单个 DAG 中优化多个 sink(仅在 TableEnvironment 中支持),而旧版 planner 则为每个 sink 创建独立的 DAG,不支持目录统计,而 Blink planner 支持。
03
3、表(Table)的概念
TableEnvironment 可以注册 Catalog,并基于 Catalog 注册表,维护 Catalog-Table 映射。表由标识符指定,包含 Catalog 名、数据库名和对象名(表名)。如果未指定目录或数据库,使用当前默认值。
04
4、连接文件系统(Csv 格式)
通过
tableEnv.connect()
调用 ConnectorDescriptor 来连接外部系统。对于文件系统,使用内置的 FileSystem() connector。
05
5、测试案例(新)
需求:从 txt 文件读取数据,过滤掉 id 不为 sensor_1 的数据。
实现思路:首先创建 table 环境,通过 connect 方法读取数据,设置表结构并注册为表,然后进行数据过滤(可使用 SQL 或流处理方式)。
小门道AI
小门道AI是一个提供AI服务的网站
117 查看详情
准备数据
sensor_1,1547718199,35.8sensor_6,1547718201,15.4sensor_7,1547718202,6.7sensor_10,1547718205,38.1sensor_1,1547718206,32sensor_1,1547718208,36.2sensor_1,1547718210,29.7sensor_1,1547718213,30.9
代码实现
import org.apache.flink.streaming.api.scala._import org.apache.flink.table.api.{DataTypes}import org.apache.flink.table.api.scala._import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}object FlinkSqlTable {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval tableEnv = StreamTableEnvironment.create(env)
tableEnv.connect(new FileSystem().path("D:d12FlinkFlinkSqlsrcmainresourcessensor.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("time", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) ).createTemporaryTable("inputTable")val resTable = tableEnv.from("inputTable") .select("*").filter('id === "sensor_1")var resSql = tableEnv.sqlQuery("select * from inputTable where id='sensor_1'")resTable.toAppendStream[(String, Long, Double)].print("resTable")resSql.toAppendStream[(String, Long, Double)].print("resSql")env.execute("FlinkSqlWrodCount")
}}
06
6、TableEnvironment 的作用
TableEnvironment 用于注册 Catalog、在内部 Catalog 中注册表、执行 SQL 查询、注册用户自定义函数、保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用。创建 TableEnv 时,可以通过 EnvironmentSettings 或 TableConfig 参数配置其特性。
07
7、老版本创建流处理和批处理
7.1
老版本流处理
val settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()val tableEnv = StreamTableEnvironment.create(env, settings)
7.2
老版本批处理
val batchEnv = ExecutionEnvironment.getExecutionEnvironmentval batchTableEnv = BatchTableEnvironment.create(batchEnv)
7.3
Blink 版本的流处理环境
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
7.4
Blink 版本的批处理环境
val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()val bbTableEnv = TableEnvironment.create(bbSettings)
00
总结:
本文介绍了 Flink SQL 的入门操作,后续将分享更多关于 Flink SQL 连接 Kafka、输出到 Kafka、MySQL 等内容。我们下期见~~~
以上就是十分钟入门Fink SQL的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/403623.html
微信扫一扫
支付宝扫一扫