聊聊flink的Tumbling Window

本文主要研究一下flink的tumbling window

聊聊flink的Tumbling Window

WindowAssigner

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java

代码语言:javascript代码运行次数:0运行复制

@PublicEvolvingpublic abstract class WindowAssigner implements Serializable {    private static final long serialVersionUID = 1L;​    /**     * Returns a {@code Collection} of windows that should be assigned to the element.     *     * @param element The element to which windows should be assigned.     * @param timestamp The timestamp of the element.     * @param context The {@link WindowAssignerContext} in which the assigner operates.     */    public abstract Collection assignWindows(T element, long timestamp, WindowAssignerContext context);​    /**     * Returns the default trigger associated with this {@code WindowAssigner}.     */    public abstract Trigger getDefaultTrigger(StreamExecutionEnvironment env);​    /**     * Returns a {@link TypeSerializer} for serializing windows that are assigned by     * this {@code WindowAssigner}.     */    public abstract TypeSerializer getWindowSerializer(ExecutionConfig executionConfig);​    /**     * Returns {@code true} if elements are assigned to windows based on event time,     * {@code false} otherwise.     */    public abstract boolean isEventTime();​    /**     * A context provided to the {@link WindowAssigner} that allows it to query the     * current processing time.     *     * 

This is provided to the assigner by its containing * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator}, * which, in turn, gets it from the containing * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}. */ public abstract static class WindowAssignerContext {​ /** * Returns the current processing time. */ public abstract long getCurrentProcessingTime();​ }}

WindowAssigner定义了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime这几个抽象方法,同时定义了抽象静态类WindowAssignerContext;它有两个泛型,其中T为元素类型,而W为窗口类型Window

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/Window.java

代码语言:javascript代码运行次数:0运行复制

@PublicEvolvingpublic abstract class Window {​    /**     * Gets the largest timestamp that still belongs to this window.     *     * @return The largest timestamp that still belongs to this window.     */    public abstract long maxTimestamp();}

Window对象代表把无限流数据划分为有限buckets的集合,它有一个maxTimestamp,代表该窗口数据在该时间点内到达;它有两个子类,一个是GlobalWindow,一个是TimeWindowTimeWindow

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java

代码语言:javascript代码运行次数:0运行复制

@PublicEvolvingpublic class TimeWindow extends Window {​    private final long start;    private final long end;​    public TimeWindow(long start, long end) {        this.start = start;        this.end = end;    }​    /**     * Gets the starting timestamp of the window. This is the first timestamp that belongs     * to this window.     *     * @return The starting timestamp of this window.     */    public long getStart() {        return start;    }​    /**     * Gets the end timestamp of this window. The end timestamp is exclusive, meaning it     * is the first timestamp that does not belong to this window any more.     *     * @return The exclusive end timestamp of this window.     */    public long getEnd() {        return end;    }​    /**     * Gets the largest timestamp that still belongs to this window.     *     * 

This timestamp is identical to {@code getEnd() - 1}. * * @return The largest timestamp that still belongs to this window. * * @see #getEnd() */ @Override public long maxTimestamp() { return end - 1; }​ @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; }​ TimeWindow window = (TimeWindow) o;​ return end == window.end && start == window.start; }​ @Override public int hashCode() { return MathUtils.longToIntWithBitMixing(start + end); }​ @Override public String toString() { return "TimeWindow{" + "start=" + start + ", end=" + end + '}'; }​ /** * Returns {@code true} if this window intersects the given window. */ public boolean intersects(TimeWindow other) { return this.start = other.start; }​ /** * Returns the minimal window covers both this window and the given window. */ public TimeWindow cover(TimeWindow other) { return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end)); }​ // ------------------------------------------------------------------------ // Serializer // ------------------------------------------------------------------------​ //......​ // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------​ /** * Merge overlapping {@link TimeWindow}s. For use by merging * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}. */ public static void mergeWindows(Collection windows, MergingWindowAssigner.MergeCallback c) {​ // sort the windows by the start time and then merge overlapping windows​ List sortedWindows = new ArrayList(windows);​ Collections.sort(sortedWindows, new Comparator() { @Override public int compare(TimeWindow o1, TimeWindow o2) { return Long.compare(o1.getStart(), o2.getStart()); } });​ List<Tuple2<TimeWindow, Set>> merged = new ArrayList(); Tuple2<TimeWindow, Set> currentMerge = null;​ for (TimeWindow candidate: sortedWindows) { if (currentMerge == null) { currentMerge = new Tuple2(); currentMerge.f0 = candidate; currentMerge.f1 = new HashSet(); currentMerge.f1.add(candidate); } else if (currentMerge.f0.intersects(candidate)) { currentMerge.f0 = currentMerge.f0.cover(candidate); currentMerge.f1.add(candidate); } else { merged.add(currentMerge); currentMerge = new Tuple2(); currentMerge.f0 = candidate; currentMerge.f1 = new HashSet(); currentMerge.f1.add(candidate); } }​ if (currentMerge != null) { merged.add(currentMerge); }​ for (Tuple2<TimeWindow, Set> m: merged) { if (m.f1.size() > 1) { c.merge(m.f1, m.f0); } } }​ /** * Method to get the window start for a timestamp. * * @param timestamp epoch millisecond to get the window start. * @param offset The offset which window start would be shifted by. * @param windowSize The size of the generated windows. * @return window start */ public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { return timestamp - (timestamp - offset + windowSize) % windowSize; }}

TimeWindow有start及end属性,其中start为inclusive,而end为exclusive,所以maxTimestamp返回的是end-1;这里重写了equals及hashcode方法TimeWindow提供了intersects方法用于表示本窗口与指定窗口是否有交叉;而cover方法用于返回本窗口与指定窗口的重叠窗口TimeWindow还提供了mergeWindows及getWindowStartWithOffset静态方法;前者用于合并重叠的时间窗口,后者用于获取指定timestamp、offset、windowSize的window startTumblingEventTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java

代码语言:javascript代码运行次数:0运行复制

@PublicEvolvingpublic class TumblingEventTimeWindows extends WindowAssigner {    private static final long serialVersionUID = 1L;​    private final long size;​    private final long offset;​    protected TumblingEventTimeWindows(long size, long offset) {        if (offset = size) {            throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");        }​        this.size = size;        this.offset = offset;    }​    @Override    public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) {        if (timestamp > Long.MIN_VALUE) {            // Long.MIN_VALUE is currently assigned when no timestamp is present            long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);            return Collections.singletonList(new TimeWindow(start, start + size));        } else {            throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +                    "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +                    "'DataStream.assignTimestampsAndWatermarks(...)'?");        }    }​    @Override    public Trigger getDefaultTrigger(StreamExecutionEnvironment env) {        return EventTimeTrigger.create();    }​    @Override    public String toString() {        return "TumblingEventTimeWindows(" + size + ")";    }​    public static TumblingEventTimeWindows of(Time size) {        return new TumblingEventTimeWindows(size.toMilliseconds(), 0);    }​    public static TumblingEventTimeWindows of(Time size, Time offset) {        return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());    }​    @Override    public TypeSerializer getWindowSerializer(ExecutionConfig executionConfig) {        return new TimeWindow.Serializer();    }​    @Override    public boolean isEventTime() {        return true;    }}

TumblingEventTimeWindows继承了Window,其中元素类型为Object,而窗口类型为TimeWindow;它有两个参数,一个是size,一个是offset,其中offset必须大于等于0,size必须大于offsetassignWindows方法获取的窗口为start及start+size,而start=TimeWindow.getWindowStartWithOffset(timestamp, offset, size);getDefaultTrigger方法返回的是EventTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回trueTumblingEventTimeWindows提供了of静态工厂方法,可以指定size及offset参数TumblingProcessingTimeWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java

代码语言:javascript代码运行次数:0运行复制

public class TumblingProcessingTimeWindows extends WindowAssigner {    private static final long serialVersionUID = 1L;​    private final long size;​    private final long offset;​    private TumblingProcessingTimeWindows(long size, long offset) {        if (offset = size) {            throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy  0 <= offset < size");        }​        this.size = size;        this.offset = offset;    }​    @Override    public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) {        final long now = context.getCurrentProcessingTime();        long start = TimeWindow.getWindowStartWithOffset(now, offset, size);        return Collections.singletonList(new TimeWindow(start, start + size));    }​    public long getSize() {        return size;    }​    @Override    public Trigger getDefaultTrigger(StreamExecutionEnvironment env) {        return ProcessingTimeTrigger.create();    }​    @Override    public String toString() {        return "TumblingProcessingTimeWindows(" + size + ")";    }​    public static TumblingProcessingTimeWindows of(Time size) {        return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);    }​    public static TumblingProcessingTimeWindows of(Time size, Time offset) {        return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());    }​    @Override    public TypeSerializer getWindowSerializer(ExecutionConfig executionConfig) {        return new TimeWindow.Serializer();    }​    @Override    public boolean isEventTime() {        return false;    }}

TumblingProcessingTimeWindows继承了WindowAssigner,其中元素类型为Object,而窗口类型为TimeWindow;它有两个参数,一个是size,一个是offset,其中offset必须大于等于0,size必须大于offsetassignWindows方法获取的窗口为start及start+size,而start=TimeWindow.getWindowStartWithOffset(now, offset, size),而now值则为context.getCurrentProcessingTime(),则是与TumblingEventTimeWindows的不同之处,TumblingProcessingTimeWindows不使用timestamp参数来计算,它使用now值替代;getDefaultTrigger方法返回的是ProcessingTimeTrigger,而isEventTime方法返回的为falseTumblingProcessingTimeWindows也提供了of静态工厂方法,可以指定size及offset参数小结flink的Tumbling Window分为TumblingEventTimeWindows及TumblingProcessingTimeWindows,它们都继承了WindowAssigner,其中元素类型为Object,而窗口类型为TimeWindow;它有两个参数,一个是size,一个是offset,其中offset必须大于等于0,size必须大于offsetWindowAssigner定义了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime这几个抽象方法,同时定义了抽象静态类WindowAssignerContext;它有两个泛型,其中T为元素类型,而W为窗口类型;TumblingEventTimeWindows及TumblingProcessingTimeWindows的窗口类型为TimeWindow,它有start及end属性,其中start为inclusive,而end为exclusive,maxTimestamp返回的是end-1,它还提供了mergeWindows及getWindowStartWithOffset静态方法;前者用于合并重叠的时间窗口,后者用于获取指定timestamp、offset、windowSize的window startTumblingEventTimeWindows及TumblingProcessingTimeWindows的不同在于assignWindows、getDefaultTrigger、isEventTime方法;前者assignWindows使用的是参数中的timestamp,而后者使用的是now值;前者的getDefaultTrigger返回的是EventTimeTrigger,而后者返回的是ProcessingTimeTrigger;前者isEventTime方法返回的为true,而后者返回的为falsedocTumbling Windows

以上就是聊聊flink的Tumbling Window的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/167838.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2025年11月1日 00:53:36
下一篇 2025年11月1日 00:54:24

相关推荐

  • 如何在PHPMyAdmin中监控数据库的健康状态

    要在phpmyadmin中监控数据库健康状态,首先应通过执行show global status查看关键指标如connections、slow_queries、bytes_received/sent;其次使用show processlist分析当前进程,识别sleep连接或长时间查询;接着用show…

    2025年12月11日 好文分享
    000
  • PHP怎样加速?OPcache配置优化

    opcache优化是php加速的核心,通过缓存编译后的opcode减少重复解析。1. 启用opcache(opcache.enable=1);2. 设置合理内存(如256mb);3. 调整字符串缓冲区(如16mb);4. 根据文件数量设置最大缓存数(如10000);5. 生产环境关闭时间戳验证(op…

    2025年12月11日 好文分享
    000
  • 解决PHPMyAdmin执行SQL语句时的锁等待问题

    解决phpmyadmin执行sql时的锁等待问题,需先定位锁源并针对性优化。1. 查看进程列表:通过show full processlist;识别长时间运行、状态为locked或waiting for table metadata lock等问题sql;2. 优化慢查询:使用explain分析未命…

    2025年12月11日 好文分享
    000
  • 如何优化PHPMyAdmin操作数据库的内存使用效率

    phpmyadmin操作大型数据库卡顿或崩溃的核心原因包括php内存限制过低、mysql/mariadb缓冲池配置不足、查询结果集过大及不良sql习惯。1. 提升php的memory_limit至512m或更高,调整max_execution_time、upload_max_filesize和pos…

    2025年12月11日 好文分享
    000
  • 解决PHPMyAdmin中用户登录权限不足的问题

    phpmyadmin登录权限不足问题通常由mysql用户权限配置不当引起,解决方法包括:1.检查phpmyadmin的config.inc.php文件中配置的用户名和密码是否正确;2.通过mysql命令行确认用户是否存在并重置密码;3.授予用户对目标数据库或所有数据库的足够权限,如select、in…

    2025年12月11日 好文分享
    000
  • 调整PhpStorm字体和字号以提升阅读体验

    调整 phpstorm 字体、字号和配色能有效缓解视觉疲劳,提升编码效率。1. 选择等宽字体如 jetbrains mono、fira code 或 source code pro,确保字符对齐;2. 设置字号在 14~16px 之间,根据屏幕分辨率微调;3. 调整行距至 1.3~1.5 倍,增强段…

    2025年12月11日 好文分享
    000
  • 目录怎样遍历?递归扫描文件方法

    递归是遍历目录的首选方法,因为它能自然映射文件系统的树形结构,代码简洁且可读性强;1. 递归通过函数自身调用实现层级深入,遇到文件处理,遇到目录继续递归;2. 优势包括逻辑清晰、无需预知目录深度、契合嵌套结构;3. 常见问题如权限不足、符号链接需额外处理,可通过异常捕获和判断跳过解决;4. 替代方案…

    2025年12月11日 好文分享
    000
  • PHP如何获取系统运行时长 3种获取系统uptime方案

    php获取系统运行时长有三种主要方案。1. 使用shell_exec执行uptime命令,简单直接但依赖权限和函数开启;2. 读取/proc/uptime文件,安全高效但仅适用于linux系统;3. 使用sys_getloadavg函数,需安装扩展且无法直接获取运行时间。若shell_exec被禁用…

    2025年12月11日 好文分享
    000
  • 解决PhpStorm搜索功能失效的常见原因

    phpstorm搜索功能失效通常由索引异常、文件排除、插件冲突或设置错误引起。1. 索引损坏或未完成构建会导致搜索失败,可通过清除缓存、重建索引解决;2. 被标记为“excluded”的目录不会参与搜索,需在项目结构设置中恢复并勾选“include non-project files”;3. 插件冲…

    2025年12月11日 好文分享
    000
  • 隐藏PhpStorm菜单栏以节省界面空间

    phpstorm 提供多种方法隐藏菜单栏以提升开发体验。1. 按 alt 键可临时隐藏主菜单栏,适合全屏编码时使用,但部分系统可能拦截该快捷键;2. 启用“无边模式”(ctrl+shift+f)可最大化编辑空间,同时隐藏工具栏和状态栏;3. 自定义关闭侧边栏、底部窗口和 minimap 等非必要界面…

    2025年12月11日 好文分享
    000
  • 配置PHPCMS的站群动态域名的详细步骤

    phpcms站群动态域名配置通过服务器重写规则与系统站点管理结合实现。1. 服务器配置:nginx中设置主站点与子站点的server块,利用泛域名或通配符匹配所有子站请求并转发至phpcms入口文件;2. phpcms后台配置:在“站点管理”中添加站点并绑定对应域名,配置站点信息后更新缓存确保生效;…

    2025年12月11日 好文分享
    000
  • 安装和配置PHPCMS的搜索引擎优化插件

    phpcms seo插件的安装与配置核心在于提升网站在搜索引擎中的可见性和优化效果,具体步骤包括:1. 下载适配当前phpcms版本的seo插件,来源可以是官方社区、开源仓库或第三方开发者;2. 解压后通过ftp或主机面板上传插件文件至指定目录,如phpcms/modules或phpcms/plug…

    2025年12月11日 好文分享
    000
  • PHP如何调用TSLint检测 TypeScript代码检测指南

    php 调用 tslint 检测 typescript 代码的方法是通过执行命令行调用 tslint cli 并解析其输出结果。1. 安装 node.js 和 npm;2. 安装 tslint 及相关规则集;3. 配置 tslint.json 文件;4. 使用 php 的 exec() 函数执行 t…

    2025年12月11日 好文分享
    000
  • 异常错误如何捕获处理?try-catch使用技巧

    使用 try-catch 处理异常需明确错误处理目的,避免盲目捕获。1. 基本结构是将可能出错的代码放入 try 块,catch 中处理并至少记录错误信息。2. 精准捕获错误类型,如仅处理 syntaxerror,其他错误重新抛出,避免吞掉未知错误。3. finally 用于执行清理工作,如关闭文件…

    2025年12月11日 好文分享
    000
  • PHPCMS和织梦CMS的安全性对比研究

    织梦cms安全性问题更突出,因为它已无人维护,漏洞无法修复,phpcms虽也停滞,但曾有积极维护。1.织梦因长期无更新,漏洞成风险敞口;2.phpcms代码结构相对规范,历史维护较好;3.两者均不适合新项目使用;4.对现有站点应强化服务器权限、部署waf、定期备份并逐步迁移。 在PHPCMS和织梦C…

    2025年12月11日 好文分享
    000
  • 邮件发送怎么实现?PHPMailer配置

    php邮件发送为何经常失败?常见问题与排查。1.smtp配置错误:smtp主机、端口、加密方式及用户名密码必须准确无误,密码常需使用授权码而非登录密码;2.网络或防火墙问题:服务器可能因防火墙或isp限制无法连接smtp端口,需检查并开放相应端口;3.认证失败:确认用户名为完整邮箱地址,密码为授权码…

    2025年12月11日 好文分享
    000
  • 解决PHPMyAdmin中用户账户被锁定的问题

    要解决 phpmyadmin 中用户账户被锁定的问题,首先应检查 mysql 错误日志以确定锁定原因。1. 使用 root 用户登录 phpmyadmin 或恢复 root 权限;2. 执行 sql 查询 update mysql.user set account\_locked = ‘…

    2025年12月11日 好文分享
    000
  • 用户登录系统如何开发?Session认证机制详细教程

    session 是用户登录系统开发中最常见的认证方式,适合中小型 web 应用。1. session 是服务器端记录用户状态的机制,通过生成唯一 session id 并存储在客户端 cookie 中实现用户识别;2. 实现流程包括用户提交信息、后端验证并创建 session、返回 session …

    2025年12月11日 好文分享
    000
  • MySQL表内容增加:PHP后端实现方法

    php后端实现mysql表内容增加的方法是通过构建并执行insert sql语句,主要步骤包括:1. 建立数据库连接;2. 获取用户输入数据;3. 构建sql插入语句;4. 执行sql并处理结果;5. 关闭数据库连接。为防止sql注入,推荐使用预处理语句或mysqli_real_escape_str…

    2025年12月11日 好文分享
    000
  • ZIP压缩怎么实现?文件打包与解压

    zip压缩能“变小”文件的核心在于使用了deflate算法,它结合lz77和霍夫曼编码有效消除数据冗余。①lz77通过滑动窗口查找重复数据并用引用替代,减少重复内容存储;②霍夫曼编码根据符号频率分配变长编码,高频符号用更短码表示,从而缩短整体编码长度。zip还通过本地文件头、中央目录等结构组织压缩数…

    2025年12月11日 好文分享
    000

发表回复

登录后才能评论
关注微信