Skip to main content

详解spark开窗函数

1.什么是窗口函数

窗口函数(Window functions)又称分析函数或开窗函数,它允许你在不改变原始行的情况下,对一组相关的行(称为“窗口”)进行计算和分析。与普通的聚合函数(如SUM、AVG等)不同,窗口函数不会将多行合并为一行,而是为每一行返回一个计算结果,同时保留原始行的详细信息。通常写法为func()over(),详细语法如下:

window_function 
[ nulls_option ]
OVER
( [ { PARTITION | DISTRIBUTE } BY partition_col_name = partition_col_val ( [ , ... ] ) ]
{ ORDER | SORT } BY expression [ ASC | DESC ]
[ NULLS { FIRST | LAST } ]
[ , ... ]
[ window_frame ] )

上面函数对应下面几个部分

  • 函数:指具体使用什么函数,支持那些函数见【函数列表】
  • 空值选项(可选)
  • over:代表开窗,固定格式;
  • 分组方式(可选)
  • 排序方式(可选)(上面语法来源于spark官方文档,语法表述为必选项,实际应用为可选)
  • 空值选项(可选)
  • 窗口框架(可选):指明窗口的范围,从什么地方开始到什么地方结束

2.函数列表

支持开窗的函数列表,支持开窗函数分为:排序函数(Ranking Functions)、分析函数(Analytic Functions)、聚合函数(Aggregate Functions)

2.1 排序函数

排序函数描述函数具体介绍
RANK计算一组值中某个值的排名。结果是在分区排序中,当前行之前或等于当前行的行数加一。该值将在序列中产生间隔。https://sparkfunctions.com/rank
DENSE_RANK计算一组值中某个值的排名。结果是先前分配的排名值加一。与 rank 函数不同,dense_rank 不会在排名序列中产生间隔。https://sparkfunctions.com/dense_rank
PERCENT_RANK计算一个值在一组值中的百分比排名https://sparkfunctions.com/percent_rank
NTILE将每个窗口分区的行分成 n 个桶,范围从 1 到最多 n。https://sparkfunctions.com/ntile
ROW_NUMBER根据窗口分区内行的排序,为每一行分配一个唯一的、连续的数字,从一开始。https://sparkfunctions.com/row_number

2.2 分析函数

分析函数描述具体使用方式
CUME_DIST计算一个值在分区中相对于所有值的位置https://sparkfunctions.com/cume_dist
LAGlag(input[, offset[, default]]) - 返回当前行之前第 offset 行的 input 值https://sparkfunctions.com/lag
LEADlead(input[, offset[, default]]) - 返回窗口中当前行之后第 offset 行的 input 值https://sparkfunctions.com/lead
NTH_VALUEnth_value(input[, offset]) - 返回窗口帧从开始处的第 offset 行的 input 值https://sparkfunctions.com/nth_value
FIRST_VALUEfirst_value(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的第一个值https://sparkfunctions.com/first_value
LAST_VALUElast_value(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的最后一个值https://sparkfunctions.com/last_value

2.3 聚合函数

聚合函数描述具体介绍
anyany(expr) - 如果 expr 中至少有一个值为真,则返回真https://sparkfunctions.com/any
any_valueany_value(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的某个值https://sparkfunctions.com/any_value
approx_count_distinctapprox_count_distinct(expr[, relativeSD]) - 通过 HyperLogLog++ 返回估计的基数https://sparkfunctions.com/approx_count_distinct
approx_percentileapprox_percentile(col, percentage [, accuracy]) - 返回数值或 ANSI 间隔列 col 的近似百分位数https://sparkfunctions.com/approx_percentile
array_aggarray_agg(expr) - 收集并返回一个非唯一元素的列表https://sparkfunctions.com/array_agg
bit_andbit_and(expr) - 返回所有非空输入值的按位与(AND),如果没有非空值则返回 nullhttps://sparkfunctions.com/bit_and
bit_orbit_or(expr) - 返回所有非空输入值的按位或(OR),如果没有非空值则返回 nullhttps://sparkfunctions.com/bit_or
bit_xorbit_xor(expr) - 返回所有非空输入值的按位异或(XOR),如果没有非空值则返回 nullhttps://sparkfunctions.com/bit_xor
bitmap_construct_aggbitmap_construct_agg(child) - 返回一个位图,其中设置了来自子表达式所有值的位位置。子表达式很可能是 bitmap_bit_position()。https://sparkfunctions.com/bitmap_construct_agg
bitmap_or_aggbitmap_or_agg(child) - 返回一个位图,它是子表达式中所有位图的按位或(OR)结果。输入应该是从 bitmap_construct_agg() 创建的位图https://sparkfunctions.com/bitmap_or_agg
bool_andbool_and(expr) - 如果 expr 的所有值都为真,则返回真https://sparkfunctions.com/bool_and
bool_orbool_or(expr) - 如果 expr 中至少有一个值为真,则返回真https://sparkfunctions.com/bool_or
collect_listcollect_list(expr) - 收集并返回一个非唯一元素的列表https://sparkfunctions.com/collect_list
collect_setcollect_set(expr) - 收集并返回一个唯一元素的集合。https://sparkfunctions.com/collect_set
corrcorr(expr1, expr2) - 返回一组数字对之间的皮尔逊相关系数https://sparkfunctions.com/corr
count(*)返回检索到的总行数,包括包含 null 的行。https://sparkfunctions.com/count
count(expr[, expr...])返回表达式列表中所有表达式都不为 null 的行数https://sparkfunctions.com/count
count(DISTINCT expr[, expr...])返回表达式列表中唯一且非 null 的行数https://sparkfunctions.com/count
count_ifcount_if(expr) - 返回表达式中 TRUE 值的数量https://sparkfunctions.com/count_if
count_min_sketchcount_min_sketch(col, eps, confidence, seed) - 返回给定列的计数-最小草图,使用指定的误差界限(eps)、置信度(confidence)和种子(seed)https://sparkfunctions.com/count_min_sketch
covar_popcovar_pop(expr1, expr2) - 返回一组数字对的总体协方差。https://sparkfunctions.com/covar_pop
covar_sampcovar_samp(expr1, expr2) - 返回一组数字对的样本协方差https://sparkfunctions.com/covar_samp
everyevery(expr) - 如果 expr 的所有值都为真,则返回真https://sparkfunctions.com/every
firstfirst(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的第一个值。如果 isIgnoreNull 为真,则只返回非空值https://sparkfunctions.com/first
first_valuefirst_value(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的第一个值。如果 isIgnoreNull 设置为真,则只返回非空值https://sparkfunctions.com/first_value
groupinggrouping(col) - 表示在 GROUP BY 中指定的列是否被聚合,返回值 1 表示已聚合,返回值 0 表示未聚合https://sparkfunctions.com/grouping
grouping_idgrouping_id([col1[, col2 ..]]) - 返回分组的级别,等同于 (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn),其中 n 是 GROUP BY 子句中列的数量,grouping 函数返回的是 0 或 1,表示相应列是否被聚合https://sparkfunctions.com/grouping_id
histogram_numerichistogram_numeric(expr, nb) - 使用 nb 个箱位对数值 'expr' 进行直方图计算https://sparkfunctions.com/histogram_numeric
hll_sketch_agghll_sketch_agg(expr, lgConfigK) - 返回 HllSketch 的可更新二进制表示https://sparkfunctions.com/hll_sketch_agg
hll_union_agghll_union_agg(expr, allowDifferentLgConfigK) - 返回估计的独特值数量https://sparkfunctions.com/hll_union_agg
kurtosiskurtosis(expr) - 根据一组值计算并返回峰态值https://sparkfunctions.com/kurtosis
lastlast(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的最后一个值。如果 isIgnoreNull 设置为真,则只返回非空值https://sparkfunctions.com/last
last_valuelast_value(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的最后一个值。如果 isIgnoreNull 设置为真,则只返回非空值https://sparkfunctions.com/last_value
maxmax(expr) - 返回 expr 的最大值https://sparkfunctions.com/max
max_bymax_by(x, y) - 返回与 y 的最大值相关联的 x 值https://sparkfunctions.com/max_by
meanmean(expr) - 根据一组值计算并返回平均值https://sparkfunctions.com/mean
medianmedian(col) - 返回数值或 ANSI interval col 的中位数https://sparkfunctions.com/median
minmin(expr) - 返回 expr 的最小值https://sparkfunctions.com/min
min_bymin_by(x, y) - 返回与 y 的最小值相关联的 x 值https://sparkfunctions.com/min_by
modemode(col) - 返回列 col 中出现频率最高的值。忽略 NULL 值。如果所有值都是 NULL,或者没有行,则返回 NULLhttps://sparkfunctions.com/mode
percentile(col, percentage [, frequency])percentile(col, percentage [, frequency]) - 返回数值列 col 或 ANSI 间隔列 col 在给定百分比的确切百分位数值https://sparkfunctions.com/percentile
percentile(col, array(percentage1 [, percentage2]...) [, frequency])返回数值列 col 在给定的百分比(或多个百分比)的确切百分位数值数组。百分比数组中的每个值都必须在 0.0 和 1.0 之间。frequency 的值应该是正整数。https://sparkfunctions.com/percentile
percentile_approxpercentile_approx(col, percentage [, accuracy]) - 返回数值列或 ANSI 间隔列 col 的近似百分位数,这是 col 值中排序后(从最小到最大)的最小值,使得不超过 percentage 指定的比例的 col 值小于或等于该值https://sparkfunctions.com/percentile_approx
regr_avgx(y, x)regr_avgx(y, x) - 返回组中非空值对的自变量的平均值,其中 y 是因变量,x 是自变量https://sparkfunctions.com/regr_avgx
regr_avgy(y, x)regr_avgy(y, x) - 返回组中非空值对的因变量的平均值,其中 y 是因变量,x 是自变量https://sparkfunctions.com/regr_avgy
regr_count(y, x)regr_count(y, x) - 返回组中非空数值对的数量,其中 y 是因变量,x 是自变量https://sparkfunctions.com/regr_count
regr_intercept(y, x)regr_intercept(y, x) - 返回在组中非空值对的单变量线性回归线的截距,其中 y 是因变量,x 是自变量。https://sparkfunctions.com/regr_intercept
regr_r2(y, x)regr_r2(y, x) - 返回组中非空对的确定系数,其中 y 是因变量,x 是自变量https://sparkfunctions.com/regr_r2
regr_slope(y, x)regr_slope(y, x) - 返回组中非空值对的线性回归线的斜率,其中 y 是因变量,x 是自变量https://sparkfunctions.com/regr_slope
regr_sxx(y, x)regr_sxx(y, x) - 对于组中的非空值对,返回 REGR_COUNT(y, x) * VAR_POP(x),其中 y 是因变量,x 是自变量https://sparkfunctions.com/regr_sxx
regr_sxy(y, x)regr_sxy(y, x) - 对于组中的非空值对,返回 REGR_COUNT(y, x) * COVAR_POP(y, x),其中 y 是因变量,x 是自变量https://sparkfunctions.com/regr_sxy
regr_syy(y, x)regr_syy(y, x) - 对于组中的非空值对,返回 REGR_COUNT(y, x) * VAR_POP(y),其中 y 是因变量,x 是自变量https://sparkfunctions.com/regr_syy
skewnessskewness(expr) - 返回根据一个组中的值计算出的偏度值https://sparkfunctions.com/skewness
somesome(expr) - 如果expr中至少有一个值为真,则返回真https://sparkfunctions.com/some
stdstd(expr) - 返回根据一个组中的值计算出的样本标准差。https://sparkfunctions.com/std
stddevstddev(expr) - 返回根据一个组中的值计算出的样本标准差https://sparkfunctions.com/stddev
stddev_popstddev_pop(expr) - 返回根据一个组中的值计算出的总体标准差https://sparkfunctions.com/stddev_pop
stddev_samp(expr)stddev_samp(expr) - 返回根据一个组中的值计算出的样本标准差https://sparkfunctions.com/stddev_samp
sumsum(expr) - 返回根据一个组中的值计算出的总和https://sparkfunctions.com/sum
try_avgtry_avg(expr) - 从一组值中计算平均值,如果发生溢出,则结果为nullhttps://sparkfunctions.com/try_avg
try_sumtry_sum(expr) - 从一组值中计算总和,如果发生溢出,则结果为nullhttps://sparkfunctions.com/try_sum
var_popvar_pop(expr) - 返回根据一个组中的值计算出的总体方差https://sparkfunctions.com/var_pop
var_sampvar_samp(expr) - 返回根据一个组中的值计算出的样本方差https://sparkfunctions.com/var_samp
variancevariance(expr) - 返回根据一个组中的值计算出的样本方差。https://sparkfunctions.com/variance

3.空值选项

[ nulls_option ] 指定在评估窗口函数时是否跳过空值。

  • RESPECT NULLS 表示不跳过空值
  • IGNORE NULLS 表示跳过空值。 如果未指定,默认值为 RESPECT NULLS。 仅 LAG | LEAD | NTH_VALUE | FIRST_VALUE | LAST_VALUE函数可以使用IGNORE NULLS

4.排序方式

  • 排序函数(Ranking Functions)、分析函数(Analytic Functions)开窗时必须要进行排序;
  • 聚合函数(Aggregate Functions)根据需要进行排序。

4.1聚合函数开窗的排序

聚合函数开窗可以排序也可以不排序。

  • 不排序则窗口框架范围,计算范围为整个分区;
  • 排序不指定窗口框架范围,计算范围为分区开始行到当前行;
  • 排序制定窗口框架范围,按照制定范围聚合

举例: 样例数据

--建表语句
CREATE TABLE t_employees
(
name STRING,
dept STRING,
salary BIGINT,
age BIGINT
);
--插入数据
INSERT INTO t_employees
VALUES ("Lisa", "Sales", 10000, 35),
("Evan", "Sales", 32000, 38),
("Fred", "Engineering", 21000, 28),
("Alex", "Sales", 30000, 33),
("Tom", "Engineering", 23000, 33),
("Jane", "Marketing", 29000, 28),
("Helen", "Marketing", 29000, 40),
("Jeff", "Marketing", 35000, 38),
("Paul", "Engineering", 29000, 23),
("Chloe", "Engineering", 23000, 25);

--样例数据
+--------+--------------+---------+------+
| name | dept | salary | age |
+--------+--------------+---------+------+
| Lisa | Sales | 10000 | 35 |
| Evan | Sales | 32000 | 38 |
| Fred | Engineering | 21000 | 28 |
| Alex | Sales | 30000 | 33 |
| Tom | Engineering | 23000 | 33 |
| Jane | Marketing | 29000 | 28 |
| Helen | Marketing | 29000 | 40 |
| Jeff | Marketing | 35000 | 38 |
| Paul | Engineering | 29000 | 23 |
| Chloe | Engineering | 23000 | 25 |
+--------+--------------+---------+------+

sum()函数举例: 1.计算每个员工所在部门的部门总薪水

select
name,
dept,
salary,
age,
sum(salary)over(partition by dept) as dept_total_salary
from t_employees;
--执行结果
+--------+--------------+---------+------+--------------------+
| name | dept | salary | age | dept_total_salary |
+--------+--------------+---------+------+--------------------+
| Fred | Engineering | 21000 | 28 | 96000 |
| Tom | Engineering | 23000 | 33 | 96000 |
| Paul | Engineering | 29000 | 23 | 96000 |
| Chloe | Engineering | 23000 | 25 | 96000 |
| Jane | Marketing | 29000 | 28 | 93000 |
| Helen | Marketing | 29000 | 40 | 93000 |
| Jeff | Marketing | 35000 | 38 | 93000 |
| Lisa | Sales | 10000 | 35 | 72000 |
| Evan | Sales | 32000 | 38 | 72000 |
| Alex | Sales | 30000 | 33 | 72000 |
+--------+--------------+---------+------+--------------------+

注意: 这里dept_total_salayr列得出的结果接统计了部门全部员工的总薪资。

2.按照员工薪资排序,从低到高,每个部门截止到该员工的累积薪水是多少;(累积求和)

--不限定窗口框架
select
name,
dept,
salary,
age,
sum(salary)over(partition by dept order by salary asc) as dept_total_salary
from t_employees;
--查询结果
+--------+--------------+---------+------+--------------------+
| name | dept | salary | age | dept_total_salary |
+--------+--------------+---------+------+--------------------+
| Fred | Engineering | 21000 | 28 | 21000 |
| Tom | Engineering | 23000 | 33 | 67000 |
| Chloe | Engineering | 23000 | 25 | 67000 |
| Paul | Engineering | 29000 | 23 | 96000 |
| Jane | Marketing | 29000 | 28 | 58000 |
| Helen | Marketing | 29000 | 40 | 58000 |
| Jeff | Marketing | 35000 | 38 | 93000 |
| Lisa | Sales | 10000 | 35 | 10000 |
| Alex | Sales | 30000 | 33 | 40000 |
| Evan | Sales | 32000 | 38 | 72000 |
+--------+--------------+---------+------+--------------------+

--使用rows限定窗口框架
select name,
dept,
salary,
age,
sum(salary)
over (partition by dept order by salary asc rows BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as dept_total_salary
from t_employees;
--查询结果
+--------+--------------+---------+------+--------------------+
| name | dept | salary | age | dept_total_salary |
+--------+--------------+---------+------+--------------------+
| Fred | Engineering | 21000 | 28 | 21000 |
| Tom | Engineering | 23000 | 33 | 44000 |
| Chloe | Engineering | 23000 | 25 | 67000 |
| Paul | Engineering | 29000 | 23 | 96000 |
| Jane | Marketing | 29000 | 28 | 29000 |
| Helen | Marketing | 29000 | 40 | 58000 |
| Jeff | Marketing | 35000 | 38 | 93000 |
| Lisa | Sales | 10000 | 35 | 10000 |
| Alex | Sales | 30000 | 33 | 40000 |
| Evan | Sales | 32000 | 38 | 72000 |
+--------+--------------+---------+------+--------------------+

--使用range限定窗口框架
select name,
dept,
salary,
age,
sum(salary)
over (partition by dept order by salary asc range BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as dept_total_salary
from t_employees;

--查询结果
+--------+--------------+---------+------+--------------------+
| name | dept | salary | age | dept_total_salary |
+--------+--------------+---------+------+--------------------+
| Fred | Engineering | 21000 | 28 | 21000 |
| Tom | Engineering | 23000 | 33 | 67000 |
| Chloe | Engineering | 23000 | 25 | 67000 |
| Paul | Engineering | 29000 | 23 | 96000 |
| Jane | Marketing | 29000 | 28 | 58000 |
| Helen | Marketing | 29000 | 40 | 58000 |
| Jeff | Marketing | 35000 | 38 | 93000 |
| Lisa | Sales | 10000 | 35 | 10000 |
| Alex | Sales | 30000 | 33 | 40000 |
| Evan | Sales | 32000 | 38 | 72000 |
+--------+--------------+---------+------+--------------------+

注意:

  1. 使用sum开窗后增加order by子句,sum的结果不在是整个部门,而是截止到当前值的结果。
  2. 如果省略了窗口框架内容,则相当于使用了range,限定截止到当前行的,关注Tom所在行的结果,聚合了包含Chloe的薪水;
  3. 使用rows的窗口框架,则仅包含到Tom的薪水,不含Chloe的薪水;

4.2 排序不唯一带来的问题

要求: 取出每个部门薪水最低的员工记录,要求每个部门仅取出一行记录

分析: 为了保证每个部门仅取出一行记录,我们使用row_number函数来进行处理,具体语句和执行结果如下:

--执行SQL
select name,
dept,
salary,
age
from (select name,
dept,
salary,
age,
row_number() over (partition by dept order by salary asc) as rn
from t_employees) t
where rn = 1

--执行结果
+-------+--------------+---------+------+
| name | dept | salary | age |
+-------+--------------+---------+------+
| Fred | Engineering | 21000 | 28 |
| Jane | Marketing | 29000 | 28 |
| Lisa | Sales | 10000 | 35 |
+-------+--------------+---------+------+

注意: 关注Marketing部门的记录,取出的是Jane的记录。Jane确实是最低的,但是同时Hellen的薪资也是一样的。虽然当前满足了需求内容,但在实际生产中,发生流程重跑,则数据内容可能发生变化,数据校验出现前后不一致,较难排查。还会影响下游使用,例如使用结果数据计算最低薪水员工的平均年龄,数据重跑之后平均年龄发生变化。所以保证排序唯一十分重要

4.3 排序中的空值

可以在排序时指定空值是排在最前面还是最后面,测试数据中没有空值,仅写SQL了

--样例SQL
select name,
dept,
salary,
age,
sum(salary)
over (partition by dept order by salary asc nulls first range BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as dept_total_salary
from t_employees;

5.窗口框架

窗口框架指定窗口从哪一行开始以及在哪里结束 语法

{ RANGE | ROWS } { frame_start | BETWEEN frame_start AND frame_end }

框架指定方式分为range方式和rows方式,如果不指定默认为range方式,

frame_startframe_end可以为以下内容

UNBOUNDED PRECEDING: 从分区的第一行开始

offset PRECEDING:从当前行之前的第 offset 行开始

CURRENT ROW:当前行

offset FOLLOWING:到当前行之后的第 offset 行结束

UNBOUNDED FOLLOWING:到分区的最后一行结束

如果未指定 frame_end,则默认值为 CURRENT ROW