WordCount核心统计逻辑就是按照单词分组,然后计算每个单词的数量,统计逻辑如下:
- // 单词统计核心逻辑
- val result = source
- .groupBy('word) // 单词分组
- .select('word, 'word.count) // 单词统计
(4) 定义Sink
将WordCount的统计结果写入Sink中,代码如下:
- // 自定义Sink
- val sink = new RetractSink // 自定义Sink(下面有完整代码)
- // 计算结果写入sink
- result.toRetractStream[(String, Long)].addSink(sink)
(5) 完整的HelloWord代码
为了方便大家运行WordCount查询统计,将完整的代码分享大家(基于flink-1.7.0),如下:
- import org.apache.flink.api.scala._
- import org.apache.flink.configuration.Configuration
- import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.table.api.TableEnvironment
- import org.apache.flink.table.api.scala._
-
- import scala.collection.mutable
-
- object HelloWord {
-
- def main(args: Array[String]): Unit = {
- // 测试数据
- val data = Seq("Flink", "Bob", "Bob", "something", "Hello", "Flink", "Bob")
-
- // Stream运行环境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- // 最简单的获取Source方式
- val source = env.fromCollection(data).toTable(tEnv, 'word)
-
- // 单词统计核心逻辑
- val result = source
- .groupBy('word) // 单词分组
- .select('word, 'word.count) // 单词统计
-
- // 自定义Sink
- val sink = new RetractSink
- // 计算结果写入sink
- result.toRetractStream[(String, Long)].addSink(sink)
-
- env.execute
- }
- }
-
- class RetractSink extends RichSinkFunction[(Boolean, (String, Long))] {
- private var resultSet: mutable.Set[(String, Long)] = _
-
- override def open(parameters: Configuration): Unit = {
- // 初始化内存存储结构
- resultSet = new mutable.HashSet[(String, Long)]
- }
-
- override def invoke(v: (Boolean, (String, Long)), context: SinkFunction.Context[_]): Unit = {
- if (v._1) {
- // 计算数据
- resultSet.add(v._2)
- }
- else {
- // 撤回数据
- resultSet.remove(v._2)
- }
- }
-
- override def close(): Unit = {
- // 打印写入sink的结果数据
- resultSet.foreach(println)
- }
- }
运行结果如下:
虽然上面用了较长的纸墨介绍简单的WordCount统计逻辑,但source和sink部分都是可以在学习后面算子中被复用的。本例核心的统计逻辑只有一行代码:
- source.groupBy('word).select('word, 'word.count)
所以Table API开发技术任务非常的简洁高效。
四、Table API 算子
虽然Table API与SQL的算子语义一致,但在表达方式上面SQL以文本的方式展现,Table API是以java或者scala语言的方式进行开发。为了大家方便阅读,即便是在《Apache Flink 漫谈系列(08) - SQL概览》中介绍过的算子,在这里也会再次进行介绍,当然对于Table API和SQL不同的地方会进行详尽介绍。
1. 示例数据及测试类
(编辑:核心网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|