3.2 分区和洗牌

我们已经了解了Apache Spark如何比Hadoop更好地处理分布式计算,还看到了内部工作原理主要是称为弹性分布式数据集的基本数据结构。RDD是代表数据集的不可变集合,并具有可靠性和故障恢复的内在能力。实际上,RDD对数据的操作不是基于整个数据块,数据分布于整个集群的分区中,通过RDD的抽象层管理和操作数据。因此,数据分区的概念对于Apache Spark作业的正常运行至关重要,并且会对性能产生很大影响,决定了资源的利用情况。本节将深入讨论分区和洗牌的概念。

RDD由数据分区组成,基于RDD的所有操作都在数据分区上执行,诸如转换之类的几种操作是在执行器上运行的函数,特定的数据分区也在此执行器上。但是,并非所有操作过程都可以仅由所在的执行器包含的数据分区孤立完成,像聚合这样的操作要求将数据跨节点移到整个集群中。在下面的简单整数RDD的操作中,SparkContext的parallelize()函数根据整数序列创建RDD,然后使用getNumPartitions()函数可以获得该RDD的分区数。

分区的数量很重要,因为该数量直接影响将要运行RDD转换的任务数量。如果分区的数量太少,那么大量数据可能仅使用几个CPU内核,从而降低性能,并使集群利用率不足。另一方面,如果分区的数量太大,那么将使用比实际需要更多的资源,并且在多用户的环境中,可能导致正在运行的其他作业的资源匮乏。如果要查看CPU的核数,可以使用下面的命令:

3.2.1 分区

Spark的分区是存储在集群中节点上的原始数据块,即逻辑划分。RDD是这种分区的集合,通过RDD的抽象概念隐藏了正在处理的分段数据。这种分区结构可帮助Spark实现并行化分布式数据处理,并以最小的网络流量在执行程序之间发送数据。

分区的数量对于一个良好的集群性能来说非常重要。如果有很少的分区,将不能充分利用集群中的内存和CPU资源,因为某些资源可能处于空闲状态。例如,假设有一个Spark集群具有10个CPU内核,一般来说,一个CPU内核负责一个分区的计算,在这种情况下如果有少于10个分区,那么一些CPU内核将处于空闲状态,所以会浪费资源。此外,由于分区较少,每个分区中就会有更多的数据,这样会造成集群中某些节点内存增加的压力。另一方面,如果有太多的分区,那么每个分区可能具有太少的数据或根本没有数据,也可能降低性能,因为集群中的数据分区可能是跨节点的,从多个节点上汇总分区中的数据需要更多的计算和传输时间。因此,根据Spark集群配置情况设置合适的分区非常重要。Spark只能一次为RDD的每个分区分配运行一个并发任务,一次最多的并发任务为集群中的最大CPU内核数。所以,如果有一个10核CPU的集群,那么至少要为RDD定义10个分区,分区总数一般为内核的2~4倍。默认情况下,Spark会创建等于集群中CPU内核数的分区数,也可以随时更改分区数。下面的例子创建了具有指定分区数的RDD。

代码3-42

正如代码中看到的,regularRDD的默认分区数量等于24,这是由于当前环境是通过本地模式启动的spark-shell,本地模式是在具有24核CPU的Docker虚拟实验环境中运行。如果在创建RDD时指定了分区数48,regularRDD的分区就变成48。在创建RDD时,第二个参数定义了为该RDD创建的分区数。一个分区从不跨越多台机器,即同一分区中的所有元组都保证在同一台机器上。集群中的每个工作节点都可以包含一个或多个RDD的分区。分区总数是可配置的,默认情况下,它等于所有执行器节点上的内核总数。

Spark提供了两个内置分区器,分别是哈希分区器和范围分区器。创建RDD时,可以通过两种方式指定特定的分区器:一种方式是通过在RDD上调用partitionBy()方法提供显式指定的分区器;另一种方式是通过转换操作返回新创建的RDD,其使用转换操作特定的分区器。带有分区器的转换操作有join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、cogroup()、foldByKey()、combineByKey()、sort()、partitionBy()、groupWith();另外,mapValues()、flatMapValues()和filter()的分区方式与父级RDD有关。而像map()这样的操作会导致新创建的RDD忘记父分区信息,因为像这样的操作理论上可以修改每个记录的键,所以,在这种情况下如果操作在结果RDD中保留了分区器,则不再有任何意义,因为现在的键都是不同的。所以,Spark提供像mapValues()和flatMapValues()这样的操作,如果不想改变键,可以使用这些操作,从而保留分区器。partitionBy()是一个转化操作,因此它的返回值总是一个新的RDD,但它不会改变原来的RDD。RDD一旦创建,就无法修改,因此应该对partitionBy()的结果进行持久化。如果没有将partitionBy()转化操作的结果持久化,那么后面每次用到这个RDD时,都会重复对数据进行分区操作。不进行持久化会导致整个RDD谱系图重新求值。那样的话,partitionBy()带来的好处就会被抵消,导致重复对数据进行分区以及跨节点的混洗,和没有指定分区方式时发生的情况十分相似。

哈希分区是Spark中的默认分区程序,通过计算RDD元组中每个键的哈希值工作,具有相同哈希码的元素最终都位于相同的分区中,如以下代码片段所示。

如果键相同,则其hashcode的结果相同,其对应的值保存在相同的分区上。哈希分区是Spark的默认分区器。如果没有提到任何分区器,那么Spark将使用哈希分区器对数据进行分区。下面的例子便于更好地理解以上内容。

代码3-43

 def glom():RDD[Array[T]]

glom()方法将分区中的数据封装为数组,并将这些分区数组嵌入一个数组中。每个返回的数组都包含一个分区的内容,(4,16)和(4,64)的键都是4,所以在同一个分区中;(1,1)和(2,4)的键分别为1和2,所以在不同的分区中。

如果RDD的键是可排序的,则范围分区器可以基于键的范围进行划分。由于范围分区器必须知道任何分区的开始键和结束键,因此在使用范围分区器之前,需要先对RDD进行排序。范围分区器首先需要基于RDD为分区分配合理的边界,然后创建一个从键到元素所属分区索引的函数,最后需要根据范围分区器重新划分RDD,以根据确定的范围正确分配RDD元素。看下面的例子:

代码3-44

哈希分区器已经能够满足绝大部分的情况,但是,由于键的数量分布可能不均匀,所以也会造成分区中的数据分布不均。如果键可以进行排序,则可以采用范围分区器,这样能保证各个分区之间的键是有序的,并且各个分区之间数据量差不多,但是不保证单个分区内键的有序性。范围分区器会将键切分成多段,每段对应一个分区,简单地说,就是将一定范围内的键映射到某一个分区内,划分各个分区键的范围采用的方法为水塘抽样算法。

虽然Spark提供的哈希分区器与范围分区已经能够满足大多数用例,但Spark还是允许通过提供一个自定义的分区对象控制RDD的分区方式,这可以通过在Spark中扩展默认的分区类定制需要的分区数量,以及定义存储在这些分区中的数据。要实现自定义的分区器,需要继承org.apache.spark.Partitioner类并实现下面三个方法:

(1)numPartitions:Int,定义需要创建的分区数。

(2)getPartition(key:Any):Int,输入参数为特定的键,返回特定的分区编号,从0到numPartitions-1。

(3)equals(),判断相等性的标准方法。这个方法的实现非常重要,Spark需要用这个方法检查分区器对象是否和其他分区器实例相同,这样,Spark才可以判断两个RDD的分区方式是否相同。

现在实现一个自定义分区器。

(1)定义分区器MyPartitioner。

(2)测试自定义分区器。自定义分区器可以从/data/code/MyPartitioner.txt中加载。

代码3-45

在这个自定义分区器MyPartitioner中,简单地将键为2的数据放到分区编号为0的分区中,其他数据放到分区编号为1的分区中。这个自定义分区器可以按哈希分区器和范围分区器的方式使用,只需要创建一个对象,并将其传递给partitionBy()方法。下面是作用在分区上的其他方法。

 def mapPartitions[U](f:(Iterator[T])⇒Iterator[U],preservesPartitioning:Boolean=false)(implicit arg0:ClassTag[U]):RDD[U]

这是一个专门的map()操作,每个分区函数f仅被调用一次,可以通过输入参数Iterarator[T]将各个分区的全部内容作为值的顺序流使用。自定义函数必须返回另一个Iterator[U]。合并的结果是迭代器将自动转换为新的RDD。reservesPartitioning指示输入函数是否保留分区器,除非这是一对RDD并且输入函数不修改键,否则应为false。

代码3-46

myfunc函数的作用是将分区中数据作为输入,例如分区(1,2,3),然后按顺序进行配对输出(2,3)和(1,2),由于myfunc函数以分区数据作为输入,所以最终的输出结果中缺少元组(3,4)和(6,7),如果查看a的分区数据,3和6分别为两个分区中的最后一个值,所以无法产生(3,4)和(6,7)元组。

 defmapPartitionsWithIndex[U](f:(Int,Iterator[T])⇒Iterator[U],preservesPartitioning:Boolean=false)(implicit arg0:ClassTag[U]):RDD[U]

与mapPartitions()类似,但是f函数需要参数(Int,Iterator[T]),第一个参数是分区编号,第二个参数是该分区中所有数据的迭代器,在应用f函数进行转换之后,输出是一个包含数据列表的迭代器Iterator[U]。

代码3-47

 foreachPartition(f:(Iterator[(K,C)])⇒Unit):Unit

为每个分区执行f函数,通过(Iterator[(K,C)])参数提供分区中的数据项,f函数没有返回值。

代码3-48

foreachPartition()方法属于动作操作,而mapPartitions()是转换操作。此外,在应用场景上的区别是mapPartitions()可以获取返回值,可以继续在返回的RDD上做其他操作,而foreachPartition因为没有返回值并且是动作操作,所以一般都用于将数据保存到存储系统中。

3.2.2 洗牌

Spark中的某些操作会触发一个洗牌事件,也称为Shuffle,这是Spark重新分配数据的机制,以便在不同分区之间进行数据分组。分布式系统的数据查找和交换非常占用系统的计算和带宽资源,所以,合理对数据进行布局可以最小化网络流量,大大提高性能。如果数据是键值对,则分区变得非常必要,因为在RDD的转换中,整个网络中数据的洗牌是相当大的。如果相同的键或键范围存储在相同的分区中,则可以将洗牌最小化,并且处理实质上会变得很快。这可能导致洗牌的操作包括重新分区,如repartition()和coalesce();带有ByKey的操作,如groupByKey()和reduceByKey(),但是不包括countByKey();以及连接操作,如cogroup()和join()。

实际上,洗牌是消耗资源的操作,因为它涉及磁盘的读写、数据序列化和网络传输。为了组织和汇总数据,Spark生成一组任务,包括映射任务重新分配数据,以及一组聚合任务来汇总数据。对内部机制来说,来自单个映射任务的结果会保存在内存中,直到内存不足为止。然后,根据目标分区进行排序并写入单个文件中,而聚合任务读取相关的排序块。这通常涉及在执行器和机器之间复制数据,使得洗牌成为复杂而耗费系统资源的操作。为了理解在洗牌过程中会发生什么,可以考虑执行一个reduceByKey()方法的操作过程。reduceByKey()操作需要生成一个新的RDD,其中的数据是进行了聚合操作的键值对元组,其中的值是将每个键对应的所有值进行聚合计算产生的结果。这个过程面临的挑战是:并非每个键的所有值都同时位于同一个分区,甚至是不在同一台计算机上,而这种分布可能是随机的,但必须在整个Spark集群中收集所有这些值,然后进行聚合计算。所以,洗牌的作用是将原来随机存储在分区中的数据根据聚合的要求重新存放,保证在聚合计算时具有相同键的元组可以在同一分区,减少聚合计算时查找和传输元组需要的计算成本和带宽。

在Spark中,通常不会因为特定操作将数据跨分区分布在一个指定的位置。在计算过程中,Spark需要执行全部操作并且将其分成多个任务,单个任务将在单个分区上运行,因此Spark要负责组织reduceByKey()执行单个聚合任务的所有数据,必须从所有分区中读取,以找到键对应的所有值,然后将各分区中的值汇总,以计算每个键的最终聚合结果,这个移动数据的过程称为洗牌。虽然执行新的洗牌后,每个分区中的元素集合都是确定性的,而且分区本身的排序也是确定性的,但是分区中的元素排序是不确定的,如果希望洗牌后数据可以按照预设的顺序排序,那么可以使用mapPartitions()对每个分区进行排序,例如使用.sorted;使用repartitionAndSortWithinPartitions()在进行重新分区的同时,有效地对分区进行分类;或者使用sortBy()对全局RDD进行排序。

正如Spark API中所写,repartitionAndSortWithinPartitions()比先调用repartition()然后在每个分区内进行排序有效,因为它可以将排序过程推入洗牌的机制中。可以看到,repartitionAndSortWithinPartitions()主要通过给定的分区器将相同键的元组发送到指定分区,并根据键进行排序,我们可以按照自定义的排序规则进行二次排序。二次排序模式是指先按照键分组,然后按照特定的顺序遍历数。

某些洗牌操作会消耗大量堆内存,因为它们在传输前后使用内存中的数据结构组织记录。具体而言,reduceByKey()和aggregateByKey()在进行映射转换时创建数据的结构,而在进行聚合动作时产生数据。当数据不适合存储在内存中时,Spark会将这些数据溢出到磁盘中,从而增加了磁盘I/O的额外开销和垃圾回收。洗牌还会在磁盘上生成大量的中间文件。从Spark 1.3开始,这些文件将被保留,直到相应的RDD不再使用并被垃圾收集为止。这样做是为了在重新计算定义RDD的谱系时不需要重新创建洗牌文件。如果应用程序保留对这些RDD的引用或者垃圾回收未频繁引入,垃圾收集可能在很长一段时间后才会发生。这意味着,长时间运行的Spark作业可能消耗大量的磁盘空间。在配置Spark上下文时,临时存储目录spark.local.dir由配置参数指定,这部分内容在性能优化的章节会继续说明。