spark sql

spark sql的概念

spark sql是 是spark用来处理结构化数据的一个模块,提供了DataFrame和DataSet两个新的抽象概念。与RDD类似,DataSet和DataFrame都是分布式数据容器,DataFrame更像穿透数据库二维表格,除了记录数据之外,还记录数据的结构信息。DataFrame是Row对象的集合,而DataSet是DataFrame的一个扩展,DataSet每一个record存储的是一个强类型值而不是一个Row。RDD,DataSet和DataFram的共同点:三者都是分布式的弹性数据集,三者都有惰性机制,都有partition机制,有许多共同函数。三者可以相互转化

DataFrame的创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val spark = SparkSession.builder().appName("test1").master("local[*]").getOrCreate()
//1. 从文件中读取
val df = spark.read.json("people.json")

//2. createDataFrame[A <: Product : TypeTag](data: Seq[A])
import spark.implicits
// A <: Product :case class 或者 元组
val df = spark.createDataFrame[Persion](Seq(Persion("zs",18),Persion("lisi",20)))
// 需要一个样例类
case class Persion(name:String,age:Int)

//3.createDataFrame(rowRDD: RDD[Row], schema: StructType)
val rdd: RDD[Row] = sc.parallelize(Seq(Row("zs",19),Row("lisi",14)))
val schemal = StructType(Seq(StructField("name",StringType),StructField("age",IntegerType)))
val df = spark.createDataFrame(rdd,schemal)

RDD,DataSet和DatDaFram互相转换

  • RDD =》DataFrame
1
2
3
4
5
6
7
8
9
10
11
case class People(var name:String,var age:Int)

val rdd = spark.sparkContext.textFile("people.txt")
val rdd2: RDD[People] = rdd.map(line => (People(line.split(",")(0),line.split(",")(1).trim.toInt)))
val rdd3:RDD[(String,Int)] = rdd.map(line => ((line.split(",")(0),line.split(",")(1).trim.toInt)))
// rdd2 => df 样例类
val dataf: DataFrame = rdd2.toDF()
dataf.foreach(row => println(row.getString(0)+" "+row.getInt(1)))
// rdd3 => df 元组
val dataf: DataFrame = rdd3.toDF()
dataf.foreach(row => println(row.getString(0)+" "+row.getInt(1)))
  • RDD =》DataSet
1
2
3
4
5
6
// 样例类 可以看出DataSet是一种强类型
val dataS: Dataset[People] = rdd2.toDS()
dataS.foreach(peo => println(peo.name +" "+peo.age))
// 元组
val dataS: Dataset[(String, Int)] = rdd3.toDS()
dataS.foreach(line=>(println(line._1+" "+line._2)))
  • DataFrame =》 RDD
1
2
val dfRdd: RDD[Row] =  .rdd
dfRdd.foreach(row => println(row.getString(0)+": "+row.getInt(1)))
  • DataFrame =》 DataSet
1
2
3
4
// 注意:DF转DS的时候,转换到DF的类型需要与DS的类型一致,如果这里的DF是由RDD[People]转来的
// 所以Dataset只能为Dataset[People]
val dfDs: Dataset[People] = dataf.as[People]
dfDs.foreach(peo => println(peo.name+": "+peo.age))
  • DataSet =》 RDD
1
2
val dsRdd: RDD[People] = dataS.rdd
dsRdd.foreach(peo => println(peo.name+": "+peo.age))
  • DataSet =》 DataFrame
1
2
val dsDF: DataFrame = dataS.toDF()
dsDF.foreach(row => println(row.getString(0)+" "+row.getInt(1)))

udf与udaf

  • udf:用户自定义函数,单行函数
1
2
3
spark.udf.register("addName",(x:String)=>("name:"+x))

spark.sql("select addName(name) as name from persion").show()
  • udaf:用户自定义聚集函数,多行函数,多对一
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class MyUdaf extends UserDefinedAggregateFunction{
//输入值的类型
override def inputSchema: StructType = StructType(Seq(StructField("input",DoubleType)))
// 缓存的值的类型
override def bufferSchema: StructType = StructType(Seq(StructField("sum",DoubleType),StructField("count",IntegerType)))
//返回值的类型
override def dataType: DataType = DoubleType

override def deterministic: Boolean = true
//初始值
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0.0
buffer(1) = 0
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getDouble(0)+input.getDouble(0)
buffer(1) = buffer.getInt(1)+1
}
// 分区合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getDouble(0)+buffer2.getDouble(0)
buffer1(1) = buffer1.getInt(1)+buffer2.getInt(1)
}
// 结果
override def evaluate(buffer: Row): Double = {
buffer.getDouble(0)/buffer.getInt(1)
}
}


spark.udf.register("mu",new MyUdaf)
spark.sql("select mu(age) from persion").show()

JDBC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 @Test
def test1(): Unit ={

val spark = SparkSession.builder().appName("jdbc").master("local[*]").getOrCreate()
val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","husaijian123")
val jdbcDF= spark.read.jdbc("jdbc:mysql://localhost:3306/interview","student",properties)
jdbcDF.show()

val jdbcDF2 = spark.read.format("jdbc").
option("url","jdbc:mysql://localhost:3306/interview").option("dbtable","student").option("user","root").option("password","husaijian123").load()
jdbcDF2.show()
}

Spark Sql运行架构

  1. 在解析Sql之前会创建SparkSession,SparkSession会将元数据保存到SessionCatelog,包括表名,字段名,字段类型。
  2. 使用SparkSqlParser进行解析SQL,构建语法树
  3. 使用分析器Analysiser绑定逻辑计划
  4. 使用优化器Optimizer优化逻辑计划
  5. 私用SparkPlanner生成物理计划
  6. 使用QueryExecution执行物理计划

窗口函数

窗口函数计算的一组行,被称为Frame。每一个被处理的行都有一个唯一的frame相关联。

窗口函数分为三类,ranking(排名),analystic(分析),aggregate(聚合)

Function Type SQL DataFrame API Description
Ranking rank rank rank值可能是不连续的
Ranking dense_rank denseRank rank值一定是连续的
Ranking percent_rank percentRank 相同的分组中 (rank -1) / ( count(score) - 1 )
Ranking ntile ntile 将同一组数据循环的往n个桶中放,返回对应的桶的index,index从1开始
Ranking row_number rowNumber 很单纯的行号,类似excel的行号
Analytic cume_dist cumeDist
Analytic first_value firstValue 相同的分组中第一个值
Analytic last_value lastValue 相同的分组中最后一个值
Analytic lag lag 取前n行数据
Analytic lead lead 取后n行数据
Aggregate min min 最小值
Aggregate max max 最大值
Aggregate sum sum 求和
Aggregate avg avg 求平均

用法:函数名 over(分区 排序 范围)

当一个函数被窗口函数使用时,需要为该窗口函数定义相关的窗口规范,窗口规范包括三个部分

分区(PARTITION BY)

排序(order by)

帧规范:指定哪些行会被当前输入行的帧包括,通过其他行对于当前行的相对位置实现。定义帧规范需要定义帧的类型,开始边界,结束边界。一共有五种边界:UNBOUNDED PRECEDING(分区第一行),UNBOUNDED FOLLOWING(分区最后一行),CURRENT ROW,n PRECEDING(当前行的前n行),n FOLLOWING(当前行后n行)。有两种帧类型:ROW帧和RANGE帧。

ROW帧是基于当前输入行的位置的物理偏移量,如:BETWEEN 1 PRECEDING AND 1 FOLLOWING表示一个包括当前行、当前行之前1行和之后1行的帧

RANGE帧是基于当前行位置的逻辑偏移。逻辑偏移为当前输入行的排序表达式的值和帧边界行的排序表达式的值之差。如:RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING,则边界为[current revenue value - 2000, current revenue value + 1000]。

  1. rank,dense_rank,percent_rank,row_number
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
## rank 排名时有重复,排序不连续,但是最大名次不变
select id,name,classId,score, dense_rank() over(partition by classId order by score)rank from scores

+---+-----+-------+-----+----+
| id| name|classId|score|rank|
+---+-----+-------+-----+----+
| 3|张三3| 1| 79.0| 1|
| 8|张三8| 1| 87.0| 2|
| 1|张三1| 1| 88.0| 3|
| 6|张三6| 1| 89.0| 4|
| 4|张三4| 2| 78.0| 1|
| 5|张三5| 2| 85.0| 2|
| 2|张三2| 2| 85.0| 2|
| 7|张三7| 2| 90.0| 4|
+---+-----+-------+-----+----+

## dense_rank 排名时有重复,排序连续,最大值减小
select id,name,classId,score, dense_rank() over(partition by classId order by score)rank from scores
+---+-----+-------+-----+----+
| id| name|classId|score|rank|
+---+-----+-------+-----+----+
| 3|张三3| 1| 79.0| 1|
| 8|张三8| 1| 87.0| 2|
| 1|张三1| 1| 88.0| 3|
| 6|张三6| 1| 89.0| 4|
| 4|张三4| 2| 78.0| 1|
| 5|张三5| 2| 85.0| 2|
| 2|张三2| 2| 85.0| 2|
| 7|张三7| 2| 90.0| 3|
+---+-----+-------+-----+----+

## percent_rank (rank -1) / ( count(score) - 1 )
select id,name,classId,score, percent_rank() over(partition by classId order by score)rank from scores
+---+-----+-------+-----+------------------+
| id| name|classId|score| rank|
+---+-----+-------+-----+------------------+
| 3|张三3| 1| 79.0| 0.0|
| 8|张三8| 1| 87.0|0.3333333333333333|
| 1|张三1| 1| 88.0|0.6666666666666666|
| 6|张三6| 1| 89.0| 1.0|
| 4|张三4| 2| 78.0| 0.0|
| 5|张三5| 2| 85.0|0.3333333333333333|
| 2|张三2| 2| 85.0|0.3333333333333333|
| 7|张三7| 2| 90.0| 1.0|
+---+-----+-------+-----+------------------+
## row_number 按顺序排序 无重复排名
select id,name,classId,score, row_number() over(partition by classId order by score)rank from scores
---+-----+-------+-----+----+
| id| name|classId|score|rank|
+---+-----+-------+-----+----+
| 3|张三3| 1| 79.0| 1|
| 8|张三8| 1| 87.0| 2|
| 1|张三1| 1| 88.0| 3|
| 6|张三6| 1| 89.0| 4|
| 4|张三4| 2| 78.0| 1|
| 5|张三5| 2| 85.0| 2|
| 2|张三2| 2| 85.0| 3|
| 7|张三7| 2| 90.0| 4|
+---+-----+-------+-----+----+
  1. aggregate函数统计的结果在排序后的结果是不一样的
1
2
3
4
avg(score) over(partition by `) avg # 每个分组加起来求平均值
avg(score) over(partition by classId order by score) # r1=r1 r2=(r1+r2)/2 ...
avg(score) over(partition by classId rows between 1 preceding and 1 following) avg_row #当前行的前一行+当前行+后一行的平均值
avg(score) over(partition by classId order by score range between 1 preceding and 1 following # score+(是否存在score+或-1)的平均值
文章目录
  1. 1. spark sql的概念
  2. 2. DataFrame的创建
  3. 3. RDD,DataSet和DatDaFram互相转换
  4. 4. udf与udaf
  5. 5. JDBC
  6. 6. Spark Sql运行架构
  7. 7. 窗口函数
|
载入天数...载入时分秒...