【Spark征服之路-2.5-Spark-Core编程(一)】

发布于:2025-06-11 ⋅ 阅读:(42) ⋅ 点赞:(0)

环境准备

  1. Jdk1.8版本
  2. Scala2.12版本
  3. Idea集成开发环境中需要安装scala插件

环境配置

· 添加 Scala 插件

Spark 由 Scala 语言开发的,所以接下来的开发所使用的语言也为 Scala,当前使用的 Spark 版本为 3.0.0,默认采用的 Scala 编译版本为 2.12,所以后续开发时,我们依然采用2.12的scala版本。开发前请保证 IDEA 开发工具中含有 Scala 开发插件

File—Settings—plugins

创建spark实现的WordCount程序

1.创建Maven项目

2.在pom.xml中添加依赖

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.1.0</version>
    </dependency>
</dependencies>

<build>
<plugins>
<!-- 该插件用于将 Scala 代码编译成 class 文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
    <!-- 声明绑定到 maven  compile 阶段 -->
    <goals>
        <goal>testCompile</goal>
    </goals>
</execution>
</executions>
</plugin>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.1.0</version>
        <configuration>
            <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
        </configuration>
        <executions>
            <execution>
                <id>make-assembly</id>
                <phase>package</phase>
                <goals>
                    <goal>single</goal>
                </goals>
            </execution>
        </executions>
    </plugin>
</plugins>
</build>

保存之后重新加载。

3.创建Spark-core子模块

4.将spark-core当中的java文件夹重命名为scala。

5.在scala文件夹中创建Scala的object程序。

6.编写wordCount的spark程序

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    // 创建 Spark 上下文环境对象(连接对象)
    val sc : SparkContext = new SparkContext(sparkConf)
    // 读取文件数据
    val fileRDD: RDD[String] = sc.

textFile("Spark-core/input/word.txt")
    // 将文件中的数据进行分词
    val wordRDD: RDD[String] = fileRDD.flatMap( _.split(" ") )
    // 转换数据结构 word => (word, 1)
    val word2OneRDD: RDD[(String, Int)] = wordRDD.map((_,1))
    // 将转换结构后的数据按照相同的单词进行分组聚合
    val word2CountRDD: RDD[(String, Int)] = word2OneRDD.reduceByKey(_+_)
    // 将数据聚合结果采集到内存中
    val word2Count: Array[(String, Int)] = word2CountRDD.collect()
    // 打印结果
    word2Count.foreach(println)
    //关闭 Spark 连接
    sc.stop()

  }
}

7.在Spark-core中创建名为input的文件夹,在此文件夹中创建word.txt文件,并在文件中添加需要进行统计的语句。

测试文本:

Spark is a unified analytics engine for large-scale data processing

It provides high-level APIs in Scala Java Python and R and an optimized engine that

supports general computation graphs for data analysis

It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames

MLlib for machine learning GraphX for graph processing and Structured Streaming for stream processing

8.运行编写好的WordCount程序

9.配置日志文件

执行过程中,会产生大量的执行日志,如果为了能够更好的查看程序的执行结果,可以在项

目的 resources 目录中创建 log4j.properties 文件,并添加日志配置信息:

log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/ddHH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to ERROR. When running the spark-shell,the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

10.配置完成后重新执行代码

11.常见问题解决

如果本机操作系统是 Windows,在程序中使用了 Hadoop 相关的东西,比如写入文件到 HDFS,则会遇到如下异常:

出现这个问题的原因,并不是程序的错误,而是 windows 系统用到了 hadoop 相关的服 务,解决办法是通过配置关联到 windows 的系统依赖就可以了

首先确保在本地磁盘中有hadoop的相关内容且配置了环境变量

然后在Idea中配置 Run Configuration,添加 HADOOP_HOME 变量

创建RDD

在 Spark 中创建 RDD 的创建方式可以分为四种:

1. 从集合(内存)中创建 RDD

从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD

val sparkConf =
  new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)

val rdd1 = sparkContext.parallelize(
  List(1,2,3,4)
)
val rdd2 = sparkContext.makeRDD(
  List(1,2,3,4)
)
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)


sparkContext.stop()

从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法

def makeRDD[T: ClassTag](

 seq: Seq[T],

 numSlices: Int = defaultParallelism): RDD[T] = withScope {

 parallelize(seq, numSlices)

}

2. 从外部存储(文件)创建 RDD

由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集, 比如 HDFS、HBase 等。

val fileRDD: RDD[String] = sparkContext.textFile("spark-core/input")
fileRDD.collect().foreach(println)

3. 从其他 RDD 创建

主要是通过一个 RDD 运算完后,再产生新的 RDD。

4. 直接创建 RDD(new)

使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。

RDD并行度与分区

默认情况下,Spark 可以将一个作业切分多个任务(Task)后,发送给 Executor 节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。记住,这里的并行执行的任务数量,并不是指的切分任务的数量

val dataRDD: RDD[Int] =
  sparkContext.makeRDD(
    List(1,2,3,4),
    4)
val fileRDD: RDD[String] =
  sparkContext.textFile(
    "spark-core/input",
    2)
fileRDD.collect().foreach(println)

读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的

Spark 核心源码如下:

def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {

 (0 until numSlices).iterator.map { i =>

 val start = ((i * length) / numSlices).toInt

 val end = (((i + 1) * length) / numSlices).toInt

 (start, end)

 }

}

读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体 Spark 核心源码如下

public InputSplit[] getSplits(JobConf job, int numSplits)

 throws IOException {

 long totalSize = 0; // compute total size

 for (FileStatus file: files) { // check we have valid files

 if (file.isDirectory()) {

 throw new IOException("Not a file: "+ file.getPath());

 }

 totalSize += file.getLen();

 }

 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

 long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.

 FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

 ...

 for (FileStatus file: files) {

 ...

 if (isSplitable(fs, path)) {

 long blockSize = file.getBlockSize();

 long splitSize = computeSplitSize(goalSize, minSize, blockSize);

 ...

 }

 protected long computeSplitSize(long goalSize, long minSize,

 long blockSize) {

 return Math.max(minSize, Math.min(goalSize, blockSize));

 }


网站公告

今日签到

点亮在社区的每一天
去签到