精彩专栏推荐订阅:在 下方专栏👇🏻👇🏻👇🏻👇🏻
💖🔥作者主页:计算机毕设木哥🔥 💖
一、项目介绍
近年来,我国空气污染问题日益严峻,环境保护部发布的《2022年中国生态环境状况公报》显示,全国338个地级及以上城市中,仅有63.3%城市空气质量达标,PM2.5年均浓度为29微克/立方米,部分地区污染物浓度仍超过世界卫生组织推荐标准数倍。面对如此庞大且复杂的空气质量数据,传统的数据处理方式已难以满足高效分析与决策需求。随着大数据技术的快速发展,Hadoop和Spark等分布式计算框架为海量环境数据的存储与处理提供了新的解决方案。国家"十四五"规划中明确提出要"加强生态环境分区管控"和"提高环境监测预警能力",这就需要建立更加智能、高效的空气污染数据分析系统。在这样的背景下,开发一个基于Spark的国内空气污染数据分析可视化系统,对于整合多源数据、挖掘污染规律、辅助环境决策具有重要意义。
开发基于Spark的国内空气污染数据分析可视化系统具有多方面的现实意义。实践层面看,该系统能够利用Spark的分布式计算优势,大幅提升对海量空气质量监测数据的处理效率,将原本需要数小时甚至数天的数据分析任务缩短至分钟级别,为环保部门提供及时的决策支持。技术层面上,系统通过Python与Django框架的结合,实现了大数据处理与Web应用的无缝对接,并借助Vue和Echarts构建直观的可视化界面,使复杂的污染数据变得易于理解和解读。社会价值方面,系统能够帮助公众更好地了解所在区域的空气质量状况及变化趋势,增强环保意识,调整日常活动安排,减少污染暴露风险。对环境管理部门而言,系统提供的多维度分析结果可以辅助污染源识别、治理效果评估和精准施策,推动空气质量持续改善,为建设美丽中国贡献力量。
二、开发环境
- 大数据技术:Hadoop、Spark、Hive
- 开发技术:Python、Django框架、Vue、Echarts
- 软件工具:Pycharm、DataGrip、Anaconda
- 可视化 工具 Echarts
三、视频展示
计算机大数据热门选题推荐-基于大数据的国内空气污染数据分析可视化系统【Hadoop、Spark、python、大屏】
四、项目展示
登录模块:
可视化分析模块模块:
五、代码展示
# 核心功能1: 基于Spark的空气污染时间序列分析
def analyze_pollution_time_series(spark, data_path, output_path):
# 读取空气污染数据
pollution_df = spark.read.csv(data_path, header=True, inferSchema=True)
# 数据预处理:转换日期格式并处理缺失值
pollution_df = pollution_df.withColumn("date", to_date(col("timestamp"), "yyyy-MM-dd"))
pollution_df = pollution_df.na.fill({"pm25": 0, "pm10": 0, "so2": 0, "no2": 0, "co": 0, "o3": 0})
# 注册临时视图用于SQL查询
pollution_df.createOrReplaceTempView("pollution_data")
# 分析年度趋势
yearly_trend = spark.sql("""
SELECT
YEAR(date) as year,
AVG(pm25) as avg_pm25,
AVG(pm10) as avg_pm10,
AVG(so2) as avg_so2,
AVG(no2) as avg_no2,
AVG(co) as avg_co,
AVG(o3) as avg_o3
FROM pollution_data
GROUP BY YEAR(date)
ORDER BY year
""")
# 分析季节性变化
seasonal_pattern = spark.sql("""
SELECT
CASE
WHEN MONTH(date) IN (3, 4, 5) THEN 'Spring'
WHEN MONTH(date) IN (6, 7, 8) THEN 'Summer'
WHEN MONTH(date) IN (9, 10, 11) THEN 'Autumn'
ELSE 'Winter'
END as season,
AVG(aqi) as avg_aqi,
AVG(pm25) as avg_pm25,
AVG(pm10) as avg_pm10
FROM pollution_data
GROUP BY
CASE
WHEN MONTH(date) IN (3, 4, 5) THEN 'Spring'
WHEN MONTH(date) IN (6, 7, 8) THEN 'Summer'
WHEN MONTH(date) IN (9, 10, 11) THEN 'Autumn'
ELSE 'Winter'
END
""")
# 分析日内变化模式
hourly_pattern = spark.sql("""
SELECT
HOUR(timestamp) as hour,
AVG(pm25) as avg_pm25,
AVG(no2) as avg_no2,
AVG(o3) as avg_o3
FROM pollution_data
GROUP BY HOUR(timestamp)
ORDER BY hour
""")
# 保存分析结果到CSV文件
yearly_trend.coalesce(1).write.mode("overwrite").csv(f"{output_path}/yearly_trend")
seasonal_pattern.coalesce(1).write.mode("overwrite").csv(f"{output_path}/seasonal_pattern")
hourly_pattern.coalesce(1).write.mode("overwrite").csv(f"{output_path}/hourly_pattern")
return {
"yearly_trend": yearly_trend.collect(),
"seasonal_pattern": seasonal_pattern.collect(),
"hourly_pattern": hourly_pattern.collect()
}
# 核心功能2: 城市间空气质量对比分析
def compare_cities_air_quality(spark, data_path, output_path):
# 读取多城市空气污染数据
cities_df = spark.read.csv(data_path, header=True, inferSchema=True)
# 数据预处理:清洗和转换
cities_df = cities_df.withColumn("date", to_date(col("timestamp"), "yyyy-MM-dd"))
cities_df = cities_df.filter(col("city").isNotNull())
cities_df = cities_df.filter(cities_df.city.isin(["Beijing", "Shanghai", "Guangzhou", "Chengdu", "Shenzhen"]))
# 注册临时视图
cities_df.createOrReplaceTempView("cities_pollution")
# 计算各城市平均AQI和主要污染物浓度
city_comparison = spark.sql("""
SELECT
city,
AVG(aqi) as avg_aqi,
AVG(pm25) as avg_pm25,
AVG(pm10) as avg_pm10,
RANK() OVER (ORDER BY AVG(aqi) DESC) as aqi_rank
FROM cities_pollution
GROUP BY city
ORDER BY avg_aqi DESC
""")
# 分析各城市主要污染物构成
pollutant_composition = spark.sql("""
SELECT
city,
AVG(pm25) as avg_pm25,
AVG(pm10) as avg_pm10,
AVG(so2) as avg_so2,
AVG(no2) as avg_no2,
AVG(co) as avg_co,
AVG(o3) as avg_o3,
CASE
WHEN AVG(so2) > AVG(no2) AND AVG(so2) > AVG(o3) THEN 'Industrial'
WHEN AVG(no2) > AVG(so2) AND AVG(no2) > AVG(o3) THEN 'Traffic'
WHEN AVG(o3) > AVG(so2) AND AVG(o3) > AVG(no2) THEN 'Photochemical'
ELSE 'Mixed'
END as pollution_type
FROM cities_pollution
GROUP BY city
""")
# 计算各城市空气质量优良天数比例
good_days_ratio = spark.sql("""
SELECT
city,
COUNT(*) as total_days,
SUM(CASE WHEN aqi <= 100 THEN 1 ELSE 0 END) as good_days,
ROUND(SUM(CASE WHEN aqi <= 100 THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as good_days_percentage
FROM (
SELECT
city,
date,
AVG(aqi) as aqi
FROM cities_pollution
GROUP BY city, date
)
GROUP BY city
ORDER BY good_days_percentage DESC
""")
# 分析城市季节性污染差异
seasonal_city_diff = spark.sql("""
SELECT
city,
CASE
WHEN MONTH(date) IN (3, 4, 5) THEN 'Spring'
WHEN MONTH(date) IN (6, 7, 8) THEN 'Summer'
WHEN MONTH(date) IN (9, 10, 11) THEN 'Autumn'
ELSE 'Winter'
END as season,
AVG(pm25) as avg_pm25,
STDDEV(pm25) as std_pm25
FROM cities_pollution
GROUP BY
city,
CASE
WHEN MONTH(date) IN (3, 4, 5) THEN 'Spring'
WHEN MONTH(date) IN (6, 7, 8) THEN 'Summer'
WHEN MONTH(date) IN (9, 10, 11) THEN 'Autumn'
ELSE 'Winter'
END
ORDER BY city, season
""")
# 保存分析结果
city_comparison.coalesce(1).write.mode("overwrite").csv(f"{output_path}/city_comparison")
pollutant_composition.coalesce(1).write.mode("overwrite").csv(f"{output_path}/pollutant_composition")
good_days_ratio.coalesce(1).write.mode("overwrite").csv(f"{output_path}/good_days_ratio")
seasonal_city_diff.coalesce(1).write.mode("overwrite").csv(f"{output_path}/seasonal_city_diff")
return {
"city_comparison": city_comparison.collect(),
"pollutant_composition": pollutant_composition.collect(),
"good_days_ratio": good_days_ratio.collect(),
"seasonal_city_diff": seasonal_city_diff.collect()
}
# 核心功能3: 气象因素与空气污染关联分析
def analyze_weather_pollution_correlation(spark, pollution_path, weather_path, output_path):
# 读取空气污染数据和气象数据
pollution_df = spark.read.csv(pollution_path, header=True, inferSchema=True)
weather_df = spark.read.csv(weather_path, header=True, inferSchema=True)
# 数据预处理:转换日期格式并连接两个数据集
pollution_df = pollution_df.withColumn("date_time", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))
weather_df = weather_df.withColumn("date_time", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))
# 基于时间和城市连接两个数据集
joined_df = pollution_df.join(
weather_df,
(pollution_df.date_time == weather_df.date_time) & (pollution_df.city == weather_df.city),
"inner"
)
# 注册临时视图
joined_df.createOrReplaceTempView("weather_pollution_data")
# 分析不同天气状况下的空气质量
weather_condition_analysis = spark.sql("""
SELECT
weather_condition,
AVG(aqi) as avg_aqi,
AVG(pm25) as avg_pm25,
COUNT(*) as sample_count
FROM weather_pollution_data
WHERE weather_condition IS NOT NULL
GROUP BY weather_condition
ORDER BY avg_aqi DESC
""")
# 分析风速与污染物浓度的关系
wind_speed_analysis = spark.sql("""
SELECT
CASE
WHEN wind_speed < 2 THEN 'Calm (0-2 m/s)'
WHEN wind_speed >= 2 AND wind_speed < 5 THEN 'Light (2-5 m/s)'
WHEN wind_speed >= 5 AND wind_speed < 10 THEN 'Moderate (5-10 m/s)'
ELSE 'Strong (>10 m/s)'
END as wind_speed_category,
AVG(pm25) as avg_pm25,
AVG(pm10) as avg_pm10,
COUNT(*) as sample_count
FROM weather_pollution_data
WHERE wind_speed IS NOT NULL
GROUP BY
CASE
WHEN wind_speed < 2 THEN 'Calm (0-2 m/s)'
WHEN wind_speed >= 2 AND wind_speed < 5 THEN 'Light (2-5 m/s)'
WHEN wind_speed >= 5 AND wind_speed < 10 THEN 'Moderate (5-10 m/s)'
ELSE 'Strong (>10 m/s)'
END
ORDER BY avg_pm25 DESC
""")
# 分析风向对各城市污染输送的影响
wind_direction_analysis = spark.sql("""
SELECT
city,
CASE
WHEN wind_direction >= 0 AND wind_direction < 45 THEN 'N'
WHEN wind_direction >= 45 AND wind_direction < 90 THEN 'NE'
WHEN wind_direction >= 90 AND wind_direction < 135 THEN 'E'
WHEN wind_direction >= 135 AND wind_direction < 180 THEN 'SE'
WHEN wind_direction >= 180 AND wind_direction < 225 THEN 'S'
WHEN wind_direction >= 225 AND wind_direction < 270 THEN 'SW'
WHEN wind_direction >= 270 AND wind_direction < 315 THEN 'W'
ELSE 'NW'
END as wind_direction_category,
AVG(pm25) as avg_pm25
FROM weather_pollution_data
WHERE wind_direction IS NOT NULL
GROUP BY
city,
CASE
WHEN wind_direction >= 0 AND wind_direction < 45 THEN 'N'
WHEN wind_direction >= 45 AND wind_direction < 90 THEN 'NE'
WHEN wind_direction >= 90 AND wind_direction < 135 THEN 'E'
WHEN wind_direction >= 135 AND wind_direction < 180 THEN 'SE'
WHEN wind_direction >= 180 AND wind_direction < 225 THEN 'S'
WHEN wind_direction >= 225 AND wind_direction < 270 THEN 'SW'
WHEN wind_direction >= 270 AND wind_direction < 315 THEN 'W'
ELSE 'NW'
END
ORDER BY city, avg_pm25 DESC
""")
# 计算温湿度与污染物的相关性
# 使用Spark的相关性函数
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
# 选择需要计算相关性的列
columns = ["temperature", "humidity", "pm25", "o3"]
assembler = VectorAssembler(inputCols=columns, outputCol="features")
vector_df = assembler.transform(joined_df.select(*columns))
# 计算皮尔逊相关系数
correlation_matrix = Correlation.corr(vector_df, "features").collect()[0][0]
# 创建相关性结果DataFrame
correlation_data = []
for i, col1 in enumerate(columns):
for j, col2 in enumerate(columns):
correlation_data.append((col1, col2, float(correlation_matrix[i, j])))
correlation_df = spark.createDataFrame(correlation_data, ["variable1", "variable2", "correlation"])
# 保存分析结果
weather_condition_analysis.coalesce(1).write.mode("overwrite").csv(f"{output_path}/weather_condition_analysis")
wind_speed_analysis.coalesce(1).write.mode("overwrite").csv(f"{output_path}/wind_speed_analysis")
wind_direction_analysis.coalesce(1).write.mode("overwrite").csv(f"{output_path}/wind_direction_analysis")
correlation_df.coalesce(1).write.mode("overwrite").csv(f"{output_path}/temp_humidity_correlation")
return {
"weather_condition_analysis": weather_condition_analysis.collect(),
"wind_speed_analysis": wind_speed_analysis.collect(),
"wind_direction_analysis": wind_direction_analysis.collect(),
"correlation_data": correlation_df.collect()
}
六、项目文档展示
七、总结
本文设计并实现了《基于Spark的国内空气污染数据分析可视化系统》,该系统充分利用大数据技术处理海量空气质量监测数据,为环境决策提供科学依据。系统以Hadoop和Spark作为核心计算框架,通过分布式计算显著提升了数据处理效率;采用Python语言与Django框架构建后端服务,实现了数据采集、清洗、分析的全流程管理;前端基于Vue框架开发,结合Echarts可视化库呈现丰富直观的数据图表。系统核心功能包括空气污染时间序列分析、城市间空气质量对比分析以及气象因素与污染物关联分析,通过这些多维度分析揭示了污染物浓度的时空分布规律、城市间污染特征差异以及气象条件对空气质量的影响机制。实验结果表明,该系统能够高效处理GB级别的历史监测数据,将复杂的数据分析任务转化为可视化的图表和报告,便于环保部门和公众理解与应用。未来工作将进一步融合机器学习算法,增强系统的污染预测能力,并扩展移动端应用,提升系统的可访问性和实用价值。总体而言,本系统为空气污染防治提供了有力的技术支持,对推动环境大数据应用和提升环境治理水平具有重要意义。
大家可以帮忙点赞、收藏、关注、评论啦👇🏻👇🏻👇🏻
💖🔥作者主页:计算机毕设木哥🔥 💖