RDD算子使用--action行动算子操作

发布于:2024-04-28 ⋅ 阅读:(24) ⋅ 点赞:(0)

action行动算子操作

1)foreach算子:

foreach主要功能,就是用来遍历RDD中的每一条纪录,其实现就是将map或者flatMap中的返回值变为Unit即可,即foreach(A => Unit)。

2)count算子:统计该rdd中元素的个数。

val count = rdd.count()

println("rdd的count个数为:" + count)

3)collect算子:字面意思就是收集,拉取的意思,该算子的含义就是将分布在集群中的各个partition中的数据拉回到driver中,进行统一的处理;但是这个算子有很大的风险存在,第一,driver内存压力很大,第二数据在网络中大规模的传输,效率很低;所以一般不建议使用,如果非要用,请先执行filter。

val arr = rdd.filter(_._2 > 2).collect()

arr.foreach(println)

4)take&first:返回该rdd中的前N个元素,如果该rdd的数据是有序的,那么take(n)就是TopN;而first是take(n)中比较特殊的一个take(1)。

val arr:Array[(String, Int)] = rdd.take(2)

arr.foreach(println)



val ret:(String, Int) = rdd.first()

println(ret)

5)takeOrdered(n):返回前几个的排序。

val arr:Array[(String, Int)] = rdd.takeOrdered(2)

arr.foreach(println)

6)reduce算子:需要清楚的是,reduce是一个action操作,reduceByKey是一个transformation。reduce对一个rdd执行聚合操作,并返回结果,结果是一个值。

//例子1
val rdd: RDD[Int] = sc.parallelize(1 to 5,2)
//聚合
val result: Int = rdd.reduce(_+_)
//打印输出
println(result)
//例子2
val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))
//聚合
val result1= rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2))
//打印输出
println(result1)

需要注意一点的是,聚合前后的数据类型保持一致。

7)countByKey算子:统计key出现的次数。

val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
//统计相同key出现的次数
val result: collection.Map[Int, Long] = rdd.countByKey()
//打印输出
println(result)

8)saveAsXxx算子:

//saveAsTextFile

rdd.saveAsTextFile("file:/E:/data/out/")

//saveAsNewAPIHadoopFile

val path = "file:/E:/data/out1"

rr.saveAsNewAPIHadoopFile(path,

classOf[Text],

classOf[IntWritable],

classOf[TextOutputFormat[Text, IntWritable]])

9)foreachPartition算子:foreach写入写入数据库,但是极其不友好。

def saveInfoMySQL2(rdd: RDD[(String, Int)]): Unit = {

rdd.foreach{case (word, count) => {

Class.forName("com.mysql.jdbc.Driver")

val url = "jdbc:mysql://localhost:3306/test"

val connection = DriverManager.getConnection(url, "mark", "sorry")

val sql =

"""

|insert into wordcounts(word, `count`) Values(?, ?)

|""".stripMargin

val ps = connection.prepareStatement(sql)

ps.setString(1, word)

ps.setInt(2, count)

ps.execute()

ps.close()

connection.close()

}}

}

val rdd: RDD[(String, Int)] = sc.parallelize(List(("hadoop",2)))

save(rdd)

高效写入数据库:

def saveInfoMySQLByForeachPartition(rdd: RDD[(String, Int)]): Unit = {

rdd.foreachPartition(partition => {

//这是在partition内部,属于该partition的本地

Class.forName("com.mysql.jdbc.Driver")

val url = "jdbc:mysql://localhost:3306/test"

val connection = DriverManager.getConnection(url, "mark", "sorry")

val sql =

"""

|insert into wordcounts(word, `count`) Values(?, ?)

|""".stripMargin

val ps = connection.prepareStatement(sql)

partition.foreach{case (word, count) => {

ps.setString(1, word)

ps.setInt(2, count)

ps.execute()

}}

ps.close()

connection.close()

})

}