加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列(11) - Temporal Table JOIN

发布时间:2018-12-12 18:13:35 所属栏目:教程 来源:孙金城
导读:一、什么是Temporal Table 在《Apache Flink 漫谈系列 - JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家详细介绍什么是Temporal Table JOIN。 在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这

执行之后当前表信息会更新并在历史表里面产生一条历史信息,如下:

Apache Flink 漫谈系列(11) - Temporal Table JOIN

注意当前表的SysStartTime意见发生了变化,历史表产生了一条记录,SyStartTIme是原当前表记录的SysStartTime,SysEndTime是当前表记录的SystemStartTime。我们再更新一次:

  1. UPDATE [dbo].[Department] SET [ManagerID] = 201 WHERE [DeptID] = 10 

Apache Flink 漫谈系列(11) - Temporal Table JOIN

到这里我们了解到SQLServer里面关于Temporal Table的逻辑是有当前表和历史表来存储数据,并且数据库内部以StartTime和EndTime的方式管理数据的版本。

(3) SELECT

  1. SELECT [DeptID], [DeptName], [SysStartTime],[SysEndTime] 
  2. FROM [dbo].[Department] 
  3. FOR SYSTEM_TIME AS OF '2018-06-06 05:50:21.0000000' ; 

Apache Flink 漫谈系列(11) - Temporal Table JOIN

SELECT语句查询的是Department的表,实际返回的数据是从历史表里面查询出来的,查询的底层逻辑就是 SysStartTime <= '2018-06-06 05:50:21.0000000' and SysEndTime > '2018-06-06 05:50:21.0000000' 。

四、Apache Flink Temporal Table

我们不止一次的提到Apache Flink遵循ANSI-SQL标准,Apache Flink中Temporal Table的概念也源于ANSI-2011的标准语义,但目前的实现在语法层面和ANSI-SQL略有差别,上面看到ANSI-2011中使用FOR SYSTEM_TIME AS OF的语法,目前Apache Flink中使用 LATERAL TABLE(TemporalTableFunction)的语法。这一点后续需要推动社区进行改进。

1. 为啥需要 Temporal Table

我们以具体的查询示例来说明为啥需要Temporal Table,假设我们有一张实时变化的汇率表(RatesHistory),如下:

Apache Flink 漫谈系列(11) - Temporal Table JOIN

RatesHistory代表了Yen汇率(Yen汇率为1),是不断变化的Append only的汇率表。例如,Euro兑Yen汇率从09:00至10:45的汇率为114。从10点45分到11点15分是116。

假设我们想在10:58输出所有当前汇率,我们需要以下SQL查询来计算结果表:

  1. SELECT * 
  2. FROM RatesHistory AS r 
  3. WHERE r.rowtime = ( 
  4. SELECT MAX(rowtime) 
  5. FROM RatesHistory AS r2 
  6. WHERE rr2.currency = r.currency 
  7. AND r2.rowtime <= '10:58'); 

相应Flink代码如下:

  • 定义数据源-genRatesHistorySource
    1. def genRatesHistorySource: CsvTableSource = { 
    2.  
    3. val csvRecords = Seq( 
    4. "rowtime ,currency ,rate", 
    5. "09:00:00 ,US Dollar , 102", 
    6. "09:00:00 ,Euro , 114", 
    7. "09:00:00 ,Yen , 1", 
    8. "10:45:00 ,Euro , 116", 
    9. "11:15:00 ,Euro , 119", 
    10. "11:49:00 ,Pounds , 108" 
    11. // 测试数据写入临时文件 
    12. val tempFilePath = 
    13. writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp") 
    14.  
    15. // 创建Source connector 
    16. new CsvTableSource( 
    17. tempFilePath, 
    18. Array("rowtime","currency","rate"), 
    19. Array( 
    20. Types.STRING,Types.STRING,Types.STRING 
    21. ), 
    22. fieldDelim = ",", 
    23. rowDelim = "$", 
    24. ignoreFirstLine = true, 
    25. ignoreComments = "%" 
    26. def writeToTempFile( 
    27. contents: String, 
    28. filePrefix: String, 
    29. fileSuffix: String, 
    30. charset: String = "UTF-8"): String = { 
    31. val tempFile = File.createTempFile(filePrefix, fileSuffix) 
    32. val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), charset) 
    33. tmpWriter.write(contents) 
    34. tmpWriter.close() 
    35. tempFile.getAbsolutePath} 
  • 主程序代码
    1. def main(args: Array[String]): Unit = { 
    2. // Streaming 环境 
    3. val env = StreamExecutionEnvironment.getExecutionEnvironment 
    4. val tEnv = TableEnvironment.getTableEnvironment(env) 
    5.  
    6. //方便我们查出输出数据 
    7. env.setParallelism(1) 
    8.  
    9. val sourceTableName = "RatesHistory" 
    10. // 创建CSV source数据结构 
    11. val tableSource = CsvTableSourceUtils.genRatesHistorySource 
    12. // 注册source 
    13. tEnv.registerTableSource(sourceTableName, tableSource) 
    14.  
    15. // 注册retract sink 
    16. val sinkTableName = "retractSink" 
    17. val fieldNames = Array("rowtime", "currency", "rate") 
    18. val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.STRING, Types.STRING) 
    19.  
    20. tEnv.registerTableSink( 
    21. sinkTableName, 
    22. fieldNames, 
    23. fieldTypes, 
    24. new MemoryRetractSink) 
    25.  
    26. val SQL = 
    27. """ 
    28. |SELECT * 
    29. |FROM RatesHistory AS r 
    30. |WHERE r.rowtime = ( 
    31. | SELECT MAX(rowtime) 
    32. | FROM RatesHistory AS r2 
    33. | WHERE rr2.currency = r.currency 
    34. | AND r2.rowtime <= '10:58:00' ) 
    35. """.stripMargin 
    36.  
    37. // 执行查询 
    38. val result = tEnv.SQLQuery(SQL) 
    39.  
    40. // 将结果插入sink 
    41. result.insertInto(sinkTableName) 
    42. env.execute() 
  • 执行结果如下图:
  • Apache Flink 漫谈系列(11) - Temporal Table JOIN

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读