加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列 - 持续查询(Continuous Queries)

发布时间:2018-11-08 10:42:25 所属栏目:教程 来源:孙金城
导读:一、实际问题 我们知道在流计算场景中,数据是源源不断的流入的,数据流永远不会结束,那么计算就永远不会结束,如果计算永远不会结束的话,那么计算结果何时输出呢?本篇将介绍Apache Flink利用持续查询来对流计算结果进行持续输出的实现原理。 二、数据管

接下来我们把上面隐式存在的时间属性timestamp作为表flink_tab_ts(timestamp,user,clicks三列,无主键)的一列,再写一个 触发器(Trigger) 示例观察一下:

Apache Flink 漫谈系列 - 持续查询(Continuous Queries)

  1. // INSERT 的时候查询一下数据flink_tab_ts,将结果写到trigger.sql中 
  2. DELIMITER ;; 
  3. create trigger flink_tab_ts_trigger_insert after insert 
  4. on flink_tab_ts for each row 
  5. begin 
  6. select ts, user, clicks from flink_tab_ts into OUTFILE '/Users/jincheng.sunjc/testdir/atas/trigger.sql'; 
  7. end ;; 
  8. DELIMITER ; 

上面的trigger要将查询结果写入本地文件,默认MySQL是不允许写入的,我们查看一下:

  1. MySQL> show variables like '%secure%'; 
  2. +--------------------------+-------+ 
  3. | Variable_name | Value | 
  4. +--------------------------+-------+ 
  5. | require_secure_transport | OFF | 
  6. | secure_file_priv | NULL | 
  7. +--------------------------+-------+ 
  8. 2 rows in set (0.00 sec) 

上面secure_file_priv属性为NULL,说明MySQL不允许写入file,我需要修改my.cnf在添加secure_file_priv=''打开写文件限制;

  1. MySQL> show variables like '%secure%'; 
  2. +--------------------------+-------+ 
  3. | Variable_name | Value | 
  4. +--------------------------+-------+ 
  5. | require_secure_transport | OFF | 
  6. | secure_file_priv | | 
  7. +--------------------------+-------+ 
  8. 2 rows in set (0.00 sec) 

下面我们对flink_tab_ts进行INSERT操作:

对flink_tab_ts进行INSERT操作

我们再来看看6次trigger 查询计算的结果:

Apache Flink 漫谈系列 - 持续查询(Continuous Queries)

大家到这里发现我写了Trigger的存储过程之后,每次在数据表flink_tab_ts进行DML操作的时候,Trigger就会触发一次查询计算,产出一份新的计算结果,观察上面的查询结果发现,结果表不停的增加(Append only)。

2. 有PK的Update场景

我们利用flink_tab_ts的6次DML操作和自定义的触发器TriggerL来介绍了什么是持续查询,做处理静态查询与持续查询的关系。那么上面的演示目的是为了说明持续查询,所有操作都是insert,没有基于主键的更新,也就是说Trigger产生的结果都是append only的,那么大家想一想,如果我们操作flink_tab这张表,按主键user进行插入和更新操作,同样利用Trigger机制来进行持续查询,结果是怎样的的呢? 初始化表,trigger:

  1. drop table flink_tab; 
  2. create table flink_tab( 
  3. user VARCHAR(100) NOT NULL, 
  4. clicks INT NOT NULL, 
  5. PRIMARY KEY (user) 
  6. ); 
  7.  
  8. DELIMITER ;; 
  9. create trigger flink_tab_trigger_insert after insert 
  10. on flink_tab for each row 
  11. begin 
  12. select user, clicks from flink_tab into OUTFILE '/tmp/trigger.sql'; 
  13. end ;; 
  14. DELIMITER ; 
  15.  
  16. DELIMITER ;; 
  17. create trigger flink_tab_trigger_ after update 
  18. on flink_tab for each row 
  19. begin 
  20. select ts, user, clicks from flink_tab into OUTFILE '/tmp/trigger.sql'; 
  21. end ;; 
  22. DELIMITER ; 

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读