在《0基础学习pyflink——个数滚动窗口(tumbling count windows)》一文中,我们了解到如果窗口内元素个数未达到设定窗口大小,计算个数的函数不会被触发。例如,下图中红色部分的元素(b,2)和(d,5)不会被计算:

为了让这些元素也能被计算,我们可以使用时间滚动窗口(Tumbling Time Windows)。这种窗口不依赖于元素的数量,而是基于时间进行触发。只要时间窗口到达,无论窗口内有多少元素,计算都会进行。
我们可以稍作修改《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》的示例,将元素集中在“A”上。以下是修改后的代码:
豆包爱学
豆包旗下AI学习应用
674 查看详情
map代码语言:javascript
class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]): def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]): print(*inputs, window) return [(key, len([e for e in inputs]))]word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]
def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
write all the data to one file
env.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0])reduce代码语言:javascript
# reducingreduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) .apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) # # define the sinkreduced.print()# submit for executionenv.execute()在这个例子中,我们使用了时间滚动窗口,窗口大小设置为2毫秒(
Time.milliseconds(2))。运行这段代码时,由于基于时间触发计算,每个元素都会被计算,输出结果可能会有所不同:
或
或
可以看出,结果并不稳定,但每条数据都会被计算,而不是像个数滚动窗口那样某些数据可能不会被触发。
完整代码如下:
from typing import Iterableimport timefrom pyflink.common import Types, Timefrom pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunctionfrom pyflink.datastream.window import TimeWindow, TumblingProcessingTimeWindows
class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):print(*inputs, window)return [(key, len([e for e in inputs]))]
word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]
def word_count():env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
write all the data to one file
env.set_parallelism(1)source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource = env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyed=source.key_by(lambda i: i[0])# reducingreduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) .apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) # # define the sinkreduced.print()# submit for executionenv.execute()if name == 'main':word_count()
参考资料:https://www.php.cn/link/dc61c1317e2c1637f0f8d2de7fd8da9b
以上就是0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/434247.html
微信扫一扫
支付宝扫一扫