所用资源:
通过网盘分享的文件:spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar等4个文件
链接: https://pan.baidu.com/s/1zYHu29tLgDvS_L2Ud-22ZA?pwd=hnpg 提取码: hnpg
1.需求分析 :
假定有一个手机通信计费系统,用户通话时在基站交换机上临时保存了相关记录,由于交换机的容量 有限且分散在各地,因此需要及时将这些通话记录汇总到计费系统中进行长时间保存,以方便后续的 统计分析。
2.准备工作:
(1)确保Kafka服务已经启动,可在Linux终端窗体中使用jps命令查看具体的进程
spark@vm01:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties &
[1] 2770
spark@vm01:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties &
[2] 3128
spark@vm01:/usr/local/kafka$ jps
2770 QuorumPeerMain
3128 Kafka
2104 Main
3529 Jps
(2)创建从130到139的十个主题,为简单起见,通过kafka附带的脚本命令来完成
spark@vm01:/usr/local/kafka$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic 130
Created topic 130.
查看:
spark@vm01:/usr/local/kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2181
130
(3)启动HDFS服务,使用jps查看是否有相关进程在运行
spark@vm01:/usr/local/kafka$ start-dfs.sh
Starting namenodes on [localhost]
localhost: starting namenode, logging to /usr/local/hadoop-2.6.5/logs/hadoop-spark-namenode-vm01.out
localhost: starting datanode, logging to /usr/local/hadoop-2.6.5/logs/hadoop-spark-datanode-vm01.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop-2.6.5/logs/hadoop-spark-secondarynamenode-vm01.out
spark@vm01:/usr/local/kafka$ ^C
spark@vm01:/usr/local/kafka$ jps
4081 SecondaryNameNode
2770 QuorumPeerMain
4181 Jps
3128 Kafka
2104 Main
3710 NameNode
3887 DataNode
(4)在HDFS根目录中创建datas目录及日期的子目录,根据自己当前运行的程序时间进行创建即可
spark@vm01:/usr/local/kafka$ cd
spark@vm01:~$ hdfs dfs -mkdir -p /datas/202505
spark@vm01:~$ hdfs dfs -mkdir -p /datas/202506
(5)在python3.6环境中安装一个kafka-python库,以便程序能够正常访问kafka,后面需要填写一个专门的python程序来模拟基站交换机随机产生通话记录
spark@vm01:~$ sudo pip install kafka-python
[sudo] spark 的密码:
(6)启动PyCharm集成开发环境,在其中创建一个名为SparkKafkaBilling的项目,对应的Python解释器使用python3.6即可
点击file,newproject
文件名(位置):/home/spark/PycharmProjects/SparkKafkaBilling
编译:python3.6
点击创建
3.通话记录生产者模拟:
(1)在新建的项目SparkKafkaBilling中创建CallMsgProducer.py文件,然后输入代码,负责按照要求的记录格式模拟产生通话消息,并将其发送到Kafka的topic中。
from kafka import KafkaProducer
import random, datetime, time
# 产生一个以13开头的手机号字符串,共11位
def gen_phone_num():
phone = '13'
for x in range(9):
phone = phone + str(random.randint(0, 9))
return phone
(2)为了持续不断地生成新的通话记录信息,可以使用一个循环创建符合格式要求的通话记录信息字符串,且每产生一条消息后休眠随机的时长,然后继续生成下一条通话记录
# Kafka的消息生产者对象准备
producer = KafkaProducer(bootstrap_servers="localhost:9092")
working = True
tformat = '%Y-%m-%d %H:%M:%S' #设置时间日志格式
while working:
# 主叫号码,被叫号码,呼叫时间(模拟当前时间的前一天),接通时间,挂断时间
src_phone = gen_phone_num()
dst_phone = gen_phone_num()
dail_time = datetime.datetime.now() + datetime.timedelta(days=-1)
call_time = dail_time + datetime.timedelta(seconds=random.randint(0, 10))
hangup_time = call_time + datetime.timedelta(seconds=random.randint(5, 600))
# 将时间格式化为所需的字符串格式,类似2025-05-27 09:30:25
s_dail_time = dail_time.strftime(tformat)
s_call_time = call_time.strftime(tformat)
s_hangup_time = hangup_time.strftime(tformat)
# 生成通话记录消息字符串
record = '%s,%s,%s,%s,%s' % (src_phone, dst_phone, s_dail_time, s_call_time, s_hangup_time)
print('send : ', record)
# 通话记录的主叫号码前三位为topic主题
topic = src_phone[0:3]
# 将通话记录字符串转换为字节数组
msg = bytes(record, encoding='utf-8')
# 调用send()将通话记录消息发送给Kafka
producer.send(topic=topic, key=b"call",value=msg)
# 休眠一个随机的时长,为一个0-1秒之间的随机小数
time.sleep( random.random() )
producer.close()
4.消息接收者测试:
(1)在SparkKafkaBilling项目中创建CallMsgBilling.py文件,将Kafka中130~139这10个topic(主题)的消息接收并在屏幕上打印显示出来
如果报错 先执行(2),再重新运行
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext('local[2]','KafkaStreamBilling')
sc.setLogLevel("OFF")
ssc = StreamingContext(sc, 5)
streamRdd = KafkaUtils.createDirectStream(ssc,
topics = ['130','131','132','133','134',
'135','136','137','138','139'],
kafkaParams = {"metadata.broker.list":"localhost:9092"} )
streamRdd.pprint()
ssc.start()
ssc.awaitTermination()
(2)打开一个Linux终端窗体,在其中输入下面的命令,将消息接收者程序提交到Spark中运行,其中用到的spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar依赖库文件此前已下载放在~/streaming目录中,为避免每次提交应用程序时在命令行手动指定,可以将其复制到集群的各节点Spark安装目录中(位于/usr/local/spark/jars目录)
spark@vm01:~$ ls streaming
FileStreamDemo.py
'IDLE (Python 3.7 64-bit).lnk'
KafkaStreamDemo.py
NetworkWordCountall.py
NetworkWordCount.py
NetworkWordCount.py.txt
NetworkWordCountSave.py
NetworkWordCountWindow.py
spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar
spark@vm01:~$ cp streaming/*.jar /usr/local/spark/jars
spark@vm01:~$ cd PycharmProjects/SparkKafkaBilling
spark@vm01:~/PycharmProjects/SparkKafkaBilling$ spark-submit CallMsgBilling.py
(3)回到PyCharm集成开发环境中,运行CallMsgProducer.py程序,在底部会源源不断地显示模拟产生的通话记录消息
(4)再切换到运行消息接收者程序的Linux终端窗体,发现其不断地接收发送过来的消息
从输出结果可以清楚地看到,接收的Kafka消息是一系列(K,V)键值对形式的二元组,其中的K代表
CallMsgProducer.py程序中设定的"call"字符串,V代表消息内容。键(K)可以设置成任意字符串,当然
也可以不设置,实际使用的是二元组里面的值(V),即消息内容
5.Spark Streaming通话记录消息处理:将生成的通话记录消息进行简单的处理并保存在HDFS中
(1)在项目的main.py文件中将原有代码删除,并添加下面的代码
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from datetime import datetime
# 初始化sc、ssc、spark等几个核心变量
spark = SparkSession.builder \
.master('local[2]') \
.appName('KafkaStreamingBilling') \
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("OFF") #关闭日志
ssc = StreamingContext(sc, 5)
spark = SparkSession(sc)
(2)定义process()和saveYmCallData()函数
# 定义一个处理消息的函数,返回一个通话记录的元组
# (主叫号码,呼叫时间,接通时间,挂断时间,通话时长,被叫号码,年月)
def process(x):
v = x[1].split(',')
tformat = '%Y-%m-%d %H:%M:%S'
d1 = datetime.strptime(v[3], tformat)
d2 = datetime.strptime(v[4], tformat)
ym = '%d%02d' % (d1.year, d1.month)
sec = (d2-d1).seconds
minutes = sec//60 if sec%60==0 else sec//60+1
return (v[0],v[2],v[3],v[4],str(minutes),v[1],ym)
# 根据参数row中的年月信息,获取相应的通话消息记录,并保存到HDFS
def saveYmCallData(row):
year_month = row.ym
path = "hdfs://localhost:9000/datas/" + year_month + "/"
ymdf = spark.sql("select * from phonecall where ym='" + year_month +"'")
ymdf.drop('ym').write.save(path, format="csv", mode="append")
(3)再定义一个save()函数,以实现DStream的通话记录消息保存
# 保存DStream的消息记录
def save(rdd):
if not rdd.isEmpty():
rdd2 = rdd.map(lambda x: process(x))
print(rdd2.count())
df0 = rdd2.toDF(['src_phone', 'dail_time', 'call_time', 'hangup_time',
'call_minutes', 'dst_phone', 'ym'])
df0.createOrReplaceTempView("phonecall")
df1 = spark.sql('select distinct ym from phonecall')
if df1.count() == 1:
print('ooooooooooo')
year_month = df1.first().ym
path = "hdfs://localhost:9000/datas/" + year_month + "/"
df0.drop("ym").write.save(path, format="csv", mode="append")
else:
df1.foreach(saveYmCallData)
(4)通过Kafka数据源创建一个DSteam对象,并开始Spark Streaming应用程序的循环
# 从Kafka的多个topic中接收消息
streamRdd = KafkaUtils.createDirectStream(ssc,
topics = ['130','131','132','133','134',
'135','136','137','138','139'],
kafkaParams = {"metadata.broker.list":"localhost:9092"})
streamRdd.pprint()
streamRdd.foreachRDD(save)
ssc.start()
ssc.awaitTermination()
(5)功能代码编写完毕,现在可以切换到Linux终端窗体,启动main.py程序
spark@vm01:~/PycharmProjects/SparkKafkaBilling$ spark-submit main.py
(6)再打开一个新的Linux终端窗体,启动消息生产者程序CallMsgProducer.py
cd ~/PycharmProjects/SparkKafkaBilling
spark@vm01:~/PycharmProjects/SparkKafkaBilling$ python CallMsgProducer.py
然后可以查看main.py程序所在终端窗体显示的通话记录消息
(7)最后,在HDFS上可以验证收到的通话记录消息是否被成功保存,注意应将下面目录路径中的年月改为实际的时间,这是因为数据是按照当前机器时间在运行的
spark@vm01:~$ hdfs dfs -cat /datas/202505/part-*
至此,我们就完成了整个通话记录处理功能的实现