7.7日 实验03-Spark批处理开发(1)

发布于:2025-07-08 ⋅ 阅读:(12) ⋅ 点赞:(0)

使用Spark Shell探索RDD

启动并使用Scala Spark Shell

在终端窗口,启动Scala Spark shell:

spark-shell

查看对象:

scala> sc
scala> spark

输入spark.[TAB]然后可以看到所有可用的方法。

读并显示文本文件

查看文本$DATA_EXERCISE/frostroad.txt

读取本地文件来创建RDD。Spark并没有读文件,直到你执行了action操作,比如统计数据集行数。

  • mydata: - 这是你定义的变量名

val mydata = sc.textFile("/root/dataExercise/frostroad.txt")

尝试执行collect操作来显示RDD的所有数据。

输入mydata.[TAB]可以看到所有可用的转换操作。

输入exit退出。

使用RDD来转换数据集

探索Web日志文件(之前的实验1导入过)

日志文件/dw/weblogs示例:

22.252.151.164 - 23251 [15/Mar/2014:00:00:30 +0100] "GET /KBDOC-00150.html HTTP/1.0" 200 19203 "http://www.loudacre.com"  "Loudacre Mobile Browser Sorrento F11L"
22.252.151.164 - 23251 [15/Mar/2014:00:00:30 +0100] "GET /theme.css HTTP/1.0" 200 10684 "http://www.loudacre.com"  "Loudacre Mobile Browser Sorrento F11L"

从数据文件创建RDD。

val logsRDD = sc.textFile("/dw/weblogs")

创建只包含请求图片JPG文件的RDD。

val jpgRDD = logsRDD.filter(line => line.contains("GET") && 
             (line.toLowerCase.contains(".jpg") || line.toLowerCase.contains(".jpeg")))
val jpgRDD

定义一个新的不可变变量(val),用来存储过滤后的结果RDD

logsRDD.filter(...)

在原始的logsRDD上调用filter转换操作

filter会逐行检查日志,只保留满足条件的记录

过滤条件分解:

line.contains("GET")
检查该行是否包含"GET"字符串(表示HTTP GET请求)

line.toLowerCase.contains(".jpg")
将行转换为小写后检查是否包含".jpg"(避免大小写问题)

line.toLowerCase.contains(".jpeg")
同上,检查".jpeg"扩展名

line => 是函数定义的开始:

line 是输入参数(这里代表日志文件的每一行)

=> 是 Scala 中分隔「参数」和「函数体」的符号

使用take查看前10行数据。

jpgRDD.take(10).foreach(println)

在日志中返回每行的长度。

val lineLengths = logsRDD.map(_.length)
lineLengths.take(5)

把每一行映射成一个数组,查看前5条。

val splitLines = logsRDD.map(_.split("\\s+"))
splitLines.take(5).foreach(arr => println(arr.mkString("[", ", ", "]")))

定义新的RDD,日志文件的每一行只包含IP地址。

val ipRDD = logsRDD.map(_.split("\\s+")(0))

最后,保存IP地址列表到/dw/iplist

ipRDD.saveAsTextFile("/dw/iplist")

在终端窗口或Hue文件浏览器,列出/dw/iplist目录内容。你可以看到多个part-xxxxx文件。查看文件内容确认结果是正确的。

hdfs dfs -ls /dw/iplist
hdfs dfs -cat /dw/iplist/part-00000 | head


网站公告

今日签到

点亮在社区的每一天
去签到