2.5 案例分析

在Spark中,RDD是一个基本的数据抽象层,是分布式部署在集群中节点上的对象集合,一旦创建,它们是不可变的。可以在RDD上执行转换和动作两种类型的数据操作。转换是惰性的计算,即它们不会在被定义的时候立即计算,只有在RDD上执行动作时,转换才会执行。还可以在内存或磁盘上持久化或缓存RDD,同时RDD是容错的,如果某个节点或任务失败,则可以在其他节点上自动重建RDD,并且完成作业。本节内容将使用在线拍卖系统的数据。这些数据保存在本地文件系统中的CSV文件中,数据包含三种类型的产品:Xbox、Cartier和Palm。此文件中的每一行代表一个单独的出价。每个出价都有拍卖ID(AuctionID)、投标金额(Bid)、从拍卖开始的投标时间(Bid Time)、投标人(Bidder)、投标人评级(Bidder Rate)、拍卖开标价(Open Bid)、最终售价(Price)、产品类型(Item Type)和拍卖的天数(Days To Live,DTL)。每个拍卖都有一个拍卖代码相关联,可以拥有多个出价,每一行代表出价。对于每个出价,包含的信息见表2-2。

表2-2 案例数据说明

其中的数据如下所示:

2.5.1 启动交换界面

本节将学习实际操作怎样加载和检查数据,可以从现有集合或外部数据源创建RDD。当数据被加载到Spark中时,就创建一个RDD。通常可以将创建的第一个RDD称为输入RDD或基础RDD,意思是这是之后RDD转换的基础,将来的转换操作是在其之上进行的。我们使用Spark交互界面将拍卖数据加载到Spark中。Spark交互式界面(REPL)允许以交互方式写入和输出,其支持的语言分别为Scala、Python和R。当输入代码时,交互式界面提供即时反馈;当界面启动时,SparkContext和SparkSession对象被初始化,然后分别使用变量sc和spark定义这两个对象。

· REPL

REPL是指交互式解释器环境,其缩写分别表示R(read)、E(evaluate)、P(print)、L(loop),输入值,交互式解释器会读取输入内容并对它求值,再返回结果,并重复此过程。

2.5.2 SparkContext和SparkSession

用户可能会注意到当前的Spark交互界面中使用了SparkSession和SparkContext。在这里回顾一下Spark的历史,了解一下这两个对象的由来是很有必要的,因为将会在一段时间内经常使用这两个连接对象。在Spark 2.0之前,Spark Context是任何Spark应用程序的入口,用于访问所有的Spark功能,并且需要具有所有集群配置和参数的SparkConf创建SparkContext对象。我们可以使用Spark Context仅创建RDD,并且必须为任何其他Spark交互创建特定的Spark上下文。从Spark 2.0开始,SparkSession充当所有Spark功能的入口点。SparkContext提供的所有功能也都可以通过SparkSession获得。但是,如果有人喜欢使用SparkContext,还可以继续使用。HiveContext是SQLContext的超集,它可以做SQLContext可以做的事情以及其他许多事情,其包括使用更完整的HiveQL解析器编写查询,访问Hive UDF以及从Hive表读取数据的功能。SQLContext允许连接到不同的数据源,以从中写入或读取数据,但是它有局限性,当Spark程序结束或Spark Shell关闭时,所有到数据源的链接就都消失了,在下一个会话中将不可用。

从图2-8中看到,SparkContext是访问所有Spark功能的渠道;每个JVM只存在一个SparkContext。Spark驱动程序(Driver Program)使用它连接到集群管理器(Cluster Manager)进行通信,提交Spark作业,并知道使用什么资源管理器(YARN、Mesos或Standalone)进行通信。驱动程序允许配置Spark配置参数,通过SparkContext驱动程序可以访问其他上下文对象,如SQLContext、HiveContext和StreamingContext。但是,从Spark 2.0开始,SparkSession可以通过单一、统一的入口访问所有上述Spark的功能。除了简化访问DataFrame和Dataset API外,它还包含了用来操作数据的底层上下文对象。

图2-8 SparkContext和Driver Program、Cluster Manager之间的关系

总而言之,以前通过SparkContext、SQLContext或HiveContext调用的所有功能现在可以通过SparkSession获得。本质上,SparkSession是使用Spark处理数据的单一、统一入口。这样可以减少代码的复杂度,需要实现的编程架构变少,犯错误的可能就会小得多,并且代码可能也不会那么混乱。所以,使用SparkSession会简化Spark编程。

2.5.3 加载数据

首先启动交互式界面,运行spark-shell来完成。一旦启动界面,接下来使用SparkContext.textFile()方法将数据加载到Spark中。注意,代码中的sc是指SparkContext对象。

代码2-28

textFile()方法返回一个RDD,每行包含一条记录。除了文本文件,Spark的API还支持其他几种数据格式的读取,其中包括:wholeTextFiles()方法读一个目录中包含的多个小文本文件,每一行的数据返回为(文件名,内容);使用SparkContext的sequenceFile[K,V]方法,K和V是文件中的键和值的类型;通过InputFormat,能够使用SparkContext.hadoopRDD()方法,它采用任意JobConf并输入格式类、键类和值类,这种设置与使用输入源的Hadoop作业相同。textFiler()方法为延迟计算的,只是定义了RDD加载的过程,并定义了将要使用的数据,但是RDD尚未实际创建,仅当RDD遇到动作时,才会按照定义创建并加载数据,然后返回结果。

2.5.4 应用操作

可以使用first()方法返回auctionRDD中第一行的数据。

代码2-29

这一行数据表示Xbox产品的一次出价,其中的数据项使用逗号分隔。当然,auctionRDD中还包括其他产品的数据,如Cartier、Palm等。如果只想查看Xbox上的出价,可以使用转换操作仅获得所有Xbox的出价信息:

代码2-30

为了做到这一点,上面的代码对auctionRDD中的每个元素进行filter()转换,auctionRDD中的每一个元素就是CSV文件中的每一行,所以filter()转换应用到RDD的每一行数据上。filter()转换是基于指定的条件进行过滤。当将filter()转换应用于auctionRDD上时,其中条件检查用以查看该行是否包含单词Xbox。如果条件为真,则将该行添加到生成的新RDD,即上面代码中的xboxRDD。在filter()转换中使用了匿名函数(line=>line.contains("xbox"))。

filter()转换是将匿名函数应用于RDD的每个元素,而这个元素是CSV文件中的一行。这是Scala匿名函数的语法,它是一个没有命名的函数,其中“=>”匿名函数操作符,其左边部分表示输入变量。在语句中出现匿名函数意味着正在应用这个函数在上面的代码中,输入变量是line,代表auctionRDD中的一行数据。filter()将line变量的值传输到匿名函数操作符右边的输出。在这个例子中,匿名函数的输出是调用条件函数line.contains(),并得到布尔类型的结果值,判断line中是否包含xbox。

由于Spark的转换操作是惰性计算的,上面的代码执行后,xboxRDD和auctionRDD并没有真正在内存中实现,但是当在xboxRDD调用动作count()时,Spark会在内存中物理生成xboxRDD。同时,在定义xboxRDD的过程中包括了auctionRDD,所以auctionRDD也被实际生成。其执行顺序是,当运行该动作时,Spark将从文本文件中读取并创建auctionRDD,然后将filter()转换应用于auctionRDD,以产生xboxRDD。此时,auctionRDD和xboxRDD都会加载在内存中。然后在xboxRDD上运行count(),将RDD中的元素总数发送到驱动程序。但是,一旦动作运行完成,auctionRDD和xboxRDD数据将从内存中释放。现在已经看了怎样定义转换和动作,将它们应用到auctionRDD上,以检查拍卖数据,希望找到一些问题的答案,例如:

· 有多少个产品被卖出?

· 每个产品有多少个出价?

· 有多少个不同种类的产品?

· 最小的出价数是多少?

· 最大的出价数是多少?

· 平均出价数是多少?

这些问题会在实验内容中找到解决的方法。首先,定义代码中需要引用的变量,根据每行的拍卖数据映射输入位置,就是将CVS数据以二维表的格式加载到RDD,每一个列用一个变量表示,这样可以更容易地引用每个列,代码如下。

代码2-31

从本地文件系统中,使用SparkContext的textFile()方法加载.csv格式的数据,在map()转换中将split函数应用到每行上,并使用“,”符号分割每行,将每行数据转换成一个数组Array。

代码2-32

在上面的代码中,map()转换中使用了匿名函数的语法,但是,利用Scala的占位符语法可以得到更简短的代码。

语法解释如果想让函数文本更简洁,可以把下画线当作一个或更多参数的占位符,只要每个参数在函数文本内仅出现一次。例如下面代码,_>0对于检查值是否大于零的函数来说就是非常短的标注。

代码2-33

可以把下画线看作表达式里需要被填入的“空白”。这个空白在每次函数被调用的时候用函数的参数填入。例如,上面代码中,filter()方法会把_>0里的下画线首先用-11替换,如-11>0,然后用-10替换,如-10>0,然后用-5,如-5>0,这样直到List的最后一个值。因此,函数文本_>0与稍微冗长一点儿的x=>x>0相同,代码如下:

代码2-34

当把下画线当作参数的占位符时,编译器有可能没有足够的信息推断缺失的参数类型。例如,假设只是写_+_:

代码2-35

这种情况下,可以使用冒号指定类型,如下:

代码2-36

请注意_+_将扩展成带两个参数的函数,这也是仅当每个参数在函数中最多出现一次的情况下才能使用这种短格式的原因,多个下画线指代多个参数,而不是单个参数的重复使用,第一个下画线代表第一个参数,第二个下画线代表第二个参数,以此类推。现在回答第一个问题。

· 有多少个产品被卖出?

代码2-37

每个auctionid代表一个要出售的商品,对该商品的每个出价将是auctionRDD数据集中的一个完整的行。一个商品可能有多次出价,因此在数据中代表商品唯一编号的auctionid可能在多个行中出现。在上面的代码中,map()转换将返回一个每行只包含auctionid的新数据集,然后在新数据集上调用distinct()转换,并返回另一个新的数据集,其中包含所有不重复的auctionid,最后的count动作在第二个新数据集上运行,并返回不同auctionid的数量。

· 每个产品类型有多少出价?

在数据中,商品类型itemtype可以是xbox、cartier或palm,每个商品类型可能有多个商品。

代码2-38

这时auctionRDD中的每个行数据都是一个数组,就可以使用数组取值的方式bid(itemtype)获得每行数据的产品类型。所以,map()转换将auctionRDD的每行数组bid映射到由itemtype和1组成的二维元组中。如果想看一看map()转换后的数据,则使用take(1)动作返回一行数据,代码为

代码2-39

在这个例子中,reduceByKey()运行在map()转换的键值对RDD(itemtype,1)上,并且基于其中的匿名函数((x,y)=>x+y)对键itemtype进行聚合操作,如果键相同,就累加值。在reduceByKey()转换中定义的匿名函数是求值的总和。reduceByKey()也返回键值对(itemtype,value),其中键itemtype还是代表产品类型,值value为每个类型的总和。最后的collect()为动作,收集结果并将其发送给驱动程序。之后可看到上面显示的结果,即每个项目类型的总量。

2.5.5 缓存处理

正如前面提到的,RDD的转换是惰性计算,这意味着定义auctionRDD的时候,还没有在内存中实际生成,直到在其上调用动作之后,auctionRDD才会在内存中生成,而使用完之后,auctionRDD会从内存中释放。每次在RDD上调用动作时,Spark会重新计算RDD和其所有的依赖。例如,当计算出价的总计次数时,数据将被加载到auctionRDD中,然后应用count动作,代码如下。

代码2-40

运行完count动作之后,数据不在内存中。同样,如果计算xbox的总计出价次数时,将会按照定义RDD谱系图的先后顺序,先计算auctionRDD,后计算xboxRDD,然后在xboxRDD上应用count动作。运行完后,auctionRDD和xboxRDD都会从内存中释放,代码如下。

代码2-41

· RDD谱系(RDD Lineage)

当从现有RDD创建新RDD时,新的RDD包含指向父RDD的指针。类似地,RDD之间的所有依赖关系将被记录在关系图中,而此时的RDD还没有在内存中生成实际的数据。该关系图被称为谱系图。

另外,当需要知道每个商品auctionid的总出价次数和出价最大次数时,则需要定义另一个RDD,代码如下。

代码2-42

上面的代码中定义了bidAuctionRDD,在这个RDD上分别调用了collect()和reduce()动作获得每个商品auctionid的总出价次数和出价最大次数的结果。当调用这两个动作时,数据将被加载到auctionRDD中,然后计算bidAuctionRDD,执行相应的动作对RDD进行运算并返回结果。上面的代码中,当调用count()、collect()或reduce()时,每次都是从auctionRDD开始重新计算(见图2-9)。

图2-9 RDD的依赖关系

上面代码中的每个动作(如count()、collect()等)都被独立地调用和从auctionRDD开始计算最后的结果。在上一个示例中,当调用reduce或collect时,每次处理完成后,数据会从内存中删除。当再次调用其中一个操作时,该过程从头开始,将数据加载到内存中。每次运行一个动作时,这些重复生成相同RDD的过程,会产生额外代价高的计算,特别是对于迭代算法来说。

可以从图2-8中看到RDD之间的关系,这相当于一个RDD定义的谱系结构。当谱系中有分支,并且多次使用相同的RDD时,建议使用cache()或persist()方法,将RDD中的数据缓存到存储介质中,例如内存或磁盘。如果RDD已经被计算,并且数据已经被缓存,可以重用此RDD,而不需要使用任何额外的计算或内存资源。

但是需要小心,不要缓存所有东西,只有那些需要被重复使用和迭代的RDD才被缓存。缓存行为取决于有多少可以使用的内存,如果当前内存的容量不能满足缓存数据大小,可以将其缓存到容量很大的磁盘中。下面看一看缓存数据的过程:

第一步定义了关于如何创建RDD的说明,此时文件尚未读取,如auctionRDD。

第二步通过转换定义需要重复使用的RDD,如bidAuctionRDD。

第三步在RDD上使用cache()方法。

代码2-43

执行第三步后,实际上还没有操作对RDD进行计算和缓存,只是定义了RDD以及RDD的转换。当在bidAuctionRDD上执行collect动作时,开始读取auctiondata.csv文件,然后执行转换、缓存和收集数据。下一个应用在bidAuctionRDD上的动作,如count()或reduce()等,只需使用缓存中的数据,而不是重新加载文件,并执行第二步的转换。