李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
15.Kafka生产者API
Leefs
2021-10-14 AM
2218℃
2条
[TOC] ### 前言 + 本次运行的Kafka版本是2.8.0单机版; + 在运行之前先要检查防火墙是否关闭,或者是否开启**9092**端口 **查看防火墙状态命令** ```shell firewall-cmd --state ``` **关闭防火墙命令** ```shell systemctl stop firewalld.service ``` *PS:别问我为什么要强调这一点。* ### 一、消息发送流程 Kafka 的 Producer 发送消息采用的是异步发送的方式。 在消息发送的过程中,涉及到了两个线程——**main线程和 Sender线程**,以及一个线程共享变量——`RecordAccumulator`。 main 线程将消息发送给 `RecordAccumulator`,Sender 线程不断从 `RecordAccumulator` 中拉取消息发送到 Kafka broker。 ![15.Kafka生产者API01.png](https://lilinchao.com/usr/uploads/2021/10/3807601075.png) **main线程说明** + 消息通过main线程发送的时候会先经过 interceptors(拦截器)过滤掉一些不要的数据; + 通过拦截器过滤以后在进行序列化操作,方便进行网络传输; + 消息会进一步通过partitioner 来确定这个消息要放哪个partition里面,然后按照分区放到RecordAccumulator 容器里面; + 在进入到RecordAccumulator容器之前消息就已经确定好了要去哪个分区。 Sender守护线程就负责从RecordAccumulator容器里面拉取数据,往对应的Topic的partition里面拷贝。 **相关参数** + **batch.size:**只有数据积累到 batch.size 之后,sender 才会发送数据。 + **linger.ms:**如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。 ### 二、生产者API操作基本步骤 #### 2.1 引入Java依赖包 ```xml
org.apache.kafka
kafka-clients
2.8.0
``` 本次引入的是2.8.0版本对应的依赖包,可以根据自身Kafka版本进行修改为对应的依赖包。 #### 2.2 创建生产者对象 + 使用**KafkaProducer** 创建kafka生产者对象,这时可以发现kafka不允许我们使用空构造来创建对象; + 选用传入properties的方式创建kafka生产者 + 创建生产者的时候,跟控制台命令一样,**我们需要指定集群名称以及序列化器**,而这些相关设置都会存储在我们的配置文件中; + kafka给我们提供了**ProducerConfig**类,并在其中已经给我们提前准备好了我们所需要的key,在向properties中put键值时,可以直接使用producerConfig的静态常量作为key;并传入相应value #### 2.3 向Kafka中发送信息 + 使用kafkaProducer向kafka中发送信息,可以使用其提供的**send()**方法 ; + 使用时可以看到其需要传入**ProducerRecord**以及一个可选的**Callback**; **注:** **ProducerRecord:** 即为每条数据所封装成的对象; **CallBack:**可选;获取函数的回调 #### 2.4 close() 在真实生产环境中,我们可能不需要手动调用close方法关闭kafkaProducer,但是目前的测试阶段,如果不使用close关闭,可能会导致发送的信息在设置等待的时间内,不会被真正的发送; **流在关闭的时候会对数据进行回收操作** ### 三、异步发送API **需要用到的类:** + **KafkaProducer**:需要创建一个生产者对象,用来发送数据 + **ProducerConfig**:获取所需的一系列配置参数 + **ProducerRecord**:每条数据都要封装成一个 ProducerRecord 对象 #### 3.1 不带回调函数的API ```java import org.apache.kafka.clients.producer.*; import java.util.Properties; /** * @author lilinchao * @date 2021/10/13 * @description 不带回调函数的API **/ public class CustomProducer { public static void main(String[] args){ Properties props = new Properties(); //Kafka服务端的主机名(或IP)和端口号 props.put("bootstrap.servers", "192.168.159.135:9092"); //等待所有副本节点的应答 props.put("acks", "all"); //消息发送最大尝试次数 props.put("retries", 0); //一批消息处理大小 props.put("batch.size", 16384); //请求延时 props.put("linger.ms", 1); //发送缓存区内存大小 props.put("buffer.memory", 33554432); //key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer
producer = new KafkaProducer
(props); for (int i = 0;i < 10;i++){ producer.send(new ProducerRecord
("kafkaDemo", Integer.toString(i), "Producer-" + i)); System.out.println("发送:"+i); } producer.close(); } } ``` **在Kafka服务端开启消费者命令** ```shell [root@hadoopserver kafka_2.12-2.8.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.159.135:9092 --topic kafkaDemo ``` **运行结果** ![15.Kafka生产者API02.jpg](https://lilinchao.com/usr/uploads/2021/10/3174092839.jpg) #### 3.2 带回调函数的API 回调函数会在 producer 收到 ack 时调用,为异步调用。 ```java import org.apache.kafka.clients.producer.*; import java.util.Properties; /** * @author lilinchao * @date 2021/10/14 * @description 带回调函数API **/ public class CallBackCustomProducer { public static void main(String[] args){ Properties props = new Properties(); //Kafka服务端的主机名和端口号 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.135:9092"); //等待所有副本节点的应答 props.put(ProducerConfig.ACKS_CONFIG, "all"); //消息发送最大尝试次数 props.put(ProducerConfig.RETRIES_CONFIG, 0); //一批消息处理大小 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //请求延时 props.put(ProducerConfig.LINGER_MS_CONFIG, 1); //发送缓存区内存大小 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); //key序列化 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //value序列化 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producer
producer = new KafkaProducer
(props); for (int i = 0;i < 10;i++){ producer.send(new ProducerRecord
("kafkaDemo", Integer.toString(i), "Producer-" + i),new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { System.err.println(metadata.partition() + "---" + metadata.offset()); }else{ exception.printStackTrace(); } } }); } producer.close(); } } ``` **运行结果** ![15.Kafka生产者API03.jpg](https://lilinchao.com/usr/uploads/2021/10/2518029628.jpg) **Callback方法有两个参数:** + **RecordMetadata**:回调函数返回信息 + **Exception**:为null,说明消息发送成功,不为null,说明消息发送失败。 *注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。* ### 四、同步发送API 同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于send方法返回的是一个Future对象,根据Future对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可。 ```java import org.apache.kafka.clients.producer.*; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * @author lilinchao * @date 2021/10/14 * @description 同步发送API **/ public class CustomProducer { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); //Kafka服务端的主机名和端口号 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.159.135:9092"); //等待所有副本节点的应答 props.put(ProducerConfig.ACKS_CONFIG, "all"); //消息发送最大尝试次数 props.put(ProducerConfig.RETRIES_CONFIG, 0); //一批消息处理大小 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //请求延时 props.put(ProducerConfig.LINGER_MS_CONFIG, 1); //发送缓存区内存大小 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); //key序列化 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //value序列化 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producer
producer = new KafkaProducer
(props); for (int i = 0;i < 10;i++){ producer.send(new ProducerRecord
("kafkaDemo", Integer.toString(i), "Producer-" + i)).get(); System.out.println("发送:"+i); } producer.close(); } } ``` 通过服务端消费者命令查看消费结果 ```basic Producer-0 Producer-1 Producer-2 Producer-3 Producer-4 Producer-5 Producer-6 Producer-7 Producer-8 Producer-9 ``` *附参考文章链接地址:* *https://blog.csdn.net/weixin_46937640/article/details/119697227*
标签:
Kafka
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/1554.html
上一篇
14.【转载】Kafka事务特性详解
下一篇
16.Kafka消费者API
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Linux
哈希表
栈
Java编程思想
Kafka
稀疏数组
Spark Streaming
Spark Core
CentOS
数学
微服务
数据结构
查找
Elastisearch
ajax
Java
排序
Scala
字符串
DataX
递归
Shiro
Tomcat
持有对象
HDFS
并发编程
JVM
Jenkins
数据结构和算法
Sentinel
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭