关于 Apache Spark 中大数据处理的某些方面,第 2 部分:有用的设计模式

发布于:2022-12-20 ⋅ 阅读:(396) ⋅ 点赞:(0)

在我之前的文章中,我演示了 Spark 如何创建和序列化任务。在这篇文章中,我将展示如何利用这些知识以可维护和可升级的方式构建 Spark 应用程序,同时避免“任务不可序列化”异常。

当我参与一个大数据项目时,我需要对 Spark 应用程序进行编程,以便在关系和分布式数据库(如Apache Hive )之间移动和转换数据。我发现这样的应用程序有许多陷阱,所以所有“难以阅读的代码”、“方法太大而无法放入单个屏幕”等问题都需要避免,我们才能专注于更深层次的问题。此外,Spark 作业也类似:数据从单个或多个数据库加载、转换,然后保存到单个或多个数据库。因此,尝试使用GoF模式来编写 Spark 应用程序似乎是合理的。 

这篇文章的组织如下:

  • 第 1 节描述了我的系统。 
  • 第 2 节讨论了在这种情况下哪些 GoF 模式可能有用。 
  • 在第 3 节中,我提出了所描述问题的解决方案(GitHub 上完全可行的代码)。

我们走吧。 

1.问题

一个典型的 Spark 应用程序如下(图 1)。有多个数据源(数据库、文件等):DBi、i=1:N 和 Java Spark 服务器。我们需要从数据源加载数据,对数据进行转换,并将转换后的数据保存到不同或相同的数据源。每个应用程序可能具有不同的输入和输出数据源组合。 

 

图 1:一个典型的 Spark 应用程序

为了使我的代码简洁且开箱即用,我简化了问题:假设有 2 个 JSON 文件作为数据源(图 2)。然后,数据进行转换:转换是一个平凡的单位转换或两个数据集的联合。最后,有两个储蓄者。一个返回数据集模式,另一个显示数据集内容。我拿了“Master Apache Spark - Hands On!”的代码。Imtiaz Ahmad 课程作为入门代码。尽管它很简单,但代码足以说明如何在 Spark 应用程序中使用基本设计模式。

图 2:我们的简化系统

让我们研究 2 个 Spark 应用程序(图 3)。第一个是简单的加载-转换-保存作业 (A)。第二种情况更复杂,将两个数据集合并为一个,然后保存(B)。我们如何以最少的代码重复对这些 Spark 应用程序进行编程?

 

图 3:一个基本的 spark 应用程序(A);具有两个源、一个保护程序和一个联合转换的 Spark 应用程序 (B)

2.模式

为了解决这个问题,让我们从“Design Patterns: Elements of Reusable Object-Oriented Software”中选择设计模式。此外,我们需要我们的解决方案是可序列化的。这里可能会想到以下 GoF 模式:

  1. Builder:该模式由 Product、Builder 和 Director 组成。导演指示建造者制造产品。该模式可以有效地构建不同的产品;这就是产品分两步构建的原因。第一步配置 Builder。第二步,Director 指示 Builder 构建产品。然而,这种模式在我们的案例中并不是特别有用,因为 Spark 应用程序有一个共同的结构。
  2. 模板方法:该模式由一个 AbstractClass(带有原始操作,加上链接这些操作的骨架方法)和一个 ConcreteClass(实现了原始操作)组成。在我们的案例中,这对于定义加载-转换-保存应用程序的框架肯定很有用。
  3. Factory:该模式由 Product(接口)、ConcreteProducts、Creator(接口)和 ConcreteCreators 组成。ConcreteCreator 有条件地创建一个 ConcreteProduct。这看起来对我们很有用。
  4. 抽象工厂:该模式由 AbstractFactory(接口或超类)、ConcreteFactories、AbstractProduct(接口或超类)、ConcreteProducts 和客户端组成。客户端配置 ConcreteFactories(共享相同的 AbstractFactory 父级),以构建 ConcreteProducts(共享相同的 AbstractProduct 父级)。根据D. Banas 的演讲“抽象工厂设计模式”,如果与策略模式结合使用,这种模式最有用。这正是我们所需要的!我们既可以使用如何运行作业的相似性,也可以使用如何构建这些作业的相似性。
  5. Strategy:该模式由一个 Strategy 接口和实现该接口的 ConcreteStrategies 组成。这允许用户轻松切换功能的特定实现。在我们的案例中,对不同的加载器、转换器和保存器进行编程很有用。  
  6. 装饰器:该模式由一个组件(接口)、具体组件、一个装饰器(抽象类)、具体装饰器组成。ConcreteDecorators 为 ConcreteComponents 添加功能。在我们的例子中,为我们的方法添加额外的功能也很有用。例如,我们可以在将数据写入其中之前截断关系数据库表。让我们结合这些模式来获得解决方案。

3.解决方案

我针对这个问题提出了以下解决方案(图 4)。该解决方案基于抽象工厂模式。SimpleJobFactory并且BasicJobPlan分别是 ConcreteFactory 和 ConcreteProduct。IFactory并且IJobPlan是 AbstractFactory 和 AbstractProduct 相应的接口。 

图 4:用于编程 Spark 作业的抽象工厂、模板方法和策略模式

类的实现细节如图 5 所示。请注意,我们将可序列化SparkSession对象保存在SparkSessionContainer. 使用 SparkSession 的所有其他类都扩展了 SparkSessionContainer。 

图 5:在我们的案例中如何实现具体的工作和具体的工厂

另请注意,SimpleJobFactory 和 BasicJobPlan 已经包含用于构建作业 ( SimpleJobFactory.make()) 和运行作业 ( BasicJobPlan.run()) 的框架方法。这些类充当模板方法模式的抽象类。更精细的工作工厂和工作计划扩展了这些课程。

实际上,要创建工作 (B) 计划,我们需要在工作 (A) 计划中添加第二个加载器、第二个保存器和另一个转换器(联合)。另外,要创建作业(B)具体工厂,我们需要“装饰”SimpleJobFactory.make()方法以添加第二个加载器,添加联合转换器,并将基本保存器替换为空的,以通过新的(第二个)保存器保存数据. 我们可以在每个步骤中更改作业计划保存程序、转换器和加载程序,因为我们可以在方法的每个步骤中访问作业ConcreteFactory.make()。如何做到这一点如图 5 所示。 

请注意,我们的 SimpleJobFactory 依赖于一个I extends IJobPlan通用接口来利用 SimpleJobFactory 的方法和字段来制定更详细的工作计划。这些更复杂的作业计划接口依赖于加载器、转换器和保存器接口。Loaders、transformers 和 savers 实现了一个 Strategy 模式,所以我们更容易切换不同的实现。 

为了避免“任务不可序列化”异常,我们应该只将可序列化对象作为类字段。不可序列化的对象应该被创建为方法内部的局部变量。详情见我之前的帖子。 

要运行这些作业,我们需要创建一个 Spark 驱动程序(请参阅第 1 部分以查看 Spark 术语),其中我们创建一个 SparkSession 实例、设置数据源的配置、创建具体工厂、创建具体作业并运行这些作业。

爪哇
1
公共  JsonDriver {  JsonDriver {
2
    公共 静态 无效 主要(字符串[]参数){公共 静态 无效 主要(字符串[]参数){
3
          最终 SparkSession 火花 =  SparkSession。建设者()      最终 SparkSession 火花 =  SparkSession。建设者()
4
                    . appName ( "JSON 行到数据框" )                . appName ( "JSON 行到数据框" )
5
                    . 大师(“本地”                . 大师(“本地”
6
                    . 获取或创建();                . 获取或创建();
7
          最终 字符串 路径 =  "src/main/resources/multiline.json" ;      最终 字符串 路径 =  "src/main/resources/multiline.json" ;
8
          final  String  path2  =  "src/main/resources/multiline2.json" ;      final  String  path2  =  "src/main/resources/multiline2.json" ;
9
          映射<字符串字符串> 选项 =  哈希映射< >();      映射<字符串字符串> 选项 =  哈希映射< >();
10
          选项。放(“路径”,路径);      选项。放(“路径”,路径);
11
          选项。放(“路径2”,路径2);      选项。放(“路径2”,路径2);
12
          选项。放(“jobName”“Job_1”);      选项。放(“jobName”“Job_1”);
13
          
14
          IFactory  factory  =  new  TwoJobFactory ( spark , options );      IFactory  factory  =  new  TwoJobFactory ( spark , options );
15
          工厂。制作();      工厂。制作();
16
          工厂。获取工作()。运行();           工厂。获取工作()。运行();     
17
​​​
18
    }}

在这里,Spark Driver 充当客户端,以抽象工厂模式配置具体工厂。整个应用程序只有一个 SparkSession。会话提供给 SparkSessionContainer 以供容器的子级访问。应用程序在本地模式下运行并打印两个数据集的并集。

结论

在这篇文章中,我演示了如何使用 GoF 抽象工厂、模板方法和策略模式来使 Spark 应用程序代码模块化、可读、可升级和可序列化。我希望这对您的 Spark 项目有所帮助。


网站公告

今日签到

点亮在社区的每一天
去签到