李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
16.Kafka消费者API
Leefs
2021-10-15 PM
1508℃
0条
[TOC] ### 前言 Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。 由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。 **所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。** 如果想了解更多Offset详情信息可参考文章:[12.Kafka之offset介绍](https://lilinchao.com/archives/1546.html) 本次使用的Kafka版本是2.8.0 **导入依赖** ```xml
org.apache.kafka
kafka-clients
2.8.0
``` ### 一、自动提交offset ```java import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * @author lilinchao * @date 2021/10/15 * @description 自动提交offset **/ public class CustomConsumer { public static void main(String[] args){ Properties props = new Properties(); //连接信息 props.put("bootstrap.servers","192.168.159.135:9092"); //消费者组ID props.put("group.id","test"); //是否自动提交offset(消费偏移量) props.put("enable.auto.commit","true"); //自动提交时间间隔 props.put("auto.commit.interval.ms", "1000"); //Key反序列化 props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); //Value反序列化 props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); //创建消费者对象 KafkaConsumer
consumer = new KafkaConsumer
(props); //订阅主题(可以订阅多个) consumer.subscribe(Arrays.asList("first","kafkaDemo")); while (true){ //获取数据 ConsumerRecords
records = consumer.poll(100); //解析数据 for(ConsumerRecord
record:records){ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } } ``` 先运行消费者,在运行生产者生产数据。 **运行结果** ![16.Kafka消费者API01.jpg](https://lilinchao.com/usr/uploads/2021/10/3665191540.jpg) **说明** + **KafkaConsumer:**需要创建一个消费者对象,用来消费数据 + **ConsuemrRecord:**每条数据都要封装成一个 ConsumerRecord 对象,为了使我们能够专注于自己的业务逻辑,Kafka 提供了自动提交offset 的功能。 **自动提交 offset 的相关参数:** **enable.auto.commit:**是否开启自动提交 offset 功能 **auto.commit.interval.ms:**自动提交 offset 的时间间隔 ### 二、手动提交offset #### 2.1 概述 虽然自动提交offset十分简介便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因此Kafka还提供了手动提交offset的 API。 **手动提交offset的方法有两种:** + commitSync(同步提交) + commitAsync(异步提交) ##### 两种方式的异同 **相同点:** 都会将本次poll 的一批数据最高的偏移量提交 **不同点:** + commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致, 也会出现提交失败); + commitAsync 则没有失败重试机制,故有可能提交失败。 #### 2.2 同步提交offset 由于同步提交offset有失败重试机制,故更加可靠。 **代码示例** ```java import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * @author lilinchao * @date 2021/10/15 * @description 同步提交 offset **/ public class CommitSyncConsumer { public static void main(String[] args){ Properties props = new Properties(); //连接信息 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.135:9092"); //消费者组ID props.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); //关闭自动提交offset props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); //Key反序列化 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); //Value反序列化 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); //创建消费者对象 KafkaConsumer
consumer = new KafkaConsumer
(props); //订阅主题(可以订阅多个) consumer.subscribe(Arrays.asList("first","kafkaDemo")); while (true){ //获取数据 ConsumerRecords
records = consumer.poll(100); //解析数据 for(ConsumerRecord
record:records){ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } //同步提交,当前线程会阻塞直到 offset 提交成功 consumer.commitSync(); } } } ``` **运行结果** ![16.Kafka消费者API02.jpg](https://lilinchao.com/usr/uploads/2021/10/2010169155.jpg) #### 2.3 异步提交offset 虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞 吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。 **代码示例** ```java import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Map; import java.util.Properties; /** * @author lilinchao * @date 2021/10/15 * @description 异步提交offset **/ public class CommitAsyncConsumer { public static void main(String[] args){ Properties props = new Properties(); //连接信息 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.135:9092"); //消费者组ID props.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); //关闭自动提交offset props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); //Key反序列化 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); //Value反序列化 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); //重置offset //earliest:从头开始消费,触发的条件1,换组;条件2:保留的offset指向的数据已经不存在 //latest:默认值,消费最新的数据。 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //创建消费者对象 KafkaConsumer
consumer = new KafkaConsumer
(props); //订阅主题(可以订阅多个) consumer.subscribe(Arrays.asList("first","kafkaDemo")); while (true){ //消费者拉取数据(每隔0.1秒拉取一次) ConsumerRecords
records = consumer.poll(100); //解析数据 for(ConsumerRecord
record:records){ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } //异步提交 consumer.commitAsync(new OffsetCommitCallback() { public void onComplete(Map
offsets, Exception exception) { if (exception != null) { System.err.println("Commit failed for" + offsets); } } }); } } } ``` **运行结果** ![16.Kafka消费者API03.jpg](https://lilinchao.com/usr/uploads/2021/10/3355939579.jpg) #### 2.4 数据漏消费和重复消费分析 无论是同步提交还是异步提交offset,都有可能会造成数据的漏消费或者重复消费。 + 先提交offset后消费,有可能造成数据的漏消费; + 而先消费后提交offset,有可能会造成数据的重复消费。 ### 三、自定义存储offset #### 3.1 概述 Kafka 0.9 版本之前,offset存储在 zookeeper,0.9 版本及之后,默认将 offset 存储在 Kafka 的一个内置的 topic 中。除此之外,Kafka 还可以选择自定义存储offset。 offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace。 **当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。** 消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。**因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。** #### 3.2 实现 要实现自定义存储 offset,需要借助 `ConsumerRebalanceListener`,以下为示例代码,其中提交和获取 offset 的方法,需要根据所选的 offset 存储系统自行实现。 ```java import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.util.*; /** * @author lilinchao * @date 2021/10/15 * @description 自定义存储Offset **/ public class CustomizeOffset { private static Map
currentOffset = new HashMap
(); public static void main(String[] args) { Properties props = new Properties(); //连接信息 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.135:9092"); //消费者组,只要 group.id 相同,就属于同一个消费者组 props.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); //关闭自动提交offset props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); //Key反序列化 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); //Value反序列化 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); //创建一个消费者 final KafkaConsumer
consumer = new KafkaConsumer
(props); //消费者订阅主题 consumer.subscribe(Arrays.asList("first","kafkaDemo"), new ConsumerRebalanceListener() { //该方法会在 Rebalance 之前调用 public void onPartitionsRevoked(Collection
partitions) { commitOffset(currentOffset); } //该方法会在 Rebalance 之后调用 public void onPartitionsAssigned(Collection
partitions) { currentOffset.clear(); for (TopicPartition partition : partitions) { consumer.seek(partition, getOffset(partition));//定位到最近提交的 offset 位置继续消费 } } }); while (true) { //消费者拉取数据 ConsumerRecords
records = consumer.poll(100); for (ConsumerRecord
record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset()); } //异步提交 commitOffset(currentOffset); } } /** * 获取某分区的最新 offset * @param partition * @return */ private static long getOffset(TopicPartition partition) { return 0; } /** * 提交该消费者所有分区的 offset * @param currentOffset */ private static void commitOffset(Map
currentOffset) { } } ``` **运行结果** ![16.Kafka消费者API04.jpg](https://lilinchao.com/usr/uploads/2021/10/788401066.jpg)
标签:
Kafka
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1559.html
上一篇
15.Kafka生产者API
下一篇
17.Kafka自定义拦截器
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
DataWarehouse
RSA加解密
Jquery
Spark SQL
MySQL
Spark RDD
Kafka
随笔
Hbase
数学
Filter
FastDFS
Shiro
Java工具类
线程池
Quartz
前端
Golang基础
哈希表
Java编程思想
Redis
递归
Spark Streaming
Flume
JavaWEB项目搭建
数据结构
gorm
Typora
Kibana
Hive
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭