既然包名是api,说明里面肯定都是一些常用的Kafka API了。

一、ApiUtils.scala
顾名思义,就是一些常见的api辅助类,定义的方法包括:
1. readShortString: 从一个ByteBuffer中读取字符串长度和字符串。这个ByteBuffer的格式应该是:2个字节的字符串长度值N+N个字节的字符串
2. writeShortString: 与readShortString相反,先写入2个字节的长度N,然后写入N个字节到ByteBuffer中
3. shortStringLength: 获取符合上面方法中格式的ByteBuffer长度——即2+N
4. readIntInRange: 返回ByteBuffer当前位置出的一个整数值并判断是否在给定的范围内。如果不在直接抛出异常,但其实调用这个方法时总是传入Int.MaxValue,所以通常要是满足的。这个整数值可以代表分区数、分区id、副本数、ISR数或topic数
5. readShortInRange: 与readIntInRange类似,只是这个方法读取一个2字节的short数,这个short数通常都是被用作error code
6. readLongInRange: 与前两个类似,只是它读取一个Long型的数,不过这个方法貌似没有被调用过
二、RequestOrResponse.scala
Kafka中有很多种客户请求(request),该文件定义了一个Request object抽象出了所有请求共同拥有的属性:
OrdinaryConsumerId: 表示follower的副本id
DebuggingConsumerId: 仅供Debug使用
isValidBrokerId: 是否是合法的Broker id,必须是非负值
下面还定义了一个抽象类,这个类特别重要,因为后面所有种类的请求或响应都继承了该类
RequestOrResponse类——即请求或响应类
如果是表示请求,那么子类必须传入一个requestId表示请求的种类(具体种类在RequestKeys中定义);如果是表示响应,那么子类不需要传入任何参数直接调用无参构造函数。这个多功能类定义了4个抽象方法:
1. sizeInBytes: 计算请求或响应所占字节数
2. writeTo: 将请求或响应写入ByteBuffer
3. handleError: 主要用于处理请求时的错误
4. describe: ​只用于请求,返回对该请求的一个描述字符串
三、RequestKeys.scala
定义了所有的请求种类,包括ProduceKey、FetchKey、OffsetsKey等,每种请求都有一个编号。另外还定义了一个Map将请求种类编号与读取请求或响应的函数关联起来以及两个对应的方法分别返回请求种类的名称以及对应的解析函数。
四、GenericRequestAndHeader.scala
一个抽象类,继承了RequestOrResponse类,自然也要实现RequestOrResponse类定义的4个抽象方法
1. writeTo: 写入版本、correlationId、clientId和body
2. sizeInBytes: 2个字节的版本号+4个字节的correlation号+(2 + N)个字节的客户端id+body的字节数
3. toString/describe: 两个方法一起构成了请求的描述字符串
五、GenericResponseAndHeader.scala
与GenericRequestAndHeader对应的response类,代码中写的是extends RequestOrResponse(requestId),由于所有response应该是extends RequestOrResponse(),所以我谨慎的怀疑它这里写错了,不过反正requestId也没有在该类中使用。该类因为继承了RequestOrResponse,自然也要实现那4个方法: writeTo,sizeInBytes, toString和describe。这里就不赘述了。
 
下面将Request和Response组合在一起说了,
六、ProducerRequest.scala/ProducerResponse.scala
在具体说对应的request/response之前,先说说Kafka通用的request和response的结构:(以下大部分内容来自:https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceRequest)
RequestOrResponse => size + (requestMessage 或者 responseMessage)。其中size是一个32位的整数,表明后面request或response的长度。
Request格式 => ApiKey + ApiVersion + CorrelationId + ClientId + RequestMessage
——ApiKey: SHOTRT类型的整数,标识这个request的类型,比如是metadata request、producer request还是fectch request等,具体定义在RequestKeys.scala中
——ApiVersion: SHORT类型的整数,主要用于迭代升级使用,目前的值是0,比如说增加一些request的字段,版本变为1等。不过目前统一是0
——CorrelationId: 4个字节的整数,在服务器端和客户端关联reponse使用
——ClientId: 用户自定义的一个名称,可用于记录日志、监控使用。比如监控不同应用产生的请求数
Response格式 => CorrelationId + ResponseMessage
可以看出,response的格式明显比request来的简明。上面2个组成部分的含义都很清晰就不再详细说了。
okay,既然说了request和response的一些共同的格式,下面我们展开说一些具体的request和response
Metadata API
Kafka有提供了几类API,其中一类API可以查询一些元数据信息,比如
  • 集群中有哪些topic?
  • 每个topic有多少分区?
  • 每个分区当前的leader是哪个broker?
  • 这些broker的host/port是什么?
于是,这类API会发送Metadata request给Kafka的集群并从集群处获得对应的response。需要注意的是,这类API发送的请求能够被集群中任意一个broker处理的,其他API的request没有这样的能力!
客户发出请求后Kafka也不总是返回所有的topic,客户可以提供一个topic列表选择感兴趣的topic。在开始看Metadata request代码之前,我们有必要先看一下Topic元数据是怎么定义的。
TopicMetadata.scala
这个scala文件结构非常鲜明,两组伴生对象,分别定义了topic级别的metadata和partition级别的metadata。先看Partition级别的metadata。
构造函数
partitionId —— partition号
leader —— 该partition的leader broker,可能是空
replicas —— 该partition的所有副本集合
isr —— 该partition的ISR集合
errorCode —— 错误码,初始为NoError
类方法
sizeInBytes —— 该metadata信息总字节数,包括2个字节的错误码+4个字节的分区号+4个字节的leader号+(4+副本集合大小N1* 4) + (4 + ISR集合大小N2*4)。每对括号中第一个4表示集合长度字节,里面的值就是N1或N2
writeTo —— 按照错误码、分区号、leaderId、副本集合大小、副本集合所有ID、ISR大小、ISR集合所有ID的顺序写入ByteBuffer。特别注意的是,如果没有leader,直接写入-1,表示leader node不存在
toString —— 将以上信息组成字符串输出
formatBroker —— 拼成这样的字符串: broker ID + (broker host : broker : port)
    而PartitionMetadata object只提供了一个readFrom方法,从一段ByteBuffer中按照writeTo中的顺序读取各种信息封装到一个PartitionMetadata实例返回。
 
定义好了partition metadata,topic的metadata就比较简单了,只包含topic信息、一组PartitionMetadata和一个ErrorCode。它的方法包括:
sizeInBytes —— 总的字节数 = 2 + size(topic) + 4 + 集合中所有partitionmetadata字节数
writeTo —— 按照error code、topic、partition数、partitionmetadata集合元素的顺序写入ByteBuffer
toString —— 构造一个供打印输出的字符串
    TopicMetadata object也只定义了一个readFrom方法,按照writeTo写入的方法读取到一个ByteBuffer。
    好了,现在可以说ProducerRequest/ProducerResponse了。
TopicMetadataRequest的代码结构和其他request很类似,包括writeTo、sizeInBytes、toString和describe方法等以及对应object定义的readFrom方法。这个类允许用户提供了一个Topic集合以获取指定topic的元数据。如果发生错误,handleError方法会创建一组错误response,然后通过RequestChannel的sendResponse方法返回给客户。
TopicMetadataResponse.scala
response比request要简单得多,类只定义了sizeInBytes和writeTo方法,而object还是定义了readFrom方法来读取response信息
七、UpdateMetadataRequest.scala/UpdateMetadataResponse.scala
既然能够查询元数据,自然也要有更新元数据的API。该request除了公共的信息还包含controller id,controller_epoch,topic+partition -> partition状态的映射以及当前可用的Broker。自然writeTo和readFrom时候都要把这些额外的信息加进去。
八、ConsumerMetadataRequest.scala/ConsumerMetaResponse.scala
consumer的元数据request/response,大多都是公共的request字段,不过有一个需要注意的是group,表示consumer group。另外如果response返回的Broker的host是空,port是-1表示这是一个假的broker——其实就是表示没有broker的意思。
九、ControlledShutdownRequest.scala/ControlledShutdownResponse.scala
关闭一个broker的request和response
十、HeartbeatRequestAndHeader.scala/HeartbeatResponseAndHeader.scala
从名字上看,很像是维持心跳的request和response,不过貌似代码中没有使用到
十一、JoinGroupRequestAndHeader.scala/JoinGroupResponseAndHeader.scala
貌似代码中也没有用到
十二、LeaderAndIsrRequest.scala/LeaderAndIsrResponse.scala
    LeaderAndIsrRequest.scala中其实定义了三组伴生对象: LeaderAndIsr、PartitionStateInfo和LeaderAndIsrRequest。LeaderAndIsr定义了一个leader以及leader epoch, 一组ISR集合以及对应的zookeeper版本,貌似变更leader或ISR时会将leader_epoch值加一。
    再说PartitionStateInfo伴生对象,它包含了该分区的AR集合以及一个LeaderIsrAndControllerEpoch实例,后者简单来说就是保存了leader、isr和controller_epoch信息——后者在controller状态变更时会加1。PartitionStateInfo作为LeaderAndIsrRequest的一部分,也维持了与request/response类似的代码结构,即提供了writeTo、readFrom、sizeInBytes等方法。其中写入/读取的顺序是controller_epoch, leader, leader_epoch, ISR size, ISR set, zkVersion, AR size, AR set。
    最后就是LeaderAndIsrRequest伴生对象了,除了常见的request信息之外它还包括了controllerId, controller_epoch以及一组leader和一组PartitionStateInfo信息。而LeaderAndIsrResponse则返回一个映射,key是topic+partition,value是对应的错误码
十三、StopReplicaRequest.scala/StopReplicaResponse.scala
关闭一组分区的副本的request和response。提request时候还需要额外提供controllerId、controller_epoch和副本所在的分区集合以及一个bool值表明是否删除这些分区。
十四、ProducerRequest.scala/ProducerResponse.scala
从这组request,response开始都是比较重要的Kafka请求了。客户端使用producer API提交发送请求发送消息集合给服务器。Kafka允许一次发送属于多个topic分区的消息。Producer request格式如下:
versionId(2Byte) + correlationId(4Byte) + clientId(2Byte + size(clientId)) + requiredAcks(2Byte) + ackTimeoutMs(4Byte) + Topics (Partitions + MessageSetSize + MessageSet)
几个字段重点说一下,
requiredAcks —— 服务器在响应请求前需接收应答的次数。如果是0,服务器不会响应请求;如果是1,表示服务器会等待数据被写入到本地log然后再发送response;如果是-1,那么就要等ISR中所有副本都提交了才发送response。这个值由属性request.required.acks控制
ackTimeoutMs —— 等待应答的最大超时时间,由属性request.timeout.ms指定,默认是10秒。这只是个近似值,因为很多元素都没有被包含在这个超时间隔内。比如它并不包括网络的延时,也不会计算request在队列中的等待时间等。如果要精确地计算这些部分的时间,还是使用Socket的超时比较好
与ProducerRequest对应的响应类就是ProducerResponse——它的格式如下:
correlationId + topic count + [topic + partition count + [partitionId + errorCode + nextOffset]*]*
每个partition都有自己的errorCode,nextOffset表示消息集合的第一条消息的offset
十五、FetchRequest.scala/FetchResponse.scala
    Fetch API用于获取一些topic分区的一条或多条消息,只需要客户段代码指定topic、分区和开始获取的起始位移即可。通常来说,返回的消息的位移一般都不小于给定的起始位移。但是如果是压缩消息,那么就有可能比起始位移小。这种消息不会太多,因为fetch api的调用者需要自己来过滤掉这些消息。
FetchRequest的格式如下:
versionId + correlationId + clientId + replicaId + maxWait + minBytes + topic count + [topic + partition count + [partitionId + offset + fetchSize]]*
replicaId —— 发起请求的副本所在节点的ID,通常使用时总是要将其设置为-1
maxWait / MinBytes —— 如果maxWait设置为100ms、MinBytes是64KB的话,Kafka server会等待100ms来收集64KB大小的response数据。
FetchResponse与其他的response相比有些特殊,它为不同的子部分定义了两个类:TopicData和PartitionData,因此格式如下:
corrleationId + topic number + [topic + [partitionId errorCode highWaterOffset MessageSetSize MessageSet ]* ]*
值得一提的是,因为这个response可能返回大量的数据,所以Kafka在构建这个reponse的时候使用了sendfile的机制(交由java.NIO包FileChannel来做)
十六、OffsetRequest.scala/OffsetResponse.scala
    这个API主要用于为一组topic分区获取合法的offset范围,和produce和fetch api一样,offset请求也必须发送到分区的leader broker处理——当然可以使用metadata api来获取leader broker id。OffsetResponse返回的是所请求分区每个日志段的起始位移以及日志结束位移(也就是下一条消息被追加到分区的位移)
    OffsetRequest中有两个比较重要的概念: LatestTime和EarliestTime,分别为-1和-2。它们和属性auto.offset.reset关系也很亲密,分别对应于largest和smallest。其中largest表示自动地重试位移到最大位移;smallest表示自动地重设位移为最小位移。
    OffsetRequest格式如下:
versionId + correlationId + clientId + replicaId + topic count + [topic + partition count + [partitionId + partition time + maxNumOffsets]* ]* 
Kafka专门创建了一个PartitionOffsetRequestInfo 类来保存partition time + maxNumOffset。指定Partition time(单位是ms)表示要请求所有在该时间点之前的消息,比如说可以指定OffsetRequest.LatestTime表示请求当前所有消息。
    OffsetResponse比较简单,格式如下:
correlationId + topic count + [ topic + partition count + [partition Id + error Code + offset数组长度 + 每个offset]* ]*
返回的offset数组就是某个分区下每个日志段的起始位移。
十七、OffsetCommitRequest.scala/OffsetCommitResponse.scala
这个request与下面要说的OffsetFetchRequest api都是用于集中式管理位移的。其中OffsetCommitRequest格式如下:
versionId + correlationId + clientId + consumer group Id + consumer generationId(0.8.2以后新加的) + consumer id(0.8.2以后新加的) + topic count + [topic + partition count + [partitionId + offset + timestamp(0.8.2以后新加) + metadata]* ]*
其中,offset,timestamp和metadata都是OffsetAndMetadata类提供的信息。
OffsetCommitResponse就要简单的多,它只有一个版本的格式:
correlationId + topic count + [topic + partition count + [partitionId + errorCode]* ]*
十八、OffsetFetchRequest.scala/OffsetFetchResponse.scala
顾名思义,获取offset信息的request,格式如下:
versionId + correlationId + clientId + consumer group id + topic count + [ topic + partition count + [ partitionId ]* ]*
虽然只有一个格式,但需要注意的是0.8.2以前都是从zookeeper中读offset,从0.8.2之后从kafka中读取offset
OffsetFetchResponse格式如下:
correlationId + topic count + [ topic + partition count + [ partitionId + offset + medata + errorCode]* ]*
其中offset和metadata是保存在OffsetAndMetadata实例中。

【原创】Kakfa api包源代码分析的更多相关文章

  1. 【原创】Kakfa cluster包源代码分析

    kafka.cluster包定义了Kafka的基本逻辑概念:broker.cluster.partition和replica——这些是最基本的概念.只有弄懂了这些概念,你才真正地使用kakfa来帮助完 ...

  2. 【原创】Kakfa log包源代码分析(二)

    八.Log.scala 日志类,个人认为是这个包最重要的两个类之一(另一个是LogManager).以伴生对象的方式提供.先说Log object,既然是object,就定义了一些类级别的变量,比如定 ...

  3. 【原创】Kakfa log包源代码分析(一)

    Kafka日志包是提供的是日志管理系统.主要的类是LogManager——该类负责处理所有的日志,并根据topic/partition分发日志.它还负责flush策略以及日志保存策略.Kafka日志本 ...

  4. 【原创】Kakfa metrics包源代码分析

    这个包主要是与Kafka度量相关的. 一.KafkaTimer.scala 对代码块的运行进行计时.仅提供一个方法: timer——在运行传入函数f的同时为期计时 二.KafkaMetricsConf ...

  5. 【原创】Kakfa network包源代码分析

    kafka.network包主要为kafka提供网络服务,通常不包含具体的逻辑,都是一些最基本的网络服务组件.其中比较重要的是Receive.Send和Handler.Receive和Send封装了底 ...

  6. 【原创】Kakfa common包源代码分析

    初一看common包的代码吓了一跳,这么多scala文件!后面仔细一看大部分都是Kafka自定义的Exception类,简直可以改称为kafka.exceptions包了.由于那些异常类的名称通常都定 ...

  7. 【原创】Kakfa message包源代码分析

    笔者最近在研究Kafka的message包代码,有了一些心得,特此记录一下.其实研究的目的从来都不是只是看源代码,更多地是想借这个机会思考几个问题:为什么是这么实现的?你自己实现方式是什么?比起人家的 ...

  8. 【原创】Kakfa serializer包源代码分析

    这个包很简单,只有两个scala文件: decoder和encoder,就是提供序列化/反序列化的服务.我们一个一个说. 一.Decoder.scala 首先定义了一个trait: Decoder[T ...

  9. 【原创】kafka producer源代码分析

        Kafka 0.8.2引入了一个用Java写的producer.下一个版本还会引入一个对等的Java版本的consumer.新的API旨在取代老的使用Scala编写的客户端API,但为了兼容性 ...

随机推荐

  1. Android异常:唤醒锁未授权。(Caused by: java.lang.SecurityException: Neither user 10044 nor current process has android.permission.WAKE_LOCK.)

    Android异常:Caused by: java.lang.SecurityException: Neither user 10044 nor current process has android ...

  2. C# ACCESS数据库操作类

    这个是针对ACCESS数据库操作的类,同样也是从SQLHELPER提取而来,分页程序的调用可以参考MSSQL那个类的调用,差不多的,只是提取所有记录的数量的时候有多一个参数,这个需要注意一下! usi ...

  3. 用C#访问SSRS自动导出SSRS报表

    一.              新建一个winform应用程序WindowsFormsApplication1 二.              添加web引用 . 报表服务:http://dbpdhk ...

  4. oracle sys sysman system 介绍

    Oracle数据库中SYS.SYSTEM.DBSNMP.SYSMAN四用户的区别 SYS用户: SYS,默认密码为CHANGE_ON_INSTALL,当创建一个数据库时,SYS用户将被默认创建并授予D ...

  5. Android通过类对象的方式实现JSON数据的解析

    1.通过主Activity的Button按钮实现数据的解析 public class MainActivity extends Activity { //定义一个包含Json格式的字符对象 priva ...

  6. jvm的client和server

    最近研究c++代码调用java的jar,发现64位的下的jvm在server路径,而32位的jvm则存在client路径下面,于是十分好奇,查了下,这里做个记录 JVM Server模式与client ...

  7. 数字雨Shopex 4.8.5 SQL Injection Exp

    # -*- coding:utf-8 -* #Author:MXi4oyu #Email:798033502@qq.com #Shopex 4.8.5 SQL Injection Exp #转载请说明 ...

  8. CentOS 7 服务器配置--安装Ftp

    #安装vsftp yum install -y vsftpd #将 /etc/vsftpd/user_list文件和/etc/vsftpd/ftpusers文件中的root这一行注释掉 #root # ...

  9. css中的单冒号和双冒号

    最近突然被别人问起css单冒号和双冒号有什么区别,答曰:"不知道". 虽然还在填坑中,但作为一个跨过了初级的FEer,感觉着实汗颜,刚好今天下午在搜别的问题的时候,突然看到一个对比 ...

  10. docker安装使用教程(Kali2.0)

    一.apt安装 apt直接安装是最好的,因为apt源中的其他docker相关组件,也是与docker匹配的版本. apt-get install docker docker-compose 二.手动安 ...