加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列 - State

发布时间:2018-10-17 14:00:43 所属栏目:教程 来源:孙金城
导读:【51CTO技术沙龙】10月27日,让我们共同探索AI场景化应用实现之道 实际问题 在流计算场景中,数据会源源不断的流入Apache Flink系统,每条数据进入Apache Flink系统都会触发计算。如果我们想进行一个Count聚合计算,那么每次触发计算是将历史上所有流入的数

我们发现 snapshotState方法的返回值是一个List,T是Tuple2

  1. public interface InputSplit extends Serializable { 
  2.     int getSplitNumber(); 

也就是说,InputSplit我们可以理解为是一个Partition索引,有了这个数据结构我们在看看上面图所示的case是如何工作的?当Source的并行度是1的时候,所有打partition数据都在同一个线程中读取,所有partition的state也在同一个state中维护,State存储信息格式如下:

Apache Flink 漫谈系列 - State

如果我们现在将并发调整为2,那么我们5个分区的State将会在2个独立的任务(线程)中进行维护,在内部实现中我们有如下算法进行分配每个Task所处理和维护partition的State信息,如下:

  1. List<Integer> assignedPartitions = new LinkedList<>(); 
  2. for (int i = 0; i < partitions; i++) { 
  3.         if (i % consumerCount == consumerIndex) { 
  4.                 assignedPartitions.add(i); 
  5.         } 

这个求mod的算法,决定了每个并发所处理和维护partition的State信息,针对我们当前的case具体的存储情况如下:

Apache Flink 漫谈系列 - State

那么到现在我们发现上面扩容后State得以很好的分配得益于OperatorState采用了List的数据结构的设计。另外大家注意一个问题,相信大家已经发现上面分配partition的算法有一个限制,那就是Source的扩容(并发数)是否可以超过Source物理存储的partition数量呢?答案是否定的,不能。目前Apache Flink的做法是提前报错,即使不报错也是资源的浪费,因为超过partition数量的并发永远分配不到待管理的partition。

KeyedState对扩容的处理

对于KeyedState最容易想到的是hash(key) mod parallelism(operator) 方式分配state,就和OperatorState一样,这种分配方式大多数情况是恢复的state不是本地已有的state,需要一次网络拷贝,这种效率比较低,OperatorState采用这种简单的方式进行处理是因为OperatorState的state一般都比较小,网络拉取的成本很小,对于KeyedState往往很大,我们会有更好的选择,在Apache Flink中采用的是Key-Groups方式进行分配。

什么是Key-Groups

Key-Groups 是Apache Flink中对keyed state按照key进行分组的方式,每个key-group中会包含N>0个key,一个key-group是State分配的原子单位。在Apache Flink中关于Key-Group的对象是 KeyGroupRange, 如下:

  1. public class KeyGroupRange implements KeyGroupsList, Serializable { 
  2.         ... 
  3.         ... 
  4.         private final int startKeyGroup; 
  5.         private final int endKeyGroup; 
  6.         ... 
  7.         ...} 

KeyGroupRange两个重要的属性就是 startKeyGroup和endKeyGroup,定义了startKeyGroup和endKeyGroup属性后Operator上面的Key-Group的个数也就确定了。

什么决定Key-Groups的个数

key-group的数量在job启动前必须是确定的且运行中不能改变。由于key-group是state分配的原子单位,而每个operator并行实例至少包含一个key-group,因此operator的最大并行度不能超过设定的key-group的个数,那么在Apache Flink的内部实现上key-group的数量就是最大并行度的值。

GroupRange.of(0, maxParallelism)如何决定key属于哪个Key-Group

确定好GroupRange之后,如何决定每个Key属于哪个Key-Group呢?我们采取的是取mod的方式,在KeyGroupRangeAssignment中的assignToKeyGroup方法会将key划分到指定的key-group中,如下:

  1. public static int assignToKeyGroup(Object key, int maxParallelism) { 
  2.       return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism); 
  3.  
  4. public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) { 
  5.       return HashPartitioner.INSTANCE.partition(keyHash, maxParallelism); 
  6.  
  7. @Override 
  8. public int partition(T key, int numPartitions) { 
  9.       return MathUtils.murmurHash(Objects.hashCode(key)) % numPartitions; 

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读