您好,欢迎来到化拓教育网。
搜索
您的当前位置:首页kafka consumer 自动提交 offset

kafka consumer 自动提交 offset

来源:化拓教育网

org.apache.kafka.clients.consumer.KafkaConsumer#pollOnce

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
    client.maybeTriggerWakeup();

    long startMs = time.milliseconds();
    // 这里面触发自动提交
    coordinator.poll(startMs, timeout);

    // Lookup positions of assigned partitions
    boolean hasAllFetchPositions = updateFetchPositions();

    // 对拉取到的数据,更新 position
    Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    if (!records.isEmpty())
        return records;

    // 发送拉取数据请求
    fetcher.sendFetches();

    long nowMs = time.milliseconds();
    long remainingTimeMs = Math.max(0, timeout - (nowMs - startMs));
    long pollTimeout = Math.min(coordinator.timeToNextPoll(nowMs), remainingTimeMs);

    // We do not want to be stuck blocking in poll if we are missing some positions
    // since the offset lookup may be backing off after a failure
    if (!hasAllFetchPositions && pollTimeout > retryBackoffMs)
        pollTimeout = retryBackoffMs;

    client.poll(pollTimeout, nowMs, new PollCondition() {
        @Override
        public boolean shouldBlock() {
            // since a fetch might be completed by the background thread, we need this poll condition
            // to ensure that we do not block unnecessarily in poll()
            return !fetcher.hasCompletedFetches();
        }
    });

    // after the long poll, we should check whether the group needs to rebalance
    // prior to returning data so that the group can stabilize faster
    if (coordinator.needRejoin())
        return Collections.emptyMap();

    return fetcher.fetchedRecords();
}

结论就是:consumer 拉取到消息后,会更新保存的位点信息,下次拉取消息前,若自动提交的时间到了,就会把位点信息提交到 broker。

转载于:https://www.cnblogs.com/allenwas3/p/11473827.html

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- huatuo9.cn 版权所有 赣ICP备2023008801号-1

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务