加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列(13) - Table API 概述

发布时间:2019-01-18 07:08:46 所属栏目:教程 来源:孙金城
导读:一、什么是Table API 在《Apache Flink 漫谈系列(08) - SQL概览》中我们概要的向大家介绍了什么是好SQL,SQL和Table API是Apache Flink中的同一层次的API抽象,如下图所示: Apache Flink 针对不同的用户场景提供了三层用户API,最下层ProcessFunction API

我们创建一个TableAPIOverviewITCase.scala 用于接下来介绍Flink Table API算子的功能体验。代码如下:

  1. import org.apache.flink.api.scala._ 
  2. import org.apache.flink.runtime.state.StateBackend 
  3. import org.apache.flink.runtime.state.memory.MemoryStateBackend 
  4. import org.apache.flink.streaming.api.TimeCharacteristic 
  5. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction 
  6. import org.apache.flink.streaming.api.functions.source.SourceFunction 
  7. import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext 
  8. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
  9. import org.apache.flink.streaming.api.watermark.Watermark 
  10. import org.apache.flink.table.api.{Table, TableEnvironment} 
  11. import org.apache.flink.table.api.scala._ 
  12. import org.apache.flink.types.Row 
  13. import org.junit.rules.TemporaryFolder 
  14. import org.junit.{Rule, Test} 
  15.  
  16. import scala.collection.mutable 
  17. import scala.collection.mutable.ArrayBuffer 
  18.  
  19. class Table APIOverviewITCase { 
  20.  
  21. // 客户表数据 
  22. val customer_data = new mutable.MutableList[(String, String, String)] 
  23. customer_data.+=(("c_001", "Kevin", "from JinLin")) 
  24. customer_data.+=(("c_002", "Sunny", "from JinLin")) 
  25. customer_data.+=(("c_003", "JinCheng", "from HeBei")) 
  26.  
  27.  
  28. // 订单表数据 
  29. val order_data = new mutable.MutableList[(String, String, String, String)] 
  30. order_data.+=(("o_001", "c_002", "2018-11-05 10:01:01", "iphone")) 
  31. order_data.+=(("o_002", "c_001", "2018-11-05 10:01:55", "ipad")) 
  32. order_data.+=(("o_003", "c_001", "2018-11-05 10:03:44", "flink book")) 
  33.  
  34. // 商品销售表数据 
  35. val item_data = Seq( 
  36. Left((1510365660000L, (1510365660000L, 20, "ITEM001", "Electronic"))), 
  37. Right((1510365660000L)), 
  38. Left((1510365720000L, (1510365720000L, 50, "ITEM002", "Electronic"))), 
  39. Right((1510365720000L)), 
  40. Left((1510365780000L, (1510365780000L, 30, "ITEM003", "Electronic"))), 
  41. Left((1510365780000L, (1510365780000L, 60, "ITEM004", "Electronic"))), 
  42. Right((1510365780000L)), 
  43. Left((1510365900000L, (1510365900000L, 40, "ITEM005", "Electronic"))), 
  44. Right((1510365900000L)), 
  45. Left((1510365960000L, (1510365960000L, 20, "ITEM006", "Electronic"))), 
  46. Right((1510365960000L)), 
  47. Left((1510366020000L, (1510366020000L, 70, "ITEM007", "Electronic"))), 
  48. Right((1510366020000L)), 
  49. Left((1510366080000L, (1510366080000L, 20, "ITEM008", "Clothes"))), 
  50. Right((151036608000L))) 
  51.  
  52. // 页面访问表数据 
  53. val pageAccess_data = Seq( 
  54. Left((1510365660000L, (1510365660000L, "ShangHai", "U0010"))), 
  55. Right((1510365660000L)), 
  56. Left((1510365660000L, (1510365660000L, "BeiJing", "U1001"))), 
  57. Right((1510365660000L)), 
  58. Left((1510366200000L, (1510366200000L, "BeiJing", "U2032"))), 
  59. Right((1510366200000L)), 
  60. Left((1510366260000L, (1510366260000L, "BeiJing", "U1100"))), 
  61. Right((1510366260000L)), 
  62. Left((1510373400000L, (1510373400000L, "ShangHai", "U0011"))), 
  63. Right((1510373400000L))) 
  64.  
  65. // 页面访问量表数据2 
  66. val pageAccessCount_data = Seq( 
  67. Left((1510365660000L, (1510365660000L, "ShangHai", 100))), 
  68. Right((1510365660000L)), 
  69. Left((1510365660000L, (1510365660000L, "BeiJing", 86))), 
  70. Right((1510365660000L)), 
  71. Left((1510365960000L, (1510365960000L, "BeiJing", 210))), 
  72. Right((1510366200000L)), 
  73. Left((1510366200000L, (1510366200000L, "BeiJing", 33))), 
  74. Right((1510366200000L)), 
  75. Left((1510373400000L, (1510373400000L, "ShangHai", 129))), 
  76. Right((1510373400000L))) 
  77.  
  78. // 页面访问表数据3 
  79. val pageAccessSession_data = Seq( 
  80. Left((1510365660000L, (1510365660000L, "ShangHai", "U0011"))), 
  81. Right((1510365660000L)), 
  82. Left((1510365720000L, (1510365720000L, "ShangHai", "U0012"))), 
  83. Right((1510365720000L)), 
  84. Left((1510365720000L, (1510365720000L, "ShangHai", "U0013"))), 
  85. Right((1510365720000L)), 
  86. Left((1510365900000L, (1510365900000L, "ShangHai", "U0015"))), 
  87. Right((1510365900000L)), 
  88. Left((1510366200000L, (1510366200000L, "ShangHai", "U0011"))), 
  89. Right((1510366200000L)), 
  90. Left((1510366200000L, (1510366200000L, "BeiJing", "U2010"))), 
  91. Right((1510366200000L)), 
  92. Left((1510366260000L, (1510366260000L, "ShangHai", "U0011"))), 
  93. Right((1510366260000L)), 
  94. Left((1510373760000L, (1510373760000L, "ShangHai", "U0410"))), 
  95. Right((1510373760000L))) 
  96.  
  97. val _tempFolder = new TemporaryFolder 
  98.  
  99. // Streaming 环境 
  100. val env = StreamExecutionEnvironment.getExecutionEnvironment 
  101. val tEnv = TableEnvironment.getTableEnvironment(env) 
  102. env.setParallelism(1) 
  103. env.setStateBackend(getStateBackend) 
  104.  
  105. def getProcTimeTables(): (Table, Table) = { 
  106. // 将order_tab, customer_tab 注册到catalog 
  107. val customer = env.fromCollection(customer_data).toTable(tEnv).as('c_id, 'c_name, 'c_desc) 
  108. val order = env.fromCollection(order_data).toTable(tEnv).as('o_id, 'c_id, 'o_time, 'o_desc) 
  109. (customer, order) 
  110.  
  111.  
  112. def getEventTimeTables(): (Table, Table, Table, Table) = { 
  113. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
  114. // 将item_tab, pageAccess_tab 注册到catalog 
  115. val item = 
  116. env.addSource(new EventTimeSourceFunction[(Long, Int, String, String)](item_data)) 
  117. .toTable(tEnv, 'onSellTime, 'price, 'itemID, 'itemType, 'rowtime.rowtime) 
  118.  
  119. val pageAccess = 
  120. env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccess_data)) 
  121. .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime) 
  122.  
  123. val pageAccessCount = 
  124. env.addSource(new EventTimeSourceFunction[(Long, String, Int)](pageAccessCount_data)) 
  125. .toTable(tEnv, 'accessTime, 'region, 'accessCount, 'rowtime.rowtime) 
  126.  
  127. val pageAccessSession = 
  128. env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccessSession_data)) 
  129. .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime) 
  130.  
  131. (item, pageAccess, pageAccessCount, pageAccessSession) 
  132.  
  133.  
  134. @Rule 
  135. def tempFolder: TemporaryFolder = _tempFolder 
  136.  
  137. def getStateBackend: StateBackend = { 
  138. new MemoryStateBackend() 
  139.  
  140. def procTimePrint(result: Table): Unit = { 
  141. val sink = new RetractingSink 
  142. result.toRetractStream[Row].addSink(sink) 
  143. env.execute() 
  144.  
  145. def rowTimePrint(result: Table): Unit = { 
  146. val sink = new RetractingSink 
  147. result.toRetractStream[Row].addSink(sink) 
  148. env.execute() 
  149.  
  150. @Test 
  151. def testProc(): Unit = { 
  152. val (customer, order) = getProcTimeTables() 
  153. val result = ...// 测试的查询逻辑 
  154. procTimePrint(result) 
  155.  
  156. @Test 
  157. def testEvent(): Unit = { 
  158. val (item, pageAccess, pageAccessCount, pageAccessSession) = getEventTimeTables() 
  159. val result = ...// 测试的查询逻辑 
  160. procTimePrint(result) 
  161.  
  162.  
  163. // 自定义Sink 
  164. final class RetractingSink extends RichSinkFunction[(Boolean, Row)] { 
  165. var retractedResults: ArrayBuffer[String] = null 
  166.  
  167.  
  168. override def open(parameters: Configuration): Unit = { 
  169. super.open(parameters) 
  170. retractedResults = mutable.ArrayBuffer.empty[String] 
  171.  
  172. def invoke(v: (Boolean, Row)) { 
  173. retractedResults.synchronized { 
  174. val vvalue = v._2.toString 
  175. if (v._1) { 
  176. retractedResults += value 
  177. } else { 
  178. val idx = retractedResults.indexOf(value) 
  179. if (idx >= 0) { 
  180. retractedResults.remove(idx) 
  181. } else { 
  182. throw new RuntimeException("Tried to retract a value that wasn't added first. " + 
  183. "This is probably an incorrectly implemented test. " + 
  184. "Try to set the parallelism of the sink to 1.") 
  185.  
  186.  
  187. override def close(): Unit = { 
  188. super.close() 
  189. retractedResults.sorted.foreach(println(_)) 
  190.  
  191. // Water mark 生成器 
  192. class EventTimeSourceFunction[T]( 
  193. dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] { 
  194. override def run(ctx: SourceContext[T]): Unit = { 
  195. dataWithTimestampList.foreach { 
  196. case Left(t) => ctx.collectWithTimestamp(t._2, t._1) 
  197. case Right(w) => ctx.emitWatermark(new Watermark(w)) 
  198.  
  199. override def cancel(): Unit = ??? 

2. SELECT

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读