一、手动直接转换
def rddToDf1(): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("hello world")
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val sc = spark.sparkContext
val rdd: RDD[(String, Int)] = sc.parallelize(Array(("李四", 10), ("zs", 20), ("王无", 21)))
rdd.toDF("name","age").show
}
二、使用样例类转换
def rddToDf2(): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("hello world")
val spark = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val sc = spark.sparkContext
val rdd: RDD[(String, Int)] = sc.parallelize(Array(("李四", 10), ("zs", 20), ("王无", 21)))
val rdd2: RDD[User] = rdd.map(line => {
User(line._1, line._2)
})
rdd2.toDF().show()
}
三、通过API转换
def rddToDf3(): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("hello world")
val spark = SparkSession.builder().config(conf).getOrCreate()
val sc = spark.sparkContext
val rdd: RDD[(String, Int)] = sc.parallelize(Array(("李四", 10), ("zs", 20), ("王无", 21)))
val rowRdd: RDD[Row] = rdd.map(x => Row(x._1, x._2))
val types = StructType(Array(StructField("name", StringType), StructField("age", IntegerType)))
val frame: DataFrame = spark.createDataFrame(rowRdd, types)
frame.show()
}