加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列(12) - Time Interval(Time-windowed) JOIN

发布时间:2019-01-18 07:16:22 所属栏目:教程 来源:孙金城
导读:一、说什么 JOIN 算子是数据处理的核心算子,前面我们在《Apache Flink 漫谈系列(09) - JOIN 算子》介绍了UnBounded的双流JOIN,在《Apache Flink 漫谈系列(10) - JOIN LATERAL》介绍了单流与UDTF的JOIN操作,在《Apache Flink 漫谈系列(11) - Temporal Ta

接下来我们以图示的方式直观说明Interval JOIN的语义,我们对上面的示例需求稍微变化一下: 订单可以预付款(不管是否合理,我们只是为了说明语义)也就是订单 前后 1小时的付款都是有效的。SQL语句如下:

  1. SELECT 
  2. ... 
  3. FROM 
  4. Orders AS o JOIN Payment AS p ON 
  5. o.orderId = p.orderId AND 
  6. p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND 
  7. orderTime + INTERVAL '1' HOUR 

这样的查询语义示意图如下:

Apache Flink 漫谈系列(12) - Time Interval(Time-windowed) JOIN

上图有几个关键点,如下:

  • 数据JOIN的区间 - 比如Order时间为3的订单会在付款时间为[2, 4]区间进行JOIN。
  • WaterMark - 比如图示Order最后一条数据时间是3,Payment最后一条数据时间是5,那么WaterMark是根据实际最小值减去UpperBound生成,即:Min(3,5)-1 = 2
  • 过期数据 - 出于性能和存储的考虑,要将过期数据清除,如图当WaterMark是2的时候时间为2以前的数据过期了,可以被清除。

3. Interval JOIN 实现原理

由于Interval JOIN和双流JOIN类似都要存储左右两边的数据,所以底层实现中仍然是利用State进行数据的存储。流计算的特点是数据不停的流入,我们可以不停的进行增量计算,也就是我们每条数据流入都可以进行JOIN计算。我们还是以具体示例和图示来说明内部计算逻辑,如下图:

Apache Flink 漫谈系列(12) - Time Interval(Time-windowed) JOIN

简单解释一下每条记录的处理逻辑如下:

Apache Flink 漫谈系列(12) - Time Interval(Time-windowed) JOIN

实际的内部逻辑会比描述的复杂的多,大家可以根据如上简述理解内部原理即可。

四、示例代码

我们还是以订单和付款示例,将完整代码分享给大家,具体如下(代码基于flink-1.7.0):

  1. import java.sql.Timestamp 
  2.  
  3. import org.apache.flink.api.scala._ 
  4. import org.apache.flink.streaming.api.TimeCharacteristic 
  5. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor 
  6. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
  7. import org.apache.flink.streaming.api.windowing.time.Time 
  8. import org.apache.flink.table.api.TableEnvironment 
  9. import org.apache.flink.table.api.scala._ 
  10. import org.apache.flink.types.Row 
  11.  
  12. import scala.collection.mutable 
  13.  
  14. object SimpleTimeIntervalJoin { 
  15. def main(args: Array[String]): Unit = { 
  16. val env = StreamExecutionEnvironment.getExecutionEnvironment 
  17. val tEnv = TableEnvironment.getTableEnvironment(env) 
  18. env.setParallelism(1) 
  19. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
  20. // 构造订单数据 
  21. val ordersData = new mutable.MutableList[(String, String, Timestamp)] 
  22. ordersData.+=(("001", "iphone", new Timestamp(1545800002000L))) 
  23. ordersData.+=(("002", "mac", new Timestamp(1545800003000L))) 
  24. ordersData.+=(("003", "book", new Timestamp(1545800004000L))) 
  25. ordersData.+=(("004", "cup", new Timestamp(1545800018000L))) 
  26.  
  27. // 构造付款表 
  28. val paymentData = new mutable.MutableList[(String, String, Timestamp)] 
  29. paymentData.+=(("001", "alipay", new Timestamp(1545803501000L))) 
  30. paymentData.+=(("002", "card", new Timestamp(1545803602000L))) 
  31. paymentData.+=(("003", "card", new Timestamp(1545803610000L))) 
  32. paymentData.+=(("004", "alipay", new Timestamp(1545803611000L))) 
  33. val orders = env 
  34. .fromCollection(ordersData) 
  35. .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]()) 
  36. .toTable(tEnv, 'orderId, 'productName, 'orderTime.rowtime) 
  37. val ratesHistory = env 
  38. .fromCollection(paymentData) 
  39. .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]()) 
  40. .toTable(tEnv, 'orderId, 'payType, 'payTime.rowtime) 
  41.  
  42. tEnv.registerTable("Orders", orders) 
  43. tEnv.registerTable("Payment", ratesHistory) 
  44.  
  45. var sqlQuery = 
  46. """ 
  47. |SELECT 
  48. | o.orderId, 
  49. | o.productName, 
  50. | p.payType, 
  51. | o.orderTime, 
  52. | cast(payTime as timestamp) as payTime 
  53. |FROM 
  54. | Orders AS o JOIN Payment AS p ON o.orderId = p.orderId AND 
  55. | p.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR 
  56. |""".stripMargin 
  57. tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery)) 
  58.  
  59. val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row] 
  60. result.print() 
  61. env.execute() 
  62.  
  63.  
  64. class TimestampExtractor[T1, T2] 
  65. extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) { 
  66. override def extractTimestamp(element: (T1, T2, Timestamp)): Long = { 
  67. element._3.getTime 

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读