Spark实时流数据处理实例(SparkStreaming通话记录消息处理)

发布于:2025-05-28 ⋅ 阅读:(19) ⋅ 点赞:(0)

所用资源:

通过网盘分享的文件:spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar等4个文件
链接: https://pan.baidu.com/s/1zYHu29tLgDvS_L2Ud-22ZA?pwd=hnpg 提取码: hnpg

1.需求分析 :

假定有一个手机通信计费系统,用户通话时在基站交换机上临时保存了相关记录,由于交换机的容量 有限且分散在各地,因此需要及时将这些通话记录汇总到计费系统中进行长时间保存,以方便后续的 统计分析。

2.准备工作:

(1)确保Kafka服务已经启动,可在Linux终端窗体中使用jps命令查看具体的进程

spark@vm01:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties &

[1] 2770

spark@vm01:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties &

[2] 3128

spark@vm01:/usr/local/kafka$ jps

2770 QuorumPeerMain

3128 Kafka

2104 Main

3529 Jps

(2)创建从130到139的十个主题,为简单起见,通过kafka附带的脚本命令来完成

spark@vm01:/usr/local/kafka$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic 130

Created topic 130.

查看:

spark@vm01:/usr/local/kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2181

130

(3)启动HDFS服务,使用jps查看是否有相关进程在运行

spark@vm01:/usr/local/kafka$ start-dfs.sh

Starting namenodes on [localhost]

localhost: starting namenode, logging to /usr/local/hadoop-2.6.5/logs/hadoop-spark-namenode-vm01.out

localhost: starting datanode, logging to /usr/local/hadoop-2.6.5/logs/hadoop-spark-datanode-vm01.out

Starting secondary namenodes [0.0.0.0]

0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop-2.6.5/logs/hadoop-spark-secondarynamenode-vm01.out

spark@vm01:/usr/local/kafka$ ^C

spark@vm01:/usr/local/kafka$ jps

4081 SecondaryNameNode

2770 QuorumPeerMain

4181 Jps

3128 Kafka

2104 Main

3710 NameNode

3887 DataNode

(4)在HDFS根目录中创建datas目录及日期的子目录,根据自己当前运行的程序时间进行创建即可

spark@vm01:/usr/local/kafka$ cd

spark@vm01:~$ hdfs dfs -mkdir -p /datas/202505

spark@vm01:~$ hdfs dfs -mkdir -p /datas/202506

(5)在python3.6环境中安装一个kafka-python库,以便程序能够正常访问kafka,后面需要填写一个专门的python程序来模拟基站交换机随机产生通话记录

spark@vm01:~$ sudo pip install kafka-python

[sudo] spark 的密码:

(6)启动PyCharm集成开发环境,在其中创建一个名为SparkKafkaBilling的项目,对应的Python解释器使用python3.6即可

点击file,newproject

文件名(位置):/home/spark/PycharmProjects/SparkKafkaBilling

编译:python3.6

点击创建

3.通话记录生产者模拟:

(1)在新建的项目SparkKafkaBilling中创建CallMsgProducer.py文件,然后输入代码,负责按照要求的记录格式模拟产生通话消息,并将其发送到Kafka的topic中。

from kafka import KafkaProducer

import random, datetime, time

# 产生一个以13开头的手机号字符串,共11位

def gen_phone_num():

    phone = '13'

    for x in range(9):

        phone = phone + str(random.randint(0, 9))

    return phone

(2)为了持续不断地生成新的通话记录信息,可以使用一个循环创建符合格式要求的通话记录信息字符串,且每产生一条消息后休眠随机的时长,然后继续生成下一条通话记录

# Kafka的消息生产者对象准备

producer = KafkaProducer(bootstrap_servers="localhost:9092")

working = True

tformat = '%Y-%m-%d %H:%M:%S'     #设置时间日志格式

while working:

    # 主叫号码,被叫号码,呼叫时间(模拟当前时间的前一天),接通时间,挂断时间

    src_phone = gen_phone_num()

    dst_phone = gen_phone_num()

    dail_time = datetime.datetime.now() + datetime.timedelta(days=-1)

    call_time = dail_time + datetime.timedelta(seconds=random.randint(0, 10))

    hangup_time = call_time + datetime.timedelta(seconds=random.randint(5, 600))

    # 将时间格式化为所需的字符串格式,类似2025-05-27 09:30:25

    s_dail_time = dail_time.strftime(tformat)

    s_call_time = call_time.strftime(tformat)

    s_hangup_time = hangup_time.strftime(tformat)

    # 生成通话记录消息字符串

    record = '%s,%s,%s,%s,%s' % (src_phone, dst_phone, s_dail_time, s_call_time, s_hangup_time)

    print('send : ', record)

    # 通话记录的主叫号码前三位为topic主题

    topic = src_phone[0:3]

    # 将通话记录字符串转换为字节数组

    msg = bytes(record, encoding='utf-8')

    # 调用send()将通话记录消息发送给Kafka

    producer.send(topic=topic, key=b"call",value=msg)

    # 休眠一个随机的时长,为一个0-1秒之间的随机小数

    time.sleep( random.random() )

producer.close()

4.消息接收者测试:

(1)在SparkKafkaBilling项目中创建CallMsgBilling.py文件,将Kafka中130~139这10个topic(主题)的消息接收并在屏幕上打印显示出来

如果报错 先执行(2),再重新运行

from pyspark.streaming.kafka import KafkaUtils

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

sc = SparkContext('local[2]','KafkaStreamBilling')

sc.setLogLevel("OFF")

ssc = StreamingContext(sc, 5)

streamRdd = KafkaUtils.createDirectStream(ssc,

               topics = ['130','131','132','133','134',

                         '135','136','137','138','139'],

               kafkaParams = {"metadata.broker.list":"localhost:9092"} )

streamRdd.pprint()

ssc.start()

ssc.awaitTermination()

(2)打开一个Linux终端窗体,在其中输入下面的命令,将消息接收者程序提交到Spark中运行,其中用到的spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar依赖库文件此前已下载放在~/streaming目录中,为避免每次提交应用程序时在命令行手动指定,可以将其复制到集群的各节点Spark安装目录中(位于/usr/local/spark/jars目录)

spark@vm01:~$ ls streaming

 FileStreamDemo.py

'IDLE (Python 3.7 64-bit).lnk'

 KafkaStreamDemo.py

 NetworkWordCountall.py

 NetworkWordCount.py

 NetworkWordCount.py.txt

 NetworkWordCountSave.py

 NetworkWordCountWindow.py

 spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar

spark@vm01:~$ cp streaming/*.jar /usr/local/spark/jars

spark@vm01:~$ cd PycharmProjects/SparkKafkaBilling

spark@vm01:~/PycharmProjects/SparkKafkaBilling$ spark-submit CallMsgBilling.py

(3)回到PyCharm集成开发环境中,运行CallMsgProducer.py程序,在底部会源源不断地显示模拟产生的通话记录消息

(4)再切换到运行消息接收者程序的Linux终端窗体,发现其不断地接收发送过来的消息

从输出结果可以清楚地看到,接收的Kafka消息是一系列(K,V)键值对形式的二元组,其中的K代表

CallMsgProducer.py程序中设定的"call"字符串,V代表消息内容。键(K)可以设置成任意字符串,当然

也可以不设置,实际使用的是二元组里面的值(V),即消息内容

5.Spark Streaming通话记录消息处理:将生成的通话记录消息进行简单的处理并保存在HDFS中

(1)在项目的main.py文件中将原有代码删除,并添加下面的代码

from pyspark.streaming.kafka import KafkaUtils

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

from pyspark.sql import SparkSession

from datetime import datetime

# 初始化sc、ssc、spark等几个核心变量

spark = SparkSession.builder \

    .master('local[2]') \

    .appName('KafkaStreamingBilling') \

    .getOrCreate()

sc = spark.sparkContext

sc.setLogLevel("OFF")  #关闭日志

ssc = StreamingContext(sc, 5)

spark = SparkSession(sc)

(2)定义process()和saveYmCallData()函数

# 定义一个处理消息的函数,返回一个通话记录的元组

# (主叫号码,呼叫时间,接通时间,挂断时间,通话时长,被叫号码,年月)

def process(x):

    v = x[1].split(',')

    tformat = '%Y-%m-%d %H:%M:%S'

    d1 = datetime.strptime(v[3], tformat)

    d2 = datetime.strptime(v[4], tformat)

    ym = '%d%02d' % (d1.year, d1.month)

    sec = (d2-d1).seconds

    minutes = sec//60 if sec%60==0 else sec//60+1

    return (v[0],v[2],v[3],v[4],str(minutes),v[1],ym)

# 根据参数row中的年月信息,获取相应的通话消息记录,并保存到HDFS

def saveYmCallData(row):

    year_month = row.ym

    path = "hdfs://localhost:9000/datas/" + year_month + "/"

    ymdf = spark.sql("select * from phonecall where ym='" + year_month +"'")

    ymdf.drop('ym').write.save(path, format="csv", mode="append")

(3)再定义一个save()函数,以实现DStream的通话记录消息保存

# 保存DStream的消息记录

def save(rdd):

    if not rdd.isEmpty():

        rdd2 = rdd.map(lambda x: process(x))

        print(rdd2.count())

        df0 = rdd2.toDF(['src_phone', 'dail_time', 'call_time', 'hangup_time',

                         'call_minutes', 'dst_phone', 'ym'])

        df0.createOrReplaceTempView("phonecall")

        df1 = spark.sql('select distinct ym from phonecall')

        if df1.count() == 1:

            print('ooooooooooo')

            year_month = df1.first().ym

            path = "hdfs://localhost:9000/datas/" + year_month + "/"

            df0.drop("ym").write.save(path, format="csv", mode="append")

        else:

            df1.foreach(saveYmCallData)

(4)通过Kafka数据源创建一个DSteam对象,并开始Spark Streaming应用程序的循环

# 从Kafka的多个topic中接收消息

streamRdd = KafkaUtils.createDirectStream(ssc,

               topics = ['130','131','132','133','134',

                         '135','136','137','138','139'],

               kafkaParams = {"metadata.broker.list":"localhost:9092"})

streamRdd.pprint()

streamRdd.foreachRDD(save)

ssc.start()

ssc.awaitTermination()

(5)功能代码编写完毕,现在可以切换到Linux终端窗体,启动main.py程序

spark@vm01:~/PycharmProjects/SparkKafkaBilling$ spark-submit main.py

(6)再打开一个新的Linux终端窗体,启动消息生产者程序CallMsgProducer.py

cd ~/PycharmProjects/SparkKafkaBilling

spark@vm01:~/PycharmProjects/SparkKafkaBilling$ python CallMsgProducer.py

然后可以查看main.py程序所在终端窗体显示的通话记录消息

(7)最后,在HDFS上可以验证收到的通话记录消息是否被成功保存,注意应将下面目录路径中的年月改为实际的时间,这是因为数据是按照当前机器时间在运行的

spark@vm01:~$ hdfs dfs -cat /datas/202505/part-*

至此,我们就完成了整个通话记录处理功能的实现


网站公告

今日签到

点亮在社区的每一天
去签到