Spark-Scala语言实战(15)

发布于:2024-04-16 ⋅ 阅读:(25) ⋅ 点赞:(0)

在之前的文章中,我们学习了如何在spark中使用键值对中的学习键值对方法中的lookup,cogroup两种方法。想了解的朋友可以查看这篇文章。同时,希望我的文章能帮助到你,如果觉得我的文章写的不错,请留下你宝贵的点赞,谢谢。

Spark-Scala语言实战(14)-CSDN博客文章浏览阅读1.5k次,点赞33次,收藏25次。今天开始的文章,我会带给大家如何在spark的中使用我们的键值对方法,今天学习键值对方法中的lookup,cogroup两种方法。希望我的文章能帮助到大家,也欢迎大家来我的文章下交流讨论,共同进步。https://blog.csdn.net/qq_49513817/article/details/137441090

今天的文章开始,我会带着大家来做三道任务,运用之前学到的方法,温故知新,举一反三,将知识紧紧掌握,希望你能在我的文章中有所收获。 

目录

一、知识回顾

二、任务实现

1.使用Spark完成单词去重

2.使用Spark统计133 136 139开头的总流量

3.完成统计相同字母组成的单词


一、知识回顾

上一篇文章中我们学习了RDD键值对的两种方法,分别是lookup,cogroup。

lookup是我们的查找方法,它用于返回我们指定键所对应的值。

我们创建了两个RDD一个名为p包含了我们的键值对,一个名为pp包含了我们需要查找的键。然后使用 map来实现我们的lookoup方法。 

运行代码它就会返回我们需要的键所对应的值,没有就会输出None

然后就是我们 cogroup方法。它是一种常见的组合操作,用于合并两个或多个数据组中具有相同键的数据。

可以看到代码我们通过 cogroup方法将p1,p2组合到一起了,那么收集结果打印出来会是什么样子呢?

它将我们值通过键全部合并在一起了

复习完毕,现在开始今天的学习吧~

二、任务实现

1.使用Spark完成单词去重

 现在我们有一个名为text01的txt文件

它里面的数据如下:

java php hello word
php hi exe java 
python hello kitty
php happy abc java

现在,我们需要用到之前所学的知识将它进行去重操作

解题思路:

首先我们肯定要将文件的内容读取出来

val p = sc.textFile("C:\\IDEA\\P1\\p1\\text01.txt")

然后就是切分我们文件的内容,将它里面的单词转换成一个数组啊,列表啊或者其他,根据需求转化。

    def pp(line: String): Array[String] = {
      line.split("\\s+")//'\s+中\ 是转义字符。s 是代表空白字符的元字符。+ 表示前面的字符或组(在这里是 \s)可以出现一次或多次。'
    }

这里我们使用了一个\\s+,它的意思注释的很清楚,主要作用还是设置我们的切分。

现在准备工作做完了就可以开始去重了

    val ppp= p.flatMap(pp)
    val pppp = ppp.distinct().collect()
    pppp.foreach(println)

 使用我们的distinct()方法进行去重操作,最后收集RDD数据并打印

完整代码:

import org.apache.spark.{SparkConf, SparkContext}

object p1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("ppp")
    val sc = new SparkContext(conf)
    val p = sc.textFile("C:\\IDEA\\P1\\p1\\text01.txt")

    def pp(line: String): Array[String] = {
      line.split("\\s+")//'\s+中\ 是转义字符。s 是代表空白字符的元字符。+ 表示前面的字符或组(在这里是 \s)可以出现一次或多次。'
    }
    val ppp= p.flatMap(pp)
    val pppp = ppp.distinct().collect()
    pppp.foreach(println)
  }
}

 运行代码:

 

可以看到成功完成任务需求,读取外部文件并去重。 

2.使用Spark统计133 136 139开头的总流量

我们这里有两对数据,分别是手机号和使用的流量

13326293050 81
13626594101  50
13326435696  30
13926265119  40
13326564191  2106
13626544191  1432
13919199499  300

我们需要将它通过手机号前三位区分,然后统计133 136 139开头的总流量

解题思路:

在这里,我想到的方法是创建一个键值对,将手机号和流量进行一个对应

    val p = sc.parallelize(Array(
      (13326293050L, 81),
      (13626594101L, 50),
      (13326435696L, 30),
      (13926265119L, 40),
      (13326564191L, 2106),
      (13626544191L, 1432),
      (13919199499L, 300)//int超出存储限制,添加L变为long
    ))

 然后进行取前手机号三位并且使用groupByKey方法分组的方法

    val pp = p.map { case (phone, value) =>
      val prefix = (phone / 1000000).toString.take(3)
      (prefix, value)}.groupByKey()//取出前三位并分组

 最后将我们三个组里的数据进行一个sum求和并打印

    val ppp=pp.mapValues(_.sum)//值相加
    ppp.foreach(println)

 完整代码:

import org.apache.spark.{SparkConf, SparkContext}

object p2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("ppp")
    val sc = new SparkContext(conf)
    val p = sc.parallelize(Array(
      (13326293050L, 81),
      (13626594101L, 50),
      (13326435696L, 30),
      (13926265119L, 40),
      (13326564191L, 2106),
      (13626544191L, 1432),
      (13919199499L, 300)//int超出存储限制,添加L变为long
    ))
    val pp = p.map { case (phone, value) =>
      val prefix = (phone / 1000000).toString.take(3)
      (prefix, value)}.groupByKey()//取出前三位并分组
    val ppp=pp.mapValues(_.sum)//值相加
    ppp.foreach(println)
  }
}

 运行代码:

 完成任务,分组并统计流量

3.完成统计相同字母组成的单词

现在我们有一个名为text02的txt文件

它里面的数据如下:

abc acb java
avaj bac
cba abc
jvaa php hpp
pph python thonpy

现在,我们需要用到之前所学的知识将它统计相同字母组成的单词出现的次数。

解题思路:

首先,肯定还是要读取文件

    val p = sc.textFile("C:\\IDEA\\P1\\p1\\text02.txt")

 然后对单词中的字母排序

    def sortLetters(word: String): String = {
      word.toLowerCase().replaceAll("\\s+", "").sorted}

最后,分割单词,进行排序与reduceByKey方法统计 

    val pp = p
      .flatMap(_.split("\\s+")) // 分割每行文本为单词
      .map(word => (sortLetters(word), 1)) // 对单词中的字母进行排序,并映射到计数1
      .reduceByKey(_ + _) // 统计相同字母组成的单词数量

收集数据并打印

    pp.collect().foreach(println)

 完整代码:

import org.apache.spark.{SparkConf, SparkContext}

object p3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("ppp")
    val sc = new SparkContext(conf)
    val p = sc.textFile("C:\\IDEA\\P1\\p1\\text02.txt")

    // 用于对单词中的字母进行排序
    def sortLetters(word: String): String = {
      word.toLowerCase().replaceAll("\\s+", "").sorted}
    // 对每一行文本进行处理,统计具有相同字母组成的单词数量
    val pp = p
      .flatMap(_.split("\\s+")) // 分割每行文本为单词
      .map(word => (sortLetters(word), 1)) // 对单词中的字母进行排序,并映射到计数1
      .reduceByKey(_ + _) // 统计相同字母组成的单词数量
    pp.collect().foreach(println)
  }
}

  运行代码:

  完成任务,统计相同字母组成的单词次数

 最后,代码的可变性很多,不同的写法不同的方法有时候也能完成相同的任务。我的解题思路可以当作一种参考,期待大家能用自己不同的方式完成任务。