1.序篇

看了那么多的技术文,你能明白作者想让你在读完文章后学到什么吗?

大数据羊说的文章会让你明白

1. 博主会阐明博主期望本文能给小伙伴们带来什么帮助,让小伙伴萌能直观明白博主的心思

2. 博主会以实际的应用场景和案例入手,不只是知识点的简单堆砌

3. 博主会把重要的知识点的原理进行剖析,让小伙伴萌做到深入浅出

进入正文。

源码公众号后台回复1.13.2 sql join 的奇妙解析之路获取。

下面即是文章目录,也对应到本文的结论,小伙伴可以先看结论快速了解本文能给你带来什么帮助:

  1. 背景及应用场景介绍:join 作为离线数仓中最常见的场景,在实时数仓中也必然不可能缺少它,flink sql 提供的丰富的 join 方式(总结 6 种:regular join,维表 join,temporal join,interval join,array 拍平,table function 函数)对我们满足需求提供了强大的后盾
  2. 先来一个实战案例:以一个曝光日志 left join 点击日志为案例展开,介绍 flink sql join 的解决方案
  3. flink sql join 的解决方案以及存在问题的介绍:主要介绍 regular join 的在上述案例的运行结果及分析源码机制,它虽然简单,但是 left join,right join,full join 会存在着 retract 的问题,所以在使用前,你应该充分了解其运行机制,避免出现数据发重,发多的问题。
  4. 本文主要介绍 regular join retract 的问题,下节介绍怎么使用 interval join 来避免这种 retract 问题,并满足第 2 点的实战案例需求。

2.背景及应用场景介绍

在我们的日常场景中,应用最广的一种操作必然有 join 的一席之地,例如

  1. 计算曝光数据和点击数据的 CTR,需要通过唯一 id 进行 join 关联
  2. 事实数据关联维度数据获取维度,进而计算维度指标

上述场景,在离线数仓应用之广就不多说了。

那么,实时流之间的关联要怎么操作呢?

flink sql 为我们提供了四种强大的关联方式,帮助我们在流式场景中达到流关联的目的。如下图官网截图所示:

join

  1. regular join:即 left join,right join,full join,inner join
  2. 维表 lookup join:维表关联
  3. temporal join:快照表 join
  4. interval join:两条流在一段时间区间之内的 join
  5. array 炸开:列转行
  6. table function join:通过 table function 自定义函数实现 join(类似于列转行的效果,或者说类似于维表 join 的效果)

在实时数仓中,regular join 以及 interval join,以及两种 join 的结合使用是最常使用的。所以本文主要介绍这两种(太长的篇幅大家可能也不想看,所以之后的文章就以简洁,短为目标)。

3.先来一个实战案例

先来一个实际案例来看看在具体输入值的场景下,输出值应该长啥样。

场景:即常见的曝光日志流(show_log)通过 log_id 关联点击日志流(click_log),将数据的关联结果进行下发。

来一波输入数据:

曝光数据:

点击数据:

预期输出数据如下:

熟悉离线 hive sql 的同学可能 10s 就写完上面这个 sql 了,如下 hive sql

INSERT INTO sink_table
SELECT
    show_log.log_id as log_id,
    show_log.timestamp as timestamp,
    show_log.show_params as show_params,
    click_log.click_params as click_params
FROM show_log
LEFT JOIN click_log ON show_log.log_id = click_log.log_id;

那么我们看看上述需求如果要以 flink sql 实现需要怎么做呢?

虽然不 flink sql 提供了 left join 的能力,但是在实际使用时,可能会出现预期之外的问题。下节详述。

4.flink sql join

4.1.flink sql

还是上面的案例,我们先实际跑一遍看看结果:

INSERT INTO sink_table
SELECT
    show_log.log_id as log_id,
    show_log.timestamp as timestamp,
    show_log.show_params as show_params,
    click_log.click_params as click_params
FROM show_log
LEFT JOIN click_log ON show_log.log_id = click_log.log_id;

flink web ui 算子图如下:

flink web ui

结果如下:

+[1 | 2021-11-01 00:01:03 | show_params | null]
-[1 | 2021-11-01 00:01:03 | show_params | null]
+[1 | 2021-11-01 00:01:03 | show_params | click_params]
+[2 | 2021-11-01 00:03:00 | show_params | click_params]
+[3 | 2021-11-01 00:05:00 | show_params | null]

从结果上看,其输出数据有 +-,代表其输出的数据是一个 retract 流的数据。分析原因发现是,由于第一条 show_log 先于 click_log 到达,
所以就先直接发出 +[1 | 2021-11-01 00:01:03 | show_params | null],后面 click_log 到达之后,将上一次未关联到的 show_log 撤回,
然后将关联到的 +[1 | 2021-11-01 00:01:03 | show_params | click_params] 下发。

但是 retract 流会导致写入到 kafka 的数据变多,这是不可被接受的。我们期望的结果应该是一个 append 数据流。

为什么 left join 会出现这种问题呢?那就要从 left join 的原理说起了。

来定位到具体的实现源码。先看一下 transformations。

transformations

可以看到 left join 的具体 operator 是 org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator

其核心逻辑就集中在 processElement 方法上面。并且源码对于 processElement 的处理逻辑有详细的注释说明,如下图所示。

StreamingJoinOperator#processElement

注释看起来逻辑比较复杂。我们这里按照 left join,inner join,right join,full join 分类给大家解释一下。

4.2.left join

首先是 left join,以上面的 show_log(左表) left join click_log(右表) 为例:

  1. 首先如果 join xxx on 中的条件是等式则代表 join 是在相同 key 下进行的,join 的 key 即 show_log.log_id,click_log.log_id,相同 key 的数据会被发送到一个并发中进行处理。如果 join xxx on 中的条件是不等式,则两个流的 source 算子向 join 算子下发数据是按照 global 的 partition 策略进行下发的,并且 join 算子并发会被设置为 1,所有的数据会被发送到这一个并发中处理。
  2. 相同 key 下,当 show_log 来一条数据,如果 click_log 有数据:则 show_log 与 click_log 中的所有数据进行遍历关联一遍输出[+(show_log,click_log)]数据,并且把 show_log 保存到左表的状态中(以供后续 join 使用)。
  3. 相同 key 下,当 show_log 来一条数据,如果 click_log 中没有数据:则 show_log 不会等待,直接输出[+(show_log,null)]数据,并且把 show_log 保存到左表的状态中(以供后续 join 使用)。
  4. 相同 key 下,当 click_log 来一条数据,如果 show_log 有数据:则 click_log 对 show_log 中所有的数据进行遍历关联一遍。在输出数据前,会判断,如果被关联的这条 show_log 之前没有关联到过 click_log(即往下发过[+(show_log,null)]),则先发一条[-(show_log,null)],后发一条[+(show_log,click_log)]
    ,代表把之前的那条没有关联到 click_log 数据的 show_log 中间结果给撤回,把当前关联到的最新结果进行下发,并把 click_log 保存到右表的状态中(以供后续左表进行关联)。这也就解释了为什么输出流是一个 retract 流。
  5. 相同 key 下,当 click_log 来一条数据,如果 show_log 没有数据:把 click_log 保存到右表的状态中(以供后续左表进行关联)。

4.3.inner join

以上面的 show_log(左表) inner join click_log(右表) 为例:

  1. 首先如果 join xxx on 中的条件是等式则代表 join 是在相同 key 下进行的,join 的 key 即 show_log.log_id,click_log.log_id,相同 key 的数据会被发送到一个并发中进行处理。如果 join xxx on 中的条件是不等式,则两个流的 source 算子向 join 算子下发数据是按照 global 的 partition 策略进行下发的,并且 join 算子并发会被设置为 1,所有的数据会被发送到这一个并发中处理。
  2. 相同 key 下,当 show_log 来一条数据,如果 click_log 有数据:则 show_log 与 click_log 中的所有数据进行遍历关联一遍输出[+(show_log,click_log)]数据,并且把 show_log 保存到左表的状态中(以供后续 join 使用)。
  3. 相同 key 下,当 show_log 来一条数据,如果 click_log 中没有数据:则 show_log 不会输出数据,会把 show_log 保存到左表的状态中(以供后续 join 使用)。
  4. 相同 key 下,当 click_log 来一条数据,如果 show_log 有数据:则 click_log 与 show_log 中的所有数据进行遍历关联一遍输出[+(show_log,click_log)]数据,并且把 click_log 保存到右表的状态中(以供后续 join 使用)。
  5. 相同 key 下,当 click_log 来一条数据,如果 show_log 没有数据:则 click_log 不会输出数据,会把 click_log 保存到右表的状态中(以供后续 join 使用)。

4.4.right join

right join 和 left join 一样,只不过顺序反了,这里不再赘述。

4.5.full join

以上面的 show_log(左表) full join click_log(右表) 为例:

  1. 首先如果 join xxx on 中的条件是等式则代表 join 是在相同 key 下进行的,join 的 key 即 show_log.log_id,click_log.log_id,相同 key 的数据会被发送到一个并发中进行处理。如果 join xxx on 中的条件是不等式,则两个流的 source 算子向 join 算子下发数据是按照 global 的 partition 策略进行下发的,并且 join 算子并发会被设置为 1,所有的数据会被发送到这一个并发中处理。
  2. 相同 key 下,当 show_log 来一条数据,如果 click_log 有数据:则 show_log 对 click_log 中所有的数据进行遍历关联一遍。在输出数据前,会判断,如果被关联的这条 click_log 之前没有关联到过 show_log(即往下发过[+(null,click_log)]),则先发一条[-(null,click_log)],后发一条[+(show_log,click_log)]
    ,代表把之前的那条没有关联到 show_log 数据的 click_log 中间结果给撤回,把当前关联到的最新结果进行下发,并把 show_log 保存到左表的状态中(以供后续 join 使用)
  3. 相同 key 下,当 show_log 来一条数据,如果 click_log 中没有数据:则 show_log 不会等待,直接输出[+(show_log,null)]数据,并且把 show_log 保存到左表的状态中(以供后续 join 使用)。
  4. 相同 key 下,当 click_log 来一条数据,如果 show_log 有数据:则 click_log 对 show_log 中所有的数据进行遍历关联一遍。在输出数据前,会判断,如果被关联的这条 show_log 之前没有关联到过 click_log(即往下发过[+(show_log,null)]),则先发一条[-(show_log,null)],后发一条[+(show_log,click_log)]
    ,代表把之前的那条没有关联到 click_log 数据的 show_log 中间结果给撤回,把当前关联到的最新结果进行下发,并把 click_log 保存到右表的状态中(以供后续 join 使用)
  5. 相同 key 下,当 click_log 来一条数据,如果 show_log 中没有数据:则 click_log 不会等待,直接输出[+(null,click_log)]数据,并且把 click_log 保存到右表的状态中(以供后续 join 使用)。

4.6.regular join 的总结

总的来说上述四种 join 可以按照以下这么划分。

  1. inner join 会互相等,直到有数据才下发。
  2. left join,right join,full join 不会互相等,只要来了数据,会尝试关联,能关联到则下发的字段是全的,关联不到则另一边的字段为 null。后续数据来了之后,发现之前下发过为没有关联到的数据时,就会做回撤,把关联到的结果进行下发

4.7.怎样才能解决 retract 导致数据重复下发到 kafka 这个问题呢?

既然 flink sql 在 left join、right join、full join 实现上的原理就是以这种 retract 的方式去实现的,就不能通过这种方式来满足业务了。

我们来转变一下思路,上述 join 的特点就是不会相互等,那有没有一种 join 是可以相互等待的呢。以 left join 的思路为例,左表在关联不到右表的时候,可以选择等待一段时间,如果超过这段时间还等不到再下发 (show_log,null),如果等到了就下发(show_log,click_log)。

interval join 闪亮登场。关于 interval join 是如何实现上述场景,及其原理实现,本篇的(下)会详细介绍,敬请期待。

5.总结与展望

源码公众号后台回复1.13.2 sql join 的奇妙解析之路获取。

本文主要介绍了 flink sql regular 的在满足 join 场景时存在的问题,并通过解析其实现说明了运行原理,主要包含下面两部分:

  1. 背景及应用场景介绍:join 作为离线数仓中最常见的场景,在实时数仓中也必然不可能缺少它,flink sql 提供的丰富的 join 方式(总结 4 种:regular join,维表 join,temporal join,interval join)对我们满足需求提供了强大的后盾
  2. 先来一个实战案例:以一个曝光日志 left join 点击日志为案例展开,介绍 flink sql join 的解决方案
  3. flink sql join 的解决方案以及存在问题的介绍:主要介绍 regular join 的在上述案例的运行结果及分析源码机制,它虽然简单,但是 left join,right join,full join 会存在着 retract 的问题,所以在使用前,你应该充分了解其运行机制,避免出现数据发重,发多的问题。
  4. 本文主要介绍 regular join retract 的问题,下节介绍怎么使用 interval join 来避免这种 retract 问题,并满足第 2 点的实战案例需求。

废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:

  1. 背景及应用场景介绍:博主期望你能了解到,flink sql 提供的丰富的 join 方式(总结 6 种:regular join,维表 join,快照 join,interval join,array 拍平,table function)对我们满足需求提供了强大的后盾,
    这 6 种 join 中涉及到流与流的 join 最常用的是 regular join 以及 interval join,本节主要介绍 interval join
  2. 来一个实战案例:博主以上节说到的曝光日志流点击日志流为案例展开,主要是想告诉小伙伴 flink sql left join 数据不会互相等待,存在 retract 问题,会导致写入 kafka 的数据量变大,
    然后转变思路为使用 flink sql interval join 的方式可以使得数据互相等待一段时间进行 join,这种方式不会存在 retract 问题
  3. flink sql interval join 的解决方案以及原理的介绍:主要介绍 interval join 的在上述实战案例的运行结果及分析源码机制,博主期望你能了解到,interval join 的执行机制是会在你设置的 interval 区间之内互相等待一段时间,一旦时间推进(事件时间由 watermark 推进)到区间之外(即当前这条数据再也不可能被另一条流的数据 join 到时),outer join 会输出没有 join 到的数据,inner join 会从 state 中删除这条数据
  4. 总结及展望

2.背景及应用场景介绍

书接上文,上文介绍了曝光流在关联点击流时,使用 flink sql regular join 存在的 retract 问题。

本文介绍怎么使用 flink sql interval join 解决这些问题。

3.来一个实战案例

flink sql 知其所以然(十二):流 join 很难嘛???(上)

看看上节的实际案例,来看看在具体输入值的场景下,输出值应该长啥样。

场景:即常见的曝光日志流(show_log)通过 log_id 关联点击日志流(click_log),将数据的关联结果进行下发。

来一波输入数据:

曝光数据:

点击数据:

预期输出数据如下:

上节的 flink sql regular join 解决方案如下:

INSERT INTO sink_table
SELECT
    show_log.log_id as log_id,
    show_log.timestamp as timestamp,
    show_log.show_params as show_params,
    click_log.click_params as click_params
FROM show_log
LEFT JOIN click_log ON show_log.log_id = click_log.log_id;

上节说道,flink sql left join 在流数据到达时,如果左表流(show_log)join 不到右表流(click_log)
,则不会等待右流直接输出(show_log,null),在后续右表流数据代打时,会将(show_log,null)撤回,发送(show_log,click_log)。这就是为什么产生了 retract 流,从而导致重复写入 kafka。

对此,我们也是提出了对应的解决思路,既然 left join 中左流不会等待右流,那么能不能让左流强行等待右流一段时间,实在等不到在数据关联不到的数据即可。

当当当!!!

本文的 flink sql interval join 登场,它就能等。

4.flink sql interval join

4.1.interval join 定义

大家先通过下面这句话和图简单了解一下 interval join 的作用(熟悉 DataStream 的小伙伴萌可能已经使用过了),后续会详细介绍原理。

interval join 就是用一个流的数据去关联另一个流的一段时间区间内的数据。关联到就下发关联到的数据,关联不到且在超时后就根据是否是 outer join(left join,right join,full join)下发没关联到的数据。

interval join

4.2.案例解决方案

来看看上述案例的 flink sql interval join sql 怎么写:

INSERT INTO sink_table
SELECT
    show_log.log_id as log_id,
    show_log.timestamp as timestamp,
    show_log.show_params as show_params,
    click_log.click_params as click_params
FROM show_log LEFT JOIN click_log ON show_log.log_id = click_log.log_id
AND show_log.row_time 
    BETWEEN click_log.row_time - INTERVAL '10' MINUTE 
    AND click_log.row_time + INTERVAL '10' MINUTE;

这里设置了 show_log.row_time BETWEEN click_log.row_time - INTERVAL '10' MINUTE AND click_log.row_time + INTERVAL '10' MINUTE代表 show_log 表中的数据会和 click_log 表中的 row_time 在前后 10 分钟之内的数据进行关联。

运行结果如下:

+[1 | 2021-11-01 00:01:03 | show_params | click_params]
+[2 | 2021-11-01 00:03:00 | show_params | click_params]
+[3 | 2021-11-01 00:05:00 | show_params | null]

如上就是我们期望的正确结果了。

flink web ui 算子图如下:

flink web ui

那么此时你可能有一个问题,结果中的前两条数据 join 到了输出我是理解的,那当 show_log join 不到 click_log 时为啥也输出了?原理是啥?

博主带你们来定位到具体的实现源码。先看一下 transformations。

transformations

可以看到事件时间下 interval join 的具体 operator 是 org.apache.flink.table.runtime.operators.join.KeyedCoProcessOperatorWithWatermarkDelay

其核心逻辑就集中在 processElement1 和 processElement2 中,在 processElement1 和 processElement2 中使用 org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin 来处理具体 join 逻辑。RowTimeIntervalJoin 重要方法如下图所示。

TimeIntervalJoin

下面详细给大家解释一下。

4.3.TimeIntervalJoin 简版说明

join 时,左流和右流会在 interval 时间之内相互等待,如果等到了则输出数据[+(show_log,click_log)],如果等不到,并且另一条流的时间已经推进到当前这条数据在也不可能 join 到另一条流的数据时,则直接输出[+(show_log,null)],[+(null,click_log)]。

举个例子,show_log.row_time BETWEEN click_log.row_time - INTERVAL '10' MINUTE AND click_log.row_time + INTERVAL '10' MINUTE
当 click_log 的时间推进到 2021-11-01 11:00:00 时,这时 show_log 来一条 2021-11-01 02:00:00 的数据,
那这条 show_log 必然不可能和 click_log 中的数据 join 到了,因为 click_log 中 2021-11-01 01:50:00 到 2021-11-01 02:10:00 之间的数据以及过期删除了。则 show_log 直接输出 [+(show_log,null)]

Notes:
如果你设置了 allowLateness,join 不到的数据的输出和 state 的清理会多保留 allowLateness 时间

4.4.TimeIntervalJoin 详细实现说明

以上面案例的 show_log(左表) interval join click_log(右表) 为例(不管是 inner interval join,left interval join,right interval join 还是 full interval join,都会按照下面的流程执行):

  1. 第一步,首先如果 join xxx on 中的条件是等式则代表 join 是在相同 key 下进行的(上述案例中 join 的 key 即 show_log.log_id,click_log.log_id),相同 key 的数据会被发送到一个并发中进行处理。如果 join xxx on 中的条件是不等式,则两个流的 source 算子向 join 算子下发数据是按照 global 的 partition 策略进行下发的,并且 join 算子并发会被设置为 1,所有的数据会被发送到这一个并发中处理。
  2. 第二步,相同 key 下,一条 show_log 的数据先到达,首先会计算出下面要使用的最重要的三类时间戳:
  • 根据 show_log 的时间戳(l_time)计算出能关联到的右流的时间区间下限(r_lower)、上限(r_upper)
  • 根据 show_log 目前的 watermark 计算出目前右流的数据能够过期做过期处理的时间的最小值(r_expire)
  • 获取左流的 l_watermark,右流的 r_watermark,这两个时间戳在事件语义的任务中都是 watermark
  1. 第三步,遍历所有同 key 下的 click_log 来做 join
  • 对于遍历的每一条 click_log,走如下步骤
  • 经过判断,如果 on 中的条件为 true,则和 click_log 关联,输出[+(show_log,click_log)]数据;如果 on 中的条件为 false,则啥也不干
  • 接着判断当前这条 click_log 的数据时间(r_time)是否小于右流的数据过期时间的最小值(r_expire)(即判断这条 click_log 是否永远不会再被 show_log join 到了)。如果小于,并且当前 click_log 这一侧是 outer join,则不用等直接输出[+(null,click_log)]),从状态删除这条 click_log;如果 click_log 这一侧不是 outer join,则直接从状态里删除这条 click_log。
  1. 第四步,判断右流的时间戳(r_watermark)是否小于能关联到的右流的时间区间上限(r_upper):
  • 如果是,则说明这条 show_log 还有可能被 click_log join 到,则 show_log 放到 state 中,并注册后面用于状态清除的 timer。
  • 如果否,则说明关联不到了,则输出[+(show_log,null)]
  1. 第五步,timer 触发时:
  • timer 触发时,根据当前 l_watermark,r_watermark 以及 state 中存储的 show_log,click_log 的 l_time,r_time 判断是否再也不会被对方 join 到,如果是,则根据是否为 outer join 对应输出[+(show_log,null)],[+(null,click_log)],并从状态中删除对应的 show_log,click_log。

上面只是左流 show_log 数据到达时的执行流程(即 ProcessElement1),当右流 click_log 到达时也是完全类似的执行流程(即 ProcessElement2)。

4.5.使用注意事项

小伙伴萌在使用 interval join 需要注意的两点事项:

  1. interval join 的时间区间取决于日志的真实情况:设置大了容易造成任务的 state 太大,并且时效性也会变差。设置小了,join 不到,下发的数据在后续使用时,数据质量会存在问题。所以小伙伴萌在使用时建议先使用离线数据做一遍两条流的时间戳 diff 比较,来确定真实情况下的时间戳 diff 的分布是怎样的。举例:你通过离线数据 join 并做时间戳 diff 后发现 99% 的数据都能在时间戳相差 5min 以内 join 到,那么你就有依据去设置 interval 时间差为 5min。
  2. interval join 中的时间区间条件即支持事件时间,也支持处理时间。事件时间由 watermark 推进。

5.总结与展望

源码公众号后台回复1.13.2 sql interval join获取。

本文主要介绍了 flink sql interval 是怎么避免出现 flink regular join 存在的 retract 问题的,并通过解析其实现说明了运行原理,博主期望你读完本文之后能了解到:

  1. 背景及应用场景介绍:博主期望你能了解到,flink sql 提供的丰富的 join 方式(总结 6 种:regular join,维表 join,快照 join,interval join,array 拍平,table function)对我们满足需求提供了强大的后盾,
    这 6 种 join 中涉及到流与流的 join 最常用的是 regular join 以及 interval join,本节主要介绍 interval join
  2. 来一个实战案例:博主以上节说到的曝光日志流点击日志流为案例展开,主要是想告诉小伙伴 flink sql left join 数据不会互相等待,存在 retract 问题,会导致写入 kafka 的数据量变大,
    然后转变思路为使用 flink sql interval join 的方式可以使得数据互相等待一段时间进行 join,这种方式不会存在 retract 问题
  3. flink sql interval join 的解决方案以及原理的介绍:主要介绍 interval join 的在上述实战案例的运行结果及分析源码机制,博主期望你能了解到,interval join 的执行机制是会在你设置的 interval 区间之内互相等待一段时间,一旦时间推进(事件时间由 watermark 推进)到区间之外(即当前这条数据再也不可能被另一条流的数据 join 到时),outer join 会输出没有 join 到的数据,inner join 会从 state 中删除这条数据