详解开窗函数

1.什么是窗口函数

窗口函数(Window functions)又称分析函数或开窗函数,它允许你在不改变原始行的情况下,对一组相关的行(称为“窗口”)进行计算和分析。与普通的聚合函数(如SUM、AVG等)不同,窗口函数不会将多行合并为一行,而是为每一行返回一个计算结果,同时保留原始行的详细信息。通常写法为func()over(),详细语法如下:

window_function 
[ nulls_option ]
OVER
( [  { PARTITION | DISTRIBUTE } BY partition_col_name = partition_col_val ( [ , ... ] ) ]
  { ORDER | SORT } BY expression [ ASC | DESC ] 
 [ NULLS { FIRST | LAST } ] 
 [ , ... ]
 [ window_frame ] )

上面函数对应下面几个部分

  • 函数:指具体使用什么函数,支持那些函数见【函数列表】
  • 空值选项(可选)
  • over:代表开窗,固定格式;
  • 分组方式(可选)
  • 排序方式(可选)(上面语法来源于spark官方文档,语法表述为必选项,实际应用为可选)
  • 空值选项(可选)
  • 窗口框架(可选):指明窗口的范围,从什么地方开始到什么地方结束

2.函数列表

支持开窗的函数列表,支持开窗函数分为:排序函数(Ranking Functions)、分析函数(Analytic Functions)、聚合函数(Aggregate Functions)

2.1 排序函数

排序函数 描述 函数具体介绍
RANK 计算一组值中某个值的排名。结果是在分区排序中,当前行之前或等于当前行的行数加一。该值将在序列中产生间隔。 https://sparkfunctions.com/rank
DENSE_RANK 计算一组值中某个值的排名。结果是先前分配的排名值加一。与 rank 函数不同,dense_rank 不会在排名序列中产生间隔。 https://sparkfunctions.com/dense_rank
PERCENT_RANK 计算一个值在一组值中的百分比排名 https://sparkfunctions.com/percent_rank
NTILE 将每个窗口分区的行分成 n 个桶,范围从 1 到最多 n。 https://sparkfunctions.com/ntile
ROW_NUMBER 根据窗口分区内行的排序,为每一行分配一个唯一的、连续的数字,从一开始。 https://sparkfunctions.com/row_number

2.2 分析函数

分析函数 描述 具体使用方式
CUME_DIST 计算一个值在分区中相对于所有值的位置 https://sparkfunctions.com/cume_dist
LAG lag(input[, offset[, default]]) - 返回当前行之前第 offset 行的 input 值 https://sparkfunctions.com/lag
LEAD lead(input[, offset[, default]]) - 返回窗口中当前行之后第 offset 行的 input 值 https://sparkfunctions.com/lead
NTH_VALUE nth_value(input[, offset]) - 返回窗口帧从开始处的第 offset 行的 input 值 https://sparkfunctions.com/nth_value
FIRST_VALUE first_value(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的第一个值 https://sparkfunctions.com/first_value
LAST_VALUE last_value(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的最后一个值 https://sparkfunctions.com/last_value

2.3 聚合函数

聚合函数 描述 具体介绍
any any(expr) - 如果 expr 中至少有一个值为真,则返回真 https://sparkfunctions.com/any
any_value any_value(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的某个值 https://sparkfunctions.com/any_value
approx_count_distinct approx_count_distinct(expr[, relativeSD]) - 通过 HyperLogLog++ 返回估计的基数 https://sparkfunctions.com/approx_count_distinct
approx_percentile approx_percentile(col, percentage [, accuracy]) - 返回数值或 ANSI 间隔列 col 的近似百分位数 https://sparkfunctions.com/approx_percentile
array_agg array_agg(expr) - 收集并返回一个非唯一元素的列表 https://sparkfunctions.com/array_agg
bit_and bit_and(expr) - 返回所有非空输入值的按位与(AND),如果没有非空值则返回 null https://sparkfunctions.com/bit_and
bit_or bit_or(expr) - 返回所有非空输入值的按位或(OR),如果没有非空值则返回 null https://sparkfunctions.com/bit_or
bit_xor bit_xor(expr) - 返回所有非空输入值的按位异或(XOR),如果没有非空值则返回 null https://sparkfunctions.com/bit_xor
bitmap_construct_agg bitmap_construct_agg(child) - 返回一个位图,其中设置了来自子表达式所有值的位位置。子表达式很可能是 bitmap_bit_position()。 https://sparkfunctions.com/bitmap_construct_agg
bitmap_or_agg bitmap_or_agg(child) - 返回一个位图,它是子表达式中所有位图的按位或(OR)结果。输入应该是从 bitmap_construct_agg() 创建的位图 https://sparkfunctions.com/bitmap_or_agg
bool_and bool_and(expr) - 如果 expr 的所有值都为真,则返回真 https://sparkfunctions.com/bool_and
bool_or bool_or(expr) - 如果 expr 中至少有一个值为真,则返回真 https://sparkfunctions.com/bool_or
collect_list collect_list(expr) - 收集并返回一个非唯一元素的列表 https://sparkfunctions.com/collect_list
collect_set collect_set(expr) - 收集并返回一个唯一元素的集合。 https://sparkfunctions.com/collect_set
corr corr(expr1, expr2) - 返回一组数字对之间的皮尔逊相关系数 https://sparkfunctions.com/corr
count(*) 返回检索到的总行数,包括包含 null 的行。 https://sparkfunctions.com/count
count(expr[, expr...]) 返回表达式列表中所有表达式都不为 null 的行数 https://sparkfunctions.com/count
count(DISTINCT expr[, expr...]) 返回表达式列表中唯一且非 null 的行数 https://sparkfunctions.com/count
count_if count_if(expr) - 返回表达式中 TRUE 值的数量 https://sparkfunctions.com/count_if
count_min_sketch count_min_sketch(col, eps, confidence, seed) - 返回给定列的计数-最小草图,使用指定的误差界限(eps)、置信度(confidence)和种子(seed) https://sparkfunctions.com/count_min_sketch
covar_pop covar_pop(expr1, expr2) - 返回一组数字对的总体协方差。 https://sparkfunctions.com/covar_pop
covar_samp covar_samp(expr1, expr2) - 返回一组数字对的样本协方差 https://sparkfunctions.com/covar_samp
every every(expr) - 如果 expr 的所有值都为真,则返回真 https://sparkfunctions.com/every
first first(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的第一个值。如果 isIgnoreNull 为真,则只返回非空值 https://sparkfunctions.com/first
first_value first_value(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的第一个值。如果 isIgnoreNull 设置为真,则只返回非空值 https://sparkfunctions.com/first_value
grouping grouping(col) - 表示在 GROUP BY 中指定的列是否被聚合,返回值 1 表示已聚合,返回值 0 表示未聚合 https://sparkfunctions.com/grouping
grouping_id grouping_id([col1[, col2 ..]]) - 返回分组的级别,等同于 (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn),其中 n 是 GROUP BY 子句中列的数量,grouping 函数返回的是 0 或 1,表示相应列是否被聚合 https://sparkfunctions.com/grouping_id
histogram_numeric histogram_numeric(expr, nb) - 使用 nb 个箱位对数值 'expr' 进行直方图计算 https://sparkfunctions.com/histogram_numeric
hll_sketch_agg hll_sketch_agg(expr, lgConfigK) - 返回 HllSketch 的可更新二进制表示 https://sparkfunctions.com/hll_sketch_agg
hll_union_agg hll_union_agg(expr, allowDifferentLgConfigK) - 返回估计的独特值数量 https://sparkfunctions.com/hll_union_agg
kurtosis kurtosis(expr) - 根据一组值计算并返回峰态值 https://sparkfunctions.com/kurtosis
last last(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的最后一个值。如果 isIgnoreNull 设置为真,则只返回非空值 https://sparkfunctions.com/last
last_value last_value(expr[, isIgnoreNull]) - 对于一组行,返回 expr 的最后一个值。如果 isIgnoreNull 设置为真,则只返回非空值 https://sparkfunctions.com/last_value
max max(expr) - 返回 expr 的最大值 https://sparkfunctions.com/max
max_by max_by(x, y) - 返回与 y 的最大值相关联的 x 值 https://sparkfunctions.com/max_by
mean mean(expr) - 根据一组值计算并返回平均值 https://sparkfunctions.com/mean
median median(col) - 返回数值或 ANSI interval col 的中位数 https://sparkfunctions.com/median
min min(expr) - 返回 expr 的最小值 https://sparkfunctions.com/min
min_by min_by(x, y) - 返回与 y 的最小值相关联的 x 值 https://sparkfunctions.com/min_by
mode mode(col) - 返回列 col 中出现频率最高的值。忽略 NULL 值。如果所有值都是 NULL,或者没有行,则返回 NULL https://sparkfunctions.com/mode
percentile(col, percentage [, frequency]) percentile(col, percentage [, frequency]) - 返回数值列 col 或 ANSI 间隔列 col 在给定百分比的确切百分位数值 https://sparkfunctions.com/percentile
percentile(col, array(percentage1 [, percentage2]...) [, frequency]) 返回数值列 col 在给定的百分比(或多个百分比)的确切百分位数值数组。百分比数组中的每个值都必须在 0.0 和 1.0 之间。frequency 的值应该是正整数。 https://sparkfunctions.com/percentile
percentile_approx percentile_approx(col, percentage [, accuracy]) - 返回数值列或 ANSI 间隔列 col 的近似百分位数,这是 col 值中排序后(从最小到最大)的最小值,使得不超过 percentage 指定的比例的 col 值小于或等于该值 https://sparkfunctions.com/percentile_approx
regr_avgx(y, x) regr_avgx(y, x) - 返回组中非空值对的自变量的平均值,其中 y 是因变量,x 是自变量 https://sparkfunctions.com/regr_avgx
regr_avgy(y, x) regr_avgy(y, x) - 返回组中非空值对的因变量的平均值,其中 y 是因变量,x 是自变量 https://sparkfunctions.com/regr_avgy
regr_count(y, x) regr_count(y, x) - 返回组中非空数值对的数量,其中 y 是因变量,x 是自变量 https://sparkfunctions.com/regr_count
regr_intercept(y, x) regr_intercept(y, x) - 返回在组中非空值对的单变量线性回归线的截距,其中 y 是因变量,x 是自变量。 https://sparkfunctions.com/regr_intercept
regr_r2(y, x) regr_r2(y, x) - 返回组中非空对的确定系数,其中 y 是因变量,x 是自变量 https://sparkfunctions.com/regr_r2
regr_slope(y, x) regr_slope(y, x) - 返回组中非空值对的线性回归线的斜率,其中 y 是因变量,x 是自变量 https://sparkfunctions.com/regr_slope
regr_sxx(y, x) regr_sxx(y, x) - 对于组中的非空值对,返回 REGR_COUNT(y, x) * VAR_POP(x),其中 y 是因变量,x 是自变量 https://sparkfunctions.com/regr_sxx
regr_sxy(y, x) regr_sxy(y, x) - 对于组中的非空值对,返回 REGR_COUNT(y, x) * COVAR_POP(y, x),其中 y 是因变量,x 是自变量 https://sparkfunctions.com/regr_sxy
regr_syy(y, x) regr_syy(y, x) - 对于组中的非空值对,返回 REGR_COUNT(y, x) * VAR_POP(y),其中 y 是因变量,x 是自变量 https://sparkfunctions.com/regr_syy
skewness skewness(expr) - 返回根据一个组中的值计算出的偏度值 https://sparkfunctions.com/skewness
some some(expr) - 如果expr中至少有一个值为真,则返回真 https://sparkfunctions.com/some
std std(expr) - 返回根据一个组中的值计算出的样本标准差。 https://sparkfunctions.com/std
stddev stddev(expr) - 返回根据一个组中的值计算出的样本标准差 https://sparkfunctions.com/stddev
stddev_pop stddev_pop(expr) - 返回根据一个组中的值计算出的总体标准差 https://sparkfunctions.com/stddev_pop
stddev_samp(expr) stddev_samp(expr) - 返回根据一个组中的值计算出的样本标准差 https://sparkfunctions.com/stddev_samp
sum sum(expr) - 返回根据一个组中的值计算出的总和 https://sparkfunctions.com/sum
try_avg try_avg(expr) - 从一组值中计算平均值,如果发生溢出,则结果为null https://sparkfunctions.com/try_avg
try_sum try_sum(expr) - 从一组值中计算总和,如果发生溢出,则结果为null https://sparkfunctions.com/try_sum
var_pop var_pop(expr) - 返回根据一个组中的值计算出的总体方差 https://sparkfunctions.com/var_pop
var_samp var_samp(expr) - 返回根据一个组中的值计算出的样本方差 https://sparkfunctions.com/var_samp
variance variance(expr) - 返回根据一个组中的值计算出的样本方差。 https://sparkfunctions.com/variance

3.空值选项

[ nulls_option ] 指定在评估窗口函数时是否跳过空值。

  • RESPECT NULLS 表示不跳过空值
  • IGNORE NULLS 表示跳过空值。 如果未指定,默认值为 RESPECT NULLS。 仅 LAG | LEAD | NTH_VALUE | FIRST_VALUE | LAST_VALUE函数可以使用IGNORE NULLS

4.排序方式

  • 排序函数(Ranking Functions)、分析函数(Analytic Functions)开窗时必须要进行排序;
  • 聚合函数(Aggregate Functions)根据需要进行排序。

4.1聚合函数开窗的排序

聚合函数开窗可以排序也可以不排序。

  • 不排序则窗口框架范围,计算范围为整个分区;
  • 排序不指定窗口框架范围,计算范围为分区开始行到当前行;
  • 排序制定窗口框架范围,按照制定范围聚合

举例: 样例数据

--建表语句
CREATE TABLE t_employees
(
    name   STRING,
    dept   STRING,
    salary BIGINT,
    age    BIGINT
);
--插入数据
INSERT INTO t_employees
VALUES ("Lisa", "Sales", 10000, 35),
       ("Evan", "Sales", 32000, 38),
       ("Fred", "Engineering", 21000, 28),
       ("Alex", "Sales", 30000, 33),
       ("Tom", "Engineering", 23000, 33),
       ("Jane", "Marketing", 29000, 28),
       ("Helen", "Marketing", 29000, 40),
       ("Jeff", "Marketing", 35000, 38),
       ("Paul", "Engineering", 29000, 23),
       ("Chloe", "Engineering", 23000, 25);

--样例数据
+--------+--------------+---------+------+
|  name  |     dept     | salary  | age  |
+--------+--------------+---------+------+
| Lisa   | Sales        | 10000   | 35   |
| Evan   | Sales        | 32000   | 38   |
| Fred   | Engineering  | 21000   | 28   |
| Alex   | Sales        | 30000   | 33   |
| Tom    | Engineering  | 23000   | 33   |
| Jane   | Marketing    | 29000   | 28   |
| Helen  | Marketing    | 29000   | 40   |
| Jeff   | Marketing    | 35000   | 38   |
| Paul   | Engineering  | 29000   | 23   |
| Chloe  | Engineering  | 23000   | 25   |
+--------+--------------+---------+------+

sum()函数举例: 1.计算每个员工所在部门的部门总薪水

select
    name,
    dept,
    salary,
    age,
    sum(salary)over(partition by dept) as dept_total_salary
from t_employees;
--执行结果
+--------+--------------+---------+------+--------------------+
|  name  |     dept     | salary  | age  | dept_total_salary  |
+--------+--------------+---------+------+--------------------+
| Fred   | Engineering  | 21000   | 28   | 96000              |
| Tom    | Engineering  | 23000   | 33   | 96000              |
| Paul   | Engineering  | 29000   | 23   | 96000              |
| Chloe  | Engineering  | 23000   | 25   | 96000              |
| Jane   | Marketing    | 29000   | 28   | 93000              |
| Helen  | Marketing    | 29000   | 40   | 93000              |
| Jeff   | Marketing    | 35000   | 38   | 93000              |
| Lisa   | Sales        | 10000   | 35   | 72000              |
| Evan   | Sales        | 32000   | 38   | 72000              |
| Alex   | Sales        | 30000   | 33   | 72000              |
+--------+--------------+---------+------+--------------------+

注意: 这里dept_total_salayr列得出的结果接统计了部门全部员工的总薪资。

2.按照员工薪资排序,从低到高,每个部门截止到该员工的累积薪水是多少;(累积求和)

--不限定窗口框架
select
    name,
    dept,
    salary,
    age,
    sum(salary)over(partition by dept order by salary asc) as dept_total_salary
from t_employees;
--查询结果
+--------+--------------+---------+------+--------------------+
|  name  |     dept     | salary  | age  | dept_total_salary  |
+--------+--------------+---------+------+--------------------+
| Fred   | Engineering  | 21000   | 28   | 21000              |
| Tom    | Engineering  | 23000   | 33   | 67000              |
| Chloe  | Engineering  | 23000   | 25   | 67000              |
| Paul   | Engineering  | 29000   | 23   | 96000              |
| Jane   | Marketing    | 29000   | 28   | 58000              |
| Helen  | Marketing    | 29000   | 40   | 58000              |
| Jeff   | Marketing    | 35000   | 38   | 93000              |
| Lisa   | Sales        | 10000   | 35   | 10000              |
| Alex   | Sales        | 30000   | 33   | 40000              |
| Evan   | Sales        | 32000   | 38   | 72000              |
+--------+--------------+---------+------+--------------------+

--使用rows限定窗口框架
select name,
       dept,
       salary,
       age,
       sum(salary)
           over (partition by dept order by salary asc rows BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as dept_total_salary
from t_employees;
--查询结果
+--------+--------------+---------+------+--------------------+
|  name  |     dept     | salary  | age  | dept_total_salary  |
+--------+--------------+---------+------+--------------------+
| Fred   | Engineering  | 21000   | 28   | 21000              |
| Tom    | Engineering  | 23000   | 33   | 44000              |
| Chloe  | Engineering  | 23000   | 25   | 67000              |
| Paul   | Engineering  | 29000   | 23   | 96000              |
| Jane   | Marketing    | 29000   | 28   | 29000              |
| Helen  | Marketing    | 29000   | 40   | 58000              |
| Jeff   | Marketing    | 35000   | 38   | 93000              |
| Lisa   | Sales        | 10000   | 35   | 10000              |
| Alex   | Sales        | 30000   | 33   | 40000              |
| Evan   | Sales        | 32000   | 38   | 72000              |
+--------+--------------+---------+------+--------------------+

--使用range限定窗口框架
select name,
       dept,
       salary,
       age,
       sum(salary)
           over (partition by dept order by salary asc range BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as dept_total_salary
from t_employees;

--查询结果
+--------+--------------+---------+------+--------------------+
|  name  |     dept     | salary  | age  | dept_total_salary  |
+--------+--------------+---------+------+--------------------+
| Fred   | Engineering  | 21000   | 28   | 21000              |
| Tom    | Engineering  | 23000   | 33   | 67000              |
| Chloe  | Engineering  | 23000   | 25   | 67000              |
| Paul   | Engineering  | 29000   | 23   | 96000              |
| Jane   | Marketing    | 29000   | 28   | 58000              |
| Helen  | Marketing    | 29000   | 40   | 58000              |
| Jeff   | Marketing    | 35000   | 38   | 93000              |
| Lisa   | Sales        | 10000   | 35   | 10000              |
| Alex   | Sales        | 30000   | 33   | 40000              |
| Evan   | Sales        | 32000   | 38   | 72000              |
+--------+--------------+---------+------+--------------------+

注意:

  1. 使用sum开窗后增加order by子句,sum的结果不在是整个部门,而是截止到当前值的结果。
  2. 如果省略了窗口框架内容,则相当于使用了range,限定截止到当前行的,关注Tom所在行的结果,聚合了包含Chloe的薪水;
  3. 使用rows的窗口框架,则仅包含到Tom的薪水,不含Chloe的薪水;

4.2 排序不唯一带来的问题

要求: 取出每个部门薪水最低的员工记录,要求每个部门仅取出一行记录

分析: 为了保证每个部门仅取出一行记录,我们使用row_number函数来进行处理,具体语句和执行结果如下:

--执行SQL
select name,
       dept,
       salary,
       age
from (select name,
             dept,
             salary,
             age,
             row_number() over (partition by dept order by salary asc) as rn
      from t_employees) t
where rn = 1

--执行结果
+-------+--------------+---------+------+
| name  |     dept     | salary  | age  |
+-------+--------------+---------+------+
| Fred  | Engineering  | 21000   | 28   |
| Jane  | Marketing    | 29000   | 28   |
| Lisa  | Sales        | 10000   | 35   |
+-------+--------------+---------+------+

注意: 关注Marketing部门的记录,取出的是Jane的记录。Jane确实是最低的,但是同时Hellen的薪资也是一样的。虽然当前满足了需求内容,但在实际生产中,发生流程重跑,则数据内容可能发生变化,数据校验出现前后不一致,较难排查。还会影响下游使用,例如使用结果数据计算最低薪水员工的平均年龄,数据重跑之后平均年龄发生变化。所以保证排序唯一十分重要

4.3 排序中的空值

可以在排序时指定空值是排在最前面还是最后面,测试数据中没有空值,仅写SQL了

--样例SQL
select name,
       dept,
       salary,
       age,
       sum(salary)
           over (partition by dept order by salary asc nulls first range BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as dept_total_salary
from t_employees;

5.窗口框架

窗口框架指定窗口从哪一行开始以及在哪里结束 语法 { RANGE | ROWS } { frame_start | BETWEEN frame_start AND frame_end } 框架指定方式分为range方式和rows方式,如果不指定默认为range方式,

frame_startframe_end可以为以下内容 UNBOUNDED PRECEDING: 从分区的第一行开始 offset PRECEDING:从当前行之前的第 offset 行开始。 CURRENT ROW:当前行。 offset FOLLOWING:到当前行之后的第 offset 行结束。 UNBOUNDED FOLLOWING:到分区的最后一行结束。

如果未指定 frame_end,则默认值为 CURRENT ROW