kafka如何设置指定分区进行发送和消费?原生代码和整合springboot代码都有

不整合springboot

整合springboot

 

0

Spring Boot 如何自定义kafka 消费者配置 ContainerFactory

本篇博文主要提供一个在 SpringBoot 中自定义 kafka配置的实践,想象这样一个场景:你的系统需要监听多个不同集群的消息,在不同的集群中topic冲突了,所以你需要分别定义kafka消息配置。

此篇文章会在SpringBoot 提供的默认模板上提供扩展,不会因为你自定义了消费者配置,而导致原生SpringBoot的Kakfa模板配置失效。

1 引入 MAVEN 依赖

版本需要你自己指定

2 引入Java配置类

3 yml模板

4 配置释义

点开 KafkaProperties 这个类,可以看到这个是SpringBoot 自动配置kafka的配置类,引入这个实例,就相当于你拿到了SpringBoot kafka配置模板的参数,就是上述贴的配置,然后再此基础上重新定义你需要改变的配置,这里主要讲消费者配置。

代码中举了个重写监听servers的例子:

5 @KafkaListener 使用 containerFactory

如果在@KafkaListener属性中没有指定 containerFactory 那么Spring Boot 会默认注入 name 为“kafkaListenerContainerFactory” 的 containerFactory。具体源码可跟踪:KafkaListenerAnnotationBeanPostProcessor中的常量:

 

0

kafka集群分布式下,如何保证消息的有序性?

前言

kafka可以保证同一个partition的消息的有序性,不能保证不同partition消息的有序性。

为什么kafka不设计成不同partition消息的有序性?

如果要保证多个partition有序,那么既要保证broker保存消息的顺序,又要保证消费消息时的顺序。如果partition1堵了,为了保证消费的有序性,其它partition都只能等待了,这样kafka的多partition还有什么意义呢。

kafka producer 发送消息时,是如何确保单一partition的有序性的?

加锁

kafka producer 发送消息时,是如何确保单一partition的有序性
kafka producer 发送消息时,是如何确保单一partition的有序性

结合我们自己的业务场景,可以在发送消息时,想保证顺序的同一批数据指定相同的message key,因为kafka可以保证相同的key进入同一partition。

比如,用kafka发送GPS数据时,有三个参数(partition key message),其中partition和key是可选的,我们可以通过指定partition或者指定key为gps设备的imei号,这两者都可以保证同一台gps设备的数据进入同一个partition。

并且kafka也保证了消费端同一个partition的消息只被一个Consumer消费。

牺牲性能的做法

当然,我们也可以在创建topic时,只指定一个partition,这样也能保证消息的有序性,不过这样就浪费了kafka的分布式高吞吐的特性,所以不推荐这样做。

消息发送失败,重试机制启动的情况下,如何保证消息的有序性?

正常的理想状态下,上面的配置的确没问题,可是有时,我们还需要考虑消息发送失败的情况。

一般我们会设置重试机制来解决消息发送的问题,如果是非临时性错误导致的消息发送失败(如消息体过大),那么再怎么重试也是没有意义的。如果是临时性错误导致消息发送失败(如网络抖动或分区找不到首领),那么可以设置producer的retries,该值默认是0,比如设置3,重试3次。但是这样可能就会造成乱序的问题,比如消息A先发送,但是发送失败了,等消息B发完后,消息A又重试成功了,这就造成了message A 和 B 的乱序。

如何解决呢?

可以借助 max.in.flight.requests.per.connection 配置项来解决,该参数指定了生产者在收到服务器响应前可以发送几个消息。该值默认是5。它的值越高,吞吐量就越高,但是也越占内存。我们可以把它设为1,这样就保证了即使发生重试,消息也会严格的有序。

但是,还是老问题,高可靠性和高性能,择其一必然会舍其一,这样的配置可以保证消息的可靠性,但是性能也会下降。

0.11版本后,kafka引入事务机制可保证producer挂掉重启后依然保证有序。

 

 

0

windows下kafka启动

windows下kafka启动

因为kafka依赖zookeeper,所以需要先启动zookeeper。

切换到kafka_2.12-2.1.0\bin\windows目录,该目录下是所有windows命令:

在此目录下打开cmd,执行命令zookeeper-server-start.bat ....\config\zookeeper.properties,如下图:

执行命令kafka-server-start.bat ....\config\server.properties,如下图:

看到started,说明Kafka启动成功。

注意:

如果出现‘命令语法不正确’ ,导致不能正常运行,尝试修改配置文件的dataDir(zookeeper.properties),log.dirs(server.properties)。因为默认的是linux的文件目录格式。

0

Kafka面试题整理总结大全

1 .什么是kafka?主要有哪些应用场景?

Kafka是分布式发布-订阅消息系统,它最初是由LinkedIn公司开发的,之后成为Apache项目的一部分,Kafka是一个分布式,可划分的,冗余备份的持久性的日志服务,它主要用于处理流式数据。

流平台具有三个关键功能:

  1. 消息队列:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因。
  2. 容错的持久方式存储记录消息流: Kafka 会把消息持久化到磁盘,有效避免了消息丢失的风险·。
  3. 流式处理平台: 在消息发布的时候进行处理,Kafka 提供了一个完整的流式处理类库。

Kafka 主要有两大应用场景:

  1. 消息队列 :建立实时流数据管道,以可靠地在系统或应用程序之间获取数据。
  2. 数据处理: 构建实时的流数据处理程序来转换或处理数据流。

2 .为什么要使用 kafka,为什么要使用消息队列?

缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。

解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。

冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。

健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。

异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

3.kafka消息模型

队列模型:早期的消息模型

Kafka面试题整理总结大全

使用队列(Queue)作为消息通信载体,满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。 比如:我们生产者发送 100 条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)

队列模型存在的问题:

假如我们存在这样一种情况:我们需要将生产者产生的消息分发给多个消费者,并且每个消费者都能接收到完成的消息内容。

这种情况,队列模型就不好解决了。很多比较杠精的人就说:我们可以为每个消费者创建一个单独的队列,让生产者发送多份。这是一种非常愚蠢的做法,浪费资源不说,还违背了使用消息队列的目的。

发布-订阅模型:Kafka 消息模型

发布-订阅模型主要是为了解决队列模型存在的问题。

Kafka面试题整理总结大全

发布订阅模型(Pub-Sub) 使用主题(Topic) 作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的

在发布 – 订阅模型中,如果只有一个订阅者,那它和队列模型就基本是一样的了。所以说,发布 – 订阅模型在功能层面上是可以兼容队列模型的。

Kafka 采用的就是发布 – 订阅模型。

RocketMQ 的消息模型和 Kafka 基本是完全一样的。唯一的区别是 Kafka 中没有队列这个概念,与之对应的是 Partition(分区)。

3.什么是Producer、Consumer、Broker、Topic、Partition?

Kafka 将生产者发布的消息发送到 Topic(主题) 中,需要这些消息的消费者可以订阅这些 Topic(主题),如下图所示:

Kafka面试题整理总结大全

上面这张图也为我们引出了,Kafka 比较重要的几个概念:

  1. Producer(生产者) : 产生消息的一方。
  2. Consumer(消费者) : 消费消息的一方。
  3. Broker(代理) : 可以看作是一个独立的 Kafka 实例。多个 Kafka Broker 组成一个 Kafka Cluster。

同时,你一定也注意到每个 Broker 中又包含了 Topic 以及 Partition 这两个重要的概念:

  • Topic(主题) : Producer 将消息发送到特定的主题,Consumer 通过订阅特定的 Topic(主题) 来消费消息。
  • Partition(分区) : Partition 属于 Topic 的一部分。一个 Topic 可以有多个 Partition ,并且同一 Topic 下的 Partition 可以分布在不同的 Broker 上,这也就表明一个 Topic 可以横跨多个 Broker 。这正如我上面所画的图一样。

划重点:Kafka 中的 Partition(分区) 实际上可以对应成为消息队列中的队列。这样是不是更好理解一点?

3.Kafka 的多副本机制了解吗?带来了什么好处?

还有一点我觉得比较重要的是 Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。

生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。当 leader 副本发生故障时会从 follower 中选举出一个 leader,但是 follower 中如果有和 leader 同步程度达不到要求的参加不了 leader 的竞选。

Kafka 的多分区(Partition)以及多副本(Replica)机制有什么好处呢?

  1. Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力(负载均衡)。
  2. Partition 可以指定对应的 Replica 数, 这也极大地提高了消息存储的安全性, 提高了容灾能力,不过也相应的增加了所需要的存储空间。

Zookeeper 在 Kafka 中的作用知道吗?

要想搞懂 zookeeper 在 Kafka 中的作用 一定要自己搭建一个 Kafka 环境然后自己进 zookeeper 去看一下有哪些文件夹和 Kafka 有关,每个节点又保存了什么信息。 一定不要光看不实践,这样学来的也终会忘记。

下图就是我的本地 Zookeeper ,它成功和我本地的 Kafka 关联上(以下文件夹结构借助 idea 插件 Zookeeper tool 实现)。

Kafka面试题整理总结大全

ZooKeeper 主要为 Kafka 提供元数据的管理的功能。

从图中我们可以看出,Zookeeper 主要为 Kafka 做了下面这些事情:

  1. Broker 注册 :在 Zookeeper 上会有一个专门用来进行 Broker 服务器列表记录的节点。每个 Broker 在启动时,都会到 Zookeeper 上进行注册,即到/brokers/ids 下创建属于自己的节点。每个 Broker 就会将自己的 IP 地址和端口等信息记录到该节点中去
  2. Topic 注册 : 在 Kafka 中,同一个Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护。比如我创建了一个名字为 my-topic 的主题并且它有两个分区,对应到 zookeeper 中会创建这些文件夹:/brokers/topics/my-topic/Partitions/0/brokers/topics/my-topic/Partitions/1
  3. 负载均衡 :上面也说过了 Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力。 对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。

Kafka 如何保证消息的消费顺序?

我们在使用消息队列的过程中经常有业务场景需要严格保证消息的消费顺序,比如我们同时发了 2 个消息,这 2 个消息对应的操作分别对应的数据库操作是:更改用户会员等级、根据会员等级计算订单价格。假如这两条消息的消费顺序不一样造成的最终结果就会截然不同。

我们知道 Kafka 中 Partition(分区)是真正保存消息的地方,我们发送的消息都被放在了这里。而我们的 Partition(分区) 又存在于 Topic(主题) 这个概念中,并且我们可以给特定 Topic 指定多个 Partition。

Kafka面试题整理总结大全

每次添加消息到 Partition(分区) 的时候都会采用尾加法,如上图所示。Kafka 只能为我们保证 Partition(分区) 中的消息有序,而不能保证 Topic(主题) 中的 Partition(分区) 的有序。

消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏移量(offset)来保证消息在分区内的顺序性。

所以,我们就有一种很简单的保证消息消费顺序的方法:1 个 Topic 只对应一个 Partition。这样当然可以解决问题,但是破坏了 Kafka 的设计初衷。

Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data(数据) 4 个参数。如果你发送消息的时候指定了 Partition 的话,所有消息都会被发送到指定的 Partition。并且,同一个 key 的消息可以保证只发送到同一个 partition,这个我们可以采用表/对象的 id 来作为 key 。

总结一下,对于如何保证 Kafka 中消息消费的顺序,有了下面两种方法:

  1. 1 个 Topic 只对应一个 Partition。
  2. (推荐)发送消息的时候指定 key/Partition。

当然不仅仅只有上面两种方法,上面两种方法是我觉得比较好理解的,

3.Kafka中的ISR、AR又代表什么?ISR的伸缩又指什么?

  • ISR:In-Sync Replicas 副本同步队列
  • AR:Assigned Replicas 所有副本

ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度,当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。

4.kafka中的broker 是干什么的?

broker 是消息的代理,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉取指定Topic的消息,然后进行业务处理,broker在中间起到一个代理保存消息的中转站。

5.kafka中的 zookeeper 起到什么作用,可以不用zookeeper么?

zookeeper 是一个分布式的协调组件,早期版本的kafka用zk做meta信息存储,consumer的消费状态,group的管理以及 offset的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖,

但是broker依然依赖于ZK,zookeeper 在kafka中还用来选举controller 和检测broker是否存活等等。

6.kafka follower如何与leader同步数据?

Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。完全同步复制要求All Alive Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下,如果leader挂掉,会丢失数据,kafka使用ISR的方式很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,而且Leader充分利用磁盘顺序读以及send file(zero copy)机制,这样极大的提高复制性能,内部批量写磁盘,大幅减少了Follower与Leader的消息量差。

7.什么情况下一个 broker 会从 isr中踢出去?

leader会维护一个与其基本保持同步的Replica列表,该列表称为ISR(in-sync Replica),每个Partition都会有一个ISR,而且是由leader动态维护 ,如果一个follower比一个leader落后太多,或者超过一定时间未发起数据复制请求,则leader将其重ISR中移除 。

8.kafka 为什么那么快?

  • Cache:Filesystem Cache PageCache缓存
  • 顺序写:由于现代的操作系统提供了预读和写技术,磁盘的顺序写大多数情况下比随机写内存还要快。
  • Zero-copy:零拷技术减少拷贝次数
  • Batching of Messages:批量消息处理,合并小的请求,然后以流的方式进行交互,直顶网络上限。
  • Pull 拉模式: 使用拉模式进行消息的获取消费,与消费端处理能力相符。

9.kafka producer如何优化push速度

  • 增加线程
  • 提高 batch.size
  • 增加更多 producer 实例
  • 增加 partition 数
  • 设置 acks=-1 时,如果延迟增大:可以增大 num.replica.fetchers(follower 同步数据的线程数)来调解
  • 跨数据中心的传输,增加 socket 缓冲区设置以及 OS tcp 缓冲区设置

10.kafka producer push数据,ack  为 0, 1, -1 的时候代表啥, 设置 -1 的时候,什么情况下,leader 会认为一条消息 commit了?

  • 1(默认)  数据发送到Kafka后,经过leader成功接收消息的的确认,就算是发送成功了。在这种情况下,如果leader宕机了,则会丢失数据。
  • 0 生产者将数据发送出去就不管了,不去等待任何返回。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
  • -1 producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。当ISR中所有Replica都向Leader发送ACK时,leader才commit,这时候producer才能认为一个请求中的消息都commit了。

11.kafka unclean 配置代表啥,会对 spark streaming 消费有什么影响?

unclean.leader.election.enable 为true的话,意味着非ISR集合的broker 也可以参与选举,这样有可能就会丢数据,spark streaming在消费过程中拿到的 end offset 会突然变小,导致 spark streaming job挂掉。如果unclean.leader.election.enable参数设置为true,就有可能发生数据丢失和数据不一致的情况,Kafka的可靠性就会降低;而如果unclean.leader.election.enable参数设置为false,Kafka的可用性就会降低。

12.如果leader crash时,ISR为空怎么办?

kafka在Broker端提供了一个配置参数:unclean.leader.election,这个参数有两个值:

true(默认):允许不同步副本成为leader,由于不同步副本的消息较为滞后,此时成为leader,可能会出现消息不一致的情况。

false:不允许不同步副本成为leader,此时如果发生ISR列表为空,会一直等待旧leader恢复,降低了可用性。

13.kafka的message格式是什么样的?

一个Kafka的Message由一个固定长度的header和一个变长的消息体body组成。

header部分由一个字节的magic(文件格式)和四个字节的CRC32(用于判断body消息体是否正常)构成。

当magic的值为1的时候,会在magic和crc32之间多一个字节的数据:attributes(保存一些相关属性,比如是否压缩、压缩格式等等);如果magic的值为0,那么不存在attributes属性。

body是由N个字节构成的一个消息体,包含了具体的key/value消息。

14.kafka中consumer group 是什么概念?

同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。同一个topic的数据,会广播给不同的group;同一个group中的worker,只有一个worker能拿到这个数据。换句话说,对于同一个topic,每个group都可以拿到同样的所有数据,但是数据进入group后只能被其中的一个worker消费。group内的worker可以使用多线程或多进程来实现,也可以将进程分散在多台机器上,worker的数量通常不超过partition的数量,且二者最好保持整数倍关系,因为Kafka在设计时假定了一个partition只能被一个worker消费(同一group内)。

15.Kafka中的消息是否会丢失和重复消费?

要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费。

1、消息发送

消息发送失败

生产者(Producer) 调用send方法发送消息之后,消息可能因为网络问题并没有发送过去。

所以,我们不能默认在调用send方法发送消息之后消息消息发送成功了。为了确定消息是发送成功,我们要判断消息发送的结果。但是要注意的是 Kafka 生产者(Producer) 使用 send 方法发送消息实际上是异步的操作,我们可以通过 get()方法获取调用结果,但是这样也让它变为了同步操作,示例代码如下:

但是一般不推荐这么做!可以采用为其添加回调函数的形式,示例代码如下:

如果消息发送失败的话,我们检查失败的原因之后重新发送即可!

另外这里推荐为 Producer 的retries (重试次数)设置一个比较合理的值,一般是 3 ,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动一次你3次一下子就重试完了。

Kafka 弄丢了消息

Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过producer.type属性进行配置。Kafka通过配置request.required.acks属性来确认消息的生产:

  • 0—表示不进行消息接收是否成功的确认;
  • 1—表示当Leader接收成功时确认;
  • -1—表示Leader和Follower都接收成功时确认;

综上所述,有3种消息生产的情况,下面分情况来分析消息丢失的场景:

(1)acks=0,不和Kafka集群进行消息接收确认,则当网络异常、缓冲区满了等情况时,消息可能丢失;

(2)acks=1、同步模式下,只有Leader确认接收成功后但挂掉了,副本没有同步,数据可能丢失;

我们知道 Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。

试想一种情况:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。

设置 acks = all

解决办法就是我们设置 acks = all。acks 是 Kafka 生产者(Producer) 很重要的一个参数。

acks 的默认值即为1,代表我们的消息被leader副本接收之后就算被成功发送。当我们配置 acks = all 代表则所有副本都要接收到该消息之后该消息才算真正成功被发送。

设置 replication.factor >= 3

为了保证 leader 副本能有 follower 副本能同步消息,我们一般会为 topic 设置 replication.factor >= 3。这样就可以保证每个 分区(partition) 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。

设置 min.insync.replicas > 1

一般情况下我们还需要设置 min.insync.replicas> 1 ,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。min.insync.replicas 的默认值为 1 ,在实际生产中应尽量避免默认值 1。

但是,为了保证整个 Kafka 服务的高可用性,你需要确保 replication.factor > min.insync.replicas 。为什么呢?设想一下假如两者相等的话,只要是有一个副本挂掉,整个分区就无法正常工作了。这明显违反高可用性!一般推荐设置成 replication.factor = min.insync.replicas + 1

设置 unclean.leader.election.enable = false

Kafka 0.11.0.0版本开始 unclean.leader.election.enable 参数的默认值由原来的true 改为false

我们最开始也说了我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。多个 follower 副本之间的消息同步情况不一样,当我们配置了 unclean.leader.election.enable = false 的话,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。

2、消息消费

Kafka消息消费有两个consumer接口,Low-level API和High-level API:

Low-level API:消费者自己维护offset等值,可以实现对Kafka的完全控制;

High-level API:封装了对parition和offset的管理,使用简单;

如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“诡异”的消失了。

解决办法:

针对消息丢失:同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态。

 

我们知道消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。

Kafka面试题整理总结大全

当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。

解决办法也比较粗暴,我们手动关闭自动提交 offset,每次在真正消费完消息之后之后再自己手动提交 offset 。 但是,细心的朋友一定会发现,这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。

针对消息重复:===看下面一个面试题===

16.如何保证消息不被重复消费?(如何保证消息消费时的幂等性)

首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为这问题通常不是 MQ 自己保证的,是由我们开发来保证的。拿 Kafka 来举个例子,说说怎么重复消费吧。

Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示“我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 来继续消费吧”。

那出现重复消费的根本原因就是已经消费了数据,但是offset没提交(kafka没有或者不知道该数据已经被消费)。

基于这种原因总结以下几个易造成重复消费的配置:

原因1:强行kill线程,导致消费后的数据,offset没有提交(消费系统宕机、重启等)。

举个例子。

有这么个场景。数据 1/2/3 依次进入 kafka,kafka 会给这三条数据每条分配一个 offset,代表这条数据的序号,分配的 offset 依次是 152/153/154。消费者从 kafka 去消费的时候,也是按照这个顺序去消费。假如当消费者消费了 offset=153 的这条数据,刚准备去提交 offset 到 zookeeper,此时消费者进程被重启了。那么此时消费过的数据 1/2 的 offset 并没有提交,kafka 也就不知道你已经消费了 offset=153 这条数据。那么重启之后,消费者会找 kafka 说,嘿,哥儿们,你给我接着把上次我消费到的那个地方后面的数据继续给我传递过来。数据 1/2 再次被消费。

原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。例如:

上面代码会导致部分offset没提交,下次启动时会重复消费。 

原因3:(重复消费最常见的原因):消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。 

原因4:当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。 

原因5:当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。

17.为什么Kafka不支持读写分离?

在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。

Kafka 并不支持主写从读,因为主写从读有 2 个很明 显的缺点:

(1)数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。

(2)延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经 历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。

18.Kafka中是怎么体现消息顺序性的?

kafka每个partition中的消息在写入时都是有序的,消费时,每个partition只能被每一个group中的一个消费者消费,保证了消费时也是有序的。

整个topic不保证有序。如果为了保证topic整个有序,那么将partition调整为1。

19.消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?

offset+1

20.kafka如何实现延迟队列?

Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。时间轮的应用并非Kafka独有,其应用场景还有很多,在Netty、Akka、Quartz、Zookeeper等组件中都存在时间轮的踪影。

底层使用数组实现,数组中的每个元素可以存放一个TimerTaskList对象。TimerTaskList是一个环形双向链表,在其中的链表项TimerTaskEntry中封装了真正的定时任务TimerTask。

Kafka中到底是怎么推进时间的呢?Kafka中的定时器借助了JDK中的DelayQueue来协助推进时间轮。具体做法是对于每个使用到的TimerTaskList都会加入到DelayQueue中。Kafka中的TimingWheel专门用来执行插入和删除TimerTaskEntry的操作,而DelayQueue专门负责时间推进的任务。再试想一下,DelayQueue中的第一个超时任务列表的expiration为200ms,第二个超时任务为840ms,这里获取DelayQueue的队头只需要O(1)的时间复杂度。如果采用每秒定时推进,那么获取到第一个超时的任务列表时执行的200次推进中有199次属于“空推进”,而获取到第二个超时任务时有需要执行639次“空推进”,这样会无故空耗机器的性能资源,这里采用DelayQueue来辅助以少量空间换时间,从而做到了“精准推进”。Kafka中的定时器真可谓是“知人善用”,用TimingWheel做最擅长的任务添加和删除操作,而用DelayQueue做最擅长的时间推进工作,相辅相成。

21.实现精确一次(exactly once)处理

在分布式环境下,要实现消息一致与精确一次(exactly once)语义处理是很难的。精确一次处理意味着一个消息只处理一次,造成一次的效果,不能多也不能少。

那么kafka如何能够实现这样的效果呢?在介绍之前,我们先来介绍其他两个语义,至多一次(at most once)和至少一次(at least once)。

最多一次和至少一次

最多一次就是保证一条消息只发送一次,这个其实最简单,异步发送一次然后不管就可以,缺点是容易丢数据,所以一般不采用。

至少一次语义是kafka默认提供的语义,它保证每条消息都能至少接收并处理一次,缺点是可能有重复数据。

前面有介绍过acks机制,当设置producer客户端的acks是1的时候,broker接收到消息就会跟producer确认。但producer发送一条消息后,可能因为网络原因消息超时未达,这时候producer客户端会选择重发,broker回应接收到消息,但很可能最开始发送的消息延迟到达,就会造成消息重复接收。

那么针对这些情况,要如何实现精确一次处理的语义呢?

幂等的producer

要介绍幂等的producer之前,得先了解一下幂等这个词是什么意思。幂等这个词最早起源于函数式编程,意思是一个函数无论执行多少次都会返回一样的结果。比如说让一个数加1就不是幂等的,而让一个数取整就是幂等的。因为这个特性所以幂等的函数适用于并发的场景下。

但幂等在分布式系统中含义又做了进一步的延申,比如在kafka中,幂等性意味着一个消息无论重复多少次,都会被当作一个消息来持久化处理。

kafka的producer默认是支持最少一次语义,也就是说不是幂等的,这样在一些比如支付等要求精确数据的场景会出现问题,在0.11.0后,kafka提供了让producer支持幂等的配置操作。即:

props.put(“enable.idempotence”, ture)

在创建producer客户端的时候,添加这一行配置,producer就变成幂等的了。注意开启幂等性的时候,acks就自动是“all”了,如果这时候手动将ackss设置为0,那么会报错。

而底层实现其实也很简单,就是对每条消息生成一个id值,broker会根据这个id值进行去重,从而实现幂等,这样一来就能够实现精确一次的语义了。

但是!幂等的producery也并非万能。有两个主要缺陷

  • 幂等性的producer仅做到单分区上的幂等性,即单分区消息不重复,多分区无法保证幂等性。
  • 只能保持单会话的幂等性,无法实现跨会话的幂等性,也就是说如果producer挂掉再重启,无法保证两个会话间的幂等(新会话可能会重发)。因为broker端无法获取之前的状态信息,所以无法实现跨会话的幂等。

事务的producer

当遇到上述幂等性的缺陷无法解决的时候,可以考虑使用事务了。事务可以支持多分区的数据完整性,原子性。并且支持跨会话的exactly once处理语义,也就是说如果producer宕机重启,依旧能保证数据只处理一次。

开启事务也很简单,首先需要开启幂等性,即设置enable.idempotence为true。然后对producer发送代码做一些小小的修改。

但无论开启幂等还是事务的特性,都会对性能有一定影响,这是必然的。所以kafka默认也并没有开启这两个特性,而是交由开发者根据自身业务特点进行处理。

21.Kafka中的事务是怎么实现的?

在说Kafka的事务之前,先要说一下Kafka中幂等的实现。幂等和事务是Kafka 0.11.0.0版本引入的两个特性,以此来实现EOS(exactly once semantics,精确一次处理语义)。

幂等,简单地说就是对接口的多次调用所产生的结果和调用一次是一致的。生产者在进行重试的时候有可能会重复写入消息,而使用Kafka的幂等性功能之后就可以避免这种情况。

开启幂等性功能的方式很简单,只需要显式地将生产者客户端参数enable.idempotence设置为true即可(这个参数的默认值为false)。

Kafka是如何具体实现幂等的呢?Kafka为此引入了producer id(以下简称PID)和序列号(sequence number)这两个概念。每个新的生产者实例在初始化的时候都会被分配一个PID,这个PID对用户而言是完全透明的。

对于每个PID,消息发送到的每一个分区都有对应的序列号,这些序列号从0开始单调递增。生产者每发送一条消息就会将对应的序列号的值加1。

broker端会在内存中为每一对维护一个序列号。对于收到的每一条消息,只有当它的序列号的值(SN_new)比broker端中维护的对应的序列号的值(SN_old)大1(即SN_new = SN_old + 1)时,broker才会接收它。

如果SN_new< SN_old + 1,那么说明消息被重复写入,broker可以直接将其丢弃。如果SN_new> SN_old + 1,那么说明中间有数据尚未写入,出现了乱序,暗示可能有消息丢失,这个异常是一个严重的异常。

引入序列号来实现幂等也只是针对每一对而言的,也就是说,Kafka的幂等只能保证单个生产者会话(session)中单分区的幂等。幂等性不能跨多个分区运作,而事务可以弥补这个缺陷。

事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能。

为了使用事务,应用程序必须提供唯一的transactionalId,这个transactionalId通过客户端参数transactional.id来显式设置。事务要求生产者开启幂等特性,因此通过将transactional.id参数设置为非空从而开启事务特性的同时需要将enable.idempotence设置为true(如果未显式设置,则KafkaProducer默认会将它的值设置为true),如果用户显式地将enable.idempotence设置为false,则会报出ConfigException的异常。

transactionalId与PID一一对应,两者之间所不同的是transactionalId由用户显式设置,而PID是由Kafka内部分配的。

另外,为了保证新的生产者启动后具有相同transactionalId的旧生产者能够立即失效,每个生产者通过transactionalId获取PID的同时,还会获取一个单调递增的producer epoch。如果使用同一个transactionalId开启两个生产者,那么前一个开启的生产者会报错。

从生产者的角度分析,通过事务,Kafka可以保证跨生产者会话的消息幂等发送,以及跨生产者会话的事务恢复。

前者表示具有相同transactionalId的新生产者实例被创建且工作的时候,旧的且拥有相同transactionalId的生产者实例将不再工作。

后者指当某个生产者实例宕机后,新的生产者实例可以保证任何未完成的旧事务要么被提交(Commit),要么被中止(Abort),如此可以使新的生产者实例从一个正常的状态开始工作。

KafkaProducer提供了5个与事务相关的方法,详细如下:

在消费端有一个参数isolation.level,与事务有着莫大的关联,这个参数的默认值为“read_uncommitted”,意思是说消费端应用可以看到(消费到)未提交的事务,当然对于已提交的事务也是可见的。

这个参数还可以设置为“read_committed”,表示消费端应用不可以看到尚未提交的事务内的消息。

举个例子,如果生产者开启事务并向某个分区值发送3条消息msg1、msg2和msg3,在执行commitTransaction()或abortTransaction()方法前,设置为“read_committed”的消费端应用是消费不到这些消息的,不过在KafkaConsumer内部会缓存这些消息,直到生产者执行commitTransaction()方法之后它才能将这些消息推送给消费端应用。反之,如果生产者执行了abortTransaction()方法,那么KafkaConsumer会将这些缓存的消息丢弃而不推送给消费端应用。

日志文件中除了普通的消息,还有一种消息专门用来标志一个事务的结束,它就是控制消息(ControlBatch)。控制消息一共有两种类型:COMMIT和ABORT,分别用来表征事务已经成功提交或已经被成功中止。

RecordBatch中attributes字段的第6位用来标识当前消息是否是控制消息。如果是控制消息,那么这一位会置为1,否则会置为0,如上图所示。

attributes字段中的第5位用来标识当前消息是否处于事务中,如果是事务中的消息,那么这一位置为1,否则置为0。由于控制消息也处于事务中,所以attributes字段的第5位和第6位都被置为1。

KafkaConsumer可以通过这个控制消息来判断对应的事务是被提交了还是被中止了,然后结合参数isolation.level配置的隔离级别来决定是否将相应的消息返回给消费端应用,如上图所示。注意ControlBatch对消费端应用不可见。

22.Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?

Kafka是一个高性能,高容错,多副本,可复制的分布式消息系统。在整个系统中,涉及到多处选举机制,被不少人搞混,这里总结一下,本篇文章大概会从三个方面来讲解。

  • 控制器(Broker)选主
  • 分区多副本选主
  • 消费组选主

控制器(Broker)选举

所谓控制器就是一个Borker,在一个kafka集群中,有多个broker节点,但是它们之间需要选举出一个leader,其他的broker充当follower角色。集群中第一个启动的broker会通过在zookeeper中创建临时节点/controller来让自己成为控制器,其他broker启动时也会在zookeeper中创建临时节点,但是发现节点已经存在,所以它们会收到一个异常,意识到控制器已经存在,那么就会在zookeeper中创建watch对象,便于它们收到控制器变更的通知。

那么如果控制器由于网络原因与zookeeper断开连接或者异常退出,那么其他broker通过watch收到控制器变更的通知,就会去尝试创建临时节点/controller,如果有一个broker创建成功,那么其他broker就会收到创建异常通知,也就意味着集群中已经有了控制器,其他broker只需创建watch对象即可。

如果集群中有一个broker发生异常退出了,那么控制器就会检查这个broker是否有分区的副本leader,如果有那么这个分区就需要一个新的leader,此时控制器就会去遍历其他副本,决定哪一个成为新的leader,同时更新分区的ISR集合。

如果有一个broker加入集群中,那么控制器就会通过Broker ID去判断新加入的broker中是否含有现有分区的副本,如果有,就会从分区副本中去同步数据。

集群中每选举一次控制器,就会通过zookeeper创建一个controller epoch,每一个选举都会创建一个更大,包含最新信息的epoch,如果有broker收到比这个epoch旧的数据,就会忽略它们,kafka也通过这个epoch来防止集群产生“脑裂”。

分区副本选举机制

在kafka的集群中,会存在着多个主题topic,在每一个topic中,又被划分为多个partition,为了防止数据不丢失,每一个partition又有多个副本,在整个集群中,总共有三种副本角色:

  • 首领副本(leader):也就是leader主副本,每个分区都有一个首领副本,为了保证数据一致性,所有的生产者与消费者的请求都会经过该副本来处理。
  • 跟随者副本(follower):除了首领副本外的其他所有副本都是跟随者副本,跟随者副本不处理来自客户端的任何请求,只负责从首领副本同步数据,保证与首领保持一致。如果首领副本发生崩溃,就会从这其中选举出一个leader。
  • 首选首领副本:创建分区时指定的首选首领。如果不指定,则为分区的第一个副本。

follower需要从leader中同步数据,但是由于网络或者其他原因,导致数据阻塞,出现不一致的情况,为了避免这种情况,follower会向leader发送请求信息,这些请求信息中包含了follower需要数据的偏移量offset,而且这些offset是有序的。

如果有follower向leader发送了请求1,接着发送请求2,请求3,那么再发送请求4,这时就意味着follower已经同步了前三条数据,否则不会发送请求4。leader通过跟踪 每一个follower的offset来判断它们的复制进度。

默认的,如果follower与leader之间超过10s内没有发送请求,或者说没有收到请求数据,此时该follower就会被认为“不同步副本”。而持续请求的副本就是“同步副本”,当leader发生故障时,只有“同步副本”才可以被选举为leader。其中的请求超时时间可以通过参数replica.lag.time.max.ms参数来配置。

我们希望每个分区的leader可以分布到不同的broker中,尽可能的达到负载均衡,所以会有一个首选首领,如果我们设置参数auto.leader.rebalance.enable为true,那么它会检查首选首领是否是真正的首领,如果不是,则会触发选举,让首选首领成为首领。

消费组选主

在kafka的消费端,会有一个消费者协调器以及消费组,组协调器GroupCoordinator需要为消费组内的消费者选举出一个消费组的leader,那么如何选举的呢?

如果消费组内还没有leader,那么第一个加入消费组的消费者即为消费组的leader,如果某一个时刻leader消费者由于某些原因退出了消费组,那么就会重新选举leader,如何选举?

上面代码是kafka源码中的部分代码,member是一个hashmap的数据结构,key为消费者的member_id,value是元数据信息,那么它会将leaderId选举为Hashmap中的第一个键值对,它和随机基本没啥区别。

对于整个选举算法的详情需要先了解Raft选举算法,kafka是基于该算法来实现leader选举的。

23.Kafka 与传统消息系统之间有三个关键区别

Kafka 持久化日志,这些日志可以被重复读取和无限期保留
Kafka 是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数据提升容错能力和高可用性
Kafka 支持实时的流式处理

24.Kafka 创建 Topic 时如何将分区放置到不同的 Broker 中

副本因子不能大于 Broker 的个数,第一个分区(编号为 0)的第一个副本放置位置是随机从 brokerList 选择的,其他分区的第一个副本放置位置相对于第0个分区依次往后移。也就是如果我们有5个 Broker,5个分区,假设第一个分区放在第四个 Broker 上,那么第二个分区将会放在第五个 Broker 上,第三个分区将会放在第一个 Broker 上;第四个分区将会放在第二个 Broker 上,依次类推,剩余的副本相对于第一个副本放置位置其实是由 nextReplicaShift 决定的,而这个数也是随机产生的。

25.Kafka 新建的分区会在哪个目录下创建?

在启动 Kafka 集群之前,我们需要配置好 log.dirs 参数,其值是 Kafka 数据的存放目录,这个参数可以配置多个目录,目录之间使用逗号分隔,通常这些目录是分布在不同的磁盘
上用于提高读写性能。

当然我们也可以配置 log.dir 参数,含义一样。只需要设置其中一个即可。

如果 log.dirs 参数只配置了一个目录,那么分配到各个 Broker 上的分区肯定只能在这个目录下创建文件夹用于存放数据。

但是如果 log.dirs 参数配置了多个目录,那么 Kafka 会在哪个文件夹中创建分区目录呢?

答案是:Kafka 会在含有分区目录最少的文件夹中创建新的分区目录,分区目录名为 Topic 名+分区 ID。注意,是分区文件夹总数最少的目录,而不是磁盘使用量最少的目录!也就是说,如果你给 log.dirs 参数新增了一个新的磁盘,新的分区目录肯定是先在这个新的磁盘上创建直到这个新的磁盘目录拥有的分区目录不是最少为止。

26.partition 的数据如何保存到硬盘

topic 中的多个 partition 以文件夹的形式保存到 broker,每个分区序号从 0 递增,且消息有序 Partition 文件下有多个 segment(xxx.index,xxx.log)segment 文件里的 大小和配置文件大小一致可以根据要求修改 默认为 1g 如果大小大于 1g 时,会滚动一个新的 segment 并且以上一个 segment 最后一条消息的偏移量命名。

27.Kafka 判断一个节点是否还活着有那两个条件?

节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接
如果节点是个 follower,他必须能及时的同步 leader 的写操作,延时不能太久

0

kafka集群升级导致broker.id发生变动变动引起的问题

最近遇到一个问题,由于kafka集群升级导致每个broker.id出现了变动,但是topic的partition所在broker的信息依旧是原broker.id,这就造成了所有partition都丢失leader。

发现问题

通过kafka的命令发现topic的partition都没有leader,然后看了下集群监控和日志都没有明显的异常,但是日常显示的broker.id和通过kafka命令显示的有出入,进一步去zookeeper上查看了所有broker.id情况,发现是broker.id变了导致kafka controller无法通过partition依赖的broker.id找到对应的broker。

先说明一下自动生成broker.id的由来,kafka开启了broker.id.generation.enable,也就是自动生成broker.id功能,kafka在早期版本需要人为给每个broker分配一个broker.id,在server.properties配置里申明,从0.9.0版本开始,kafka支持并默认了自动生成broker.id功能。那么kafka是如何自动生成broker.id的呢?有一点是必须的,broker.id在这个kafka集群必须是全局唯一的,分布式下的全局唯一,那就联想到分布式锁了。kafka通过在zookeeper里写空字符串,触发该znode的版本自增,然后把获取的版本号和另一个配置reserved.broker.max.id相加,就得到了该broker节点的broker.id。

kafka查看topic所有partition依赖的broker,leader以及isr

zk查看当前kafka集群自增的broker序号

zk查看当前kafka集群所有的broker.id

zk查看topic所有partition的broker.id

解决问题

手动删除错误broker.id的topic,删除完成后重启kafka集群

0

SpringBoot里使用log4j2把日志推入kafka

生产环境里,有的时候查看日志不太方便,这时,我们可以把平常输出到控制台里的日志输出到kafka里,再进行相应处理。

pom文件

需要注意的是,排除掉默认的spring-boot-starter-logging,引用spring-boot-starter-log4j2

application.properties

log4j2.xml

Controller

0