kafka特点
支持多生产者、多消费者、数据持久化于磁盘、具有良好的伸缩性(集群)、性能高。
使用kafka后,生产者与消费者不再耦合,生产者只管生产数据,而不用担心消费者数量,消费速度,消费是否成功。消费者只需要订阅自己关注的主题,在有需要的场景下支持调整偏移量,重新消费数据。
kafka生产者
发送消息的方式
- 同步发送消息 在Java实现里,发送会返回一个Future对象,调用Future.get()方法等待发送结果,如果发送成功,会返回一个包含消息topic,partition,offset等信息的RecordMetadata对象。如果发送失败,则会抛出异常。kafka生产者可以配置为自动重试的方式,对于返回的异常,如果重试超过阈值仍然失败,则会抛出重试异常。如果遇到重试无法解决的异常,例如消息体大小超过限制,kafka不会进行重试,将直接抛出异常。
- 异步发送消息 实现一个回调接口,当kafka服务返回发送结果时,会回调对应的函数,传入返回结果和异常信息(如果存在异常)。
- 发送后即忘记。不关注返回结果,这样如果有配置重试机制,失败了仍然会重试,但是重试也失败,消息就将直接丢失。
kafka通过toppic进行消息分类,topic可以被分为多个partition,不同partition之间不能保证消息有序,但是单个partition内可以保证消息有序。
默认情况下,消息发布时会根据kafka内置的hash算法,均匀分发到不同的分区上,但是可以通过自定义算法(配合消息的key)来把消息映射到指定的分区上,或者直接指定分区。如果消息key为null,并且使用默认的分区器,将会使用轮询的方式分发消息到各个分区。
一些重要配置
acks
这个参数代表kafka集群里,生产者收到多少个broker的相应,才认为消息发送成功了。如果acks=0,即不等待任何相应就认为发送成功,这种形式有可能丢失消息,但性能最高。如果acks=1,则认为收到主节点的回应就认为是发送成功,如果主节点崩了,集群还未重新选主,服务器收到消息会抛出异常,生产者重试发送。如果一个未收到消息的节点成为了新主节点,消息还是会丢失。如果acks=all,则受到所有节点的响应才认为消息发送成功。
compression .type
消息压缩算法,默认情况下消息发送出去是没有经过压缩的。这个参数可以指定压缩算法。
retries
消息发送失败,服务器抛出可重试异常时的重试次数上限。
max.in.flight.requests.per.connection
在收到服务器响应前,客户端最多可以发送多少个消息。如果设置为1,可以保证即使发生了重试发送,写入kafka服务器的消息顺序是跟生产者发送的顺序一致,这样在任何一个partition里,都可以保证前面的消息比后面的消息发送时间要早。这在特定的场景下有用,但是会影响性能。
batch .size
多个消息要发送到同一partition时,生产者会将他们放在同一批次里(类似redis的管道操作),这个参数可以设置单个批次的大小限制。
max.request.size
单个请求的大小限制
kafka消费者
对于同一个消费者群组(group)内的消费者,每个分区(partition)只能被一个消费者消费。也就是说,如果partition数量比消费者多,将会有一个或者多个消费者消费多个partition,如果消费者数量比partition多,将会有消费者永远无法消费消息。
消费者与被被指定为集群协调器的节点会保持心跳,如果心跳停止时间超过阈值,partition与消费者的关系就会发生rehash。当有新的消费者加入时,也可能会发生rehash(取决于当前消费者数量是否已经>=partition数)
如果设置了自动提交offset,在两次自动提交间隔发生了rehash,那么这个partition的消息可能会被重复消费(原理自行理会 =。=懒得画图示意),这种情况需要注意消费者处理消息的幂等性。也可以关闭自动提交功能,使用手动提交。为了避免手动提交的请求持续等待kafka服务的相应,还可以进行异步手动提交,即kafka服务器相应后回调消费者。
- 消费者可以对rehash事件进行监听,以在rehash发生时作一些处理避免出现异常(如前面说的重复消费)。
- 消费者可以使用seek()方法从指定的offset开始读取消息,在一些特殊场景下,可以将offset存储在kafka服务以外的地方,每次直接指定offset读取数据
消费者可以直接为自己指定partition进行消费,这样就没有rehash什么事了,当然如果这个消费者挂了,也就没人消费这个partition了。
一些重要配置
fetch.min.bytes
消费者获取单条记录的最小字节数。kafka服务往消费者写消息时,达到这个数量才会返回给消费者。如果数据量小,消费者空轮询CPU占用高,可适当调大这个配置。
fetch.max.wait.ms
当fetch.min.bytes的数据量长时间不能达到时,这个参数就起作用了。kafka服务器最多等待这么长时间就会将数据返回给消费者,即使数据量未达到fetch.min.bytes的限制。
session.timeout.ms
消费者心跳停止死亡时间。如果停止向服务器发送心跳超过这个时间,则认为消费者死亡,进行rehash。注意与heatbeat.inteval.ms(心跳间隔)搭配使用。
enable.auto.commit
消费者是否自动提交偏移量,默认为true。每次提交的间隔时间由auto.commit.interval.ms决定。
消息存储配置
num.patitions
partition数量的考量:整个主题的吞吐量(系统有2个生产者,4生产者写入速度50MB/s,单个消费者消费速度为5Mb/s,则需要20个消费者才能没有消息堆积,此时则最少需要20个partition)
log.retention.hour/minutes/ms
消息保存时间,默认为1周
log.retention.bytes
通过持久化文件的大小来判定消息是否过期
log.segment.bytes
单个日志文件的大小限制,太小会导致频繁关闭文件,创建新文件。太大则消息过期时间可能比配置的消息过期时间还长,因为当前消息文件未关闭时,里面消息是不会过期的。
log.segment.ms
消息文件关闭的时间限制
message.max.bytes
单个消息大小限制