加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列 - 持续查询(Continuous Queries)

发布时间:2018-11-08 10:42:25 所属栏目:教程 来源:孙金城
导读:一、实际问题 我们知道在流计算场景中,数据是源源不断的流入的,数据流永远不会结束,那么计算就永远不会结束,如果计算永远不会结束的话,那么计算结果何时输出呢?本篇将介绍Apache Flink利用持续查询来对流计算结果进行持续输出的实现原理。 二、数据管

上面查询SQL的代码结构如下(这个图示在Alibaba 企业版Flink的集成IDE环境生成的,了解更多):

Apache Flink 漫谈系列 - 持续查询(Continuous Queries)

上面SQL中我们发现有两层查询计算逻辑,第一个查询计算逻辑是与SOURCE相连的按地区统计订单数量的分组统计,第二个查询计算逻辑是在第一个查询产出的动态表上面进行按订单数量统计地区数量的分组统计,我们一层一层分析。

5. 错误处理

  • 第一层分析:SELECT region, count(id) AS order_cnt FROM order_tab GROUP BY region;
  • Apache Flink 漫谈系列 - 持续查询(Continuous Queries)

  • 第二层分析:SELECT order_cnt, count(region) as region_cnt FROM order_count_view GROUP BY order_cnt;
  • Apache Flink 漫谈系列 - 持续查询(Continuous Queries)

按照第一层分析的结果,再分析第二层产出的结果,我们分析的过程是对的,但是最终写到sink表的计算结果是错误的,那我们错在哪里了呢?

其实当 (SH,2)这条记录来的时候,以前来过的(SH, 1)已经是脏数据了,当(BJ, 2)来的时候,已经参与过计算的(BJ, 1)也变成脏数据了,同样当(BJ, 3)来的时候,(BJ, 2)也是脏数据了,上面的分析,没有处理脏数据进而导致最终结果的错误。那么Apache Flink内部是如何正确处理的呢?

6. 正确处理

  • 第一层分析:SELECT region, count(id) AS order_cnt FROM order_tab GROUP BY region;
  • Apache Flink 漫谈系列 - 持续查询(Continuous Queries)

  • 第二层分析:SELECT order_cnt, count(region) as region_cnt FROM order_count_view GROUP BY order_cnt;
  • Apache Flink 漫谈系列 - 持续查询(Continuous Queries)

上面我们将有更新的事件进行打标的方式来处理脏数据,这样在Apache Flink内部计算的时候 算子会根据事件的打标来处理事件,在aggregate function中有两个对应的方法(retract和accumulate)来处理不同标识的事件,如上面用到的count AGG,内部实现如下:

  1. def accumulate(acc: CountAccumulator): Unit = { 
  2. acc.f0 += 1L // acc.f0 存储记数 
  3.  
  4. def retract(acc: CountAccumulator, value: Any): Unit = { 
  5. if (value != null) { 
  6. acc.f0 -= 1L //acc.f0 存储记数 
  7. }} 

Apache Flink内部这种为事件进行打标的机制叫做 retraction。retraction机制保障了在流上已经流转到下游的脏数据需要被撤回问题,进而保障了持续查询的正确语义。

八、Apache Flink Connector 类型

本篇一开始就对比了MySQL的数据存储和Apache Flink数据存储的区别,Apache Flink目前是一个计算平台,将数据的存储以高度抽象的插件机制与各种已有的数据存储无缝对接。目前Apache Flink中将数据插件称之为链接器Connector,Connnector又按数据的读和写分成Soruce(读)和Sink(写)两种类型。对于传统数据库表,PK是一个很重要的属性,在频繁的按某些字段(PK)进行更新的场景,在表上定义PK非常重要。那么作为完全支持ANSI-SQL的Apache Flink平台在Connector上面是否也支持PK的定义呢?

1. Apache Flink Source

现在(2018.11.5)Apache Flink中用于数据流驱动的Source Connector上面无法定义PK,这样在某些业务场景下会造成数据量较大,造成计算资源不必要的浪费,甚至有聚合结果不是用户“期望”的情况。我们以双流JOIN为例来说明:

  1. SQL: 
  2.  
  3. CREATE TABLE inventory_tab( 
  4. product_id VARCHAR, 
  5. product_count BIGINT 
  6. ); 
  7.  
  8. CREATE TABLE sales_tab( 
  9. product_id VARCHAR, 
  10. sales_count BIGINT 
  11. ) ; 
  12.  
  13. CREATE TABLE join_sink( 
  14. product_id VARCHAR, 
  15. product_count BIGINT, 
  16. sales_count BIGINT, 
  17. PRIMARY KEY(product_id) 
  18. ); 
  19.  
  20. CREATE VIEW join_view AS 
  21. SELECT 
  22. l.product_id, 
  23. l.product_count, 
  24. r.sales_count 
  25. FROM inventory_tab l 
  26. JOIN sales_tab r 
  27. ON l.product_id = r.product_id; 
  28.  
  29. INSERT INTO join_sink 
  30. SELECT 
  31. product_id, 
  32. product_count, 
  33. sales_count 
  34. FROM join_view ; 

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读