本文旨在探讨flink table的group windows。

Table table = input .window([Window w].as("w")) // 定义窗口并为其赋予别名 w .groupBy("w") // 按窗口 w 分组表 .select("b.sum"); // 聚合Table table = input .window([Window w].as("w")) // 定义窗口并为其赋予别名 w .groupBy("w, a") // 按属性 a 和窗口 w 分组表 .select("a, b.sum"); // 聚合Table table = input .window([Window w].as("w")) // 定义窗口并为其赋予别名 w .groupBy("w, a") // 按属性 a 和窗口 w 分组表 .select("a, w.start, w.end, w.rowtime, b.count"); // 聚合并添加窗口的开始、结束和行时间戳
窗口操作可以为Window设置别名,并在groupBy及select中引用该别名。窗口具有start、end和rowtime属性,其中start和rowtime是包含的,而end是排外的。
Tumbling Windows:
// 事件时间的Tumbling窗口.window(Tumble.over("10.minutes").on("rowtime").as("w"));// 处理时间的Tumbling窗口(假设有一个处理时间属性 "proctime").window(Tumble.over("10.minutes").on("proctime").as("w"));// 基于行数的Tumbling窗口(假设有一个处理时间属性 "proctime").window(Tumble.over("10.rows").on("proctime").as("w"));
Tumbling Windows按照固定窗口大小移动,因此窗口之间不重叠;over方法用于指定窗口大小;窗口大小可以基于事件时间、处理时间或行数来定义。
Sliding Windows:
// 事件时间的Sliding窗口.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"));// 处理时间的Sliding窗口(假设有一个处理时间属性 "proctime").window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"));// 基于行数的Sliding窗口(假设有一个处理时间属性 "proctime").window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"));
当滑动间隔小于窗口大小时,Sliding Windows会导致窗口重叠,因此行可能属于多个窗口;over方法用于指定窗口大小,窗口大小可以基于事件时间、处理时间或行数来定义;every方法用于指定滑动间隔。
Session Windows:
// 事件时间的Session窗口.window(Session.withGap("10.minutes").on("rowtime").as("w"));// 处理时间的Session窗口(假设有一个处理时间属性 "proctime").window(Session.withGap("10.minutes").on("proctime").as("w"));
Session Windows没有固定的窗口大小,它基于非活动时间的长度来关闭窗口,withGap方法用于指定两个窗口之间的间隔,作为时间间隔;Session Windows只能使用事件时间或处理时间。
Table类提供了window操作,接收Window参数,并创建WindowedTable对象。
class Table( private[flink] val tableEnv: TableEnvironment, private[flink] val logicalPlan: LogicalNode) { //...... def window(window: Window): WindowedTable = { new WindowedTable(this, window) } //......}
WindowedTable类仅提供groupBy操作,groupBy可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最终调用的是Expression类型的groupBy方法;如果groupBy操作除了窗口之外没有其他属性,则其并行度为1,只会在单个任务上执行;groupBy方法创建WindowGroupedTable对象。
class WindowedTable( private[flink] val table: Table, private[flink] val window: Window) { def groupBy(fields: Expression*): WindowGroupedTable = { val fieldsWithoutWindow = fields.filterNot(window.alias.equals(_)) if (fields.size != fieldsWithoutWindow.size + 1) { throw new ValidationException("GroupBy must contain exactly one window alias.") } new WindowGroupedTable(table, fieldsWithoutWindow, window) } def groupBy(fields: String): WindowGroupedTable = { val fieldsExpr = ExpressionParser.parseExpressionList(fields) groupBy(fieldsExpr: _*) }}
WindowGroupedTable类仅提供select操作,select可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最终调用的是Expression类型的select方法;select方法创建新的Table对象,其Project操作的子节点为WindowAggregate。
class WindowGroupedTable( private[flink] val table: Table, private[flink] val groupKeys: Seq[Expression], private[flink] val window: Window) { def select(fields: Expression*): Table = { val expandedFields = expandProjectList(fields, table.logicalPlan, table.tableEnv) val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, table.tableEnv) val projectsOnAgg = replaceAggregationsAndProperties( expandedFields, table.tableEnv, aggNames, propNames) val projectFields = extractFieldReferences(expandedFields ++ groupKeys :+ window.timeField) new Table(table.tableEnv, Project( projectsOnAgg, WindowAggregate( groupKeys, window.toLogicalWindow, propNames.map(a => Alias(a._1, a._2)).toSeq, aggNames.map(a => Alias(a._1, a._2)).toSeq, Project(projectFields, table.logicalPlan).validate(table.tableEnv) ).validate(table.tableEnv), // required for proper resolution of the time attribute in multi-windows explicitAlias = true ).validate(table.tableEnv)) } def select(fields: String): Table = { val fieldExprs = ExpressionParser.parseExpressionList(fields) //get the correct expression for AggFunctionCall val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv)) select(withResolvedAggFunctionCall: _*) }}
总结:窗口操作可以为Window设置别名,并在groupBy及select中引用该别名。窗口具有start、end和rowtime属性,其中start和rowtime是包含的,而end是排外的。Tumbling Windows按固定窗口大小移动,不重叠;Sliding Windows在滑动间隔小于窗口大小的情况下会重叠;Session Windows基于非活动时间关闭窗口。Table类提供window操作,创建WindowedTable;WindowedTable提供groupBy操作,创建WindowGroupedTable;WindowGroupedTable提供select操作,创建新的Table,其Project操作的子节点为WindowAggregate。
以上就是聊聊flink Table的Group Windows的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/174829.html
微信扫一扫
支付宝扫一扫