- Spark大数据处理与分析
- 雷擎编著
- 1914字
- 2021-03-27 00:15:41
3.3 共享变量
共享变量是许多函数和方法必须并行使用的变量,可以在并行操作中使用。通常,当Spark操作(如map()或reduce())在远程集群节点上执行时,函数被传递到操作中,而且函数需要的变量会在每个节点任务上复制副本,所以Spark操作可以独立执行。这些变量被复制,并且远程计算机上的变量更新都不会传回驱动程序。如果在各个任务之间支持通用的读写共享变量,则效率很低。但是,Spark实现了两种常用模式,提供了有限类型的共享变量:广播变量和累加器。
3.3.1 广播变量
广播变量是所有执行程序之间的共享变量,是在驱动程序中创建的,然后在执行程序上是只读的。广播变量使Spark的操作可以在每台计算机上保留一个只读变量,而不用将其副本与任务一起发送,可以使用它们以有效的方式为每个节点提供大型输入数据集的副本。
可以在Spark集群中广播整个数据集,以便执行器可以访问广播的数据。执行器中运行的所有任务都可以访问广播变量。广播使用各种优化的方法使广播数据可供所有执行器访问。因为广播的数据集的尺寸可能很大,这是要解决的重要挑战。执行器通过HTTP连接和最新的组件提取数据,类似于BitTorrent,数据集本身像洪流一样快速地分布到集群中。这使扩展性更强的方法可以将广播变量分发给所有执行程序,而不是让每个执行器一个接一个地从驱动程序中提取数据,当有很多执行器时,这可能导致驱动程序发生故障。
Spark的动作是通过一组阶段执行的,这些阶段被分布式洗牌操作分割。Spark自动广播每个阶段中任务所需的通用数据。以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前反序列化。这意味着,仅当跨多个阶段的任务需要相同数据,或以反序列化形式缓存数据非常重要时,显式创建广播变量才有用。广播变量是通过调用SparkContext.broadcast(v)方法,从变量v创建的。广播变量是变量v的包装,可以通过调用value()方法访问其值。让我们看看如何广播一个Integer变量,然后在执行器上执行的转换操作中使用广播变量。
广播变量也可以不仅在原始数据类型上创建,如下面的示例所示,将从驱动程序广播HashMap。
广播变量确实会占用所有执行器的内存,而且取决于广播变量中包含的数据大小,这有时可能导致资源问题。有一种方法可以从所有执行程序的内存中删除广播变量。在广播变量上调用unpersist()会从所有执行器的缓存中删除广播变量的数据,以释放资源。如果再次使用该变量,则数据将重新传输给执行器,以便再次使用。下面是如何在广播变量上调用unpersist()的示例。调用unpersist()后,如果再次访问广播变量,将在后台照常工作,执行器再次为该变量提取数据。
还可以销毁广播变量,将其从所有执行器中完全删除,并且驱动程序也无法访问它们。这对于在整个集群中最佳地管理资源非常有帮助。在广播变量上调用destroy()会破坏与指定广播变量相关的所有数据和元数据。广播变量一旦销毁,将无法再次使用,必须重新创建。以下是销毁广播变量的示例。
3.3.2 累加器
累加器是执行器之间的共享变量,通常用于向Spark程序添加计数器。累加器是仅通过关联和交换操作进行累加的变量,因此可以有效地被并行操作支持,可用于实现计数器(如MapReduce中的计数器)或求和。Spark本机支持数字类型的累加器,也可以添加对新类型的支持。我们可以创建命名或未命名的累加器。一个已命名的累加器将在Web UI中显示。以下是使用Spark Context和longAccumulator函数创建和使用整数累加器的示例,以将新创建的累加器变量初始化为零。随着累加器在map()转换中的使用,累加器也会增加。操作结束时,累加器的值为351。
内置的累加器可用于许多用例,其中包括:
(1)LongAccumulator:用于计算64位整数的和、计数和平均值。
(2)DoubleAccumulator:用于计算双精度浮点数的总和、计数和平均值。
(3)CollectionAccumulator[T]:用于收集元素列表。
尽管上面的例子使用了内置支持的整数类型累加器,我们也可以通过将AccumulatorV2子类化创建自己的类型。AccumulatorV2抽象类具有几种必须重写的方法:reset()用于将累加器重置为零;add()用于将另一个值添加到累加器;merge()将另一个相同类型的累加器合并到该方法中。API文档中包含其他必须重写的方法。接下来,看一个自定义累加器的实际示例。同样,我们将为此使用statesPopulation CSV文件。我们的目标是在自定义累加器中累加年份和人口总数。
步骤1:导入包含AccumulatorV2类的软件包。
步骤2:包含年份和人口的案例类别。
步骤3:StateAccumulator类扩展了AccumulatorV2。
步骤4:创建一个新的StateAccumulator并将其注册到SparkContext。
步骤5:将statesPopulation.csv阅读为RDD。
步骤6:使用StateAccumulator。
步骤7:现在可以检查StateAccumulator的值。
上面的步骤是自定义累加器的分步骤讲解,其中包括数据的提取、累加器的定义、执行和结果输出,可以将上面每一步的代码汇总到一个文件中。在本教程的虚拟实验环境中集成的上述代码,可以在/data/code/AccumulatorsExample.txt中找到。可以通过Scala的交互界面调用和执行这个文件中的代码,查看运行结果。
本节研究了累加器以及如何构建自定义累加器。因此,使用前面的示例,可以创建复杂的累加器,以满足需求。