加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列 - SQL概览

发布时间:2018-11-15 03:51:36 所属栏目:教程 来源:孙金城
导读:一、SQL简述 SQL是Structured Query Language的缩写,最初是由美国计算机科学家Donald D. Chamberlin和Raymond F. Boyce在20世纪70年代早期从 Early History of SQL 中了解关系模型后在IBM开发的。该版本最初称为[SEQUEL: A Structured English Query Lang

我们创建一个SqlOverviewITCase.scala 用于接下来介绍Flink SQL算子的功能体验。代码如下:

  1. import org.apache.flink.api.scala._ 
  2. import org.apache.flink.runtime.state.StateBackend 
  3. import org.apache.flink.runtime.state.memory.MemoryStateBackend 
  4. import org.apache.flink.streaming.api.TimeCharacteristic 
  5. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction 
  6. import org.apache.flink.streaming.api.functions.source.SourceFunction 
  7. import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext 
  8. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
  9. import org.apache.flink.streaming.api.watermark.Watermark 
  10. import org.apache.flink.table.api.TableEnvironment 
  11. import org.apache.flink.table.api.scala._ 
  12. import org.apache.flink.types.Row 
  13. import org.junit.rules.TemporaryFolder 
  14. import org.junit.{Rule, Test} 
  15.  
  16. import scala.collection.mutable 
  17. import scala.collection.mutable.ArrayBuffer 
  18.  
  19. class SqlOverviewITCase { 
  20. val _tempFolder = new TemporaryFolder 
  21.  
  22. @Rule 
  23. def tempFolder: TemporaryFolder = _tempFolder 
  24.  
  25. def getStateBackend: StateBackend = { 
  26. new MemoryStateBackend() 
  27.  
  28. // 客户表数据 
  29. val customer_data = new mutable.MutableList[(String, String, String)] 
  30. customer_data.+=(("c_001", "Kevin", "from JinLin")) 
  31. customer_data.+=(("c_002", "Sunny", "from JinLin")) 
  32. customer_data.+=(("c_003", "JinCheng", "from HeBei")) 
  33.  
  34.  
  35. // 订单表数据 
  36. val order_data = new mutable.MutableList[(String, String, String, String)] 
  37. order_data.+=(("o_001", "c_002", "2018-11-05 10:01:01", "iphone")) 
  38. order_data.+=(("o_002", "c_001", "2018-11-05 10:01:55", "ipad")) 
  39. order_data.+=(("o_003", "c_001", "2018-11-05 10:03:44", "flink book")) 
  40.  
  41. // 商品销售表数据 
  42. val item_data = Seq( 
  43. Left((1510365660000L, (1510365660000L, 20, "ITEM001", "Electronic"))), 
  44. Right((1510365660000L)), 
  45. Left((1510365720000L, (1510365720000L, 50, "ITEM002", "Electronic"))), 
  46. Right((1510365720000L)), 
  47. Left((1510365780000L, (1510365780000L, 30, "ITEM003", "Electronic"))), 
  48. Left((1510365780000L, (1510365780000L, 60, "ITEM004", "Electronic"))), 
  49. Right((1510365780000L)), 
  50. Left((1510365900000L, (1510365900000L, 40, "ITEM005", "Electronic"))), 
  51. Right((1510365900000L)), 
  52. Left((1510365960000L, (1510365960000L, 20, "ITEM006", "Electronic"))), 
  53. Right((1510365960000L)), 
  54. Left((1510366020000L, (1510366020000L, 70, "ITEM007", "Electronic"))), 
  55. Right((1510366020000L)), 
  56. Left((1510366080000L, (1510366080000L, 20, "ITEM008", "Clothes"))), 
  57. Right((151036608000L))) 
  58.  
  59. // 页面访问表数据 
  60. val pageAccess_data = Seq( 
  61. Left((1510365660000L, (1510365660000L, "ShangHai", "U0010"))), 
  62. Right((1510365660000L)), 
  63. Left((1510365660000L, (1510365660000L, "BeiJing", "U1001"))), 
  64. Right((1510365660000L)), 
  65. Left((1510366200000L, (1510366200000L, "BeiJing", "U2032"))), 
  66. Right((1510366200000L)), 
  67. Left((1510366260000L, (1510366260000L, "BeiJing", "U1100"))), 
  68. Right((1510366260000L)), 
  69. Left((1510373400000L, (1510373400000L, "ShangHai", "U0011"))), 
  70. Right((1510373400000L))) 
  71.  
  72. // 页面访问量表数据2 
  73. val pageAccessCount_data = Seq( 
  74. Left((1510365660000L, (1510365660000L, "ShangHai", 100))), 
  75. Right((1510365660000L)), 
  76. Left((1510365660000L, (1510365660000L, "BeiJing", 86))), 
  77. Right((1510365660000L)), 
  78. Left((1510365960000L, (1510365960000L, "BeiJing", 210))), 
  79. Right((1510366200000L)), 
  80. Left((1510366200000L, (1510366200000L, "BeiJing", 33))), 
  81. Right((1510366200000L)), 
  82. Left((1510373400000L, (1510373400000L, "ShangHai", 129))), 
  83. Right((1510373400000L))) 
  84.  
  85. // 页面访问表数据3 
  86. val pageAccessSession_data = Seq( 
  87. Left((1510365660000L, (1510365660000L, "ShangHai", "U0011"))), 
  88. Right((1510365660000L)), 
  89. Left((1510365720000L, (1510365720000L, "ShangHai", "U0012"))), 
  90. Right((1510365720000L)), 
  91. Left((1510365720000L, (1510365720000L, "ShangHai", "U0013"))), 
  92. Right((1510365720000L)), 
  93. Left((1510365900000L, (1510365900000L, "ShangHai", "U0015"))), 
  94. Right((1510365900000L)), 
  95. Left((1510366200000L, (1510366200000L, "ShangHai", "U0011"))), 
  96. Right((1510366200000L)), 
  97. Left((1510366200000L, (1510366200000L, "BeiJing", "U2010"))), 
  98. Right((1510366200000L)), 
  99. Left((1510366260000L, (1510366260000L, "ShangHai", "U0011"))), 
  100. Right((1510366260000L)), 
  101. Left((1510373760000L, (1510373760000L, "ShangHai", "U0410"))), 
  102. Right((1510373760000L))) 
  103.  
  104. def procTimePrint(sql: String): Unit = { 
  105. // Streaming 环境 
  106. val env = StreamExecutionEnvironment.getExecutionEnvironment 
  107. val tEnv = TableEnvironment.getTableEnvironment(env) 
  108.  
  109. // 将order_tab, customer_tab 注册到catalog 
  110. val customer = env.fromCollection(customer_data).toTable(tEnv).as('c_id, 'c_name, 'c_desc) 
  111. val order = env.fromCollection(order_data).toTable(tEnv).as('o_id, 'c_id, 'o_time, 'o_desc) 
  112.  
  113. tEnv.registerTable("order_tab", order) 
  114. tEnv.registerTable("customer_tab", customer) 
  115.  
  116. val result = tEnv.sqlQuery(sql).toRetractStream[Row] 
  117. val sink = new RetractingSink 
  118. result.addSink(sink) 
  119. env.execute() 
  120.  
  121. def rowTimePrint(sql: String): Unit = { 
  122. // Streaming 环境 
  123. val env = StreamExecutionEnvironment.getExecutionEnvironment 
  124. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
  125. env.setStateBackend(getStateBackend) 
  126. env.setParallelism(1) 
  127. val tEnv = TableEnvironment.getTableEnvironment(env) 
  128.  
  129. // 将item_tab, pageAccess_tab 注册到catalog 
  130. val item = 
  131. env.addSource(new EventTimeSourceFunction[(Long, Int, String, String)](item_data)) 
  132. .toTable(tEnv, 'onSellTime, 'price, 'itemID, 'itemType, 'rowtime.rowtime) 
  133.  
  134. val pageAccess = 
  135. env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccess_data)) 
  136. .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime) 
  137.  
  138. val pageAccessCount = 
  139. env.addSource(new EventTimeSourceFunction[(Long, String, Int)](pageAccessCount_data)) 
  140. .toTable(tEnv, 'accessTime, 'region, 'accessCount, 'rowtime.rowtime) 
  141.  
  142. val pageAccessSession = 
  143. env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccessSession_data)) 
  144. .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime) 
  145.  
  146. tEnv.registerTable("item_tab", item) 
  147. tEnv.registerTable("pageAccess_tab", pageAccess) 
  148. tEnv.registerTable("pageAccessCount_tab", pageAccessCount) 
  149. tEnv.registerTable("pageAccessSession_tab", pageAccessSession) 
  150.  
  151. val result = tEnv.sqlQuery(sql).toRetractStream[Row] 
  152. val sink = new RetractingSink 
  153. result.addSink(sink) 
  154. env.execute() 
  155.  
  156.  
  157. @Test 
  158. def testSelect(): Unit = { 
  159. val sql = "替换想要测试的SQL" 
  160. // 非window 相关用 procTimePrint(sql) 
  161. // Window 相关用 rowTimePrint(sql) 
  162.  
  163.  
  164. // 自定义Sink 
  165. final class RetractingSink extends RichSinkFunction[(Boolean, Row)] { 
  166. var retractedResults: ArrayBuffer[String] = mutable.ArrayBuffer.empty[String] 
  167.  
  168. def invoke(v: (Boolean, Row)) { 
  169. retractedResults.synchronized { 
  170. val vvalue = v._2.toString 
  171. if (v._1) { 
  172. retractedResults += value 
  173. } else { 
  174. val idx = retractedResults.indexOf(value) 
  175. if (idx >= 0) { 
  176. retractedResults.remove(idx) 
  177. } else { 
  178. throw new RuntimeException("Tried to retract a value that wasn't added first. " + 
  179. "This is probably an incorrectly implemented test. " + 
  180. "Try to set the parallelism of the sink to 1.") 
  181. retractedResults.sorted.foreach(println(_)) 
  182.  
  183. // Water mark 生成器 
  184. class EventTimeSourceFunction[T]( 
  185. dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] { 
  186. override def run(ctx: SourceContext[T]): Unit = { 
  187. dataWithTimestampList.foreach { 
  188. case Left(t) => ctx.collectWithTimestamp(t._2, t._1) 
  189. case Right(w) => ctx.emitWatermark(new Watermark(w)) 
  190.  
  191. override def cancel(): Unit = ??? 

五、Select

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读