初识Structured Streaming

设想我们要设计一个交易数据展示系统,实时呈现比特币最近1s钟的成交均价。

我们可以通过交易数据接口以非常低的延迟获得全球各个比特币交易市场的每一笔比特币的成交价,成交额,交易时间。

由于比特币交易事件一直在发生,所以交易事件触发的交易数据会像流水一样源源不断地通过交易接口传给我们。

如何对这种流式数据进行实时的计算呢?我们需要使用流计算工具,在数据到达的时候就立即对其进行计算。

市面上主流的开源流计算工具主要有 Storm, Flink 和 Spark。

其中Storm的延迟最低,一般为几毫秒到几十毫秒,但数据吞吐量较低,每秒能够处理的事件在几十万左右,建设成本高。

Flink是目前国内互联网厂商主要使用的流计算工具,延迟一般在几十到几百毫秒,数据吞吐量非常高,每秒能处理的事件可以达到几百上千万,建设成本低。

Spark通过Spark Streaming或Spark Structured Streaming支持流计算。但Spark的流计算是将流数据按照时间分割成一个一个的小批次(mini-batch)进行处理的,其延迟一般在1秒左右。吞吐量和Flink相当。值得注意的是Spark Structured Streaming 现在也支持了Continous Streaming 模式,即在数据到达时就进行计算,不过目前还处于测试阶段,不是特别成熟。

虽然从目前来看,在流计算方面,Flink比Spark更具性能优势,是当之无愧的王者。但由于Spark拥有比Flink更加活跃的社区,其流计算功能也在不断地完善和发展,未来在流计算领域或许足以挑战Flink的王者地位。

代码语言:javascript代码运行次数:0运行复制

import pysparkfrom pyspark.sql import SparkSessionfrom pyspark.sql import types as Tfrom pyspark.sql import functions as F import time,os,random#本文主要用小数据测试,设置较小的分区数可以获得更高性能spark = SparkSession.builder         .appName("structured streaming")         .config("spark.sql.shuffle.partitions","8")         .config("spark.default.parallelism","8")         .config("master","local[4]")         .enableHiveSupport()         .getOrCreate()sc = spark.sparkContext

一,Structured Streaming 基本概念

流计算(Streaming)和批计算(Batch):

批计算或批处理是处理离线数据。单个处理数据量大,处理速度比较慢。

流计算是处理在线实时产生的数据。单次处理的数据量小,但处理速度更快。

Spark Streaming 和 Spark Structured Streaming:

Spark在2.0之前,主要使用的Spark Streaming来支持流计算,其数据结构模型为DStream,其实就是一个个小批次数据构成的RDD队列。

目前,Spark主要推荐的流计算模块是Structured Streaming,其数据结构模型是Unbounded DataFrame,即没有边界的数据表。

相比于 Spark Streaming 建立在 RDD数据结构上面,Structured Streaming 是建立在 SparkSQL基础上,DataFrame的绝大部分API也能够用在流计算上,实现了流计算和批处理的一体化,并且由于SparkSQL的优化,具有更好的性能,容错性也更好。

source 和 sink:

source即流数据从何而来。在Spark Structured Streaming 中,主要可以从以下方式接入流数据。

1, Kafka Source。当消息生产者发送的消息到达某个topic的消息队列时,将触发计算。这是structured Streaming 最常用的流数据来源。

2, File Source。当路径下有文件被更新时,将触发计算。这种方式通常要求文件到达路径是原子性(瞬间到达,不是慢慢写入)的,以确保读取到数据的完整性。在大部分文件系统中,可以通过move操作实现这个特性。

3, Socket Source。需要制定host地址和port端口号。这种方式一般只用来测试代码。linux环境下可以用nc命令来开启网络通信端口发送消息测试。

sink即流数据被处理后从何而去。在Spark Structured Streaming 中,主要可以用以下方式输出流数据计算结果。

1, Kafka Sink。将处理后的流数据输出到kafka某个或某些topic中。

2, File Sink。将处理后的流数据写入到文件系统中。

3, ForeachBatch Sink。对于每一个micro-batch的流数据处理后的结果,用户可以编写函数实现自定义处理逻辑。例如写入到多个文件中,或者写入到文件并打印。

4, Foreach Sink。一般在Continuous触发模式下使用,用户编写函数实现每一行的处理处理。

5,Console Sink。打印到Driver端控制台,如果日志量大,谨慎使用。一般供调试使用。

6,Memory Sink。输出到内存中,供调试使用。

append mode, complete mode 和 update mode:

这些是流数据输出到sink中的方式,叫做 output mode。

append mode 是默认方式,将新流过来的数据的计算结果添加到sink中。

complete mode 一般适用于有aggregation查询的情况。流计算启动开始到目前为止接收到的全部数据的计算结果添加到sink中。

update mode 只有本次结果中和之前结果不一样的记录才会添加到sink中。

operation 和 query:

在SparkSQL批处理中,算子被分为Transformation算子和Action算子。Spark Structured Streaming 有所不同,所有针对流数据的算子都是懒惰执行的,叫做operation。

DataFrame的Action算子(例如show,count,reduce)都不可以在Spark Structured Streaming中使用,而大部分Transformation算子都可以在Structured Streaming中使用(例如select,where,groupBy,agg)。

但也有些操作不可以(例如sort, distinct,某些类型的join操作,以及连续的agg操作等)。

如果要触发执行,需要通过writeStream启动一个query,指定sink,output mode,以及触发器trigger类型。

从一定意义上,可以将writeStream理解成Structured Streaming 唯一的 Action 算子。

Spark Structured Streaming支持的触发器trigger类型主要有以下一些。

1,unspecified。不指定trigger类型,以micro-batch方式触发,当上一个micro-batch执行完成后,将中间收到的数据作为下一个micro-batch的数据。

2,fixed interval micro-batches。指定时间间隔的micro-batch。如果上一个micro-batch在间隔时间内完成,需要等待指定间隔时间。如果上一个micro-batch在间隔时间后才完成,那么会在上一个micro-batch执行完成后立即执行。

3,one-time micro-batch。只触发一次,以micro-batch方式触发。一种在流计算模式下执行批处理的方法。

4,continuous with fixed checkpoint interval。每个事件触发一次,真正的流计算,这种模式目前还处于实验阶段。

event time, processing time 和 watermarking:

event time 是流数据的发生时间,一般嵌入到流数据中作为一个字段。

processing time 是指数据被处理的时间。

Spark Structured Streaming 一般 使用 event time作为 Windows切分的依据,例如每秒钟的成交均价,是取event time中每秒钟的数据进行处理。

考虑到数据存在延迟,如果一个数据到达时,其对应的时间批次已经被计算过了,那么会重新计算这个时间批次的数据并更新之前的计算结果。但是如果这个数据延迟太久,那么可以设置watermarking(水位线)来允许丢弃 processing time和event time相差太久的数据,即延迟过久的数据。注意这种丢弃是或许会发生的,不是一定会丢弃。

at-most once,at-least once 和 exactly once:

怪兽AI知识库 怪兽AI知识库

企业知识库大模型 + 智能的AI问答机器人

怪兽AI知识库 51 查看详情 怪兽AI知识库

这是分布式流计算系统在某些机器发生发生故障时,对结果一致性(无论机器是否发生故障,结果都一样)的保证水平。反应了分布式流计算系统的容错能力。

at-most once,最多一次。每个数据或事件最多被程序中的所有算子处理一次。这本质上是一种尽力而为的方法,只要机器发生故障,就会丢弃一些数据。这是比较低水平的一致性保证。

at-least once,至少一次。每个数据或事件至少被程序中的所有算子处理一次。这意味着当机器发生故障时,数据会从某个位置开始重传。但有些数据可能在发生故障前被所有算子处理了一次,在发生故障后重传时又被所有算子处理了一次,甚至重传时又有机器发生了故障,然后再次重传,然后又被所有算子处理了一次。因此是至少被处理一次。这是一种中间水平的一致性保证。

exactly once,恰好一次。从计算结果看,每个数据或事件都恰好被程序中的所有算子处理一次。这是一种最高水平的一致性保证。

spark structured streaming 在micro-batch触发器类型下,sink是File情况下,可以保证为exactly once的一致性水平。

但是在continuou触发器类型下,只能保证是at-least once的一致性水平。

详情参考如下文章:《谈谈流计算中的『Exactly Once』特性》

https://segmentfault.com/a/1190000019353382

二,word count 基本范例

下面范例中,我们将用Python代码在一个目录下不断生成一些简单句子组成的文件。

然后用pyspark读取文件流,并进行词频统计,并将结果打印。

下面是生成文件流的代码。并通过subprocess.Popen调用它异步执行。

代码语言:javascript代码运行次数:0运行复制

%%writefile make_streamming_data.pyimport random import os import time import shutilsentences = ["eat tensorflow2 in 30 days","eat pytorch in 20 days","eat pyspark in 10 days"]data_path = "./data/streamming_data"if os.path.exists(data_path):    shutil.rmtree(data_path)    os.makedirs(data_path)for i in range(20):    line = random.choice(sentences)    tmp_file = str(i)+".txt"    with open(tmp_file,"w") as f:        f.write(line)        f.flush()    shutil.move(tmp_file,os.path.join(data_path,tmp_file))    time.sleep(1)    

代码语言:javascript代码运行次数:0运行复制

# 在后台异步生成文件流import subprocesscmd = ["python", "make_streamming_data.py"]process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)#process.wait() #等待结束

代码语言:javascript代码运行次数:0运行复制

#通过 readStream 创建streaming dataframeschema = T.StructType().add("value", "string")data_path = "./data/streamming_data"dflines = spark     .readStream     .option("sep", ".")     .schema(schema)     .csv(data_path)dflines.printSchema() print(dflines.isStreaming) 

代码语言:javascript代码运行次数:0运行复制

root |-- value: string (nullable = true)True

代码语言:javascript代码运行次数:0运行复制

#实施operator转换dfwords = dflines.select(F.explode(F.split(dflines.value, " ")).alias("word"))dfwordCounts = dfwords.groupBy("word").count()

代码语言:javascript代码运行次数:0运行复制

#执行query, 注意是异步方式执行, 相当于是开启了后台进程def foreach_batch_function(df, epoch_id):    print("Batch: ",epoch_id)    df.show()query = dfwordCounts     .writeStream     .outputMode("complete")    .foreachBatch(foreach_batch_function)     .start()#query.awaitTermination() #阻塞当前进程直到query发生异常或者被stopprint(query.isActive)#60s后主动停止querytime.sleep(30)query.stop()print(query.isActive)

代码语言:javascript代码运行次数:0运行复制

TrueBatch:  0+-----------+-----+|       word|count|+-----------+-----+|        eat|   10||       days|   10||         20|    4||tensorflow2|    3||         30|    3||         10|    3||    pyspark|    3||         in|   10||    pytorch|    4|+-----------+-----+Batch:  1+-----------+-----+|       word|count|+-----------+-----+|        eat|   13||       days|   13||         20|    4||tensorflow2|    5||         30|    5||         10|    4||    pyspark|    4||         in|   13||    pytorch|    4|+-----------+-----+Batch:  2+-----------+-----+|       word|count|+-----------+-----+|        eat|   15||       days|   15||         20|    5||tensorflow2|    5||         30|    5||         10|    5||    pyspark|    5||         in|   15||    pytorch|    5|+-----------+-----+Batch:  3+-----------+-----+|       word|count|+-----------+-----+|        eat|   18||       days|   18||         20|    6||tensorflow2|    5||         30|    5||         10|    7||    pyspark|    7||         in|   18||    pytorch|    6|+-----------+-----+Batch:  4+-----------+-----+|       word|count|+-----------+-----+|        eat|   20||       days|   20||         20|    7||tensorflow2|    5||         30|    5||         10|    8||    pyspark|    8||         in|   20||    pytorch|    7|+-----------+-----+False

三,创建Streaming DataFrame

可以从Kafka Source,File Source 以及 Socket Source 中创建 Streaming DataFrame。

1,从Kafka Source 创建

需要安装kafka,并加载其jar包到依赖中。

详细参考:http://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html

以下代码仅供示范,运行需要配置相关kafka环境。

代码语言:javascript代码运行次数:0运行复制

df = spark   .read   .format("kafka")   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")   .option("subscribe", "topic1")   .load()

2,从File Source 创建

支持读取parquet文件,csv文件,json文件,txt文件目录。需要指定schema。

代码语言:javascript代码运行次数:0运行复制

schema = T.StructType().add("name","string").add("age","integer").add("score","double")dfstudents = spark.readStream.schema(schema).json("./data/students_json")dfstudents.printSchema() 

代码语言:javascript代码运行次数:0运行复制

query = dfstudents.writeStream     .outputMode("append")    .format("parquet")     .option("checkpointLocation", "./data/checkpoint/")     .option("path", "./data/students_parquet/")     .start()#query.awaitTermination()

3,从Socket Source创建

在bash中输入nc -lk 9999 开启socket网络通信端口,然后在其中输入一些句子,如:

代码语言:javascript代码运行次数:0运行复制

hello worldhello Chinahello Beijing

代码语言:javascript代码运行次数:0运行复制

dflines = spark     .readStream     .format("socket")     .option("host", "localhost")     .option("port", 9999)     .load()

四,使用operator转换

可以在Streaming DataFrame上使用Static DataFrame大部分常规Transformation算子。

还可以针对event time进行滑动窗口(window)操作,可以通过设置水位线(watermarking)来丢弃延迟过久的数据。

不仅如此,可以对Streaming DataFrame和 Static DataFrame 进行表连接 join操作。

甚至两个Streaming DataFrame之前也是可以join的。

1,Basic Operators

一些常用的Transformation算子都可以在Unbounded DataFrame上使用,例如select,selectExpr, where, groupBy, agg等等。

也可以像批处理中的静态的DataFrame那样,注册临时视图,然后在视图上使用SQL语法。

代码语言:javascript代码运行次数:0运行复制

schema = T.StructType().add("name","string").add("age","integer").add("score","double")dfstudents = spark.readStream.schema(schema).json("./data/students_json")dfstudents.printSchema() dfstudents.createOrReplaceTempView("students")dfstudents_old = spark.sql("select * from students where age >25")print(dfstudents_old.isStreaming)

2, Window Operations on Event Time

基于事件时间滑动窗上的聚合操作和其它列的goupBy操作非常相似,落在同一个时间窗的记录就好像具有相同的key,它们将进行聚合。

下面我们通过一个虚拟的比特币交易价格的例子来展示基于事件时间滑动窗上的聚合操作。

代码语言:javascript代码运行次数:0运行复制

%%writefile make_trading_data.pyimport random import os import time import datetime import json import shutildata_path = "./data/trading_data"if os.path.exists(data_path):    shutil.rmtree(data_path)    os.makedirs(data_path)for i in range(20):    now =  datetime.datetime.now()    now_str =  now.strftime("%Y-%m-%d %H:%M:%S.%f")        #构造延迟数据, 延迟20min左右    right_now = now - datetime.timedelta(minutes = 20)    right_now_str = right_now.strftime("%Y-%m-%d %H:%M:%S.%f")        if i%2==0:        dic = {"dt": now_str, "amount": 100, "price": 10000.0+random.choice(range(5))}    else:        dic = {"dt": right_now_str, "amount": 100 ,"price": 100.0-random.choice(range(5))}            tmp_file = str(i)+".json"    with open(tmp_file,"w") as f:        json.dump(dic,f)    shutil.move(tmp_file,os.path.join(data_path,tmp_file))    time.sleep(10)

代码语言:javascript代码运行次数:0运行复制

# 在后台异步生成文件流import subprocesscmd = ["python", "make_trading_data.py"]process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)#process.wait() #等待结束

代码语言:javascript代码运行次数:0运行复制

data_path = "./data/trading_data"schema = T.StructType().add("dt","string").add("amount","integer").add("price","double")dfprice_raw = spark.readStream.schema(schema).json("./data/trading_data")dfprice_raw.printSchema() 

代码语言:javascript代码运行次数:0运行复制

dfprice = dfprice_raw.selectExpr("cast(dt as timestamp) as dt","amount","price", "amount*price as volume")dfprice.printSchema() 

代码语言:javascript代码运行次数:0运行复制

# 控制台方式输出,可能需要在jupyter 的log界面查看输出日志query = dfprice.writeStream     .outputMode("append")    .format("console")     .start()time.sleep(20)query.stop()#query.awaitTermination()

代码语言:javascript代码运行次数:0运行复制

-------------------------------------------Batch: 0-------------------------------------------+--------------------+------+-------+---------+|                  dt|amount|  price|   volume|+--------------------+------+-------+---------+|2020-12-21 08:11:...|   100|10004.0|1000400.0|+--------------------+------+-------+---------+-------------------------------------------Batch: 1-------------------------------------------+--------------------+------+-----+------+|                  dt|amount|price|volume|+--------------------+------+-----+------+|2020-12-21 07:51:...|   100| 99.0|9900.0|+--------------------+------+-----+------+-------------------------------------------Batch: 2-------------------------------------------+--------------------+------+-------+---------+|                  dt|amount|  price|   volume|+--------------------+------+-------+---------+|2020-12-21 08:12:...|   100|10003.0|1000300.0|+--------------------+------+-------+---------+

代码语言:javascript代码运行次数:0运行复制

#下面我们将dfprice按照时间分窗,窗口范围为10min,滑动周期为5min,并统计滑动窗口内的平均交易价格dfprice_avg = dfprice.groupBy(F.window(dfprice.dt, "10 minutes", "5 minutes"))    .agg(F.sum("amount").alias("amount"), F.sum("volume").alias("volume"))    .selectExpr("window","window.start","window.end","volume/amount as avg_price")dfprice_avg.printSchema() 

代码语言:javascript代码运行次数:0运行复制

query = dfprice_avg.writeStream     .outputMode("complete")    .format("console")     .start()time.sleep(60)query.stop()#query.awaitTermination()

代码语言:javascript代码运行次数:0运行复制

-------------------------------------------Batch: 0-------------------------------------------+--------------------+-------------------+-------------------+---------+|              window|              start|                end|avg_price|+--------------------+-------------------+-------------------+---------+|[2020-12-21 08:25...|2020-12-21 08:25:00|2020-12-21 08:35:00|  10004.0||[2020-12-21 08:00...|2020-12-21 08:00:00|2020-12-21 08:10:00|     97.0||[2020-12-21 08:05...|2020-12-21 08:05:00|2020-12-21 08:15:00|     97.0||[2020-12-21 08:20...|2020-12-21 08:20:00|2020-12-21 08:30:00|  10004.0|+--------------------+-------------------+-------------------+---------+-------------------------------------------Batch: 1-------------------------------------------+--------------------+-------------------+-------------------+---------+|              window|              start|                end|avg_price|+--------------------+-------------------+-------------------+---------+|[2020-12-21 08:25...|2020-12-21 08:25:00|2020-12-21 08:35:00|  10002.0||[2020-12-21 08:00...|2020-12-21 08:00:00|2020-12-21 08:10:00|     97.0||[2020-12-21 08:05...|2020-12-21 08:05:00|2020-12-21 08:15:00|     97.0||[2020-12-21 08:20...|2020-12-21 08:20:00|2020-12-21 08:30:00|  10002.0|+--------------------+-------------------+-------------------+---------+-------------------------------------------Batch: 2-------------------------------------------+--------------------+-------------------+-------------------+---------+|              window|              start|                end|avg_price|+--------------------+-------------------+-------------------+---------+|[2020-12-21 08:25...|2020-12-21 08:25:00|2020-12-21 08:35:00|  10002.0||[2020-12-21 08:00...|2020-12-21 08:00:00|2020-12-21 08:10:00|     98.5||[2020-12-21 08:05...|2020-12-21 08:05:00|2020-12-21 08:15:00|     98.5||[2020-12-21 08:20...|2020-12-21 08:20:00|2020-12-21 08:30:00|  10002.0|+--------------------+-------------------+-------------------+---------+

代码语言:javascript代码运行次数:0运行复制

#进一步地,我们设置watermarking(水位线)为20分钟, 则超出水位线的数据将允许被丢弃(但不一定被丢弃)dfprice_avg = dfprice.withWatermark("dt", "20 minutes")    .groupBy(F.window(dfprice.dt, "10 minutes", "5 minutes"))    .agg(F.sum("amount").alias("amount"), F.sum("volume").alias("volume"))    .selectExpr("window","window.start","window.end","volume/amount as avg_price")dfprice_avg.printSchema() 

代码语言:javascript代码运行次数:0运行复制

#设置水位线后, outputMode必须是append或者updatequery = dfprice_avg.writeStream     .outputMode("update")    .format("console")     .start()time.sleep(60)query.stop()#query.awaitTermination()

代码语言:javascript代码运行次数:0运行复制

-------------------------------------------Batch: 0-------------------------------------------+--------------------+-------------------+-------------------+---------+|              window|              start|                end|avg_price|+--------------------+-------------------+-------------------+---------+|[2020-12-21 08:00...|2020-12-21 08:00:00|2020-12-21 08:10:00|     96.0||[2020-12-21 08:15...|2020-12-21 08:15:00|2020-12-21 08:25:00|  10001.0||[2020-12-21 08:20...|2020-12-21 08:20:00|2020-12-21 08:30:00|  10001.0||[2020-12-21 07:55...|2020-12-21 07:55:00|2020-12-21 08:05:00|     96.0|+--------------------+-------------------+-------------------+---------+-------------------------------------------Batch: 1-------------------------------------------+------+-----+---+---------+|window|start|end|avg_price|+------+-----+---+---------++------+-----+---+---------+-------------------------------------------Batch: 2-------------------------------------------+--------------------+-------------------+-------------------+---------+|              window|              start|                end|avg_price|+--------------------+-------------------+-------------------+---------+|[2020-12-21 08:00...|2020-12-21 08:00:00|2020-12-21 08:10:00|     97.5||[2020-12-21 07:55...|2020-12-21 07:55:00|2020-12-21 08:05:00|     97.5|+--------------------+-------------------+-------------------+---------+-------------------------------------------Batch: 3-------------------------------------------+--------------------+-------------------+-------------------+------------------+|              window|              start|                end|         avg_price|+--------------------+-------------------+-------------------+------------------+|[2020-12-21 08:15...|2020-12-21 08:15:00|2020-12-21 08:25:00|10000.666666666666||[2020-12-21 08:20...|2020-12-21 08:20:00|2020-12-21 08:30:00|10000.666666666666|+--------------------+-------------------+-------------------+------------------+-------------------------------------------Batch: 4-------------------------------------------+------+-----+---+---------+|window|start|end|avg_price|+------+-----+---+---------++------+-----+---+---------+-------------------------------------------Batch: 5-------------------------------------------+--------------------+-------------------+-------------------+---------+|              window|              start|                end|avg_price|+--------------------+-------------------+-------------------+---------+|[2020-12-21 08:00...|2020-12-21 08:00:00|2020-12-21 08:10:00|     98.0||[2020-12-21 07:55...|2020-12-21 07:55:00|2020-12-21 08:05:00|     98.0|+--------------------+-------------------+-------------------+---------+

3, Join Operations

Streaming DataFrame 可以和 Static DataFrame 进行 Inner 或者 Left Outer 连接操作。join后的结果依然是一个 Streaming DataFrame。

此外 Streaming DataFrame 也可以和 Streaming DataFrame 进行 Inner join.

这种join机制是通过追溯被join的 Streaming DataFrame 已经接收到的流数据和主动 join的 Streaming DataFrame的当前批次进行key的配对,为了避免追溯过去太久的数据造成性能瓶颈,可以通过设置 watermark 来清空过去太久的历史数据的State,数据被清空State后将允许不被配对查询。

代码语言:javascript代码运行次数:0运行复制

schema = T.StructType().add("name","string").add("age","integer").add("score","double")dfstudents = spark.readStream.schema(schema).json("./data/students_json")dfstudents.printSchema() 

下面是Streaming DataFrame 和 Static DataFrame 进行 join的示范。

代码语言:javascript代码运行次数:0运行复制

dfclasses = spark.createDataFrame([("LiLei","class1"),("Hanmeimei","class2"),("Lily","class3")]).toDF("name","class")dfclasses.printSchema() 

代码语言:javascript代码运行次数:0运行复制

# 示范 Streaming DataFrame  inner join Static DataFramedfjoin_inner = dfstudents.join(dfclasses, "name", "inner")dfjoin_inner.printSchema()print(dfjoin_inner.isStreaming)

代码语言:javascript代码运行次数:0运行复制

root |-- name: string (nullable = true) |-- age: integer (nullable = true) |-- score: double (nullable = true) |-- class: string (nullable = true)True

代码语言:javascript代码运行次数:0运行复制

query = dfjoin_inner.writeStream     .outputMode("append")    .format("console")     .start()time.sleep(10)query.stop()#query.awaitTermination()

代码语言:javascript代码运行次数:0运行复制

-------------------------------------------Batch: 0-------------------------------------------+---------+---+-----+------+|     name|age|score| class|+---------+---+-----+------+|    LiLei| 12| 75.5|class1||Hanmeimei| 16| 90.0|class2||     Lily| 15| 68.0|class3|+---------+---+-----+------+

代码语言:javascript代码运行次数:0运行复制

# 示范 Streaming DataFrame  left join Static DataFramedfjoin_left = dfstudents.join(dfclasses, "name", "left")dfjoin_left.printSchema()print(dfjoin_left.isStreaming)

代码语言:javascript代码运行次数:0运行复制

query = dfjoin_left.writeStream     .outputMode("append")    .format("console")     .start()time.sleep(10)query.stop()#query.awaitTermination()

代码语言:javascript代码运行次数:0运行复制

-------------------------------------------Batch: 0-------------------------------------------+---------+---+-----+------+|     name|age|score| class|+---------+---+-----+------+|    LiLei| 12| 75.5|class1||   Justin| 19| 87.0|  null||     Lily| 15| 68.0|class3||     Andy| 17| 80.0|  null||Hanmeimei| 16| 90.0|class2||  Michael| 20| 70.5|  null|+---------+---+-----+------+

下面是一个简单的Streaming DataFrame inner join Streaming DataFrame示范。

代码语言:javascript代码运行次数:0运行复制

dfhometown = dfstudents.selectExpr("name","if(rand()>0.5,'China','USA') as hometown")dfhometown.printSchema()print(dfhometown.isStreaming)

代码语言:javascript代码运行次数:0运行复制

root |-- name: string (nullable = true) |-- hometown: string (nullable = false)True

代码语言:javascript代码运行次数:0运行复制

dfjoin_streaming = dfstudents.join(dfhometown,"name","inner")dfjoin_streaming.printSchema()print(dfjoin_streaming.isStreaming)

代码语言:javascript代码运行次数:0运行复制

query = dfjoin_streaming.writeStream     .outputMode("append")    .format("console")     .start()time.sleep(10)query.stop()#query.awaitTermination()

代码语言:javascript代码运行次数:0运行复制

-------------------------------------------Batch: 0-------------------------------------------+---------+---+-----+--------+|     name|age|score|hometown|+---------+---+-----+--------+|    LiLei| 12| 75.5|     USA||   Justin| 19| 87.0|   China||     Lily| 15| 68.0|   China||Hanmeimei| 16| 90.0|   China||     Andy| 17| 80.0|     USA||  Michael| 20| 70.5|   China|+---------+---+-----+--------+

五,输出 Structured Streaming 的结果

Streaming DataFrame 支持以下类型的结果输出:

Kafka Sink。将处理后的流数据输出到kafka某个或某些topic中。File Sink。将处理后的流数据写入到文件系统中。ForeachBatch Sink。对于每一个micro-batch的流数据处理后的结果,用户可以编写函数实现自定义处理逻辑。例如写入到多个文件中,或者写入到文件并打印。Foreach Sink。一般在Continuous触发模式下使用,用户编写函数实现每一行的处理。Console Sink。打印到Driver端控制台,如果日志量大,谨慎使用。一般供调试使用。Memory Sink。输出到内存中,供调试使用。

1,输出到Kafka Sink

示范代码如下,注意,df 应当具备以下列:topic, key 和 value.

代码语言:javascript代码运行次数:0运行复制

query = df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")   .writeStream()  .format("kafka")  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")  .start()

2,输出到File Sink

代码语言:javascript代码运行次数:0运行复制

schema = T.StructType().add("name","string").add("age","integer").add("score","double")dfstudents = spark.readStream.schema(schema).json("./data/students_json")query = dfstudents     .writeStream    .format("csv")     .option("checkpointLocation", "./data/checkpoint")     .option("path", "./data/students_csv")     .start()time.sleep(5)query.stop()

3, 输出到ForeachBatch Sink

对于每一个Batch,可以当做一个Static DataFrame 进行处理。

代码语言:javascript代码运行次数:0运行复制

schema = T.StructType().add("name","string").add("age","integer").add("score","double")dfstudents = spark.readStream.schema(schema).json("./data/students_json")def foreach_batch_function(df, epoch_id):    print("epoch_id = ",epoch_id)    df.show()    print("rows = ",df.count())    query = dfstudents.writeStream.foreachBatch(foreach_batch_function).start()  time.sleep(3)query.stop()

代码语言:javascript代码运行次数:0运行复制

epoch_id =  0+---------+---+-----+|     name|age|score|+---------+---+-----+|    LiLei| 12| 75.5||Hanmeimei| 16| 90.0||     Lily| 15| 68.0||  Michael| 20| 70.5||     Andy| 17| 80.0||   Justin| 19| 87.0|+---------+---+-----+rows =  6

4, 输出到Console Sink

将结果输出到终端,对于jupyter 环境调试,可能需要在jupyter 的 log 日志中去查看。

代码语言:javascript代码运行次数:0运行复制

schema = T.StructType().add("name","string").add("age","integer").add("score","double")dfstudents = spark.readStream.schema(schema).json("./data/students_json")dfstudents.writeStream   .format("console")   .trigger(processingTime='2 seconds')   .start()

代码语言:javascript代码运行次数:0运行复制

-------------------------------------------Batch: 0-------------------------------------------+---------+---+-----+|     name|age|score|+---------+---+-----+|    LiLei| 12| 75.5||Hanmeimei| 16| 90.0||     Lily| 15| 68.0||  Michael| 20| 70.5||     Andy| 17| 80.0||   Justin| 19| 87.0|+---------+---+-----+

5, 输出到Memory Sink

代码语言:javascript代码运行次数:0运行复制

schema = T.StructType().add("name","string").add("age","integer").add("score","double")dfstudents = spark.readStream.schema(schema).json("./data/students_json")#设置的queryName 将成为需要查询的表的名称query = dfstudents     .writeStream     .queryName("dfstudents")     .outputMode("append")     .format("memory")     .start()time.sleep(3)query.stop()dfstudents_static = spark.sql("select * from dfstudents")dfstudents_static.show() 

代码语言:javascript代码运行次数:0运行复制

+---------+---+-----+|     name|age|score|+---------+---+-----+|    LiLei| 12| 75.5||Hanmeimei| 16| 90.0||     Lily| 15| 68.0||  Michael| 20| 70.5||     Andy| 17| 80.0||   Justin| 19| 87.0|+---------+---+-----+
初识Structured Streaming

如果本书对你有所帮助,想鼓励一下作者,记得给本项目加一颗星星star⭐️,并分享给你的朋友们喔?!

公众号后台回复关键词:pyspark,获取本项目github地址。

以上就是初识Structured Streaming的详细内容,更多请关注创想鸟其它相关文章!

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/415666.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
漫蛙2网页版入口跳转页面 漫蛙2网页版登录访问地址
上一篇 2025年11月7日 01:06:06
为什么iPhone没有增加RAM容量?
下一篇 2025年11月7日 01:06:13

相关推荐

  • 修复Django电商项目中AJAX过滤产品列表图片不显示问题

    在Django电商项目中,当使用AJAX动态加载过滤后的产品列表时,常遇到图片无法正常显示的问题。这通常是由于前端模板中图片加载方式(如data-setbg属性结合JavaScript库)与AJAX动态内容更新机制不兼容所致。解决方案是直接在AJAX返回的HTML中使用标准的标签来渲染图片,确保浏览…

    2026年5月10日
    000
  • 开源免费PHP工具 PHP开发效率提升利器

    推荐开源免费PHP开发工具以提升效率:VS Code、Sublime Text轻量高效,PhpStorm专业强大;调试用Xdebug、Kint、Ray;依赖管理选Composer;代码质量工具包括PHPStan、Psalm、PHP_CodeSniffer;数据库管理可用%ignore_a_1%MyA…

    2026年5月10日
    000
  • Matplotlib 地图中多类型图例的创建与优化

    Matplotlib 地图中多类型图例的创建与优化Matplotlib 地图中多类型图例的创建与优化Matplotlib 地图中多类型图例的创建与优化Matplotlib 地图中多类型图例的创建与优化

    本教程旨在解决matplotlib地图可视化中,如何在一个图例中同时展示颜色块(如区域分类)和自定义标记(如特定兴趣点)的问题。文章详细介绍了当传统`patch`对象无法正确显示标记时,如何利用`matplotlib.lines.line2d`创建标记图例句柄,并将其与颜色块图例句柄合并,从而生成一…

    2026年5月10日 用户投稿
    100
  • Golang JSON序列化:控制敏感字段暴露的最佳实践

    本教程探讨golang中如何高效控制结构体字段在json序列化时的可见性。当需要将包含敏感信息的结构体数组转换为json响应时,通过利用`encoding/json`包提供的结构体标签,特别是`json:”-“`,可以轻松实现对特定字段的忽略,从而避免敏感数据泄露,确保api…

    2026年5月10日
    000
  • 利用海象运算符简化条件赋值:Python教程与最佳实践

    本文旨在探讨Python中海象运算符(:=)在条件赋值场景下的应用。通过对比传统if/else语句与海象运算符,以及条件表达式,分析海象运算符在简化代码、提高可读性方面的优势与局限性。并通过具体示例,展示如何在列表推导式等场景下合理使用海象运算符,同时强调其潜在的复杂性及替代方案,帮助开发者更好地掌…

    2026年5月10日
    100
  • 怎么在PHP代码中实现图片上传功能_PHP图片上传功能实现与安全处理教程

    首先创建含enctype的HTML表单,再用PHP接收文件,检查目录、移动临时文件,验证类型与大小,生成唯一文件名,并调整php.ini限制以确保上传成功。 如果您尝试在PHP项目中添加图片上传功能,但服务器无法正确接收或保存文件,则可能是由于表单配置、文件处理逻辑或安全限制的问题。以下是实现该功能…

    2026年5月10日
    100
  • HTML如何隐藏滚动条或去除滚动条

    滚动条可以存在也可以不存在,本文主要介绍了html 隐藏滚动条和去除滚动条的方法的相关资料,大家一起来学习一下html隐藏滚动条或去除滚动条的方法吧。 1. html 标签加属性 XML/HTML Code复制内容到剪贴板 2.body中加入以下代码 立即学习“前端免费学习笔记(深入)”; html…

    用户投稿 2026年5月10日
    000
  • vscode上怎么运行html_vscode上运行html步骤【指南】

    首先保存文件为.html格式,再通过浏览器或Live Server插件打开预览;推荐安装Live Server实现本地服务器运行与实时刷新,提升开发体验。 在 VS Code 上运行 HTML 文件并不需要复杂的配置,只需几个简单步骤即可预览页面效果。VS Code 本身是一个代码编辑器,不直接运行…

    2026年5月10日
    100
  • RichHandler与Rich Progress集成:解决显示冲突的教程

    在使用rich库的`richhandler`进行日志输出并同时使用`progress`组件时,可能会遇到显示错乱或溢出问题。这通常是由于为`richhandler`和`progress`分别创建了独立的`console`实例导致的。解决方案是确保日志处理器和进度条组件共享同一个`console`实例…

    2026年5月10日
    000
  • 修复点击时按钮抖动:CSS垂直对齐实践

    本文探讨了在Web开发中,交互式按钮(如播放/暂停按钮)在点击时发生意外垂直位移的问题。通过分析CSS样式变化对元素布局的影响,我们发现这是由于按钮不同状态下的边框样式和内边距改变,以及默认的垂直对齐行为共同作用所致。核心解决方案是利用CSS的vertical-align属性,将其设置为middle…

    2026年5月10日
    100
  • 页面中文本域的值怎么设置

    标签定义多行的文本输入控件。 文本区中可容纳无限数量的文本,其中的文本的默认字体是等宽字体(通常是 Courier)。 可以通过 cols 和 rows 属性来规定 textarea 的尺寸,不过更好的办法是使用 CSS 的 height 和 width 属性。 注释:在文本输入区内的文本行间,用 …

    2026年5月10日
    000
  • 使用 Jupyter Notebook 进行探索性数据分析

    Jupyter Notebook通过单元格实现代码与Markdown结合,支持数据导入(pandas)、清洗(fillna)、探索(matplotlib/seaborn可视化)、统计分析(describe/corr)和特征工程,便于记录与分享分析过程。 Jupyter Notebook 是进行探索性…

    2026年5月10日
    000
  • 《魔兽世界》将于6月11日开启国服回归技术测试

    《魔兽世界》将于6月11日开启国服回归技术测试《魔兽世界》将于6月11日开启国服回归技术测试《魔兽世界》将于6月11日开启国服回归技术测试《魔兽世界》将于6月11日开启国服回归技术测试

    《%ign%ignore_a_1%re_a_1%》官方宣布,将于6月11日开启国服回归技术测试,时间为7天,并称可以在6月内正式开服,玩家们可以访问官网下载战网客户端并预下载“巫妖王之怒”客户端,技术测试详情见下图。 WordAi WordAI是一个AI驱动的内容重写平台 53 查看详情 以上就是《…

    2026年5月10日 用户投稿
    200
  • 如何在HTML中插入表单元素_HTML表单控件与输入类型使用指南

    HTML表单通过标签构建,包含action和method属性定义数据提交目标与方式,常用input类型如text、password、email等适配不同输入需求,配合label、required、placeholder提升可用性,结合textarea、select、button等控件实现完整交互,是…

    2026年5月10日
    100
  • 前端缓存策略与JavaScript存储管理

    根据数据特性选择合适的存储方式并制定清晰的读写与清理逻辑,能显著提升前端性能;合理运用Cookie、localStorage、sessionStorage、IndexedDB及Cache API,结合缓存策略与定期清理机制,可在保证用户体验的同时避免安全与性能隐患。 前端缓存和JavaScript存…

    2026年5月10日
    200
  • HTML5网页如何实现手势操作 HTML5网页移动端交互的处理技巧

    首先利用原生touch事件实现滑动判断,再通过preventDefault解决滚动冲突,接着引入Hammer.js处理复杂手势,最后通过优化点击区域、避免事件冲突和增加视觉反馈提升体验。 在移动端浏览器中,HTML5网页可以通过触摸事件实现手势操作,提升用户体验。虽然原生JavaScript提供了基…

    2026年5月10日
    000
  • 深入理解 Express.js 中 next() 参数的作用与中间件机制

    本文深入探讨 express.js 中间件函数中的 `next()` 参数。它负责将控制权传递给请求-响应周期中的下一个中间件或路由处理程序。文章将详细解释 `next()` 的工作原理、中间件的注册与执行顺序,以及不正确使用 `next()` 可能导致请求挂起的风险,并通过代码示例和实际应用场景,…

    2026年5月10日
    000
  • Python命令怎样使用profile分析脚本性能 Python命令性能分析的基础教程

    使用Python的cProfile模块分析脚本性能最直接的方式是通过命令行执行python -m cProfile your_script.py,它会输出每个函数的调用次数、总耗时、累积耗时等关键指标,帮助定位性能瓶颈;为进一步分析,可将结果保存为文件python -m cProfile -o ou…

    2026年5月10日
    000
  • PHP动态生成表单输入与POST数据获取实践指南

    本教程详细阐述了如何在php中根据动态数据源(如数据库值)生成多个表单输入框,并演示了如何通过post方法准确无误地获取这些动态生成的输入值。文章强调了正确的输入框命名策略,避免了常见的命名误区,并提供了完整的代码示例,确保开发者能够高效处理动态表单数据。 动态生成表单输入 在Web开发中,我们经常…

    2026年5月10日
    000
  • Python递归函数追踪与性能考量:以序列打印为例

    本文深入探讨了Python中一种递归打印序列元素的方法,并着重演示了如何通过引入缩进参数来有效追踪递归函数的执行流程和参数变化。通过实际代码示例,文章揭示了递归调用可能带来的潜在性能开销,特别是对调用栈空间的需求,以及Python默认递归深度限制可能导致的错误,为读者提供了理解和优化递归算法的实用见…

    2026年5月10日
    000

发表回复

登录后才能评论
关注微信