李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
11. Kafka消费者分区分配策略
Leefs
2021-09-26 PM
2477℃
0条
[TOC] ### 前言 在本节开始之前,可以先了解一下上一节内容[*Kafka重平衡机制*](https://lilinchao.com/archives/1533.html)。 ### 一、消费者分区分配策略作用 一个`consumer group` 中有多个consumer,一个 topic 有多个partition,所以必然会涉及到partition 的分配问题,即**确定哪个partition 由哪个consumer 来消费**。 ![11.Kafka消费者分区分配策略01.png](https://lilinchao.com/usr/uploads/2021/09/3219685726.png) Kafka 有三种分配策略: + `RoundRobin`(轮询) + `Range`(范围) + `StickyAssignor` 同时Kafka也支持自定义分配策略。 ### 二、分区分配策略详解 #### 2.1 RangeAssignor(默认分配策略) **概述** **范围分区策略:**对每个Topic进行独立的分区分配。对于每一个Topic,首先对分区按照分区ID进行排序,然后订阅这个Topic的消费组的消费者再进行排序,之后尽量均衡的将分区分配给消费者。 这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,那么有一些消费者就会多分配到一些分区。 `RangeAssignor`策略对应的`partition.assignment.strategy`参数值为: ```java org.apache.kafka.clients.consumer.RangeAssignor ``` **如何进行计算分区** > 对于每个被订阅的Topic,设Consumer总数为c,Partition总数为p,那么RangeAssignor会根据`p / c`的结果得出一个区间值r,以及余数值`p % c`,记为m。 将所有Consumer按照预设好的Member ID字典序排序,从第一个Consumer开始顺序分配,前m个Consumer分配连续的(r + 1)个Partition,后(c - m)个Consumer分配连续的r个Partition。 ![11.Kafka消费者分区分配策略05.png](https://lilinchao.com/usr/uploads/2021/09/2146087712.png) 图中有1个Topic,8个Partition,3个Consumer,最终分配的比例为3:3:2,大致是均匀的。 如果我们再多加两个Topic,每个Topic有2个Partition,分配结果又是如何呢? ![11.Kafka消费者分区分配策略06.png](https://lilinchao.com/usr/uploads/2021/09/1754104718.png) **说明** 显然,c2完全没有分配到t2中的Partition,因为c0和c1按照字典序排在它前面,已经被优先分配到了。 **缺点** 如果推广上述情况,很容易发现该策略无法保证平均分配,造成负载倾斜。当订阅了多个Topic时,尤其需要注意`RangeAssignor`的适用性。 #### 2.2 RoundRobinAssignor ##### **概述** **轮询分区策略:**把所有partition和所有consumer线程都列出来,然后按照`hashcode`进行排序,最后通过轮询算法分配partition给消费线程。 **如果所有consumer实例的订阅是相同的,那么partition会均匀分布。** `RoundRobinAssignor`策略对应的`partition.assignment.strategy`参数值为: ```java org.apache.kafka.clients.consumer.RoundRobinAssignor ``` ##### 工作原理 将所有主题的分区组成 `TopicAndPartition` 列表,然后对 `TopicAndPartition` 列表按照 hashCode 进行排序,然后平均分配给每一个消费者的线程。 ![11.Kafka消费者分区分配策略02.png](https://lilinchao.com/usr/uploads/2021/09/4042362504.png) 多个Topic的情况也类似: ![11.Kafka消费者分区分配策略03.png](https://lilinchao.com/usr/uploads/2021/09/2230706380.png) 由于分配时是按所有Partition来的,所以即使Topic之间Partition的数量是不平均的,分配结果也是基本平均的,克服了`RangeAssignor`的缺点。 ##### 缺点 如果同一个消费组内的消费者所订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能会导致分区分配的不均匀。如果某个消费者没有订阅消费组内的某个topic,那么在分配分区的时候此消费者将分配不到这个topic的任何分区。 **示例** > opic t0、t1、t2分别有1、2、3个Partition,而Consumer c0订阅了t0,c1订阅了t0~t1,c2订阅了t0~t2 那么分配结果会如下图所示 ![11.Kafka消费者分区分配策略04.png](https://lilinchao.com/usr/uploads/2021/09/3324604510.png) ##### 总结 使用`RoundRobin`策略有两个前提条件必须满足: - 同一个Consumer Group里面的所有消费者的`num.streams`必须相等; - 每个消费者订阅的主题必须相同。 #### 2.3 StrickyAssignor kafka在0.11.x版本支持了StrickyAssignor, 翻译过来叫**粘滞策略**。 **目标** > + 分区的分配尽可能的均匀 > > + 分区的分配尽可能和上次分配保持相同 当两者发生冲突时, 第一个目标优先于第二个目标。 鉴于这两个目标, `StickyAssignor`分配策略的具 体实现要比`RangeAssignor`和`RoundRobinAssignor`这两种分配策略要复杂得多。 `RoundRobinAssignor`策略对应的`partition.assignment.strategy`参数值为: ```java org.apache.kafka.clients.consumer.StickyAssignor ``` **示例一** > - 有3个Consumer:C0、C1、C2 > - 有4个Topic:T0、T1、T2、T3,每个Topic有2个分区 > - 所有Consumer都订阅了这4个分区 ![11.Kafka消费者分区分配策略07.png](https://lilinchao.com/usr/uploads/2021/09/2763282676.png) 上面的例子中,Sticky模式原来分配给C0、C2的分区都没有发生变动,且最终C0、C1达到的均衡的目的。 **示例二** > - 有3个Consumer:C0、C1、C2 > - 3个Topic:T0、T1、T2,它们分别有1、2、3个分区 > - C0订阅T0;C1订阅T0、T1;C2订阅T0、T1、T2 ![11.Kafka消费者分区分配策略08.png](https://lilinchao.com/usr/uploads/2021/09/2118759662.png) 从以上两个例子的分配结果可以看出,`StickyAssignor`是比`RangeAssignor`和`RoundRobinAssignor`更好的分配方式。 ### 三、自定义分配策略 自定义分配策略可以通过继承`PartitionAssignor`接口或者`AbstractPartitionAssignor`抽象类来实现,后者相对比较简单。下面给出一个在Consumer之间随机分配Partition的示例,部分代码复用了`RangeAssignor`的实现,简单易懂。 ```java public class RandomAssignor extends AbstractPartitionAssignor { @Override public String name() { return "random"; } @Override public Map
> assign( Map
partitionsPerTopic, Map
subscriptions ) { Map
> consumersPerTopic = consumersPerTopic(subscriptions); Map
> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) { assignment.put(memberId, new ArrayList
()); } for (Map.Entry
> topicEntry : consumersPerTopic.entrySet()) { String topic = topicEntry.getKey(); List
consumersForTopic = topicEntry.getValue(); Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) continue; for (TopicPartition partition : partitions(topic, numPartitionsForTopic)) { int rand = ThreadLocalRandom.current().nextInt(consumersForTopic.size()); assignment.get(consumersForTopic.get(rand)).add(partition); } } return assignment; } private Map
> consumersPerTopic(Map
consumerMetadata) { Map
> res = new HashMap<>(); for (Map.Entry
subscriptionEntry : consumerMetadata.entrySet()) { String consumerId = subscriptionEntry.getKey(); for (String topic : subscriptionEntry.getValue().topics()) put(res, topic, consumerId); } return res; } } ``` 注意`RandomAssignor`基本上不能用于生产环境,只是个示例而已。 *附参考文章链接:* *https://blog.csdn.net/u4110122855/article/details/103616791* *https://blog.csdn.net/nazeniwaresakini/article/details/108445278*
标签:
Kafka
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1542.html
上一篇
10.【转载】Kafka重平衡机制
下一篇
12.Kafka之offset介绍
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
FastDFS
Java阻塞队列
MyBatis-Plus
DataX
Python
SpringCloud
MyBatis
Flume
Elasticsearch
BurpSuite
VUE
Tomcat
Jenkins
SQL练习题
Java
国产数据库改造
Typora
微服务
链表
Filter
MySQL
Shiro
前端
Java编程思想
序列化和反序列化
容器深入研究
Docker
GET和POST
Golang基础
Eclipse
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭