Kafka 丢消息的处理 - 今日头条

本文由 简悦 SimpRead 转码, 原文地址 www.toutiao.com

刷盘触发条件有三:主动调用 sync 或 fsync 函数可用内存低于阀值 dirtydata 时间达到阀值。 long lastO

https://p3.toutiaoimg.com/origin/pgc-image/ba7a945dd218455ea878d47724611392?from=pc

Kafka 存在丢消息的问题,消息丢失会发生在 Broker,Producer 和 Consumer 三种。

Broker 丢失消息是由于 Kafka 本身的原因造成的,kafka 为了得到更高的性能和吞吐量,将数据异步批量的存储在磁盘中。消息的刷盘过程,为了提高性能,减少刷盘次数,kafka 采用了批量刷盘的做法。即,按照一定的消息量,和时间间隔进行刷盘。这种机制也是由于 linux 操作系统决定的。将数据存储到 linux 操作系统种,会先存储到页缓存(Page cache)中,按照时间或者其他条件进行刷盘(从 page cache 到 file),或者通过 fsync 命令强制刷盘。数据在 page cache 中时,如果系统挂掉,数据会丢失。

https://p3.toutiaoimg.com/origin/pgc-image/90e2a2ad61184cd6a538dfe716bd4296?from=pc

Broker 在 linux 服务器上高速读写以及同步到 Replica

上图简述了 broker 写数据以及同步的一个过程。broker 写数据只写到 PageCache 中,而 pageCache 位于内存。这部分数据在断电后是会丢失的。pageCache 的数据通过 linux 的 flusher 程序进行刷盘。刷盘触发条件有三:

  • 主动调用 sync 或 fsync 函数
  • 可用内存低于阀值
  • dirty data 时间达到阀值。dirty 是 pagecache 的一个标识位,当有数据写入到 pageCache 时,pagecache 被标注为 dirty,数据刷盘以后,dirty 标志清除。

Broker 配置刷盘机制,是通过调用 fsync 函数接管了刷盘动作。从单个 Broker 来看,pageCache 的数据会丢失。

Kafka 没有提供同步刷盘的方式。同步刷盘在 RocketMQ 中有实现,实现原理是将异步刷盘的流程进行阻塞,等待响应,类似 ajax 的 callback 或者是 java 的 future。下面是一段 rocketmq 的源码。

1
2
3
4
5
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());

service.putRequest(request);

boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // 刷盘

为了解决该问题,kafka 通过 producer 和 broker 协同处理单个 broker 丢失参数的情况。一旦 producer 发现 broker 消息丢失,即可自动进行 retry。除非 retry 次数超过阀值(可配置),消息才会丢失。此时需要生产者客户端手动处理该情况。那么 producer 是如何检测到数据丢失的呢?是通过 ack 机制,类似于 http 的三次握手的方式。

  • acks=0,producer 不等待 broker 的响应,效率最高,但是消息很可能会丢。
  • acks=1,leader broker 收到消息后,不等待其他 follower 的响应,即返回 ack。也可以理解为 ack 数为 1。此时,如果 follower 还没有收到 leader 同步的消息 leader 就挂了,那么消息会丢失。按照上图中的例子,如果 leader 收到消息,成功写入 PageCache 后,会返回 ack,此时 producer 认为消息发送成功。但此时,按照上图,数据还没有被同步到 follower。如果此时 leader 断电,数据会丢失。
  • acks=-1,leader broker 收到消息后,挂起,等待所有 ISR 列表中的 follower 返回结果后,再返回 ack。-1 等效与 all。这种配置下,只有 leader 写入数据到 pagecache 是不会返回 ack 的,还需要所有的 ISR 返回 “成功” 才会触发 ack。如果此时断电,producer 可以知道消息没有被发送成功,将会重新发送。如果在 follower 收到数据以后,成功返回 ack,leader 断电,数据将存在于原来的 follower 中。在重新选举以后,新的 leader 会持有该部分数据。数据从 leader 同步到 follower,需要 2 步:数据从 pageCache 被刷盘到 disk。因为只有 disk 中的数据才能被同步到 replica。数据同步到 replica,并且 replica 成功将数据写入 PageCache。在 producer 得到 ack 后,哪怕是所有机器都停电,数据也至少会存在于 leader 的磁盘内。

上面第三点提到了 ISR 的列表的 follower,需要配合另一个参数才能更好的保证 ack 的有效性。ISR 是 Broker 维护的一个 “可靠的 follower 列表”,in-sync Replica 列表,broker 的配置包含一个参数:min.insync.replicas。该参数表示 ISR 中最少的副本数。如果不设置该值,ISR 中的 follower 列表可能为空。此时相当于 acks=1。

https://p3.toutiaoimg.com/origin/pgc-image/12eae60ccef543c08f22a37f7f73ff5e?from=pc

如上图中:

  • acks=0,总耗时 f(t) = f(1)。
  • acks=1,总耗时 f(t) = f(1) + f(2)。
  • acks=-1,总耗时 f(t) = f(1) + max( f(A) , f(B) ) + f(2)。

性能依次递减,可靠性依次升高。

Producer 丢失消息,发生在生产者客户端。

为了提升效率,减少 IO,producer 在发送数据时可以将多个请求进行合并后发送。被合并的请求咋发送一线缓存在本地 buffer 中。缓存的方式和前文提到的刷盘类似,producer 可以将请求打包成 “块” 或者按照时间间隔,将 buffer 中的数据发出。通过 buffer 我们可以将生产者改造为异步的方式,而这可以提升我们的发送效率。

但是,buffer 中的数据就是危险的。在正常情况下,客户端的异步调用可以通过 callback 来处理消息发送失败或者超时的情况,但是,一旦 producer 被非法的停止了,那么 buffer 中的数据将丢失,broker 将无法收到该部分数据。又或者,当 Producer 客户端内存不够时,如果采取的策略是丢弃消息(另一种策略是 block 阻塞),消息也会被丢失。抑或,消息产生(异步产生)过快,导致挂起线程过多,内存不足,导致程序崩溃,消息丢失。

https://p3.toutiaoimg.com/origin/pgc-image/fe58a19299ce4e3ba3defc65b3945400?from=pc

producer 采取批量发送的示意图

https://p3.toutiaoimg.com/origin/pgc-image/a245fa29829349048c6ed9924ecff54b?from=pc

异步发送消息生产速度过快的示意图

根据上图,可以想到几个解决的思路:

  • 异步发送消息改为同步发送消。或者 service 产生消息时,使用阻塞的线程池,并且线程数有一定上限。整体思路是控制消息产生速度。
  • 扩大 Buffer 的容量配置。这种方式可以缓解该情况的出现,但不能杜绝。
  • service 不直接将消息发送到 buffer(内存),而是将消息写到本地的磁盘中(数据库或者文件),由另一个(或少量)生产线程进行消息发送。相当于是在 buffer 和 service 之间又加了一层空间更加富裕的缓冲层。

Consumer 消费消息有下面几个步骤:

  1. 接收消息
  2. 处理消息
  3. 反馈 “处理完毕”(commited)

Consumer 的消费方式主要分为两种:

  • 自动提交 offset,Automatic Offset Committing
  • 手动提交 offset,Manual Offset Control

Consumer 自动提交的机制是根据一定的时间间隔,将收到的消息进行 commit。commit 过程和消费消息的过程是异步的。也就是说,可能存在消费过程未成功(比如抛出异常),commit 消息已经提交了。此时消息就丢失了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 自动提交开关
props.put("enable.auto.commit", "true");
// 自动提交的时间间隔,此处是1s
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
        // 调用poll后,1000ms后,消息状态会被改为 committed
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records)
    insertIntoDB(record); // 将消息入库,时间可能会超过1000ms
}

上面的示例是自动提交的例子。如果此时,insertIntoDB(record) 发生异常,消息将会出现丢失。接下来是手动提交的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 关闭自动提交,改为手动提交
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
        // 调用poll后,不会进行auto commit
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
    buffer.add(record);
  }
  if (buffer.size() >= minBatchSize) {
    insertIntoDb(buffer);
                // 所有消息消费完毕以后,才进行commit操作
    consumer.commitSync();
    buffer.clear();
  }
}

将提交类型改为手动以后,可以保证消息 “至少被消费一次”(at least once)。但此时可能出现重复消费的情况,重复消费不属于本篇讨论范围。

上面两个例子,是直接使用 Consumer 的 High level API,客户端对于 offset 等控制是透明的。也可以采用 Low level API 的方式,手动控制 offset,也可以保证消息不丢,不过会更加复杂。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
try {
     while(running) {
         ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
         for (TopicPartition partition : records.partitions()) {
             List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
             for (ConsumerRecord<String, String> record : partitionRecords) {
                 System.out.println(record.offset() + ": " + record.value());
             }
             long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
             // 精确控制offset
             consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
         }
     }
 } finally {
   consumer.close();
 }