接下来我们以图示的方式直观说明Interval JOIN的语义,我们对上面的示例需求稍微变化一下: 订单可以预付款(不管是否合理,我们只是为了说明语义)也就是订单 前后 1小时的付款都是有效的。SQL语句如下:
- SELECT
- ...
- FROM
- Orders AS o JOIN Payment AS p ON
- o.orderId = p.orderId AND
- p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND
- orderTime + INTERVAL '1' HOUR
这样的查询语义示意图如下:
上图有几个关键点,如下:
- 数据JOIN的区间 - 比如Order时间为3的订单会在付款时间为[2, 4]区间进行JOIN。
- WaterMark - 比如图示Order最后一条数据时间是3,Payment最后一条数据时间是5,那么WaterMark是根据实际最小值减去UpperBound生成,即:Min(3,5)-1 = 2
- 过期数据 - 出于性能和存储的考虑,要将过期数据清除,如图当WaterMark是2的时候时间为2以前的数据过期了,可以被清除。
3. Interval JOIN 实现原理
由于Interval JOIN和双流JOIN类似都要存储左右两边的数据,所以底层实现中仍然是利用State进行数据的存储。流计算的特点是数据不停的流入,我们可以不停的进行增量计算,也就是我们每条数据流入都可以进行JOIN计算。我们还是以具体示例和图示来说明内部计算逻辑,如下图:
简单解释一下每条记录的处理逻辑如下:
实际的内部逻辑会比描述的复杂的多,大家可以根据如上简述理解内部原理即可。
四、示例代码
我们还是以订单和付款示例,将完整代码分享给大家,具体如下(代码基于flink-1.7.0):
- import java.sql.Timestamp
-
- import org.apache.flink.api.scala._
- import org.apache.flink.streaming.api.TimeCharacteristic
- import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.streaming.api.windowing.time.Time
- import org.apache.flink.table.api.TableEnvironment
- import org.apache.flink.table.api.scala._
- import org.apache.flink.types.Row
-
- import scala.collection.mutable
-
- object SimpleTimeIntervalJoin {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setParallelism(1)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- // 构造订单数据
- val ordersData = new mutable.MutableList[(String, String, Timestamp)]
- ordersData.+=(("001", "iphone", new Timestamp(1545800002000L)))
- ordersData.+=(("002", "mac", new Timestamp(1545800003000L)))
- ordersData.+=(("003", "book", new Timestamp(1545800004000L)))
- ordersData.+=(("004", "cup", new Timestamp(1545800018000L)))
-
- // 构造付款表
- val paymentData = new mutable.MutableList[(String, String, Timestamp)]
- paymentData.+=(("001", "alipay", new Timestamp(1545803501000L)))
- paymentData.+=(("002", "card", new Timestamp(1545803602000L)))
- paymentData.+=(("003", "card", new Timestamp(1545803610000L)))
- paymentData.+=(("004", "alipay", new Timestamp(1545803611000L)))
- val orders = env
- .fromCollection(ordersData)
- .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]())
- .toTable(tEnv, 'orderId, 'productName, 'orderTime.rowtime)
- val ratesHistory = env
- .fromCollection(paymentData)
- .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]())
- .toTable(tEnv, 'orderId, 'payType, 'payTime.rowtime)
-
- tEnv.registerTable("Orders", orders)
- tEnv.registerTable("Payment", ratesHistory)
-
- var sqlQuery =
- """
- |SELECT
- | o.orderId,
- | o.productName,
- | p.payType,
- | o.orderTime,
- | cast(payTime as timestamp) as payTime
- |FROM
- | Orders AS o JOIN Payment AS p ON o.orderId = p.orderId AND
- | p.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR
- |""".stripMargin
- tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))
-
- val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
- result.print()
- env.execute()
- }
-
- }
-
- class TimestampExtractor[T1, T2]
- extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) {
- override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
- element._3.getTime
- }
- }
(编辑:核心网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|