[Spark] 详解 outputMode

发布于:2024-07-31 ⋅ 阅读:(123) ⋅ 点赞:(0)

在 Spark Structured Streaming 中,outputMode 用于指定将结果表中的数据写入接收器(Sink)的方式。主要有以下三种模式:

  1. append 模式:这是默认的输出模式。仅将自上次触发以来添加到结果表中的新行输出到接收器。适用于不包含聚合操作的查询,或者包含聚合操作且使用了水印(watermark)的情况。在有聚合操作和水印时,水印用于删除旧的聚合状态,并仅在水印超过窗口结束时间时,输出一次最终结果到结果表并写入接收器。如果是没有聚合的操作(如selectwheremapflatMapfilterjoin等),则直接输出结果表中新增的数据。
  2. update 模式:只将自上次触发以来结果表中更新的行(新增或修改)输出到接收器。如果没有聚合操作,其行为类似于 append 模式,即输出结果表中新增的数据;如果有聚合操作,则输出聚合结果改变的数据。
  3. complete 模式:每次触发后,会将整个结果表输出到接收器。此模式仅适用于包含聚合操作的查询。由于它会输出表的所有内容,因此只有在此模式下可以进行全局排序。但需要注意,使用此模式时,结果表数据会一直存储在内存中,所以要谨慎使用,确保数据量不会过大导致内存溢出。

下面是使用不同 outputMode 的示例代码:

// 无聚合操作,使用 append 模式
val noAggDf = deviceDataDf.select("device").where("signal > 10")
noAggDf
 .writeStream
 .format("console") 
 .start() 

noAggDf
 .writeStream
 .parquet("path/to/destination/directory") 
 .start() 

// 有聚合操作,使用 complete 模式
val aggDf = df.groupBy("device").count()
aggDf
 .writeStream
 .outputMode("complete") 
 .format("console") 
 .start() 

aggDf
 .writeStream
 .queryName("aggregates") // 此查询名称将成为表名
 .outputMode("complete") 
 .format("memory") 
 .start() 
spark.sql("select * from aggregates").show() // 交互查询内存表

在实际应用中,选择哪种 outputMode 取决于具体的业务需求和数据特点。如果数据量较大且不需要全局排序,或者没有聚合操作,通常使用 append 模式以减少输出的数据量。如果需要输出完整的聚合结果或进行全局排序,则使用 complete 模式,但要注意内存限制。而 update 模式适用于只关心数据更新部分的情况。

此外,还需要注意的是,使用某些接收器(如FileSink)时,其支持的 outputMode 可能会受到限制。例如,在 Spark 2.0 中,FileSink 仅支持 Parquet 文件和 append 模式。同时,为了确保数据的一致性和可靠性,还可以设置检查点(checkpoint)来保存一些元数据信息,以便在出现故障时能够恢复。


网站公告

今日签到

点亮在社区的每一天
去签到