3.5 案例分析

表3-1显示了/data/sfpd.csv文件中的字段和说明,并且示范了保存在字段中的数据。该数据集是从2013年1月至2015年7月期间某市公安局的报案记录信息。本节将探讨分析这些数据,回答诸如哪个区域的报案记录最多,以及哪个报案类别数量最多等问题。

表3-1 案例数据项说明

通过案例分析,熟悉怎样使用Spark键值对RDD的操作,Spark为键值对的RDD提供特殊操作。键值对RDD在许多程序中是一个有用的构建块,因为它们允许并行地对每个键执行操作,或者通过网络重新组合数据。例如,键值对RDD具有reduceByKey()方法,可以分别为每个键聚合汇总数据,以及用一个join()方法可以通过使用相同的键对元素进行分组将两个RDD合并在一起。可以从原始数据中提取,例如事件时间、客户ID或其他标识符,作为键值对RDD中的键。

3.5.1 检查事件数据

首先,快速回顾第2章讲的内容,使用Spark交互界面加载数据,创建RDD并应用转换和操作。首先通过定义变量映射输入字段:

代码3-49

使用SparkContext的textFile()方法加载CSV文件,应用map()进行转换,同时使用split()分割每行数据中的字段。

代码3-50

使用第2章学到的方法,检查sfpd中的数据。

· sfpd中的数据是什么样子的?

代码3-51

· 报案记录总数是多少?

代码3-52

· 报案记录的类别是什么?

代码3-53

在下面的操作中,定义代码创建bayviewRDD,直到添加一个动作才会计算出来。filter转换用于过滤sfpd中包含“BAYVIEW”字符的所有元素。转换的其他示例包括map()、filter()和distinct()。

代码3-54

语法说明contains()方法返回true或false,判断集合中是否包含输入参数。

代码3-55

当运行一个动作命令时,Spark将加载数据创建输入RDD,然后计算任何其他定义RDD的转换和动作。在这个例子中,调用count()动作将导致数据被加载到sfpd中,应用filter()转换,然后计数。

代码3-56

3.5.2 reduceByKey和groupByKey

从现有常规RDD创建键值对RDD有许多方法,最常见的方法是使用map()转换。创建键值对RDD的方式在不同的语言中是不同的。在Python和Scala中,需要返回一个包含元组的RDD。

代码3-57

在这个例子中,通过应用map()操作将sfpd转变为一个名为incByCat的键值对RDD,得到的RDD包含如上面代码所示的元组。现在,可以使用sfpd数据集得到一些问题的答案。

· 哪三个地区(或类别,或地址)的报案记录数量最多?

数据集sfpd中的每一行记录一个事件相关信息,想计算每个区域的报案记录数量,即数据集中这个地区出现多少次,第一步是从sfpd上创建一个键值对RDD。

代码3-58

如上,可以通过在sfpd上应用map()转换实现。map()转换导致sfpd变换成为由(PdDistrict,1)元组组成的键值对RDD,其中每个元素表示在一个地区发生的一个报案记录。当创建一个键值对RDD时,Spark会自动添加一些特定用于键值对RDD的其他方法,如reduceByKey()、groupByKey()和combineByKey()等,这些转换最常见的特征是分布式洗牌操作,需要按键进行分组或聚合。另外,还可以在键值对RDD之间进行转换,如join()、cogroup()和subtractByKey()等。键值对RDD也是一种RDD,因此也支持与其他RDD相同的操作,如filter()和map()。

通常,想要查找具有相同键的元素之间的统计信息,reduceByKey()就是这样一个转换的例子,它运行多个并行聚合操作,每个并行操作对应数据集中的一个键,其中每个操作聚合具有相同键的值。reduceByKey()返回一个由每个键和对应计算值聚合组成的新RDD。

代码3-59

在上面的代码中,reduceByKey()转换在由元组(PdDistrict,1)组成的键值对RDD上应用了一个简单的求和操作(x,y)=>x+y,它返回一个新的RDD,其包含每个区域和这个区域的报案记录总和。可以在键值对RDD进行排序,前提条件是在键值对RDD上定义了一个排序操作sortByKey(),表示按照键排序。一旦在键值对RDD上进行排序,任何将来在其上的collect()或save()动作调用将导致对数据进行排序。sortByKey()函数接受一个名为ascending的参数,如果其值为true(默认值),则表示结果按升序排列。

代码3-60

在代码3-60的示例中,将reduceByKey()的结果应用到另一个map(x=>(x._2,x._1))操作。这个map()操作是切换每个元组中元素之间的前后顺序,即互换地区和事件总和的放置顺序,得到一个元组包含总计和地区(sum,district)的数据集,然后将sortByKey()应用于此数据集对每个区域的报案记录数量进行排序。由于想要前3名,因此需要的结果是降序。将值false传递给sortByKey()的ascending参数,要返回前3个元素,则使用take(3),这将会给出前3个元素。也可以使用另外一种方法实现。

代码3-61

在RDD上使用top()方法。top()方法的第一个参数为3,表示前3个元素;两个参数为隐式参数,定义按照元组的第二个值进行排序。Ordering.by(_._2)表示使用元组中的第二个元素排序。

· def top(numInt)(implicit ordOrdering[T]):Array[T]

按照默认(降序)或者根据指定的隐式Ordering[T]定义,返回前num个元素,其排序方式与takeOrdered相反。

语法解释scala.math.Ordering的伴随对象定义了许多隐式对象,以处理AnyVal(如Int、Double)、String和其他类型的子类型。要按一个或多个成员变量对实例进行排序,可以使用内置排序Ordering.by和Ordering.on。

代码3-62

在Ordering.by[(String,Int,Int),Int](_._2)中,[(String,Int,Int),Int]分别表示三元组的类型(String,Int,Int)和返回值的类型Int;(_._2)表示使用第二个值进行排序。在Ordering[(Int,String)].on(x=>(x._3,x._1))中,(Int,String)表示返回值的类型;(x=>(x._3,x._1))表示先使用第三个值排序,然后使用第二个值排序。通过指定compare(a:T,b:T)实现Ordering[T],用来决定如何对两个实例a和b进行排序。scala.util.Sorting类可以使用Ordering[T]的实例对Array[T]的集合进行排序,例如:

scala.math.Ordering和scala.math.Ordered都提供了相同的功能,但是方式不同。可以通过扩展Ordered给T类型一种单独的排序方式。使用Ordering可以用许多其他方式对同一类型进行排序。Ordered和Ordering都提供隐式值,它们可以互换使用。可以导入scala.math.Ordering.Implicits访问其他隐式排序。

现在用另一种答案回答上面的问题。报案事件次数最多的三个地区是哪里?可以使用groupByKey(),而不是reduceByKey()。groupByKey()将具有相同键的所有对应值分成一组,它不需要任何参数,其结果返回键值对数据集,其中值是迭代列表。

代码3-63

上面的代码将groupByKey()应用于由元组(PdDistrict,1)组成的数据集上,然后将另一个map()转换应用于groupByKey()转换的结果上,并返回区域键和迭代列表的长度。也可以使用如下代码完成相同的功能:

代码3-64

同时,使用groupByKey()和reduceByKey()实现了相同的功能,但是它们的内部实现是有区别的,接下来将看到使用groupByKey()和使用reduceByKey()的区别。在上面的代码中,第一个map的转换结果是一个键值对RDD,由(PdDistrict,1)组成。groupByKey()将具有相同键的所有值组合成列表,从而产生一个新的键值对,每个元素由键和列表组成。实际上,groupByKey()将具有相同键的所有对应值分组到Spark集群上的同一个节点上。在处理大型数据集时,groupByKey()这种分组操作会导致网络上大量不必要的数据传输。当被分组到单个工作节点上的数据不能全部装载到此节点的内存中时,Spark会将数据传输到磁盘上。但是,Spark只能一次处理一个键的数据,因此,如果单个键对应的数据大大超出内存容量,则将存在内存不足的异常。

groupByKey()是在全部的节点上聚合数据,而reduceByKey()首先在数据所在的本地节点上自动聚合数据,然后洗牌后数据会再次汇总。reduceByKey()可以被认为是一种结合,就是同时实现每个键对于值的聚合和汇总,这比使用groupByKey()更有效率。洗牌操作需要通过网络发送数据,reduceByKey()在每个分区中将每个键的对应聚合结果作为输出,以减少数据量,从而获得更好的性能。一般来说,reduceByKey()对于大型数据集尤其有效。为了方便理解,下面通过另一个简单的例子说明。

代码3-65

虽然两个函数都能得出正确的结果,但reduceByKey()函数更适合使用在大数据集上。

3.5.3 三种连接转换

使用键值对的一些最有用的操作是将其与其他类似的数据一起使用,最常见的操作之一是连接两键值对RDD,可以进行左右外连接(LEFT AND RIGHT OUTER)、交叉连接(CROSS)和内连接(INNER),如图3-4所示。

join是内连接,表示在两个键值对RDD中同时存在的键才会出现在输出结果中。leftOuterJoin是左连接,输出结果中的每个键来自源键值对RDD,即连接操作的左边部分;rightOuterJoin类似于leftOuterJoin,是右连接,输出结果中的每个键来自连接操作的右边部分。在这个例子中,PdDists是键值对RDD,其中键为PdDistrict,值为Address。

图3-4 几种join转换

代码3-66

CatRes是另一键值对RDD,其中键为PdDistrict,值为由Category和Resolution组成的元组。

代码3-67

由于键PdDistrict存在于两个RDD中,join转换的输出格式如下所示,其输出结果也是一个键值对RDD,由(PdDistrict,(Address,(Category,Resolution)))组成。

代码3-68

leftOuterJoin返回另一个键值对RDD,其中的键全部来自PdDists。

代码3-69

rightOuterJoin返回另一个键值对RDD,其中的键全部来自CatRes。

代码3-70

正如看到的,除表示方式上,这三种情况下的结果是相同的,因为PdDists和CatRes具有相同的键集合。在另一示例中,PdDists保持不变,定义另一键值对IncCatRes,是以IncidntNum作为键,值是由Category、Descript和Resolution组成的元组。

代码3-71

使用join转换的结果是返回一个空集合,因为两个键值对没有任何共同的键。

代码3-72

3.5.4 执行几个动作

所有动作都可用于键值对RDD,但是以下几个动作仅适用于键值对RDD。

 def countByKey():Map[K,Long]

将统计每个键的元素总数,仅当预期的结果Map集合较小时,才使用此方法,因为整个操作都已加载到驱动程序的内存中。要处理非常大的结果,请考虑使用rdd.mapValues(_=>1L).reduceByKey(_+_),它返回RDD[T,Long],而不是Map集合。

 def collectAsMap():Map[K,V]

将结果作为Map集合以提供简单的查找,将此RDD中的键值对作为Map集合返回给驱动程序,如果同一键有多个值,则每个键中仅保留一个值。仅当预期结果数据较小时,才使用此方法,因为所有数据均加载到驱动程序的内存中。

 def lookup(key:K):Seq[V]

返回RDD中键值的列表。如果RDD有一个可知的分区器,仅通过搜索键映射到的分区就可以有效地完成此操作。

例如,如果只想按区域返回报案记录总数,可以在由元组(PdDistrict,1)组成的键值对RDD上执行countByKey()。

代码3-73

仅当结果数据集大小足够小以适应内存时,才能使用此操作。

3.5.5 跨节点分区

本节将使用RDD的分区学习如何控制跨节点的数据洗牌。在分布式环境中,如何布置数据会影响性能。最小化网络流量的数据布局可以显著提高性能。Spark RDD中的数据分为几个分区。可以在Spark程序中控制RDD的分区,以提高性能。分区在所有应用程序中不一定有帮助。如果有多次重复使用的数据集,则分区是有用的,但是,如果数据集只是被扫描一次,则不需要对数据集的分区进行特别设置。创建具有特定分区的RDD有两种方法:

(1)在RDD上调用partitionBy(),提供一个显式的分区器。

(2)在转换中指定分区器,这将返回进行了分区的RDD。

partitionBy()是一个转换操作,并使用指定的分区器创建RDD。要创建RangePartitioner,需要指定所需的分区数,并提供一键值对RDD。

代码3-74

HashPartitioner需要传递一个定义分区数的参数。

代码3-75

使用键的哈希值将用于分配分区。如果相同的键比较多,其哈希值相同,则数据会大量集中在某个分区中,会出现数据分布不均匀的现象,可能遇到部分集群空闲的情况。使用哈希分区的键值对必须可以是哈希的。使用分区时,会创建一个洗牌作业,应该保留partitionBy()的结果,以防止每次使用分区RDD时进行重新安排。

在键值对RDD上的大量操作一般会接受附加分区参数,如分区数、类型或分区。RDD上的一些操作自动导致具有已知分区器的RDD。例如,默认情况下,当使用sortByKey时,使用RangePartitioner,并且groupByKey使用的默认分区器是HashPartitioner。要在聚合和分组操作之外更改分区,可以使用repartition()或coalesce()函数。

 def repartition(numPartitions:Int)(implicit ord:Ordering[T]=null):RDD[T]

返回具有numPartitions个分区的新RDD,可以增加或减少此RDD中的并行度。使用洗牌机制重新分配数据。如果要减少此RDD中的分区数,请考虑使用coalesce(),这样可以避免执行洗牌操作。

 def coalesce(numPartitions:Int,shuffle:Boolean=false,partitionCoalescer:Option[PartitionCoalescer]=Option.empty)(implicit ord:Ordering[T]=null):RDD[T]

这导致狭窄的依赖性,例如,从1000个分区变成100个分区,则不会进行洗牌,而是100个新分区中的每一个将分配当前分区中的10个。如果请求更多的分区,将保持当前的分区数量。

重新分区数据的代价可能相当昂贵,因为重新分区将数据通过全局洗牌生成新的分区。需要确保在使用合并时指定较少的分区数。首先使用partition.size确定当前的分区数。在这个例子中,将reduceByKey()应用于RDD,指定分区数作为参数。

要找出分区数,可使用rdd.partitions.size。在实例中,dists通过reduceByKey传递分区数参数为5,然后应用coalesce()指定减少分区数,在这种情况下为3。

在Scala和Java中,可以使用RDD上的partitioner属性查看RDD的分区方式。回看之前使用的示例,当运行命令partrdd1.partitioner时,它返回分区类型,在这种情况下为RangePartitioner。

代码3-76

由于对RDD上的许多操作需要通过网络传输数据,通过按键进行洗牌操作,因此分区可以对许多操作提高性能。对于诸如reduceByKey()的操作,当进行预分区时,这些值将在本地节点上进行计算,然后将每个节点的最终结果发送回驱动程序。如果对两个键值对RDD进行操作,当进行预分区时,至少有一个RDD带有已知分区器,不会被重新洗牌,如果两个RDD都具有相同的分区器,则不会在网络上进行洗牌操作。为输出RDD设置分区器的操作包括cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、partitionBy()、sort(),而所有其他操作将产生没有分区器的结果。如果父RDD具有分区器,则mapValues()、flatMapValues()和filter()等操作将导致在输出RDD上设置分区器。如果分区的数量太少,即太少的并行性,Spark将使资源闲置。如果分区太多或并行性太高,则可能影响性能,因为每个分区的开销可以相加,产生总开销。使用repartition()随机地将现有的RDD进行洗牌,以重新分区,可以使用coalesce()代替repartition()完成减少分区数量的作业,因为其可以避免洗牌。对于coalesce(N),如果N大于当前的分区,则需要传递shuffle=true给coalesce();如果N小于当前的分区(即缩小分区),则不要设置shuffle=true,否则会导致额外的、不必要的洗牌。