Spark学习(python版本)

发布于:2025-09-10 ⋅ 阅读:(20) ⋅ 点赞:(0)

Spark编程基础Python

前言:大数据技术

技术层面 功能
数据采集 利用ETL工具将分布的、异构数据源中的数据如关系数据、平面数据文件等,抽取到临时中间层后进行清洗、转换、集成,最后加载到数据仓库或数据集市中,成为联机分析处理、数据挖掘的基础;或者也可以把实时采集的数据作为流计算系统的输入,进行实时处理分析
数据存储和管理 利用分布式文件系统、数据仓库、关系数据库、NoSQL数据库、云数据库等,实现对结构化、半结构化和非结构化海量数据的存储和管理
数据处理与分析 利用分布式并行编程模型和计算框架,结合机器学习和数据挖掘算法,实现对海量数据的处理和分析;对分析结果进行可视化呈现,帮助人们更好地理解数据、分析数据
数据隐私和安全 在从大数据中挖掘潜在的巨大商业价值和学术价值的同时,构建隐私数据保护体系和数据安全体系,有效保护个人隐私和数据安全

在这里插入图片描述

大数据计算模式 解决问题 代表产品
批处理计算 针对大规模数据的批量处理 MapReduce、Spark等
流计算 针对流数据的实时计算 Flink、Strom、S4、Streams等
图计算 针对大规模图结构数据的处理 Pregel、Graphx、PowerGraph、Hama等
查询分析计算 大规模数据的存储管理和查询分析 Dremel、Hive等

1. Spark简介

spark架构图:信息查询,流式计算,机器学习,图计算
在这里插入图片描述

Spark与Hadoop的对比

Spark在借鉴Hadoop MapReduce优点的同时,很好地解决了MapReduce所面临的问题

相比于Hadoop MapReduce,Spark主要具有如下优点:

  • Spark的计算模式也属于MapReduce,但不局限于Map和Reduce操作,还提供了多种数据集操作类型,编程模型比Hadoop MapReduce更灵活

  • Spark提供了内存计算,可将中间结果放到内存中,对于迭代运算效率更高

  • Spark基于DAG的任务调度执行机制,要优于Hadoop MapReduce的迭代执行机制

结论:Spark更适合与迭代次数较多的,比如机器学习,数据挖掘

注意:Spark只是一个计算框架与MapReduce对等,可能会取代Map Reduce,但是不会取代Hadoop整体

Flink和Spark还存在一些明显的区别,具体如下:
(1)Spark的技术理念是基于批来模拟流的计算。而Flink则完全相反,它采用的是基于流计算来模拟批计算。从技术发展方向看,用批处理来模拟流计算有一定的技术局限性,并且这个局限性可能很难突破。而Flink基于流计算来模拟批处理,在技术上有更好的扩展性。
(2)Flink和Spark都支持流计算,二者的区别在于,Flink是一条一条地处理数据,而Spark是基于RDD的小批量处理,所以,Spark在流式处理方面,不可避免地会增加一些延时,实时性没有Flink好。Flink的流计算性能和Storm差不多,可以支持毫秒级的响应,而Spark则只能支持秒级响应。
(3)当全部运行在Hadoop YARN之上时,Flink的性能要略好于Spark,因为,Flink支持增量迭代,具有对迭代进行自动优化的功能。

总结:Spark在生态上更加完善,然后Flink在流计算有绝对的优势

Spark生态系统
应用场景 时间跨度 其他框架 Spark生态系统中的组件
复杂的批量数据处理 小时级 MapReduce, Hive Spark
基于历史数据的交互式查询 分钟级、秒 级 Impala, Dremel. Drill Spark SQL
基于实时数据流的数据处理 毫秒、秒级 Storm, S4 Spark Streaming 、Structured Streaming
基于历史数据的数据挖掘 - Mahout MLlib
图结构数据的处理 - Pregel, Hama GraphX
  • RDD:是Resillient Distributed Dataset(弹性分布式数据集)的简称,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型
  • DAG:是Directed Acyclic Graph(有向无环图)的简称,反映RDD之间的依赖关系
  • Executor:是运行在工作节点(WorkerNode)的一个进程,负责运行Task
  • 应用(Application):用户编写的Spark应用程序
  • 任务(Task):运行在Executor上的工作单元
  • 作业(Job):一个作业包含多个RDD及作用于相应RDD上的各种操作
  • 阶段(Stage):是作业的基本调度单位,一个作业会分为多组任务,每组任务被称为阶段,或者也被称为任务集合,代表了一组关联的、相互之间,没有Shuffle依赖关系的任务组成的任务集
Spark运行基本流程

在这里插入图片描述

spark流水线优化:窄依赖和宽依赖

在这里插入图片描述

窄依赖可以实现“流水线”优化

宽依赖无法实现“流水线”优化

如上图所示,平行线都是可以优化的在一个Stage中,因为他们不用shuffle,可以在一个Stage中表达,如果需要shuffle就需要划分

RDD在Spark架构中的运行过程

在这里插入图片描述

Spark的运行模式

./bin/pyspark --master 会进入spark交互界面

在这里插入图片描述

Spark on Yarn:在这种情况下,一般是hadoop集群了,所以无需部署Spark集群,只需要找一台服务器,充当Spark客户端,即可提交任务到Yarn集群中运行

Client模式一般使用于学习和测试的场景,Cluster模式适用于生产环境

Cluster模式 Client模式(默认)
Driver运行位置 YARN容器内部 客户端进程内
通讯效率 低于Cluster模式
日志查看 日志输出在容器内,不方便查看 日志输出在客户端的标准输出流中,方便查看
生产环境是否使用 推荐使用 不推荐使用
稳定性 稳定 受到客户端进程影响

2. spark配置安装

spark安装 local单节点
分布式环境下spark配置
  1. 在所有节点上完成包括安装Anaconda3、设置国内源、创建pyspark环境等。验证pyspark虚拟环境中python的版本。

  2. Hadoop02,Hadoop03中修改配置 ~/.bashrc

    export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_371
    export JRE_HOME= J A V A H O M E / j r e e x p o r t C L A S S P A T H = . : {JAVA_HOME}/jre export CLASSPATH=.: JAVAHOME/jreexportCLASSPATH=.:{JAVA_HOME}/lib: J R E H O M E / l i b e x p o r t H A D O O P H O M E = / u s r / l o c a l / h a d o o p e x p o r t S P A R K H O M E = / u s r / l o c a l / s p a r k e x p o r t H A D O O P C O N F D I R = {JRE_HOME}/lib export HADOOP_HOME=/usr/local/hadoop export SPARK_HOME=/usr/local/spark export HADOOP_CONF_DIR= JREHOME/libexportHADOOPHOME=/usr/local/hadoopexportSPARKHOME=/usr/local/sparkexportHADOOPCONFDIR=HADOOP_HOME/etc/hadoop
    export PYSPARK_PYTHON=/home/hadoop/miniconda3/env/pysaprk/bin/python3.8
    export PATH= P A T H : PATH: PATH:{JAVA_HOME}/bin: H A D O O P H O M E / b i n : HADOOP_HOME/bin: HADOOPHOME/bin:SPARK_HOME/bin

  3. Spark配置

    1. 配置works文件

      在hadoop01节点上执行如下命令将workers.template改名为workers:

      $ cd /usr/local/spark/
      $ sudo mv ./conf/workers.template ./conf/workers
      

      在workers文件中设置Spark集群的Worker节点。编辑workers文件的内容,把默认内容localhost替换成如下内容:

      hadoop01
      hadoop02
      hadoop03

      注意不要有多余的空格之类的

    2. 配置spark-env.sh文件

      #PART1
      export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_371
      export HADOOP_HOME=/usr/local/hadoop
      export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
      export YARN_CONF_DIR==/usr/local/hadoop/etc/hadoop

      #PART2
      export SPARK_MASTER_HOST=hadoop01
      export SPARK_MASTER_PORT=7077
      export SPARK_MASTER_WEBUI_PORT=8081
      export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
      export SPARK_HISTORY_OPTS=“-Dspark.history.fs.logDirectory=hdfs://hadoop01:9000/sparklog -Dspark.history.fs.cleaner.enabled=true”

      #PART3
      export SPARK_WORKER_CORES=1
      export SPARK_WORKER_MEMORY=1G
      export SPARK_EXECUTOR_CORES=1
      export SPARK_EXECUTOR_MEMORY=1G
      export SPARK_DRIVER_MEMORY=1G
      export SPARK_WORKER_PORT=7078
      export SPARK_WORKER_WEBUI_PORT=8082

    3. 创建历史服务器日志目录

      1.  # 在Hadoop01上启动hdfs
         cd /usr/local/hadoop
         ./sbin/start-dfs.sh
         
         cd /usr/local/hadoop
         ./bin/hdfs dfs -mkdir /sparklog
         ./bin/hdfs dfs -chmod 777 /sparklog #将这个hdfs文件的权限赋予hadoop01用户
        
    4. 配置spark-defaults.conf文件

      # 在 haoop01上执行如下命令
      cd /usr/local/hadoop
      sudo mv spark-defaults.conf.template spark-defaults.conf
      vim spark-defaults.conf
      
      #修改内容如下
      spark.eventLog.enabled true  # 开启Spark的日志记录功能
      spark.eventLog.dir hdfs://hadoop01:9000/sparklog  #日志记录的保存路径
      spark.eventLog.compress true # 是否启动压缩
      
    5. 配置Worker节点

      在hadoop01节点上执行如下命令,将Master节点(hadoop01节点)上的/usr/local/spark文件夹复制到各个Worker节点上(即hadoop02和hadoop03节点):

      $ cd /usr/local/
      $ tar -zcf ~/spark.master.tar.gz ./spark #将./spark打包为tar.gz
      $ cd ~
      $ scp ./spark.master.tar.gz hadoop02:/home/hadoop
      $ scp ./spark.master.tar.gz hadoop03:/home/hadoop
      

      在hadoop02和hadoop03节点上分别执行如下操作:

      $ cd ~
      $ sudo rm -rf /usr/local/spark/
      $ sudo tar -zxf ~/spark.master.tar.gz -C /usr/local
      $ sudo chown -R hadoop /usr/local/spark
      
  4. 启动集群服务

    1. 启动Hadoop集群 ./sbin/start-all.sh

      1. 启动历史服务器 maser节点上 spark/sbin/start-history-server.sh
    2. 启动maser节点,在Hadoop01节点上 spark/sbin/start-master.sh

    3. 启动所有Worker节点Hadoop01节点上 spark/sbin/start-workers.sh

    4. 查看进程情况

      5409 DataNode
      6434 ResourceManager
      7156 Master
      7380 Worker
      7418 Jps
      5228 NameNode
      7022 HistoryServer
      6607 NodeManager

  5. 查看spark集群信息

    http://hadoop01:8081

  6. 关闭集群/usr/local/spark

    1. 先关闭master ./sbin/stop-master.sh
    2. 关闭workers ./sbin/stop-workers.sh
    3. 在/usr/local/hadoop关闭Hadoop集群 ./sbin/stop-all.sh
spark代码常见的运行方式
  1. 启动python版本的Spark交互式执行环境

​ 启动pyspark ./bin/pyspark即可进入spark交互页面

  1. 使用spark-submit命令提交应用程序

    注意:在集群模式使用standalone模式需要先启动 sbin/start-master.sh sbin/start-workers.sh

    spark-submit命令提交应用程序的,命令格式如下

    ./bin/spark-submit

    ​ --master #上图中可以加的参数

    ​ --deploy-mode

    ​ … #其他参数

    ​ --jar xxx #添加其他的依赖包

    ​ #python代码文件

    ​ [application-arguments] #传递给主类的主方法参数

  2. 开发Spark独立应用程序

    1. 编写程序

    2. 在命令行直接运行.py文件

      1. 进入虚拟环境
      2. 运行命令 Python wordCount.py
    3. 或者是通过spark-submit运行应用程序

      1. 进入虚拟环境

      2.  示例:
         ./bin/spark-submit \
         --master local[*] \ 
         /usr/local/spark/examples/src/main/python/pi.py | grep "Pi is"
         
         ./bin/spark-submit \
         --master spark://hadoop01:7077 \ 
         /usr/local/spark/mycode/python/wordCount.py
         
         注意:这里使用standalone模式需要先启动master和workers,如果不是分布式环境直接local[*]就可以了
        
总结

所有操作都在pyspark虚拟环境下操作,

​ 使用 spark/bin/pyspark交互式进行编写

​ 或者是使用 submit 提交

3. RDD创建

  1. 本地数据加载,在usr/local/spark文件夹下:./bin/pyspark 这个类库中执行
>>> lines = sc.textFile("file:///usr/local/spark/mycode/word.text")
>>> lines.foreach(print)

注意这里的sc 是直接可用的,由pyspark创建的

  1. 在pycharm中创建程序
#coding:utf-8
from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("My app")
sc = SparkContext(conf)

网站公告

今日签到

点亮在社区的每一天
去签到