2.2 认识RDD

Spark利用RDD实现更快、更有效的Map和Reduce操作,同时也解决了MapReduce操作效率不高的问题。在之后的内容中,统一将其称为RDD。本节将重点介绍RDD的概念和具体操作。

由于复制、序列化和磁盘读写,基于MapReduce框架的数据共享速度很慢。对于传统的大多数Hadoop应用程序,它们花费超过90%的时间进行HDFS读写操作。而基于Spark框架的大数据计算都是以RDD为基础的,支持在内存中处理计算。这意味着,Spark可以将对象的状态存储在内存中,并且对象的状态可以在作业之间共享,而在内存中实现数据共享比网络和磁盘快10~100倍。RDD是Spark的基础数据结构,是一个不可变的分布式对象集合。RDD中的每个数据集分为逻辑分区,可以分布在Spark集群不同节点的内存上,实现分布式计算。RDD可以包含任何类型的Python、Java或Scala对象,以及用户定义的类。通常,可以将RDD看作一个只读分区的记录集合,可以通过读取保存在磁盘的数据,或操作其他RDD中的数据创建新RDD,而原来的RDD是不能修改的。RDD按照分区划分,并且具有容错机制,总体来说,RDD是一个可以实现并行操作的容错集合。将RDD看作一个只读分区的记录集合如图2-6所示。

图2-6 将RDD看作一个只读分区的记录集合

RDD代表弹性分布式数据集,是Spark的基本数据结构,是对象的不可变集合,它们在集群的不同节点上进行计算。什么是弹性分布式数据集?弹性表示借助RDD谱系图(DAG)容错,因此能够重新计算由于节点故障而丢失或损坏的分区;由于数据驻留在多个节点上,因此是分布式的;数据集表示使用的数据记录,可以通过JDBC在外部加载数据集,这些数据集可以是JSON文件、CSV文件、文本文件或数据库,而无须特定的数据结构。因此,RDD中的每个数据集都在逻辑上跨许多服务器进行分区,因此可以在集群的不同节点上进行计算。RDD是容错的,即在故障情况下具有自我恢复的能力。

在Spark中创建RDD有几种方法,包括从稳定存储中的数据和其他RDD,以及并行化驱动程序中已经存在的集合创建。RDD也可以缓存并手动分区,当多次使用RDD时,缓存RDD可以提高运行的速度。手动分区对于正确平衡分区很重要。通常,较小的分区允许在更多执行程序之间更均匀地分配RDD数据,因此更少的分区使工作变得容易。用户还可以调用persist()方法指示他们希望在将来的操作中重用哪些RDD。默认情况下,Spark将持久化的RDD保留在内存中,但是如果没有足够的RAM,也可能会将其溢出到磁盘上。用户还可以请求其他持久化策略,例如将RDD仅存储在磁盘上或在计算机之间复制RDD。

Spark定义RDD概念的目的主要是迭代算法、交互式数据挖掘。分布式共享内存(Distributed Shared Memory,DSM)是一种非常通用的抽象,但是这种通用性使得在大量廉价的集群上以高效且容错的方式实现起来更加困难。另外,在分布式计算系统中,数据存储在中间稳定的分布式存储中,例如HDFS或Amazon S3。这使作业的计算变慢,因为它在此过程中涉及许多IO操作、复制和序列化。而如果将数据保留在内存中,可以将性能提高一个数量级。设计RDD的主要挑战是定义一个程序接口,以有效地提供容错能力。为了有效地实现容错能力,RDD提供了受限形式的共享内存,是基于粗粒度的转换,而不是基于细粒度的转换。Spark通过几种开发语言集成的API公开了RDD。在集成API中,每个数据集都表示为一个对象,并且使用这些对象的方法进行转换。Spark懒惰地评估RDD,只有在需要时才会被调用,这样可以节省大量时间并提高效率。一旦在RDD上执行动作,才会真正在RDD上执行数据转换。用户可以调用persist()方法声明他们希望在将来的操作中使用哪个RDD。

Spark生成初始的RDD有两种方法:一种是将驱动程序中的现有并行化集合,或者从外部存储系统中引用数据集,例如共享文件系统、HDFS、HBase,或者任何提供了Hadoop InputFormat的数据源。通过在驱动程序中的现有集合(对于Scala,此数据类型为Seq)上调用SparkContext的parallelize()方法创建并行化集合,集合中的元素被复制形成分布式数据集,可以进行并行操作。该方法用于学习Spark的初始阶段,因为它可以在交互界面中快速创建自己的RDD并对其执行操作。此方法很少在测试和原型制作中使用,因为如果数据量大,此方法无法在一台计算机上存储整个数据集。考虑以下sortByKey()的示例,要排序的数据通过并行化集合获取:

代码2-2

语法解释在Scala中,Seq特征代表序列。序列是Iterable类可迭代集合的特殊情况。与Iterable不同,序列总是具有被定义的元素顺序。序列提供了一种适用于索引的方法。指数的范围从0到序列的长度。序列支持多种方法查找元素或子序列的出现,包括segmentLength、prefixLength、indexWhere、indexOf、lastIndexWhere、lastIndexOf、startsWith、endsWith、indexOfSlice。

并行化集合中要注意的关键点是数据集切入的分区数。Spark将为集群的每个分区运行一个任务。对于集群中的每个CPU,需要2~4个分区。Spark根据集群设置分区数。但是,也可以手动设置分区数。这是通过将分区数作为第二个参数进行并行化实现的。例如sc.parallelize(data,10),这里手动给定分区数为10。再看一个示例,这里使用了并行化收集,并手动指定了分区数:

Spark可以从被Hadoop支持的任何存储源创建RDD,其中包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3等。Spark支持文本文件、SequenceFiles和任何其他Hadoop的InputFormat。通过文本文件创建RDD,可以使用SparkContext的textFile()方法创建,该方法通过地址获取文件,地址可以指向本地路径、Hadoop集群存储和云计算存储等,并将文件转换成行的集合。