传递给Spark的函数,如map()或者filter()的判断条件函数,能够利用定义在函数之外的变量,
但是集群中的每一个task都会得到变量的一个副本,并且task对变量进行的更新则不会被返回给driver.
而Spark的两种共享变量:累加器(accumulator)和广播变量(broadcast variable).
累加器
累加器可以很简便地对各个worker返回给driver的值进行聚合(有些类似aggregate);
累加器最常见的用途之一就是对一个job执行期间发生的事件进行计数.
======累加器原理======
Driver端:
Driver端构建Accumulator并初始化;
同时完成了Accumulator注册:Accumulators.register(this);
同时Accumulator会在序列化后发送到Executor端;
Driver接收到ResultTask完成的状态更新后,会去更新Value的值;
然后在Action操作执行后就可以获取到Accumulator的值了.
Executor端:
Executor端接收到Task之后会进行反序列化操作,反序列化得到RDD和function;
同时在反序列化Task的同时也去反序列化Accumulator(在readObject方法中完成);
同时也会向TaskContext完成注册,完成任务计算之后,随着Task结果一起返回给Driver端.
广播变量
广播变量通常情况下,当一个RDD的很多操作都需要使用driver中定义的变量时,
每次操作,driver都要把变量发送给worker上的executor中的所有task一次;
如果这个变量中的数据很大的话,会产生很高的传输负载,导致执行效率降低;
使用广播变量可以使程序高效地将一个很大的只读数据发送给多个worker节点,而且对每个worker节点只需要传输一次,每次操作时executor可以直接获取本地保存的数据副本,不需要多次传输.
累加器、广播变量的应用
package rdd
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object SparkBroadCastDemo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkBroadCastDemo").setMaster("local[*]")
val sc = new SparkContext(conf)
val ints = ListBuffer(1,2,3,4,5)
var num = 0
val par: RDD[Int] = sc.parallelize(ints)
val br: Broadcast[ListBuffer[Int]] = sc.broadcast(ints)
val acc: Accumulator[Int] = sc.accumulator(0)
val acc1 = sc.longAccumulator
-----------------------------------------------------------------------------------------------------------------------------------------------------------
val mapRDD: RDD[Int] = par.map(f => {
num += 1
acc.add(1)
println(s"MapBroadCast:${br.value}")
f
})
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
mapRDD.filter(f => {
acc1.add(1L)
println(s"fileBroadCast:${br.value}")
true
}).foreach(println)
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
mapRDD.count()
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
println(num)
println(s"acc:${acc}")
}
}
val map = sc.textFile("/test.txt").map(line => {
val arr = line.split(",")
(arr(0), arr(2).toInt)
}).distinct
var mapBC = sc.broadcast(map.take(10).toMap)
mapBC.unpersist
mapBC = sc.broadcast(map.take(2).toMap)