kafka采用Consumer消费者Pull主动拉取数据的方式,当Broker无数据时,消费者空转。Kafka并不删除已消费的消息,各自独立的消费者可消费同一个Broker分区数据。

消费流程

1、消费者发起网络消费请求

# 每批次最小抓取设置(推荐1字节)
fetch.min.bytes
# 每批次最大抓取大小设置(推荐500ms)
fetch.max.bytes
# 未达到大小的超时设置(推荐50M)
fetch.max.wait.ms

2、拉取数据到内存消费队列中

# 单次拉取最大消息条数设置(推荐500条)
max.poll.records

2.1、反序列化处理(对应了Producer端的序列化动作)

2.2、拦截器处理(如:汇总统计记录)

3、数据的后续处理

保存等的消费端动作。

offset

当一个消费者挂掉或重启后,是否还记得消费到的位置了?offset解决了此问题。

对于每一个topic,都会维持一个分区日志,分区中的每一个记录都会分配一个Id来表示顺序,称之为offset,offset用来唯一的标识分区中每条记录,并将每次的消费位置提交到topic中。消费者恢复启动后接着按序消费数据。

自动提交

# 开启自动提交
enable.auto.commit = true
# 每次提交间隔(推荐5秒)
auto.commit.interval.ms = 5000

手动提交

先关闭自动提交后,在Consumer客户端的代码中,通过调用方法函数提交,通常的方法名:

# 同步提交,等提交完成才可下一次再消费
.CommitSync
# 异步提交,可直接进行下一个消费,也有可能提交失败
.CommitAync

指定消费

在Consumer客户端的代码中,手动指定offset的位置进行消费,关联到的方法函数名:

# 按指定时间得出offset值
.offsetsForTimes
# 按指定offset值继续消费
.seek

初始策略

# earliest:	最早消费;无offset时,从头开始消费。
# latest: 最新消费;无offset时,从最新的数据开始消费。
# none: 无offset时,引发异常。
auto.offset.reset = earliest | latest | none

消费现象

重复消费:offset未提交成功,下次消费还是旧的offset。

漏消费:offset提交成功,消费者端后续的数据处理未完成(建议下游步骤事务处理)。

消费者组

为了实现横向扩展,应用程序需要创建一个消费者群组,然后往群组里添加消费者来提高处理效率,群组里的每个消费者只处理一部分消息。

消费者组是逻辑上的一个消费者,是由一个或多个消费者实例组成,具有可扩展性和可容错性,消费者组内的消费者共享一个GroupId组成;组内每个消费者负责消费不同分区数据,并行消费数据;当组内一个消费者挂了之后,其它消费者要自动承担它的消费任务 - 组内再平衡

触发再平衡

消费成员与Broker分区保持心跳连接,或者消费成员处理消息时间过长,会被认为此消费者需要被移除,触发组内消费成员任务再分配。以下配置任其一条件触发再平衡:

# 心跳连接超时的 移除条件(建议45秒)
session.timeout.ms
# 消息处理超时的 移除条件(建议5分钟)
max.poll.interval.ms

再平衡策略

# 再平衡策略配置项(可多策略组合)
partition.assignment.strategy = Range | RoundRobin | Sticky | CooperativeSticky
  • Range:单个Topic内的重新平均分配
  • RoundRobin:所有Topic的全部消费者,一起重新分配
  • Sticky:一次小范围重新分配;仅调整需要的,避免大规模重新分配
  • CooperativeSticky:可多次小范围重新调整,直至最终效果

提升吞吐量

  • 增加分区,增加消费者,两者一一对应起来,并行消费
  • 调整一次最多拉取的消息条数(500条)
  • 调整单次抓取的数据最大容量(50M)

我的 Kafka 旅程 - Consumer的更多相关文章

  1. [Kafka] - Kafka Java Consumer实现(一)

    Kafka提供了两种Consumer API,分别是:High Level Consumer API 和 Lower Level Consumer API(Simple Consumer API) H ...

  2. 关于Kafka 的 consumer 消费者处理的一些见解

    前言 在上一篇 Kafka使用Java实现数据的生产和消费demo 中介绍如何简单的使用kafka进行数据传输.本篇则重点介绍kafka中的 consumer 消费者的讲解. 应用场景 在上一篇kaf ...

  3. Kafka Producer Consumer

    Producer API org.apache.kafka.clients.producer.KafkaProducer props.put("bootstrap.servers" ...

  4. CDH下集成spark2.2.0与kafka(四十一):在spark+kafka流处理程序中抛出错误java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V

    错误信息 19/01/15 19:36:40 WARN consumer.ConsumerConfig: The configuration max.poll.records = 1 was supp ...

  5. 关于Kafka java consumer管理TCP连接的讨论

    本篇是<关于Kafka producer管理TCP连接的讨论>的续篇,主要讨论Kafka java consumer是如何管理TCP连接.实际上,这两篇大部分的内容是相同的,即consum ...

  6. object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord)

    3. object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord)   val stream = ...

  7. [Kafka] - Kafka Java Consumer实现(二)

    Kafka提供了两种Consumer API,分别是:High Level Consumer API 和 Lower Level Consumer API(Simple Consumer API) H ...

  8. 【Kafka】Consumer配置

    从0.9.0.0开始,下面是消费者的配置. 名称 描述 类型 默认值 bootstrap.servers 消费者初始连接kafka集群时的地址列表.不管这边配置的什么地址,消费者会使用所有的kafka ...

  9. 5.kafka API consumer

    1.kafka consumer流程1.1.在启动时或者协调节点故障转移时,消费者发送ConsumerMetadataRequest给bootstrap brokers列表中的任意一个brokers. ...

随机推荐

  1. NodeJS 基于 Dapr 构建云原生微服务应用,从 0 到 1 快速上手指南

    Dapr 是一个可移植的.事件驱动的运行时,它使任何开发人员能够轻松构建出弹性的.无状态和有状态的应用程序,并可运行在云平台或边缘计算中,它同时也支持多种编程语言和开发框架.Dapr 确保开发人员专注 ...

  2. 1000-ms-HashMap 线程安全安全问题

    问题: HashMap是否是线程安全 详解 http://www.importnew.com/21396.html 有源码分析 和代码性能比较 CHM性能最好 HashMap不是线程安全的:Hasht ...

  3. Thingsboard硬网关金鸽BL102采集三菱PLC步骤

    PLC网关金鸽BL102:采集三菱FX-5U数据如何转成MQTT上报?金鸽BL102PLC网关时一款功能强大的PLC数据采集网关,南向可以采集主流的PLC,如三菱.西门子.台达.欧姆龙.施耐德等等PL ...

  4. React报错之Object is possibly null

    正文从这开始~ 类型守卫 使用类型守卫来解决React中useRef钩子"Object is possibly null"的错误.比如说,if (inputRef.current) ...

  5. day13--Java常用类

    Java常用类 1.包装类 1.1什么是包装类? Java 是面向对象的语言,但不是"纯面向对象"的,比如我们经常用到的基本数据类型就不是对象. 在我们实际应用中,经常需要将基本数 ...

  6. Spring提供的API实现文件上传

    Spring为我们提供了文件上传接口MultipartRequest及其实现类StandardMultipartFile StandardMultipartFile是StandardMultipart ...

  7. 海豚调度直播来了 - 即将发版的1.3.0新特性及Roadmap路线

    在过去的3个多月,Apache DolphinScheduler(incuating)和DolphinScheduler社区发生了很多变化,今晚19:30在线直播将为大家介绍最新1.3.0的新特性及R ...

  8. LuoguP1799 数列_NOI导刊2010提高 (动态规划)

    $ f[j]=max(f[i−1][j],f[i−1][j−1]+(x == j) $ #include <iostream> #include <cstdio> #inclu ...

  9. Express 设置请求跨域

    import cors from "cors"; import express from "express"; const app = express(); a ...

  10. Excelize 发布 2.6.0 版本,功能强大的 Excel 文档基础库

    Excelize 是 Go 语言编写的用于操作 Office Excel 文档基础库,基于 ECMA-376,ISO/IEC 29500 国际标准.可以使用它来读取.写入由 Microsoft Exc ...