目录

Kafka消息不提交的情况问题排查

注意
本文最后更新于 2022-06-12,文中内容可能已过时。

最近我在我司接手了Kafka这个三方件的owner,每天都有人找我定位问题,其中有一个问题表象是:

通过kafka-consumer-groups.sh命令查看到master消费组里,一个Topic的一个分区一直有数据写入,但未被消费,LAG一直在增大。需要证实是Kafka服务端或客户端的问题,还是对方微服务的问题。

对方称自己微服务一直消费不到消息,微服务进程也没有挂起。

服务端侧无法得知消息是否已被拉取但未被提交的

构造一个消费者拉取到了消息,但是不提交的场景,如下消费者代码所示:

enable.auto.commit=false关闭了自动提交功能,并且也不手动提交,每10秒拉取一次消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.32.51:9092,192.168.32.52:9092,192.168.32.53:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id", "group1");
        props.put("enable.auto.commit", "false");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));

        System.out.println("++++++++++++++++++++++");
        for (int i = 0; i < 10; i++) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
            System.out.println("================== records.count() = " + records.count());
            records.iterator().forEachRemaining(System.out::println);
//            consumer.commitSync();
            System.out.println("================== poll");
        }

输出日志如下:

/posts/20220612130722/0eafcf32f27a4b1d8bb189c5b02b5808.png

这段消费者代码重复运行多次,每次都可以拉取到这4条未提交的消息。

在服务端使用命令查看消费者组详情,可知分区1和分区2的都存在LAG,各2条。

因此如果在服务端侧通过运维命令来查看,是无法得知消息是否已被拉取但未被提交的。

1
./kafka-consumer-groups.sh --describe --group group1 --bootstrap-server 192.168.32.51:9092,192.168.32.52:9092,192.168.32.53:9092

/posts/20220612130722/794adafcd2024006b327150109524091.png

已拉取但未提交的消息,会在消费者组触发rebalancing后被重新重复消费

还是上面的消费者代码,继续保持拉取但不提交。

第一个消费者实例从分区1和分区2各拉取了2条消息:

/posts/20220612130722/a056704897064eaaa12fb8f3fd0495b2.png

再起第二个消费者实例,两个实例都是同一份代码,即同一个消费者组,这时触发了rebalancing重新分配消费者对应分区,因为

消费者组里的一个消费者能绑定多个分区,但不能绑定同一个组里其他消费者绑定了的分区。

以下为第一个消费者实例的日志,在重新指定了分区后,这时它还可以再次拉取到其中一个分区的2条未提交的消息。另一个分区已经指定给了第二个消费者实例。

/posts/20220612130722/3cd2f6b8b4434f9bab97f8b1de756de8.png

因此已拉取但未提交的消息,会在消费者组发生变动时触发rebalancing后被重新重复消费。

客户端Debug日志确认是否已经拉取了消息

org.apache.kafka.clients.consumer.internals.Fetcher类打开Debug日志后,搜索关键字Fetch READ_UNCOMMITTED,当recordsSizeInBytes不为0时,说明此次从partition yyq-1拉取到了消息数据。

/posts/20220612130722/c33bdd424905421ab82a2a030786c534.png

也可以从org.apache.kafka.clients.NetworkClient类的Debug日志中搜索

客户端Debug日志确认是否提交消息

修改代码进行手动提交。

org.apache.kafka.clients.NetworkClient 类打开Debug日志后,搜索Sending OFFSET_COMMIT关键字,观察其中的OffsetCommitRequestPartition,可以知道对本次拉取的所有分区的数据的offset做了提交。

/posts/20220612130722/a905ff3a81844a40826ca31c3050709e.png