实验五 Spark Structured Streaming编程实践

发布于:2024-05-08 ⋅ 阅读:(28) ⋅ 点赞:(0)

一、编写程序

(1). 按照tag分组统计生成的日志数。

在新开的终端内输入 vi spark_exercise_testsyslog1.py ,贴入如下代码并运行。运行之前需要关闭“tail终端”内的tail命令并重新运行tail命令,否则多次运行测试可能导致没有新数据生成。

#!/usr/bin/env python3

from functools import partial

from pyspark.sql import SparkSession

from pyspark.sql.functions import *



if __name__ == "__main__":

    spark = SparkSession \

        .builder \

        .appName("Structuredcronlog") \

        .getOrCreate()



    lines = spark \

        .readStream \

        .format("socket") \

        .option("host", "localhost") \

        .option("port", 9988) \

        .load()



    # Nov 24 13:17:01 spark CRON[18455]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)

    # 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段

    fields = partial(

        regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$"

    )



    words = lines.select(

        unix_timestamp(format_string('2022 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),

        fields(idx=2).alias("hadooplyf316"),

        fields(idx=3).alias("tag"),

        fields(idx=4).alias("content"),

    )



# (1).  按照tag分组统计日志数。

    windowedCounts1 = words \

        .groupBy("tag") \

        .count()

        



    # 开始运行查询并在控制台输出

    query = windowedCounts1 \

        .writeStream \

        .outputMode("append") \

        .format("console") \

        .option('truncate', 'false')\

        .trigger(processingTime="3 seconds") \

        .start()



query.awaitTermination()

(2).输出所有日志内容带spark的日志。

在新开的终端内输入 vi spark_exercise_testsyslog3.py ,贴入如下代码并运行。运行之前需要关闭“tail终端”内的tail命令并重新运行tail命令,否则多次运行测试可能导致没有新数据生成。

#!/usr/bin/env python3



from functools import partial



from pyspark.sql import SparkSession

from pyspark.sql.functions import *





if __name__ == "__main__":

    spark = SparkSession \

        .builder \

        .appName("Structuredcronlog") \

        .getOrCreate()



    lines = spark \

        .readStream \

        .format("socket") \

        .option("host", "localhost") \

        .option("port", 9988) \

        .load()



    # Nov 24 13:17:01 spark CRON[18455]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)

    # 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段

    fields = partial(

        regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$"

    )



    words = lines.select(

        unix_timestamp(format_string('2022 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),

        fields(idx=2).alias("hostname"),

        fields(idx=3).alias("tag"),

        fields(idx=4).alias("content"),

    )



    # (3).  输出所有日志内容带spark的日志(根据自己模拟的日志内容进行筛选)。

    windowedCounts3 = words \

        .filter("content like '%spark%'")



    # 开始运行查询并在控制台输出

    query = windowedCounts3 \

        .writeStream \

        .outputMode("append") \

        .format("console") \

        .option('truncate', 'false')\

        .trigger(processingTime="3 seconds") \

        .start()



query.awaitTermination()