加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列(12) - Time Interval(Time-windowed) JOIN

发布时间:2019-01-18 07:16:22 所属栏目:教程 来源:孙金城
导读:一、说什么 JOIN 算子是数据处理的核心算子,前面我们在《Apache Flink 漫谈系列(09) - JOIN 算子》介绍了UnBounded的双流JOIN,在《Apache Flink 漫谈系列(10) - JOIN LATERAL》介绍了单流与UDTF的JOIN操作,在《Apache Flink 漫谈系列(11) - Temporal Ta
副标题[/!--empirenews.page--]

一、说什么

JOIN 算子是数据处理的核心算子,前面我们在《Apache Flink 漫谈系列(09) - JOIN 算子》介绍了UnBounded的双流JOIN,在《Apache Flink 漫谈系列(10) - JOIN LATERAL》介绍了单流与UDTF的JOIN操作,在《Apache Flink 漫谈系列(11) - Temporal Table JOIN》又介绍了单流与版本表的JOIN,本篇将介绍在UnBounded数据流上按时间维度进行数据划分进行JOIN操作 - Time Interval(Time-windowed)JOIN, 后面我们叫做Interval JOIN。

二、实际问题

前面章节我们介绍了Flink中对各种JOIN的支持,那么想想下面的查询需求之前介绍的JOIN能否满足?需求描述如下:

比如有一个订单表Orders(orderId, productName, orderTime)和付款表Payment(orderId, payType, payTime)。 假设我们要统计下单一小时内付款的订单信息。

1. 传统数据库解决方式

在传统刘数据库中完成上面的需求非常简单,查询sql如下::

  1. SELECT 
  2. o.orderId, 
  3. o.productName, 
  4. p.payType, 
  5. o.orderTime, 
  6. payTime 
  7. FROM 
  8. Orders AS o JOIN Payment AS p ON 
  9. o.orderId = p.orderId AND p.payTime >= orderTime AND p.payTime < orderTime + 3600 // 秒 

上面查询可以完美的完成查询需求,那么在Apache Flink里面应该如何完成上面的需求呢?

2. Apache Flink解决方式

(1) UnBounded 双流 JOIN

上面查询需求我们很容易想到利用《Apache Flink 漫谈系列(09) - JOIN 算子》介绍了UnBounded的双流JOIN,SQL语句如下:

  1. SELECT 
  2. o.orderId, 
  3. o.productName, 
  4. p.payType, 
  5. o.orderTime, 
  6. payTime 
  7. FROM 
  8. Orders AS o JOIN Payment AS p ON 
  9. o.orderId = p.orderId AND p.payTime >= orderTime AND p.payTime as timestamp < TIMESTAMPADD(SECOND, 3600, orderTime) 

UnBounded双流JOIN可以解决上面问题,这个示例和本篇要介绍的Interval JOIN有什么关系呢?

(2) 性能问题

虽然我们利用UnBounded的JOIN能解决上面的问题,但是仔细分析用户需求,会发现这个需求场景订单信息和付款信息并不需要长期存储,比如2018-12-27 14:22:22的订单只需要保持1小时,因为超过1个小时的订单如果没有被付款就是无效订单了。同样付款信息也不需要长期保持,2018-12-27 14:22:22的订单付款信息如果是2018-12-27 15:22:22以后到达的那么我们也没有必要保存到State中。 而对于UnBounded的双流JOIN我们会一直将数据保存到State中,如下示意图:

Apache Flink 漫谈系列(12) - Time Interval(Time-windowed) JOIN

这样的底层实现,对于当前需求有不必要的性能损失。所以我们有必要开发一种新的可以清除State的JOIN方式(Interval JOIN)来高性能的完成上面的查询需求。

(3) 功能扩展

目前的UnBounded的双流JOIN是后面是没有办法再进行Event-Time的Window Aggregate的。也就是下面的语句在Apache Flink上面是无法支持的:

  1. SELECT COUNT(*) FROM ( 
  2. SELECT 
  3. ..., 
  4. payTime 
  5. FROM Orders AS o JOIN Payment AS p ON 
  6. o.orderId = p.orderId 
  7. ) GROUP BY TUMBLE(payTime, INTERVAL '15' MINUTE) 

因为在UnBounded的双流JOIN中无法保证payTime的值一定大于WaterMark(WaterMark相关可以查阅<>). Apache Flink的Interval JOIN之后可以进行Event-Time的Window Aggregate。

3. Interval JOIN

为了完成上面需求,并且解决性能和功能扩展的问题,Apache Flink在1.4开始开发了Time-windowed Join,也就是本文所说的Interval JOIN。接下来我们详细介绍Interval JOIN的语法,语义和实现原理。

三、什么是Interval JOIN

Interval JOIN 相对于UnBounded的双流JOIN来说是Bounded JOIN。就是每条流的每一条数据会与另一条流上的不同时间区域的数据进行JOIN。对应Apache Flink官方文档的 Time-windowed JOIN(release-1.7之前都叫Time-Windowed JOIN)。

1. Interval JOIN 语法

  1. SELECT ... FROM t1 JOIN t2 ON t1.key = t2.key AND TIMEBOUND_EXPRESSION 

TIMEBOUND_EXPRESSION 有两种写法,如下:

  • L.time between LowerBound(R.time) and UpperBound(R.time)
  • R.time between LowerBound(L.time) and UpperBound(L.time)
  • 带有时间属性(L.time/R.time)的比较表达式。

2. Interval JOIN 语义

Interval JOIN 的语义就是每条数据对应一个 Interval 的数据区间,比如有一个订单表Orders(orderId, productName, orderTime)和付款表Payment(orderId, payType, payTime)。 假设我们要统计在下单一小时内付款的订单信息。SQL查询如下:

  1. SELECT 
  2. o.orderId, 
  3. o.productName, 
  4. p.payType, 
  5. o.orderTime, 
  6. cast(payTime as timestamp) as payTime 
  7. FROM 
  8. Orders AS o JOIN Payment AS p ON 
  9. o.orderId = p.orderId AND 
  10. p.payTime BETWEEN orderTime AND 
  11. orderTime + INTERVAL '1' HOUR 
  • Orders订单数据

Apache Flink 漫谈系列(12) - Time Interval(Time-windowed) JOIN

  • Payment付款数据

Apache Flink 漫谈系列(12) - Time Interval(Time-windowed) JOIN

符合语义的预期结果是 订单id为003的信息不出现在结果表中,因为下单时间2018-12-26 04:53:24.0, 付款时间是 2018-12-26 05:53:30.0超过了1小时付款。

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读