加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列(13) - Table API 概述

发布时间:2019-01-18 07:08:46 所属栏目:教程 来源:孙金城
导读:一、什么是Table API 在《Apache Flink 漫谈系列(08) - SQL概览》中我们概要的向大家介绍了什么是好SQL,SQL和Table API是Apache Flink中的同一层次的API抽象,如下图所示: Apache Flink 针对不同的用户场景提供了三层用户API,最下层ProcessFunction API

WordCount核心统计逻辑就是按照单词分组,然后计算每个单词的数量,统计逻辑如下:

  1. // 单词统计核心逻辑 
  2. val result = source 
  3. .groupBy('word) // 单词分组 
  4. .select('word, 'word.count) // 单词统计 

(4) 定义Sink

将WordCount的统计结果写入Sink中,代码如下:

  1. // 自定义Sink 
  2. val sink = new RetractSink // 自定义Sink(下面有完整代码) 
  3. // 计算结果写入sink 
  4. result.toRetractStream[(String, Long)].addSink(sink) 

(5) 完整的HelloWord代码

为了方便大家运行WordCount查询统计,将完整的代码分享大家(基于flink-1.7.0),如下:

  1. import org.apache.flink.api.scala._ 
  2. import org.apache.flink.configuration.Configuration 
  3. import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} 
  4. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
  5. import org.apache.flink.table.api.TableEnvironment 
  6. import org.apache.flink.table.api.scala._ 
  7.  
  8. import scala.collection.mutable 
  9.  
  10. object HelloWord { 
  11.  
  12. def main(args: Array[String]): Unit = { 
  13. // 测试数据 
  14. val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob") 
  15.  
  16. // Stream运行环境 
  17. val env = StreamExecutionEnvironment.getExecutionEnvironment 
  18. val tEnv = TableEnvironment.getTableEnvironment(env) 
  19. // 最简单的获取Source方式 
  20. val source = env.fromCollection(data).toTable(tEnv, 'word) 
  21.  
  22. // 单词统计核心逻辑 
  23. val result = source 
  24. .groupBy('word) // 单词分组 
  25. .select('word, 'word.count) // 单词统计 
  26.  
  27. // 自定义Sink 
  28. val sink = new RetractSink 
  29. // 计算结果写入sink 
  30. result.toRetractStream[(String, Long)].addSink(sink) 
  31.  
  32. env.execute 
  33.  
  34. class RetractSink extends RichSinkFunction[(Boolean, (String, Long))] { 
  35. private var resultSet: mutable.Set[(String, Long)] = _ 
  36.  
  37. override def open(parameters: Configuration): Unit = { 
  38. // 初始化内存存储结构 
  39. resultSet = new mutable.HashSet[(String, Long)] 
  40.  
  41. override def invoke(v: (Boolean, (String, Long)), context: SinkFunction.Context[_]): Unit = { 
  42. if (v._1) { 
  43. // 计算数据 
  44. resultSet.add(v._2) 
  45. else { 
  46. // 撤回数据 
  47. resultSet.remove(v._2) 
  48.  
  49. override def close(): Unit = { 
  50. // 打印写入sink的结果数据 
  51. resultSet.foreach(println) 

运行结果如下:

Apache Flink 漫谈系列(13) - Table API 概述

虽然上面用了较长的纸墨介绍简单的WordCount统计逻辑,但source和sink部分都是可以在学习后面算子中被复用的。本例核心的统计逻辑只有一行代码:

  1. source.groupBy('word).select('word, 'word.count) 

所以Table API开发技术任务非常的简洁高效。

四、Table API 算子

虽然Table API与SQL的算子语义一致,但在表达方式上面SQL以文本的方式展现,Table API是以java或者scala语言的方式进行开发。为了大家方便阅读,即便是在《Apache Flink 漫谈系列(08) - SQL概览》中介绍过的算子,在这里也会再次进行介绍,当然对于Table API和SQL不同的地方会进行详尽介绍。

1. 示例数据及测试类

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读