本文由 简悦 SimpRead 转码, 原文地址 www.toutiao.com
刷盘触发条件有三:主动调用 sync 或 fsync 函数可用内存低于阀值 dirtydata 时间达到阀值。 long lastO
Kafka 存在丢消息的问题,消息丢失会发生在 Broker,Producer 和 Consumer 三种。
Broker 丢失消息是由于 Kafka 本身的原因造成的,kafka 为了得到更高的性能和吞吐量,将数据异步批量的存储在磁盘中。消息的刷盘过程,为了提高性能,减少刷盘次数,kafka 采用了批量刷盘的做法。即,按照一定的消息量,和时间间隔进行刷盘。这种机制也是由于 linux 操作系统决定的。将数据存储到 linux 操作系统种,会先存储到页缓存(Page cache)中,按照时间或者其他条件进行刷盘(从 page cache 到 file),或者通过 fsync 命令强制刷盘。数据在 page cache 中时,如果系统挂掉,数据会丢失。
Broker 在 linux 服务器上高速读写以及同步到 Replica
上图简述了 broker 写数据以及同步的一个过程。broker 写数据只写到 PageCache 中,而 pageCache 位于内存。这部分数据在断电后是会丢失的。pageCache 的数据通过 linux 的 flusher 程序进行刷盘。刷盘触发条件有三:
- 主动调用 sync 或 fsync 函数
- 可用内存低于阀值
- dirty data 时间达到阀值。dirty 是 pagecache 的一个标识位,当有数据写入到 pageCache 时,pagecache 被标注为 dirty,数据刷盘以后,dirty 标志清除。
Broker 配置刷盘机制,是通过调用 fsync 函数接管了刷盘动作。从单个 Broker 来看,pageCache 的数据会丢失。
Kafka 没有提供同步刷盘的方式。同步刷盘在 RocketMQ 中有实现,实现原理是将异步刷盘的流程进行阻塞,等待响应,类似 ajax 的 callback 或者是 java 的 future。下面是一段 rocketmq 的源码。
|
|
为了解决该问题,kafka 通过 producer 和 broker 协同处理单个 broker 丢失参数的情况。一旦 producer 发现 broker 消息丢失,即可自动进行 retry。除非 retry 次数超过阀值(可配置),消息才会丢失。此时需要生产者客户端手动处理该情况。那么 producer 是如何检测到数据丢失的呢?是通过 ack 机制,类似于 http 的三次握手的方式。
- acks=0,producer 不等待 broker 的响应,效率最高,但是消息很可能会丢。
- acks=1,leader broker 收到消息后,不等待其他 follower 的响应,即返回 ack。也可以理解为 ack 数为 1。此时,如果 follower 还没有收到 leader 同步的消息 leader 就挂了,那么消息会丢失。按照上图中的例子,如果 leader 收到消息,成功写入 PageCache 后,会返回 ack,此时 producer 认为消息发送成功。但此时,按照上图,数据还没有被同步到 follower。如果此时 leader 断电,数据会丢失。
- acks=-1,leader broker 收到消息后,挂起,等待所有 ISR 列表中的 follower 返回结果后,再返回 ack。-1 等效与 all。这种配置下,只有 leader 写入数据到 pagecache 是不会返回 ack 的,还需要所有的 ISR 返回 “成功” 才会触发 ack。如果此时断电,producer 可以知道消息没有被发送成功,将会重新发送。如果在 follower 收到数据以后,成功返回 ack,leader 断电,数据将存在于原来的 follower 中。在重新选举以后,新的 leader 会持有该部分数据。数据从 leader 同步到 follower,需要 2 步:数据从 pageCache 被刷盘到 disk。因为只有 disk 中的数据才能被同步到 replica。数据同步到 replica,并且 replica 成功将数据写入 PageCache。在 producer 得到 ack 后,哪怕是所有机器都停电,数据也至少会存在于 leader 的磁盘内。
上面第三点提到了 ISR 的列表的 follower,需要配合另一个参数才能更好的保证 ack 的有效性。ISR 是 Broker 维护的一个 “可靠的 follower 列表”,in-sync Replica 列表,broker 的配置包含一个参数:min.insync.replicas。该参数表示 ISR 中最少的副本数。如果不设置该值,ISR 中的 follower 列表可能为空。此时相当于 acks=1。
如上图中:
- acks=0,总耗时 f(t) = f(1)。
- acks=1,总耗时 f(t) = f(1) + f(2)。
- acks=-1,总耗时 f(t) = f(1) + max( f(A) , f(B) ) + f(2)。
性能依次递减,可靠性依次升高。
Producer 丢失消息,发生在生产者客户端。
为了提升效率,减少 IO,producer 在发送数据时可以将多个请求进行合并后发送。被合并的请求咋发送一线缓存在本地 buffer 中。缓存的方式和前文提到的刷盘类似,producer 可以将请求打包成 “块” 或者按照时间间隔,将 buffer 中的数据发出。通过 buffer 我们可以将生产者改造为异步的方式,而这可以提升我们的发送效率。
但是,buffer 中的数据就是危险的。在正常情况下,客户端的异步调用可以通过 callback 来处理消息发送失败或者超时的情况,但是,一旦 producer 被非法的停止了,那么 buffer 中的数据将丢失,broker 将无法收到该部分数据。又或者,当 Producer 客户端内存不够时,如果采取的策略是丢弃消息(另一种策略是 block 阻塞),消息也会被丢失。抑或,消息产生(异步产生)过快,导致挂起线程过多,内存不足,导致程序崩溃,消息丢失。
producer 采取批量发送的示意图
异步发送消息生产速度过快的示意图
根据上图,可以想到几个解决的思路:
- 异步发送消息改为同步发送消。或者 service 产生消息时,使用阻塞的线程池,并且线程数有一定上限。整体思路是控制消息产生速度。
- 扩大 Buffer 的容量配置。这种方式可以缓解该情况的出现,但不能杜绝。
- service 不直接将消息发送到 buffer(内存),而是将消息写到本地的磁盘中(数据库或者文件),由另一个(或少量)生产线程进行消息发送。相当于是在 buffer 和 service 之间又加了一层空间更加富裕的缓冲层。
Consumer 消费消息有下面几个步骤:
- 接收消息
- 处理消息
- 反馈 “处理完毕”(commited)
Consumer 的消费方式主要分为两种:
- 自动提交 offset,Automatic Offset Committing
- 手动提交 offset,Manual Offset Control
Consumer 自动提交的机制是根据一定的时间间隔,将收到的消息进行 commit。commit 过程和消费消息的过程是异步的。也就是说,可能存在消费过程未成功(比如抛出异常),commit 消息已经提交了。此时消息就丢失了。
|
|
上面的示例是自动提交的例子。如果此时,insertIntoDB(record)
发生异常,消息将会出现丢失。接下来是手动提交的例子:
|
|
将提交类型改为手动以后,可以保证消息 “至少被消费一次”(at least once)。但此时可能出现重复消费的情况,重复消费不属于本篇讨论范围。
上面两个例子,是直接使用 Consumer 的 High level API,客户端对于 offset 等控制是透明的。也可以采用 Low level API 的方式,手动控制 offset,也可以保证消息不丢,不过会更加复杂。
|
|