7.7晚自习作业

发布于:2025-07-08 ⋅ 阅读:(23) ⋅ 点赞:(0)

实操作业02:Spark核心开发

作业说明

  • 请严格按照步骤操作,并将最终结果文件(命名为:sparkcore_result.txt)于20点前上传
  • 结果文件需包含每一步的关键命令执行结果文本输出。

一、数据读取与转换操作

  1. 上传账户数据$DATA_EXERCISE/accounts到HDFS的/dw/accounts目录,从HDFS路径/dw/accounts读取accounts数据文件
hadoop fs -mkdir -p /dw/accounts
hadoop fs -put $DATA_EXERCISE/accounts /dw/accounts/

  1. 将每行数据按逗号分割成字段数组
  2. 以邮政编码字段(第9个字段)作为key,创建键值对RDD
  3. 查看转换后的数据结构,显示前2条记录

step1. 创建 RDD(读取所有 part 文件)

// 使用通配符 * 读取目录下所有 part 文件
val accountsRDD = sc.textFile("hdfs://master:8020/dw/accounts/accounts/part-*")

// 验证数据加载
accountsRDD.take(2).foreach(println)

step2:数据转换

// 一、数据转换操作
val zipKeyValueRDD = accountsRDD.map(line => {
  val fields = line.split(",")  // 按逗号分割每行数据
  val zipCode = fields(8)      // 提取第9个字段(索引8)作为邮政编码
  (zipCode, fields)            // 返回(邮编, 字段数组)的键值对
})

// 显示前2条
zipKeyValueRDD.take(2).foreach { 
  case (zip, fields) => println(s"Key: $zip | Value: ${fields.mkString(",")}")
}

为什么可以确定line是一个数组属性的变量呢

解析:
line: 是一个 String 类型的变量,表示CSV文件的某一行(例如:"1,John,Doe,10001")
.split(","):
是String类的方法,按指定的分隔符(这里是逗号 ,)切割字符串
返回一个 Array[String](字符串数组)
val fields:
定义一个 不可变变量(Scala中用val声明)
类型由编译器自动推断为 Array[String]
存储切割后的字段数组

可以通过Spark-shell直接验证类型:
// 示例数据
val line = "1,John,Doe,10001"
// 执行切割
val fields = line.split(",")  // 得到:Array("1", "John", "Doe", "10001")
// 查看类型
println(fields.getClass)  // 输出:class [Ljava.lang.String; (表示String数组)

这三行代码也只能输出一行数据的转换结果,但是这个是一整个文件的转换结果,应该是有循环在里面的,循环体现在哪呢

这里的关键在于 Spark的分布式计算模型 和 RDD的转换操作 的运作机制。让我们深入解析这个"隐式循环"的实现原理:

1. 表面现象 vs 底层实现

你看到的代码(看似单行处理)

scala

val zipKeyValueRDD = accountsRDD.map(line => {
  val fields = line.split(",")  // 似乎只处理一行
  val zipCode = fields(8)
  (zipCode, fields)
})

实际发生的处理(分布式并行处理)

python

# 伪代码:Spark集群的实际执行逻辑
for partition in accountsRDD.partitions:    # 遍历所有数据分区
  for line in partition:                   # 分区内逐行处理(真正的循环)
    fields = line.split(",")
    emit((fields[8], fields))             # 输出键值对

2. 循环的隐藏位置

Spark通过以下机制实现"隐式循环":

组件 作用 类比传统编程
RDD.map() 定义转换逻辑 相当于循环体
Partitions(分区) 数据分片 相当于外层循环
Tasks(任务) 并行处理单元 相当于多线程循环

3. 完整数据处理流程

  1. 输入阶段

    • 假设原始文件有100万行,被Spark自动划分为N个分区(如HDFS的block数)

  2. 并行执行阶段

    • 每个分区的数据会被不同的Executor进程处理

    • 每个Executor对分配到的分区数据 逐行执行map函数

  3. 输出阶段

    • 所有节点的处理结果合并成新的RDD

4. 关键问题解答

Q: 为什么代码没有显式循环却能处理整个文件?

  • Spark的RDD操作是声明式的:你只需要定义"做什么"(业务逻辑),Spark框架自动处理"怎么做"(分布式循环)

  • 数据分区和并行计算

    scala

    // 假设文件被分为3个分区
    accountsRDD.partitions.size  // 返回分区数

    每个分区会触发一个Task,每个Task内部会循环处理其分配到的数据

Q: 如何验证确实处理了所有数据?

scala

// 查看处理后的记录数
zipKeyValueRDD.count() 

// 查看分区处理情况(需在Spark UI观察)
zipKeyValueRDD.toDebugString


5. 与传统编程的对比

特性 Spark RDD 传统Java/Python
循环方式 隐式(由框架管理) 显式(for/while)
并行度 自动分区并行 需手动线程/进程管理
数据规模 支持TB/PB级 通常单机内存限制
代码示例 rdd.map(x => f(x)) for x in list: f(x)
  • 循环确实存在:但被Spark框架隐藏在分布式运行时中

  • 你的代码是"循环体"map中的逻辑会被应用到每一行数据

  • 优势:开发者无需关心并行化和数据分发,专注业务逻辑

二、数据聚合操作

  1. 对上述RDD提取每个记录的姓名字段:
  2. 提取第5个字段(first_name)和第4个字段(last_name)
  3. 将姓和名用逗号连接
val nameByZipRDD = zipKeyValueRDD.mapValues(fields => 
  s"${fields(4)},${fields(3)}"  // 格式化为"姓,名"
)

    在Scala中,s"${fields(4)},${fields(3)}" 是一种称为 字符串插值(String Interpolation) 的语法

    1. 字符串插值的组成

    部分 含义 示例
    开头的s 表示启用字符串插值 s"..."
    ${} 插入变量/表达式的语法 ${fields(4)}
    引号内内容 固定字符串+动态变量组合 "姓,名"

    2. 具体到代码

    scala

    s"${fields(4)},${fields(3)}"
    • 等效的普通写法

      scala

      fields(4) + "," + fields(3)  // 直接字符串拼接

    • 执行过程

      1. 取出数组fields的第5个元素(索引4)

      2. 取出第4个元素(索引3)

      3. 用逗号连接两者

    3. 对比其他语言

    语言 类似语法 示例
    Scala s"${var}" s"Hello, ${name}"
    Python f-string f"Hello, {name}"
    JavaScript 模板字符串 `Hello, ${name}`

    1. map vs mapValues 的本质区别

    操作 函数签名 输入 → 输出 在你的代码中的应用
    map (T) => U 整个元素 → 新元素 line => (zipCode, fields)
    mapValues (V) => U 仅值部分 → 新值(键不变) fields => "姓,名"

    2.代码中两个阶段的解析

    (1)第一阶段:数据转换 (map)

    scala

    val zipKeyValueRDD = accountsRDD.map(line => {
      val fields = line.split(",")       // String → Array[String]
      val zipCode = fields(8)           // 提取key
      (zipCode, fields)                 // 返回: (String, Array[String])
    })
    • line => 的含义:

      • 输入:原始字符串(如 "1,John,Doe,10001"

      • 输出:完全新建的键值对 (String, Array[String])

    • 数据流

      text

      "1,John,Doe,10001" 
        → split → ["1","John","Doe","10001"] 
        → 取fields(8)作为key 
        → 输出 ("10001", ["1","John","Doe","10001",...])
    (2)第二阶段:聚合 (mapValues)

    scala

    val nameByZipRDD = zipKeyValueRDD.mapValues(fields => 
      s"${fields(4)},${fields(3)}"  // 仅修改value部分
    )
    • fields => 的含义:

      • 输入:已有键值对的值部分(即之前的 Array[String]

      • 输出:仅更新值(键 zipCode 保持不变)

    • 数据流

      text

      输入: ("10001", ["1","John","Doe","10001",...])
        → 提取fields(4)和fields(3) 
        → 输出 ("10001", "Doe,John")  // 键未改变!

    3. => 的本质

    • => 是Scala中的函数定义符号,表示:

      scala

      val func: InputType => OutputType = (input) => { 
        // 处理input 
        output 
      }
    • 在代码中:

      • line => ...:定义了一个从 String 到 (String, Array[String]) 的函数

      • fields => ...:定义了一个从 Array[String] 到 String 的函数

    1. 按邮政编码分组
    2. 查看聚合结果,显示前2条记录
    val groupedByNameRDD = nameByZipRDD.groupByKey()
    
    // 显示前2组
    groupedByNameRDD.take(2).foreach {
      case (zip, names) => println(s"$zip -> ${names.mkString("; ")}")
    }

    三、数据排序与展示

    1. 对分组后的RDD按邮政编码进行升序排列
    2. 取前5条记录进行展示
    3. 对每条记录,先打印邮政编码,然后打印该邮政编码下的所有姓名列表
    groupedByNameRDD.sortByKey().take(5).foreach {
      case (zip, names) =>
        println(s"\n=== 邮政编码: $zip ===")
        names.foreach(println)
    }


    四、提交要求

    1. 代码和结果文件:将代码及其执行后的输出结果保存到sparkcore_result.txt文件中

    2. 结果文件应包含

    3. 数据读取与转换操作的代码和输出结果
    4. 数据聚合操作的代码和输出结果
    5. 数据排序与展示的代码和输出结果


    网站公告

    今日签到

    点亮在社区的每一天
    去签到