计算机大数据热门选题推荐-基于大数据的国内空气污染数据分析可视化系统【Hadoop、Spark、python、大屏】

发布于:2025-08-11 ⋅ 阅读:(13) ⋅ 点赞:(0)

精彩专栏推荐订阅:在 下方专栏👇🏻👇🏻👇🏻👇🏻

💖🔥作者主页计算机毕设木哥🔥 💖

一、项目介绍

近年来,我国空气污染问题日益严峻,环境保护部发布的《2022年中国生态环境状况公报》显示,全国338个地级及以上城市中,仅有63.3%城市空气质量达标,PM2.5年均浓度为29微克/立方米,部分地区污染物浓度仍超过世界卫生组织推荐标准数倍。面对如此庞大且复杂的空气质量数据,传统的数据处理方式已难以满足高效分析与决策需求。随着大数据技术的快速发展,Hadoop和Spark等分布式计算框架为海量环境数据的存储与处理提供了新的解决方案。国家"十四五"规划中明确提出要"加强生态环境分区管控"和"提高环境监测预警能力",这就需要建立更加智能、高效的空气污染数据分析系统。在这样的背景下,开发一个基于Spark的国内空气污染数据分析可视化系统,对于整合多源数据、挖掘污染规律、辅助环境决策具有重要意义。

开发基于Spark的国内空气污染数据分析可视化系统具有多方面的现实意义。实践层面看,该系统能够利用Spark的分布式计算优势,大幅提升对海量空气质量监测数据的处理效率,将原本需要数小时甚至数天的数据分析任务缩短至分钟级别,为环保部门提供及时的决策支持。技术层面上,系统通过Python与Django框架的结合,实现了大数据处理与Web应用的无缝对接,并借助Vue和Echarts构建直观的可视化界面,使复杂的污染数据变得易于理解和解读。社会价值方面,系统能够帮助公众更好地了解所在区域的空气质量状况及变化趋势,增强环保意识,调整日常活动安排,减少污染暴露风险。对环境管理部门而言,系统提供的多维度分析结果可以辅助污染源识别、治理效果评估和精准施策,推动空气质量持续改善,为建设美丽中国贡献力量。

二、开发环境

  • 大数据技术:Hadoop、Spark、Hive
  • 开发技术:Python、Django框架、Vue、Echarts
  • 软件工具:Pycharm、DataGrip、Anaconda
  • 可视化 工具 Echarts

三、视频展示

计算机大数据热门选题推荐-基于大数据的国内空气污染数据分析可视化系统【Hadoop、Spark、python、大屏】

四、项目展示

登录模块:
在这里插入图片描述

可视化分析模块模块:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

五、代码展示

# 核心功能1: 基于Spark的空气污染时间序列分析
def analyze_pollution_time_series(spark, data_path, output_path):
    # 读取空气污染数据
    pollution_df = spark.read.csv(data_path, header=True, inferSchema=True)
    
    # 数据预处理:转换日期格式并处理缺失值
    pollution_df = pollution_df.withColumn("date", to_date(col("timestamp"), "yyyy-MM-dd"))
    pollution_df = pollution_df.na.fill({"pm25": 0, "pm10": 0, "so2": 0, "no2": 0, "co": 0, "o3": 0})
    
    # 注册临时视图用于SQL查询
    pollution_df.createOrReplaceTempView("pollution_data")
    
    # 分析年度趋势
    yearly_trend = spark.sql("""
        SELECT 
            YEAR(date) as year, 
            AVG(pm25) as avg_pm25, 
            AVG(pm10) as avg_pm10,
            AVG(so2) as avg_so2,
            AVG(no2) as avg_no2,
            AVG(co) as avg_co,
            AVG(o3) as avg_o3
        FROM pollution_data
        GROUP BY YEAR(date)
        ORDER BY year
    """)
    
    # 分析季节性变化
    seasonal_pattern = spark.sql("""
        SELECT 
            CASE
                WHEN MONTH(date) IN (3, 4, 5) THEN 'Spring'
                WHEN MONTH(date) IN (6, 7, 8) THEN 'Summer'
                WHEN MONTH(date) IN (9, 10, 11) THEN 'Autumn'
                ELSE 'Winter'
            END as season,
            AVG(aqi) as avg_aqi,
            AVG(pm25) as avg_pm25,
            AVG(pm10) as avg_pm10
        FROM pollution_data
        GROUP BY 
            CASE
                WHEN MONTH(date) IN (3, 4, 5) THEN 'Spring'
                WHEN MONTH(date) IN (6, 7, 8) THEN 'Summer'
                WHEN MONTH(date) IN (9, 10, 11) THEN 'Autumn'
                ELSE 'Winter'
            END
    """)
    
    # 分析日内变化模式
    hourly_pattern = spark.sql("""
        SELECT 
            HOUR(timestamp) as hour,
            AVG(pm25) as avg_pm25,
            AVG(no2) as avg_no2,
            AVG(o3) as avg_o3
        FROM pollution_data
        GROUP BY HOUR(timestamp)
        ORDER BY hour
    """)
    
    # 保存分析结果到CSV文件
    yearly_trend.coalesce(1).write.mode("overwrite").csv(f"{output_path}/yearly_trend")
    seasonal_pattern.coalesce(1).write.mode("overwrite").csv(f"{output_path}/seasonal_pattern")
    hourly_pattern.coalesce(1).write.mode("overwrite").csv(f"{output_path}/hourly_pattern")
    
    return {
        "yearly_trend": yearly_trend.collect(),
        "seasonal_pattern": seasonal_pattern.collect(),
        "hourly_pattern": hourly_pattern.collect()
    }


# 核心功能2: 城市间空气质量对比分析
def compare_cities_air_quality(spark, data_path, output_path):
    # 读取多城市空气污染数据
    cities_df = spark.read.csv(data_path, header=True, inferSchema=True)
    
    # 数据预处理:清洗和转换
    cities_df = cities_df.withColumn("date", to_date(col("timestamp"), "yyyy-MM-dd"))
    cities_df = cities_df.filter(col("city").isNotNull())
    cities_df = cities_df.filter(cities_df.city.isin(["Beijing", "Shanghai", "Guangzhou", "Chengdu", "Shenzhen"]))
    
    # 注册临时视图
    cities_df.createOrReplaceTempView("cities_pollution")
    
    # 计算各城市平均AQI和主要污染物浓度
    city_comparison = spark.sql("""
        SELECT 
            city,
            AVG(aqi) as avg_aqi,
            AVG(pm25) as avg_pm25,
            AVG(pm10) as avg_pm10,
            RANK() OVER (ORDER BY AVG(aqi) DESC) as aqi_rank
        FROM cities_pollution
        GROUP BY city
        ORDER BY avg_aqi DESC
    """)
    
    # 分析各城市主要污染物构成
    pollutant_composition = spark.sql("""
        SELECT 
            city,
            AVG(pm25) as avg_pm25,
            AVG(pm10) as avg_pm10,
            AVG(so2) as avg_so2,
            AVG(no2) as avg_no2,
            AVG(co) as avg_co,
            AVG(o3) as avg_o3,
            CASE
                WHEN AVG(so2) > AVG(no2) AND AVG(so2) > AVG(o3) THEN 'Industrial'
                WHEN AVG(no2) > AVG(so2) AND AVG(no2) > AVG(o3) THEN 'Traffic'
                WHEN AVG(o3) > AVG(so2) AND AVG(o3) > AVG(no2) THEN 'Photochemical'
                ELSE 'Mixed'
            END as pollution_type
        FROM cities_pollution
        GROUP BY city
    """)
    
    # 计算各城市空气质量优良天数比例
    good_days_ratio = spark.sql("""
        SELECT 
            city,
            COUNT(*) as total_days,
            SUM(CASE WHEN aqi <= 100 THEN 1 ELSE 0 END) as good_days,
            ROUND(SUM(CASE WHEN aqi <= 100 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as good_days_percentage
        FROM (
            SELECT 
                city, 
                date, 
                AVG(aqi) as aqi
            FROM cities_pollution
            GROUP BY city, date
        )
        GROUP BY city
        ORDER BY good_days_percentage DESC
    """)
    
    # 分析城市季节性污染差异
    seasonal_city_diff = spark.sql("""
        SELECT 
            city,
            CASE
                WHEN MONTH(date) IN (3, 4, 5) THEN 'Spring'
                WHEN MONTH(date) IN (6, 7, 8) THEN 'Summer'
                WHEN MONTH(date) IN (9, 10, 11) THEN 'Autumn'
                ELSE 'Winter'
            END as season,
            AVG(pm25) as avg_pm25,
            STDDEV(pm25) as std_pm25
        FROM cities_pollution
        GROUP BY 
            city,
            CASE
                WHEN MONTH(date) IN (3, 4, 5) THEN 'Spring'
                WHEN MONTH(date) IN (6, 7, 8) THEN 'Summer'
                WHEN MONTH(date) IN (9, 10, 11) THEN 'Autumn'
                ELSE 'Winter'
            END
        ORDER BY city, season
    """)
    
    # 保存分析结果
    city_comparison.coalesce(1).write.mode("overwrite").csv(f"{output_path}/city_comparison")
    pollutant_composition.coalesce(1).write.mode("overwrite").csv(f"{output_path}/pollutant_composition")
    good_days_ratio.coalesce(1).write.mode("overwrite").csv(f"{output_path}/good_days_ratio")
    seasonal_city_diff.coalesce(1).write.mode("overwrite").csv(f"{output_path}/seasonal_city_diff")
    
    return {
        "city_comparison": city_comparison.collect(),
        "pollutant_composition": pollutant_composition.collect(),
        "good_days_ratio": good_days_ratio.collect(),
        "seasonal_city_diff": seasonal_city_diff.collect()
    }


# 核心功能3: 气象因素与空气污染关联分析
def analyze_weather_pollution_correlation(spark, pollution_path, weather_path, output_path):
    # 读取空气污染数据和气象数据
    pollution_df = spark.read.csv(pollution_path, header=True, inferSchema=True)
    weather_df = spark.read.csv(weather_path, header=True, inferSchema=True)
    
    # 数据预处理:转换日期格式并连接两个数据集
    pollution_df = pollution_df.withColumn("date_time", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))
    weather_df = weather_df.withColumn("date_time", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))
    
    # 基于时间和城市连接两个数据集
    joined_df = pollution_df.join(
        weather_df,
        (pollution_df.date_time == weather_df.date_time) & (pollution_df.city == weather_df.city),
        "inner"
    )
    
    # 注册临时视图
    joined_df.createOrReplaceTempView("weather_pollution_data")
    
    # 分析不同天气状况下的空气质量
    weather_condition_analysis = spark.sql("""
        SELECT 
            weather_condition,
            AVG(aqi) as avg_aqi,
            AVG(pm25) as avg_pm25,
            COUNT(*) as sample_count
        FROM weather_pollution_data
        WHERE weather_condition IS NOT NULL
        GROUP BY weather_condition
        ORDER BY avg_aqi DESC
    """)
    
    # 分析风速与污染物浓度的关系
    wind_speed_analysis = spark.sql("""
        SELECT 
            CASE
                WHEN wind_speed < 2 THEN 'Calm (0-2 m/s)'
                WHEN wind_speed >= 2 AND wind_speed < 5 THEN 'Light (2-5 m/s)'
                WHEN wind_speed >= 5 AND wind_speed < 10 THEN 'Moderate (5-10 m/s)'
                ELSE 'Strong (>10 m/s)'
            END as wind_speed_category,
            AVG(pm25) as avg_pm25,
            AVG(pm10) as avg_pm10,
            COUNT(*) as sample_count
        FROM weather_pollution_data
        WHERE wind_speed IS NOT NULL
        GROUP BY 
            CASE
                WHEN wind_speed < 2 THEN 'Calm (0-2 m/s)'
                WHEN wind_speed >= 2 AND wind_speed < 5 THEN 'Light (2-5 m/s)'
                WHEN wind_speed >= 5 AND wind_speed < 10 THEN 'Moderate (5-10 m/s)'
                ELSE 'Strong (>10 m/s)'
            END
        ORDER BY avg_pm25 DESC
    """)
    
    # 分析风向对各城市污染输送的影响
    wind_direction_analysis = spark.sql("""
        SELECT 
            city,
            CASE
                WHEN wind_direction >= 0 AND wind_direction < 45 THEN 'N'
                WHEN wind_direction >= 45 AND wind_direction < 90 THEN 'NE'
                WHEN wind_direction >= 90 AND wind_direction < 135 THEN 'E'
                WHEN wind_direction >= 135 AND wind_direction < 180 THEN 'SE'
                WHEN wind_direction >= 180 AND wind_direction < 225 THEN 'S'
                WHEN wind_direction >= 225 AND wind_direction < 270 THEN 'SW'
                WHEN wind_direction >= 270 AND wind_direction < 315 THEN 'W'
                ELSE 'NW'
            END as wind_direction_category,
            AVG(pm25) as avg_pm25
        FROM weather_pollution_data
        WHERE wind_direction IS NOT NULL
        GROUP BY 
            city,
            CASE
                WHEN wind_direction >= 0 AND wind_direction < 45 THEN 'N'
                WHEN wind_direction >= 45 AND wind_direction < 90 THEN 'NE'
                WHEN wind_direction >= 90 AND wind_direction < 135 THEN 'E'
                WHEN wind_direction >= 135 AND wind_direction < 180 THEN 'SE'
                WHEN wind_direction >= 180 AND wind_direction < 225 THEN 'S'
                WHEN wind_direction >= 225 AND wind_direction < 270 THEN 'SW'
                WHEN wind_direction >= 270 AND wind_direction < 315 THEN 'W'
                ELSE 'NW'
            END
        ORDER BY city, avg_pm25 DESC
    """)
    
    # 计算温湿度与污染物的相关性
    # 使用Spark的相关性函数
    from pyspark.ml.stat import Correlation
    from pyspark.ml.feature import VectorAssembler
    
    # 选择需要计算相关性的列
    columns = ["temperature", "humidity", "pm25", "o3"]
    assembler = VectorAssembler(inputCols=columns, outputCol="features")
    vector_df = assembler.transform(joined_df.select(*columns))
    
    # 计算皮尔逊相关系数
    correlation_matrix = Correlation.corr(vector_df, "features").collect()[0][0]
    
    # 创建相关性结果DataFrame
    correlation_data = []
    for i, col1 in enumerate(columns):
        for j, col2 in enumerate(columns):
            correlation_data.append((col1, col2, float(correlation_matrix[i, j])))
    
    correlation_df = spark.createDataFrame(correlation_data, ["variable1", "variable2", "correlation"])
    
    # 保存分析结果
    weather_condition_analysis.coalesce(1).write.mode("overwrite").csv(f"{output_path}/weather_condition_analysis")
    wind_speed_analysis.coalesce(1).write.mode("overwrite").csv(f"{output_path}/wind_speed_analysis")
    wind_direction_analysis.coalesce(1).write.mode("overwrite").csv(f"{output_path}/wind_direction_analysis")
    correlation_df.coalesce(1).write.mode("overwrite").csv(f"{output_path}/temp_humidity_correlation")
    
    return {
        "weather_condition_analysis": weather_condition_analysis.collect(),
        "wind_speed_analysis": wind_speed_analysis.collect(),
        "wind_direction_analysis": wind_direction_analysis.collect(),
        "correlation_data": correlation_df.collect()
    }


六、项目文档展示

在这里插入图片描述

七、总结

本文设计并实现了《基于Spark的国内空气污染数据分析可视化系统》,该系统充分利用大数据技术处理海量空气质量监测数据,为环境决策提供科学依据。系统以Hadoop和Spark作为核心计算框架,通过分布式计算显著提升了数据处理效率;采用Python语言与Django框架构建后端服务,实现了数据采集、清洗、分析的全流程管理;前端基于Vue框架开发,结合Echarts可视化库呈现丰富直观的数据图表。系统核心功能包括空气污染时间序列分析、城市间空气质量对比分析以及气象因素与污染物关联分析,通过这些多维度分析揭示了污染物浓度的时空分布规律、城市间污染特征差异以及气象条件对空气质量的影响机制。实验结果表明,该系统能够高效处理GB级别的历史监测数据,将复杂的数据分析任务转化为可视化的图表和报告,便于环保部门和公众理解与应用。未来工作将进一步融合机器学习算法,增强系统的污染预测能力,并扩展移动端应用,提升系统的可访问性和实用价值。总体而言,本系统为空气污染防治提供了有力的技术支持,对推动环境大数据应用和提升环境治理水平具有重要意义。

大家可以帮忙点赞、收藏、关注、评论啦👇🏻👇🏻👇🏻

💖🔥作者主页计算机毕设木哥🔥 💖


网站公告

今日签到

点亮在社区的每一天
去签到