聊聊flink Table的Over Windows

本文主要研究一下flink table的over windows

聊聊flink Table的Over Windows

实例代码语言:javascript代码运行次数:0运行复制

Table table = input  .window([OverWindow w].as("w"))           // define over window with alias w  .select("a, b.sum over w, c.min over w"); // aggregate over the over window w

Over Windows类似SQL的over子句,它可以基于event-time、processing-time或者row-count;具体可以通过Over类来构造,其中必须设置orderBy、preceding及as方法;它有Unbounded及Bounded两大类Unbounded Over Windows实例代码语言:javascript代码运行次数:0运行复制

​// Unbounded Event-time over window (assuming an event-time attribute "rowtime").window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w"));​// Unbounded Processing-time over window (assuming a processing-time attribute "proctime").window(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_range").as("w"));​// Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime").window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w")); // Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime").window(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_row").as("w"));

对于event-time及processing-time使用unbounded_range来表示Unbounded,对于row-count使用unbounded_row来表示UnboundedBounded Over Windows实例代码语言:javascript代码运行次数:0运行复制

// Bounded Event-time over window (assuming an event-time attribute "rowtime").window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))​// Bounded Processing-time over window (assuming a processing-time attribute "proctime").window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))​// Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime").window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w")) // Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime").window(Over.partitionBy("a").orderBy("proctime").preceding("10.rows").as("w"))

对于event-time及processing-time使用诸如1.minutes来表示Bounded,对于row-count使用诸如10.rows来表示BoundedTable.window

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

代码语言:javascript代码运行次数:0运行复制

class Table(    private[flink] val tableEnv: TableEnvironment,    private[flink] val logicalPlan: LogicalNode) {​  //......  ​  @varargs  def window(overWindows: OverWindow*): OverWindowedTable = {​    if (tableEnv.isInstanceOf[BatchTableEnvironment]) {      throw new TableException("Over-windows for batch tables are currently not supported.")    }​    if (overWindows.size != 1) {      throw new TableException("Over-Windows are currently only supported single window.")    }​    new OverWindowedTable(this, overWindows.toArray)  }​  //......​}    

Table提供了OverWindow参数的window方法,用来进行Over Windows操作,它创建的是OverWindowedTableOverWindow

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/windows.scala

智谱清言 - 免费全能的AI助手 智谱清言 - 免费全能的AI助手

智谱清言 - 免费全能的AI助手

智谱清言 - 免费全能的AI助手 2 查看详情 智谱清言 - 免费全能的AI助手 代码语言:javascript代码运行次数:0运行复制

/**  * Over window is similar to the traditional OVER SQL.  */case class OverWindow(    private[flink] val alias: Expression,    private[flink] val partitionBy: Seq[Expression],    private[flink] val orderBy: Expression,    private[flink] val preceding: Expression,    private[flink] val following: Expression)

OverWindow定义了alias、partitionBy、orderBy、preceding、following属性Over

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/java/windows.scala

代码语言:javascript代码运行次数:0运行复制

object Over {​  /**    * Specifies the time attribute on which rows are grouped.    *    * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode.    *    * For batch tables, refer to a timestamp or long attribute.    */  def orderBy(orderBy: String): OverWindowWithOrderBy = {    val orderByExpr = ExpressionParser.parseExpression(orderBy)    new OverWindowWithOrderBy(Array[Expression](), orderByExpr)  }​  /**    * Partitions the elements on some partition keys.    *    * @param partitionBy some partition keys.    * @return A partitionedOver instance that only contains the orderBy method.    */  def partitionBy(partitionBy: String): PartitionedOver = {    val partitionByExpr = ExpressionParser.parseExpressionList(partitionBy).toArray    new PartitionedOver(partitionByExpr)  }}​class OverWindowWithOrderBy(  private val partitionByExpr: Array[Expression],  private val orderByExpr: Expression) {​  /**    * Set the preceding offset (based on time or row-count intervals) for over window.    *    * @param preceding preceding offset relative to the current row.    * @return this over window    */  def preceding(preceding: String): OverWindowWithPreceding = {    val precedingExpr = ExpressionParser.parseExpression(preceding)    new OverWindowWithPreceding(partitionByExpr, orderByExpr, precedingExpr)  }​}​class PartitionedOver(private val partitionByExpr: Array[Expression]) {​  /**    * Specifies the time attribute on which rows are grouped.    *    * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode.    *    * For batch tables, refer to a timestamp or long attribute.    */  def orderBy(orderBy: String): OverWindowWithOrderBy = {    val orderByExpr = ExpressionParser.parseExpression(orderBy)    new OverWindowWithOrderBy(partitionByExpr, orderByExpr)  }}​class OverWindowWithPreceding(    private val partitionBy: Seq[Expression],    private val orderBy: Expression,    private val preceding: Expression) {​  private[flink] var following: Expression = _​  /**    * Assigns an alias for this window that the following `select()` clause can refer to.    *    * @param alias alias for this over window    * @return over window    */  def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias))​  /**    * Assigns an alias for this window that the following `select()` clause can refer to.    *    * @param alias alias for this over window    * @return over window    */  def as(alias: Expression): OverWindow = {​    // set following to CURRENT_ROW / CURRENT_RANGE if not defined    if (null == following) {      if (preceding.resultType.isInstanceOf[RowIntervalTypeInfo]) {        following = CURRENT_ROW      } else {        following = CURRENT_RANGE      }    }    OverWindow(alias, partitionBy, orderBy, preceding, following)  }​  /**    * Set the following offset (based on time or row-count intervals) for over window.    *    * @param following following offset that relative to the current row.    * @return this over window    */  def following(following: String): OverWindowWithPreceding = {    this.following(ExpressionParser.parseExpression(following))  }​  /**    * Set the following offset (based on time or row-count intervals) for over window.    *    * @param following following offset that relative to the current row.    * @return this over window    */  def following(following: Expression): OverWindowWithPreceding = {    this.following = following    this  }}

Over类是创建over window的帮助类,它提供了orderBy及partitionBy两个方法,分别创建的是OverWindowWithOrderBy及PartitionedOverPartitionedOver提供了orderBy方法,创建的是OverWindowWithOrderBy;OverWindowWithOrderBy提供了preceding方法,创建的是OverWindowWithPrecedingOverWindowWithPreceding则包含了partitionBy、orderBy、preceding属性,它提供了as方法创建OverWindow,另外还提供了following方法用于设置following offsetOverWindowedTable

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

代码语言:javascript代码运行次数:0运行复制

class OverWindowedTable(    private[flink] val table: Table,    private[flink] val overWindows: Array[OverWindow]) {​  def select(fields: Expression*): Table = {    val expandedFields = expandProjectList(      fields,      table.logicalPlan,      table.tableEnv)​    if(fields.exists(_.isInstanceOf[WindowProperty])){      throw new ValidationException(        "Window start and end properties are not available for Over windows.")    }​    val expandedOverFields = resolveOverWindows(expandedFields, overWindows, table.tableEnv)​    new Table(      table.tableEnv,      Project(        expandedOverFields.map(UnresolvedAlias),        table.logicalPlan,        // required for proper projection push down        explicitAlias = true)        .validate(table.tableEnv)    )  }​  def select(fields: String): Table = {    val fieldExprs = ExpressionParser.parseExpressionList(fields)    //get the correct expression for AggFunctionCall    val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv))    select(withResolvedAggFunctionCall: _*)  }}

OverWindowedTable构造器需要overWindows参数;它只提供select操作,其中select可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的select方法;select方法创建了新的Table,其Project的projectList为expandedOverFields.map(UnresolvedAlias),而expandedOverFields则通过resolveOverWindows(expandedFields, overWindows, table.tableEnv)得到小结Over Windows类似SQL的over子句,它可以基于event-time、processing-time或者row-count;具体可以通过Over类来构造,其中必须设置orderBy、preceding及as方法;它有Unbounded及Bounded两大类(

对于event-time及processing-time使用unbounded_range来表示Unbounded,对于row-count使用unbounded_row来表示Unbounded;对于event-time及processing-time使用诸如1.minutes来表示Bounded,对于row-count使用诸如10.rows来表示Bounded

)Table提供了OverWindow参数的window方法,用来进行Over Windows操作,它创建的是OverWindowedTable;OverWindow定义了alias、partitionBy、orderBy、preceding、following属性;Over类是创建over window的帮助类,它提供了orderBy及partitionBy两个方法,分别创建的是OverWindowWithOrderBy及PartitionedOver,而PartitionedOver提供了orderBy方法,创建的是OverWindowWithOrderBy;OverWindowWithOrderBy提供了preceding方法,创建的是OverWindowWithPreceding;OverWindowWithPreceding则包含了partitionBy、orderBy、preceding属性,它提供了as方法创建OverWindow,另外还提供了following方法用于设置following offsetOverWindowedTable构造器需要overWindows参数;它只提供select操作,其中select可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的select方法;select方法创建了新的Table,其Project的projectList为expandedOverFields.map(UnresolvedAlias),而expandedOverFields则通过resolveOverWindows(expandedFields, overWindows, table.tableEnv)得到docOver Windows

以上就是聊聊flink Table的Over Windows的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/424400.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年11月7日 10:52:48
下一篇 2025年11月7日 10:57:20

相关推荐

  • C++ 函数库与第三方库的比较

    标准模板库 (stl) 和第三方库为 c++++ 提供了广泛的可用功能。stl 提供基本数据结构和算法,而第三方库提供了更高级的功能,例如并发和图形。stl 是随 c++ 标准化的,提供可靠性和稳定性,而第三方库的标准化和稳定性可能因库而异。在许可方面,stl 通常在 apache license …

    2025年12月18日
    000
  • java的的高精度除法

    Java 高精度除法可通过以下方法解决:使用第三方库:如 BigDecimal(标准库)或 Fraction(Apache Commons Math)。自定义算法:包括长除法和 Knuth 算法,需进行编码实现。 Java 高精度除法 在 Java 中,处理高精度除法时会遇到挑战,因为 Java 的…

    2025年12月18日
    000
  • C++框架的未来发展趋势:行业洞察与预测

    c++++框架的未来发展趋势:跨平台支持增强,便于开发跨平台应用程序。ai和ml集成,允许构建智能应用程序。云计算优化,提供可扩展、可用解决方案。物联网设备支持提升,轻松连接和管理物联网设备。 C++ 框架的未来发展趋势:行业洞察与预测 引言 C++ 是软件开发中备受推崇的编程语言,其稳健性和高性能…

    2025年12月18日
    000
  • 如何将C++框架与分布式系统集成

    在分布式系统中,c++++框架可与分布式系统集成:通信库集成:使用第三方库建立节点间通信,封装库函数并创建自定义接口。分布式数据存储整合:使用分布式数据库技术存储和检索数据,利用框架的dal集成访问和操作数据。消息队列集成:使用消息队列平台实现异步通信,利用框架的事件处理功能监听消息并采取相应操作。…

    2025年12月18日
    000
  • 如何将 C++ 框架与大数据技术集成?

    通过以下步骤将 c++++ 框架与大数据技术集成:选择 c++ hadoop 框架,例如 apache hadoop c++ api、spark c++ 连接器或 hbase c++ 客户端。安装选定的 c++ hadoop 框架。编写 c++ hadoop 代码,例如使用 hadoop c++ a…

    2025年12月18日
    000
  • 如何将C++框架与HTML技术集成

    可无缝集成 c++++ 框架和 html,步骤如下:使用 web 服务器(如 apache)将 html 文件提供给客户端。在框架中设置路由表,映射 url 路径到 c++ 处理程序。c++ 处理程序接收来自客户端的 html 请求并生成响应。利用 c++ 框架的工具生成动态 html 响应。 如何…

    2025年12月18日
    000
  • 如何将C++框架与大数据处理集成

    将 c++++ 框架与大数据处理集成可高效处理海量数据。步骤包括:选择合适的框架,如 apache beam、dask 或 apache spark。用所选框架的 api 编写 c++ 代码定义数据处理管道。利用框架的分布式计算机制处理大量数据。使用框架的 i/o 方法读取和写入数据源。 如何将 C…

    2025年12月18日
    000
  • 如何将 C++ 框架与 Web 开发技术集成?

    将 c++++ 框架与 web 开发技术集成是一种创建高性能、可扩展 web 应用程序的方法。通过选择一个如 boost.asio 的 c++ 框架,并将其与restful api框架(如 restbed)和 web 服务器(如 apache)结合,您可以构建全栈应用程序。实战案例展示了使用 boo…

    2025年12月18日
    000
  • C++框架如何通过代码生成提高开发效率?

    在 c++++ 开发中,代码生成技术可显著提高效率,自动生成复杂代码结构,减少冗余和错误,提高一致性,并加快开发。流行的代码生成工具包括 protobuf 和 gcloud,可通过示例流程演示 protobuf 与 grpc 服务的代码生成过程,为应用程序构建提供极大便利,并享受减少开发时间、提高一…

    2025年12月18日
    000
  • C++ 框架中常见性能瓶颈及其优化方法

    常见的 c++++ 框架性能瓶颈包括:内存分配瓶颈:使用内存池分配对象。虚拟函数调用瓶颈:使用非虚方法或替代调度策略。过度使用 stl 容器瓶颈:在关键路径上优先使用原始数组。过度使用锁瓶颈:仅在必要时使用锁。数据序列化瓶颈:使用序列化库或 c++17 特性实现二进制兼容性。 C++ 框架中常见的性…

    2025年12月18日
    000
  • C++ 框架与其他编程语言和工具的互操作性问题和解决方案

    如何解决 c++++ 框架与其他语言的互操作性问题:数据类型不兼容: 使用桥接转换器或数据转换库转换不同语言的数据类型。命名空间冲突: 重命名或避免使用相同名称,以解决不同语言中类和函数的命名空间冲突。内存管理: 使用智能指针或对象池管理 c++ 中的内存,消除内存泄漏和访问已释放内存的风险。 C+…

    2025年12月18日
    000
  • 如何在网站或 Web 应用中集成 C++ 框架

    在网站或 web 应用程序中集成 c++++ 框架的方法包括:使用 cgi:创建 cgi 脚本,处理 http 请求,生成 html 响应。使用 fastcgi:创建 fastcgi 脚本,创建长驻型进程,处理请求,提供更高性能。使用 web 服务器 api:使用 web 服务器(如 apache …

    2025年12月18日
    000
  • 如何将C++框架与大数据处理系统集成?

    集成 c++++ 框架与大数据处理系统可显著提高大数据处理性能。具体步骤如下:选择 c++ 框架:apache spark、hadoop mapreduce、apache flink 等。安装大数据处理系统:根据所选框架安装软件包。编写 c++ 代码:针对所选框架编写代码。将 c++ 代码与大数据处…

    2025年12月18日
    000
  • C++开源框架的许可类型有哪些?

    c++++ 开源框架的许可类型决定了框架的使用及修改方式。常见的许可类型包括:apache 许可证 2.0:允许商业和非商业使用、修改和 re发布,但需保留版权声明。mit 许可证:允许商业和非商业使用、修改和 re发布,无需保留版权声明。bsd 许可证:允许商业和非商业使用、修改和 re发布,需保…

    2025年12月18日
    000
  • C++框架中的不同许可条款如何比较?

    答案:选择 c++++ 框架的许可条款取决于项目的特定需求和目标。详细描述:mit 许可证:免费使用、修改和分发,无版税或其他要求。bsd 许可证:与 mit 类似,但要求保留版权声明和免责条款。apache 许可证 2.0 版:更严格,要求保留版权和许可声明,以及清晰说明关联。gnu 通用公共许可…

    2025年12月18日
    000
  • 限制C++框架商业使用的许可条款有哪些?

    使用 c++++ 框架进行商业开发时,了解许可条款至关重要,因为它规定了代码的使用和分发方式。常见许可条款包括:mit 许可证:允许商业和非商业使用、复制和修改代码。apache 2.0 许可证:类似 mit 许可证,但要求包括版权和许可证通知。gnu 通用公共许可证 (gpl):要求衍生作品也采用…

    2025年12月18日
    000
  • C++框架的双重许可类型介绍

    c++++ 框架提供双重许可:开源许可(如 mit)和商业许可。开源许可允许自由使用和修改,而商业许可提供支持和保护,并限制了使用。根据项目的类型、修改限制和支持需求来选择许可类型。 C++ 框架的双重许可类型:简介 许多开源的 C++ 框架同时提供两种许可类型:开源许可(例如,MIT、GPL)和商…

    2025年12月18日
    000
  • C++框架的Apache许可类型综述

    c++++ 框架的 apache 许可证允许用户在遵守特定条款的情况下修改和分发软件,包括:apache 2.0 许可证:允许用户在任何情况下自由使用、修改和分发软件,但必须保留版权和许可证声明。apache 1.1 许可证:允许用户免费使用和修改软件,但限制商业使用,并且需要保留版权和许可证声明。…

    2025年12月18日
    000
  • C++框架的混合许可类型解读

    混合许可类型是指在一个项目中同时使用多个许可条款,例如 gpl 和 bsd 许可证。在使用混合许可项目时,必须考虑许可兼容性以确保许可证相互兼容。开发者必须了解所有许可证条款、评估兼容性、标记许可信息并遵守许可限制,例如源代码分发要求和版权归属。 C++框架的混合许可类型解读 简介 在开发C++项目…

    2025年12月18日
    000
  • C++框架在分布式系统设计中的作用是什么?

    c++++ 框架在分布式系统设计中至关重要,优势包括强大性能、可扩展性和跨平台性。实战案例中,apache thrift 提供跨语言服务创建功能,其操作步骤包括定义 thrift 接口、生成代码、实现服务接口、连接客户端和服务器、传输数据。 C++ 框架在分布式系统设计中的作用 在分布式系统的设计中…

    2025年12月18日
    000

发表回复

登录后才能评论
关注微信