利用scala书写spark程序实现wordCount

发布于:2024-03-29 ⋅ 阅读:(17) ⋅ 点赞:(0)

实验环境:虚拟机(centos)上创建了三台集群,部署了Hadoop,words文档放在HDFS上的目录下
所用版本如下:
<hadoop.version>2.7.7</hadoop.version>
<spark.version>2.4.5</spark.version>
<scala.version>2.12.10</scala.version>

步骤

代码详解

方法一:

object readData {
  def main(args: Array[String]): Unit = {
  	// 创建一个本地运行的 Spark 应用程序,并且设置了应用程序的名称为 "readData"
    val spark=SparkSession.builder().appName("readData").master("local[*]").getOrCreate();
    // HDFS目录路径
    val hdfsPath = "hdfs://你的节点ip:9000/路径/文件名";
    // 读取文本文件
    val lines = spark.read.textFile(hdfsPath).rdd
    // 单词计数
    val wordCounts = lines
      .flatMap(line => line.split(" ")) // 根据空格切分单词
      .map(word => (word, 1))
      .reduceByKey(_ + _)
    // 输出结果
    wordCounts.collect().foreach(println)
    // 停止 SparkSession
    spark.stop();
  }
}

方法二:

object readData {
  def main(args: Array[String]): Unit = {
    // HDFS目录路径
    val hdfsPath = "hdfs://你的节点ip:9000/路径/文件名";
	//1. 创建 conf 对象
    val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    //2. 创建 SparkContext 对象:提交应用的入口
    val sc = new SparkContext(conf)
    //3. 执行单词统计
    val res = sc.textFile(hdfsPath).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect;
    //4. 遍历输出结果
    res.foreach(println);
    // 停止 SparkSession
    sc.stop()
  }
}

说明:

  • 使用 SparkContext 和 SparkConf 是传统的方式,适用于 Spark 1.x 版本。它们提供了基本的 Spark 功能,但使用起来可能相对复杂,需要更多的配置和管理。
  • 使用 SparkSession 是 Spark 2.x 版本中推荐的方式。它集成了 Spark SQL,使得你可以更方便地使用 DataFrame 和 Dataset API 来处理结构化数据,而不需要额外导入其他的 API。此外,SparkSession 也可以自动管理 SparkContext,使得整个应用程序的管理更加简单。
  • flatMap 是一个转换操作,主要用于将结果扁平化(这里是将切分后的文本转换为多个单词作为输出)
  • map 是一个转换操作,它将输入RDD中的每个元素映射为一个新的元素(这里是映射为(key,1)键值对的形式)
  • reduceByKey 是一个转换操作,它将具有相同键的元素聚合在一起,并对它们的值进行合并
本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到