action行动算子操作
1)foreach算子:
foreach主要功能,就是用来遍历RDD中的每一条纪录,其实现就是将map或者flatMap中的返回值变为Unit即可,即foreach(A => Unit)。
2)count算子:统计该rdd中元素的个数。
val count = rdd.count()
println("rdd的count个数为:" + count)
3)collect算子:字面意思就是收集,拉取的意思,该算子的含义就是将分布在集群中的各个partition中的数据拉回到driver中,进行统一的处理;但是这个算子有很大的风险存在,第一,driver内存压力很大,第二数据在网络中大规模的传输,效率很低;所以一般不建议使用,如果非要用,请先执行filter。
val arr = rdd.filter(_._2 > 2).collect()
arr.foreach(println)
4)take&first:返回该rdd中的前N个元素,如果该rdd的数据是有序的,那么take(n)就是TopN;而first是take(n)中比较特殊的一个take(1)。
val arr:Array[(String, Int)] = rdd.take(2)
arr.foreach(println)
val ret:(String, Int) = rdd.first()
println(ret)
5)takeOrdered(n):返回前几个的排序。
val arr:Array[(String, Int)] = rdd.takeOrdered(2)
arr.foreach(println)
6)reduce算子:需要清楚的是,reduce是一个action操作,reduceByKey是一个transformation。reduce对一个rdd执行聚合操作,并返回结果,结果是一个值。
//例子1
val rdd: RDD[Int] = sc.parallelize(1 to 5,2)
//聚合
val result: Int = rdd.reduce(_+_)
//打印输出
println(result)
//例子2
val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))
//聚合
val result1= rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2))
//打印输出
println(result1)
需要注意一点的是,聚合前后的数据类型保持一致。
7)countByKey算子:统计key出现的次数。
val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
//统计相同key出现的次数
val result: collection.Map[Int, Long] = rdd.countByKey()
//打印输出
println(result)
8)saveAsXxx算子:
//saveAsTextFile
rdd.saveAsTextFile("file:/E:/data/out/")
//saveAsNewAPIHadoopFile
val path = "file:/E:/data/out1"
rr.saveAsNewAPIHadoopFile(path,
classOf[Text],
classOf[IntWritable],
classOf[TextOutputFormat[Text, IntWritable]])
9)foreachPartition算子:foreach写入写入数据库,但是极其不友好。
def saveInfoMySQL2(rdd: RDD[(String, Int)]): Unit = {
rdd.foreach{case (word, count) => {
Class.forName("com.mysql.jdbc.Driver")
val url = "jdbc:mysql://localhost:3306/test"
val connection = DriverManager.getConnection(url, "mark", "sorry")
val sql =
"""
|insert into wordcounts(word, `count`) Values(?, ?)
|""".stripMargin
val ps = connection.prepareStatement(sql)
ps.setString(1, word)
ps.setInt(2, count)
ps.execute()
ps.close()
connection.close()
}}
}
val rdd: RDD[(String, Int)] = sc.parallelize(List(("hadoop",2)))
save(rdd)
高效写入数据库:
def saveInfoMySQLByForeachPartition(rdd: RDD[(String, Int)]): Unit = {
rdd.foreachPartition(partition => {
//这是在partition内部,属于该partition的本地
Class.forName("com.mysql.jdbc.Driver")
val url = "jdbc:mysql://localhost:3306/test"
val connection = DriverManager.getConnection(url, "mark", "sorry")
val sql =
"""
|insert into wordcounts(word, `count`) Values(?, ?)
|""".stripMargin
val ps = connection.prepareStatement(sql)
partition.foreach{case (word, count) => {
ps.setString(1, word)
ps.setInt(2, count)
ps.execute()
}}
ps.close()
connection.close()
})
}