加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列 - State

发布时间:2018-10-17 14:00:43 所属栏目:教程 来源:孙金城
导读:【51CTO技术沙龙】10月27日,让我们共同探索AI场景化应用实现之道 实际问题 在流计算场景中,数据会源源不断的流入Apache Flink系统,每条数据进入Apache Flink系统都会触发计算。如果我们想进行一个Count聚合计算,那么每次触发计算是将历史上所有流入的数

如上实现我们了解到分配Key到指定的key-group的逻辑是利用key的hashCode和maxParallelism进行取余操作来分配的。如下图当parallelism=2,maxParallelism=10的情况下流上key与key-group的对应关系如下图所示:

Apache Flink 漫谈系列 - State

如上图key(a)的hashCode是97,与最大并发10取余后是7,被分配到了KG-7中,流上每个event都会分配到KG-0至KG-9其中一个Key-Group中。

每个Operator实例如何获取Key-Groups

了解了Key-Groups概念和如何分配每个Key到指定的Key-Groups之后,我们看看如何计算每个Operator实例所处理的Key-Groups。 在KeyGroupRangeAssignment的computeKeyGroupRangeForOperatorIndex方法描述了分配算法:

  1. public static KeyGroupRange computeKeyGroupRangeForOperatorIndex( 
  2.       int maxParallelism, 
  3.       int parallelism, 
  4.       int operatorIndex) { 
  5.         GroupRange splitRange = GroupRange.of(0, maxParallelism).getSplitRange(parallelism, operatorIndex); 
  6.         int startGroup = splitRange.getStartGroup(); 
  7.         int endGroup = splitRange.getEndGroup(); 
  8.    return new KeyGroupRange(startGroup, endGroup - 1); 
  9.  
  10. public GroupRange getSplitRange(int numSplits, int splitIndex) { 
  11.         ... 
  12.         final int numGroupsPerSplit = getNumGroups() / numSplits; 
  13.         final int numFatSplits = getNumGroups() % numSplits; 
  14.  
  15.         int startGroupForThisSplit; 
  16.         int endGroupForThisSplit; 
  17.         if (splitIndex < numFatSplits) { 
  18.             startGroupForThisSplit = getStartGroup() + splitIndex * (numGroupsPerSplit + 1); 
  19.             endGroupForThisSplit =   startGroupForThisSplit + numGroupsPerSplit + 1; 
  20.         } else { 
  21.             startGroupForThisSplit = getStartGroup() + splitIndex * numGroupsPerSplit + numFatSplits; 
  22.             endGroupForThisSplit =  startGroupForThisSplit + numGroupsPerSplit; 
  23.         } 
  24.         if (startGroupForThisSplit >= endGroupForThisSplit) { 
  25.                 return GroupRange.emptyGroupRange(); 
  26.         } else { 
  27.                 return new GroupRange(startGroupForThisSplit, endGroupForThisSplit); 
  28.         }} 

上面代码的核心逻辑是先计算每个Operator实例至少分配的Key-Group个数,将不能整除的部分N个,平均分给前N个实例。最终每个Operator实例管理的Key-Groups会在GroupRange中表示,本质是一个区间值;下面我们就上图的case,说明一下如何进行分配以及扩容后如何重新分配。

假设上面的Stateful Operation节点的最大并行度maxParallelism的值是10,也就是我们一共有10个Key-Group,当我们并发是2的时候和并发是3的时候分配的情况如下图:

Apache Flink 漫谈系列 - State

如上算法我们发现在进行扩容时候,大部分state还是落到本地的,如Task0只有KG-4被分出去,其他的还是保持在本地。同时我们也发现,一个job如果修改了maxParallelism的值那么会直接影响到Key-Groups的数量和key的分配,也会打乱所有的Key-Group的分配,目前在Apache Flink系统中统一将maxParallelism的默认值调整到4096,最大程度的避免无法扩容的情况发生。

小结

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读