程序员

kafka--生产者详解

作者:admin 2021-06-12 我要评论

一、生产者发送消息的过程 1.包装?ProducerRecord 对象 Kafka 会将发送消息包装为 ProducerRecord 对象 ProducerRecord 对象包含了目标主题和要发送的内容同时还...

在说正事之前,我要推荐一个福利:你还在原价购买阿里云、腾讯云、华为云服务器吗?那太亏啦!来这里,新购、升级、续费都打折,能够为您省60%的钱呢!2核4G企业级云服务器低至69元/年,点击进去看看吧>>>)

一、生产者发送消息的过程

1.包装?ProducerRecord 对象

Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。

2.指定分区

接下来,数据被传给分区器。如果之前已经在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情。如果没有指定分区 ,那么分区器会根据 ProducerRecord 对象的键来选择一个分区,紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。

3.放入缓存

分好区的消息不是直接被发送到服务端,而是放入了生产者的一个缓存里面。在这个缓存里面,多条消息会被封装成为一个批次(batch),默认一个批次的大小是 16K。

4.发送消息

Sender 线程启动以后会从缓存里面去获取可以发送的批次。把这些记录批次发送到相应的 broker 上。
?

5.接收返回

服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,如果达到指定的重试次数后还没有成功,则直接抛出异常,不再重试
?

二、生产者整体架构

整个生 产者客户端由两个线程协调运行,这两个线程分 别为主线程和 Sender 线程 (发送线 程)。在主线程中由 kafkaProd ucer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器( RecordAccumulator ,也称为消息收 集器〉中。 Sender 线程负责从 RecordAccumulator 获取消息并将其发送到 Ka fka

三、序列化

生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kaflca。 而在对侧, 消费者需要用反序列化器(Deserializer)把从Kaflca 中收到的字节数组转换成相应的对象。消息的key和value都使用了字符串, 对应程序中的 序列化器也使用了客户端自带的org.apache.kaflca. common. serialization. StringSerializer, 除了用于 String 类型的序列化器,还有ByteArray、ByteBuffer、 Bytes、 Double、Integer、 Long这几种类 型, 它们都实现了org.apache.kaflca. common. serialization. Serializer接口, 此接口有3个方法:

public void configure(Map<String, ?> configs, boolean isKey)
public byte[] serialize(String topic, T data)
public void close()

configure()方法用来配置当前类,serialize()方法用来执行序列化操作。 而close()方法用来关闭当前的序列化器, 一般情况下close()是个空方法, 如果实现了此方法, 则必须确保此方法的幕等性, 因为这个方法很可能会被KafkaProducer 调用多次。 生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的, 如果生产者使用了 某种序列化器, 比如StringSerializer, 而消费者使用了另 一种序列化器, 比如IntegerSerializer,那么是无法解析出想要的数据的

kakfa支持配置自定义序列化:只需将KafkaProducer的value.serializer 参数设置为CompanySerializer类的全限定名即可。

四、分区器

消息在通过send()方法发往broker 的过程中, 有可能需要经过拦截器(Interceptor)、 序列 化器(Serializer)和分区器(Partitioner)的一 系列作用之后才能被真正地发往 broker。 拦截器?一 般不是必需的, 而序列化器是必需的。 消息 经过 序列化 之后就需要确定它发往的分区 , 如果消息ProducerRecord中指定了 partition字段, 那么就不需要分区器 的作用, 因为par巨巨on代表的就是所要发往的分区号。 如果消息ProducerRecord中没有 指定par巨巨on字段,那么就需要依赖分区器,根据key 这个字段来计算 partition 的值。 分区器的作用 就是为消息 分配分区。
?
Kafka 中提供的默认分区器是org.apache.kafka.clients.producer.intemals.DefaultPartitioner, 它 实现了org.apache.kafka.clients.producer.Partitioner 接口, 这个接口中定义了2个方法, 具体如下所示。
public int partition(S七ring topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster); 
public void close()
其中 partition ()方法用来计算分区号,返回值为 int 类型。 partition ()方法中的参数分别表示 主题 、键、序列化后的 键、值、序列 后的值,以及集群的元数据信息,通过这些信息 可以实 现功能丰富的分区器。c lose() ? 方法在关闭分区器的时候用来回收 一些资源
?
?
在默认分区器 DefaultPartitioner 的实现中, close() 是空方法,而在 partition ()方法中定义了 主要的分区分配逻辑 如果 ke 不为 null ,那 么默认的分区器会对 key 进行哈 希(采用 MurmurHash2 算法 ,具备高运算性能及 低碰 撞率),最终根据得到 哈希值来 算分区号,拥 有相同 key 的消息会被写入同一个分区 如果 key为 null ,那么消息将会以轮询的方式发往主 题内的各个可用分区。
如果key不为null,那么分区号会是所有分区中的任意一个,如果为null则仅会为可用分区中的任意一个
除了使用 Kafka 提供的默认分区器进行分区分配,还可以使用自定义的分区器,只需同 DefaultPartitioner 一样实 Partitioner 接口 即可 。默认 分区器在 key null 时不 会选择 非可用 的分区,我们可以通过自 定义 的分 区器 DemoPartitioner 打破这 限制
?
?

五、生产者拦截器

拦截器(? Interceptor )是早在 Kafka 0.10.0.0 中就 已经 引入的 个功能, Kafka 一共有 两种拦 截器 生产者拦截器和消费者拦截器
生产者拦截器既 可以用 来在消息发送前 一些准备工作 比如按照某 个规则过率 不符 合要 的消 息、修改消息 的内容等, 也可以 用来在发送回 调逻辑前做一 些定 制化的需 求,比如统计 类工作。
?
生产者拦截器 使用 很方便,主要是自定义实现 org apache.kafka. clients . producer. Producerlnterceptor 接口。 Producer Interceptor 口中包含三 个方法
public ProducerRecord<K, V> onSend (ProducerRecord<K, V> record ); 
public void onAcknowledgement(RecordMetadata metadata , Exception exception ); 
public void close() ;

KafkaProducer 在将消息序列化和计算分区 前会调 生产者拦截器 onSend() 方法来对消息进行相应 定制化操作。一般来说最好不要修改消息 ProducerRecord 的topic和partition 等信息,如果要修改,则需确保对其有准确的判断,否则会与预想的效果出现偏 差。比如修改 key 不仅会影响分区的计算,同样会影响 broker 端日志压缩( Log Compaction) 的功能

KafkaProducer 会在消息被应 答( A cknowledgement )之前或消息发送失 败时调用生产者拦 截器的 onAcknowledgement () 方法,优先于用户设定的 Ca llback 之前执行。这个方法运行在 Producer I/O? 线程中,所以这个方法中实现的代码逻辑越简单越好 则会 影响消息的发送 速度。
?
close () 方法主要用于在关闭拦截器时执行一些资源的清理工作。在这 个方 法中抛 出的异 常都会被捕获并记录到日志中,但并不会再向上传递。

六、RecordAccumulator消息累加器(缓冲区)

主要用来缓存消息 Sender 线程可以批量发送,进 减少网络传输 的资源消耗以提升性能 RecordAccumulator 缓存的大 小可以通过生产者客户端参数 buffer. memory 配置,默认值为 33554432B ,即 32MB? 如果生 产者发送 消息的速度超过发 送到服务器的速度 ,则 会导致生产者空间不足,这个时候 KafkaPro ducer send () 方法调用要么 被阻塞,要么抛出异常,这个取决于参数 max block ms 的配置,此参数的默认值为 6 0000,及? 60秒
?
内部结构
主线程中发送过来的消息都会被迫加到 RecordAccumulator 的某个双端队列( Deque )中, 在RecordAccumulator 的内部为每个分区都维护了 个双端队列,队列中的内容就是 Prod uc e r Batch ,即 Deque ProducerBatch >。消息写入缓存 时,追加到双端队列的尾部: Sender 读取消息时 ,从 双端队列的头部读取。注意 Producer Batch 不是 Producer Record, ProducerBatch 中可以包含一至多个 Producer Record 通俗地说, ProducerRecord 是生产者中创建的消息,而 Producer Batch 是指一个消息批次 ProducerRecord 会被包含在 Pro ducer Batch 中,这样可以使字 节的使用更加紧凑。与此同时,将较小的 Producer Record 凑成一个较大 ProducerBatch ,也 可以减少网络请求的次数以提升整体的吞吐量 Producer Batch 和消息的具体格式有关

?

七、kafka发送消息

。 发送消息主要有三种模式: 发 后即忘(fire-and-forget)、 同步(sync)及异步Casync)

send()本身是异步的,但是调用send()后可以通过代码实现同步还是异步,异步:一旦消息被保存在等待发送的消息缓存中,此方法就立即返回。这样并行发送多条消息而不阻塞去等待每一条消息的响应。当然也可以使用同步发送但是性能差,不推荐

简单同步发送实现方法:

在调用send方法后直接调用get方法强行堵塞

 RecordMetadata metadata = producer.send(record).get();

异步实现

通常我们并不关心发送成功的情况,更多关注的是失败的情况,因此 Kafka 提供了异步发送和回调函数。 代码如下:

producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                System.out.println("进行异常处理");
            } else {
                System.out.printf("topic=%s, partition=%d, offset=%s \n",
                        metadata.topic(), metadata.partition(), metadata.offset());
            }
        }
    });

八、重要的生产者参数

在kafka生产者中大部分的参数都有合理的默认值,一般不需要修改它们

1.acks

这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。 acks 是生产者客户端中一个非常 重要的 参数 ,它涉及 消息的可靠 性和吞吐量 之间的权衡acks 参数有3 种类型的值(都是字符串类型)

(1)acks=1

默认值即为1?。生产者发送消息之后,只要分区的 leader 副本成功写入消 息,那么它就会收到来自服务端的成功响应,如果消息无法写入 leader 副本,比如在 leader副本崩溃、重新选举新的 leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入 leader 副本并 返回成功响应给生产者,且在被其他 fo llo wer 副本拉取之前 leader 副本崩溃,那么此时消息还是会丢失,因为新选举的 leader 副本中并没有这条对应的消息 acks 设置为1,是消息可靠性和吞吐量之间的折中方

(2)acks = 0

生产者发送消 息之后不需要等待任何服务端的响应 。如果在消息从发送到 写入 Kafka 的过程中出现某些异常,导致 Kafka 并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下, acks 设置为0 可以达到最大的吞吐量。

?

(3)acks = -1或acks =all

生产者在消 息发送之后,需要等待 ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下, acks 设置为 (all )可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为?JSR 中可能只有 leader 副本,这样就退化成了 acks=1?的情况。要获得更高的消息 可靠性需要配合 min.insync.replicas 参数的联动

注意 acks 参数配置的值是一个字符串类型,而不是整数类型

2.max.request.size

该参数用于控制生产者发送的请求大小,它可以指发送的单个消息的最大值,kafka默认的发送一条消息的大小是1M

3.retries 和?retry.backo.ms

发生错误后,消息重发的次数。如果达到设定值,生产者就会放弃重试并返回错误。默认是0,即在发生异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常, 比如网络抖动、 le der 副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries 大于0值,以此通过内部重试来恢复而不是一昧地将异常抛给生产者的应用程序。但是不是所有异常都能处理,比如超过消息最大值的异常

retry.backoff.ms用来设置两次重试之间的间隔

4.compression.type

这个参数用来指定消息的压缩方式,默认值为“ none ”,即默认情况下,消息不会被压缩。 该参数还可以配置为“ gzip snappy 和“ z4 对消息进行压缩可以极大地减少网络传输 、降低网络I/O,从而提 高整 体的性能 。消息压 缩是 种使用时间换 间的优化 方式 ,如 果对 时延有一定的要求,则不推荐对消息进行压缩
?

5.linge .ms

这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息( ProducerRecord )加入 Producer Batch 时间,默认值为 。生产者客户端会在 ProducerBatch 填满或等待时间超过 linger.ms 值时发迭出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。

6.?receive.buffer.bytes & send.buffer.byte

这个参数用来设置 Socket 接收消息缓冲区( SO RE CBUF )的大小(滑动窗口协议),默认值接收窗口为 32KB,发生窗口为128KB。如果设置为 -1 ,则使用操作系统的默认值。如果 Producer Kafka 于不同的机房 则可以 适地调大这个参数值

7.timeout.ms, request.timeout.ms & metadata.fetch.timeout.ms

timeout.ms 指定了 borker 等待同步副本返回消息的确认时间;
request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间;
metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如分区首领是谁)时等待服务器返回响应的时间。
?

;原文链接:https://blog.csdn.net/zhang09090606/article/details/115536102

版权声明:本文转载自网络,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。本站转载出于传播更多优秀技术知识之目的,如有侵权请联系QQ/微信:153890879删除

相关文章
  • Windows 10新增磁盘分析工具:文件夹占

    Windows 10新增磁盘分析工具:文件夹占

  • 鸿蒙HarmonyOS环境搭建遇到的坑,分享

    鸿蒙HarmonyOS环境搭建遇到的坑,分享

  • Zabbix5.2由浅入深系列之制作网络设备

    Zabbix5.2由浅入深系列之制作网络设备

  • 转手赚1000,开源抢茅台神器,真香!

    转手赚1000,开源抢茅台神器,真香!

腾讯云代理商
海外云服务器