spark之RDD操作

1. RDD的基本概念

RDD是弹性分布式数据集,是spark中最基本的数据抽象,它代表一个可变,可分区的数据集合。在spark中对数据的所有操作包括创建 RDD、转化已有RDD 以及调用 RDD 操作进行求值。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。

RDD的属性

  1. Partitions:一组分片,即数据集的基本组成单位.每个分片会被一个任务运行
  2. Compute:分区计算函数
  3. Dependencies:RDD的每次转换操作都会生成依赖关系,在部分分区数据丢失时,spark可以根据依赖关系恢复数据
  4. Partitioner:RDD的分区方式,默认的是HashPartitoner,可以实现Partitioner自定义分区方式
  5. preferred location:存储每个Partition位置的列表。

RDD的弹性

  1. 自动进行内存和磁盘数据存储的切换

    Spark优先把数据放到内存中,如果内存放不下,就会放到磁盘里面,程序进行自动的存储切换

  2. 基于血统的高效容错机制

    在RDD进行转换和动作的时候,会形成RDD的Lineage依赖链,当某一个RDD失效的时候,可以通过重新计算上游的RDD来重新生成丢失的RDD数据。

  3. Task失败进行特定次数的重试

    RDD的计算任务如果运行失败,会自动进行任务的重新计算,默认次数是4次。

  4. Stage如果失败会自动进行特定次数的重试

    如果Job的某个Stage阶段计算失败,框架也会自动进行任务的重新计算,默认次数也是4次

  5. Checkpoint和Persist可主动或被动触发

2.RDD编程

RDD创建

  1. 从集合中创建
1
2
3
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
这种方式默认分区个数使用的是 taskScheduler.defaultParallelism
val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6,7,8)) // 调用的是上面的函数
  1. 从文件中创建
1
2
3
 val rdd1  = sc.textFile("files/test")//math.min(defaultParallelism, 2)默认分区数
// 获取的是每个文件的内容 key为文件名 value为文件内容
val rdd2: RDD[(String, String)] = sc.wholeTextFiles("files/test")

RDD的转换

  1. mapPartitions:类似map,但是是基于分区执行的,有几个分区执行几次
1
2
3
sc.parallelize(List(("kpop","female"),("zorro","male"),("mobin","male"))
// mapPartition需要接受一个函数:f: Iterator[(String, String)] => Iterator[U]
res10.mapPartitions(iter =>Iterator(iter.mkString("|"))).collect
  1. glom:将每个分区中中的数据形成一个数组,形成一个数组类型的RDD
1
2
3
4
5
6
7
8
scala> val rdd1 = sc.parallelize(1 to 10,4)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd1.glom.collect
res0: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4, 5), Array(6, 7), Array(8, 9, 10))

scala> rdd1.glom
res1: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[2] at glom at <console>:26
  1. mapPartitionsWithIndex:比mapPartitions多了一个分区号
1
2
3
4
5
rdd1.mapPartitionsWithIndex((x,y)=>Iterator(x+":"+y.mkString("|")))
res4: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at mapPartitionsWithIndex at <console>:26

scala> res4.collect
res5: Array[String] = Array(0:(kpop,female), 1:(zorro,male)|(mobin,male))
  1. sample(withReplacement: Boolean, fraction: Double,seed: Long = Utils.random.nextLong)

    withReplacement表示抽出数据是否返回,fraction比例,seed随机的种子

1
2
3
4
5
scala> rdd.sample(true,0.1,4).collect
res10: Array[Int] = Array(4, 6)

scala> rdd.sample(true,0.2,4).collect
res11: Array[Int] = Array(3, 5, 6)
  1. partitionBy:分区,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD。
1
2
3
4
5
6
7
8
val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
scala> rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
res0: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[1] at partitionBy at <console>:26

scala> res0.collect
res1: Array[(Int, String)] = Array((2,bbb), (4,ddd), (1,aaa), (3,ccc))

可以看到产生了ShuffledRDD,并且有四个分区变为两个
  1. coalesce:根据分区数,重新进行分区,默认不进行shuffle
1
2
3
4
5
6
val rdd = sc.parallelize(1 to 16,4)
rdd.coalesce(3)

scala> res3.partitions.size
res6: Int = 3
分区个数变为三
  1. repartition:底层调用coalesce(numPartitions, shuffle = true),重分区使用shuffle
  2. sortBy:先用函数对数据进行处理,再排序
1
2
3
4
5
6
scala>  val rdd = sc.parallelize(List(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd.sortBy(x=>x%2)collect
warning: there was one feature warning; re-run with -feature for details
res0: Array[Int] = Array(2, 4, 1, 3)
  1. join:在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
1
2
3
4
5
6
7
8
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[32] at parallelize at <console>:24

scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24

scala> rdd.join(rdd1).collect()
res13: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))
  1. cogroup:在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
1
2
3
4
5
6
7
8
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[37] at parallelize at <console>:24

scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:24

scala> rdd.cogroup(rdd1).collect()
res14: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer(6))))
  1. reduceByKey

在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

1
2
3
val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
rdd.reduceByKey(_+_).collect
res6: Array[(String, Int)] = Array((female,6), (male,7))

12.groupByKey

1
rdd.groupByKey() // 生成一个RDD[k,Iterator[t]]
  1. combineByKey( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) createCombiner:分区内第一次碰到key,创建key对应C的初始值

    mergeValue:第二次碰到key,将当前值与C中的值进行合并,分区内合并

    mergeCombiners:由于每个分区是独立的,需要合并每个分区

1
2
3
4
5
val scores: Array[(String, Int)] = Array(("Fred",88), ("Fred",95), ("Fred",91), ("Wilma",93), ("Wilma",95), ("Wilma",98))
val input = sc.parallelize(scores)
input.combineByKey(v=>(v,1),(a:(Int,Int),b)=>(a._1+b,a._2+1),(x:(Int,Int),y:(Int,Int))=>(x._1+y._1,x._2+y._2))

结果为 Array((Wilma,(286,3)), (Fred,(274,3)))
  1. aggregateByKey(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)

    zeroValue:分区内的每个key对应的初始值U

    seqOp:分区内的U的合并操作

    combOp:每个分区的合并操作

1
2
3
val scores: Array[(String, Int)] = Array(("Fred",88), ("Fred",95), ("Fred",91), ("Wilma",93), ("Wilma",95), ("Wilma",98))
val input = sc.parallelize(scores)
input.aggregateByKey((0,0))((a:(Int,Int),v:Int)=>(a._1+v,a._2+1),(x,y)=>(x._1+y._1,x._2+x._2))
  1. foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
1
2
3
4
5
val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
dd.foldByKey(0)(_+_)
// 结果
res0.collect
res1: Array[(Int, Int)] = Array((3,14), (1,9), (2,3))
  1. sortByKey:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
1
2
val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
rdd.sortByKey()
  1. mapValues:针对于(K,V)形式的类型只对V进行操作
1
2
3
val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))
rdd3.mapValues(a => a*2).collect
结果: Array((1,aa), (1,dd), (2,bb), (3,cc))

RDD执行操作

  1. reduce(func)
1
2
val rdd1 = sc.makeRDD(1 to 10,2)
rdd1.reduce(_+_)
  1. collect():在驱动程序中,以数组的形式返回数据集的所有元素
  2. count():返回RDD的元素个数
  3. first():返回RDD的第一个元素(类似于take(1))
  4. take(n):返回一个由数据集的前n个元素组成的数组
  5. takeOrdered(n):返回前几个的排序
  6. aggregate(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24

scala> rdd1.aggregate(1)(
| {(x : Int,y : Int) => x + y},
| {(a : Int,b : Int) => a + b}
| )
res56: Int = 58

scala> rdd1.aggregate(1)(
| {(x : Int,y : Int) => x * y},
| {(a : Int,b : Int) => a + b}
| )
res57: Int = 30361
  1. folder(num)(func):折叠操作,aggregate的简化操作,seqop和combop一样。
1
2
3
4
5
6
7
8
9
10
11
scala> var rdd1 = sc.makeRDD(1 to 4,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[90] at makeRDD at <console>:24

scala> rdd1.aggregate(1)(
| {(x : Int,y : Int) => x + y},
| {(a : Int,b : Int) => a + b}
| )
res59: Int = 13

scala> rdd1.fold(1)(_+_)
res60: Int = 13
  1. saveAsTextFile(path)

    将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

  2. saveAsSequenceFile(path)

    将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统

  3. saveAsObjectFile(path)

    用于将RDD中的元素序列化成对象,存储到文件中。

  4. countByKey:针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

1
2
3
4
5
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[95] at parallelize at <console>:24

scala> rdd.countByKey()
res63: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)
  1. foreach

RDD的文件操作

  • saveAsObject File:底层调用其实是saveAsSequenceFile
1
2
3
4
5
val rdd =  sc.parallelize(Array(("zs",18),("lisi",20),("ww",17),("zl",30)))

rdd.saveAsObjectFile("write_dile")

val rdd2: RDD[(String, Int)] = sc.objectFile[(String,Int)]("write_dile")
  • saveAsSequenceFile
1
2
3
4
5
al rdd =  sc.parallelize(Array(("zs",18),("lisi",20),("ww",17),("zl",30)))

rdd.saveAsSequenceFile("write_dile")

val rdd2: RDD[(String, Int)] = sc.sequenceFile[String,Int]("write_dile",classOf[String],classOf[Int])
  • hdfs
1
2
3
4
val rdd =  sc.parallelize(Array(("zs",18),("lisi",20),("ww",17),("zl",30)))
rdd.saveAsNewAPIHadoopFile("write_dile",classOf[Text],classOf[Text],classOf[TextOutputFormat[Text,Text]])

val rdd2 = sc.newAPIHadoopFile[Text,Text,KeyValueTextInputFormat]("write_dile")
文章目录
  1. 1. 1. RDD的基本概念
    1. 1.1. RDD的属性
    2. 1.2. RDD的弹性
  2. 2. 2.RDD编程
    1. 2.1. RDD创建
    2. 2.2. RDD的转换
    3. 2.3. RDD执行操作
    4. 2.4. RDD的文件操作
|
载入天数...载入时分秒...