kafka.network包主要为kafka提供网络服务,通常不包含具体的逻辑,都是一些最基本的网络服务组件。其中比较重要的是Receive、Send和Handler。Receive和Send封装了底层的入站(inbound)和出站(outbound)字节传输请求,而Handler在此二者间做了一个映射。一个handler就代表了一个函数,该函数接收Receive类型的对象并返回Send类型的对象。用户可以处理过冲中添加逻辑代码,并且需要自行捕获传输读写过程中的异常,将其序列化之后发送给客户端。基本上, 如果你想要研究透这个包的代码,你必须要很了解Java NIO的原理。下面我们对包中的代码进行逐个分析:

一、Transmission.scala
Transmission trait表示一个有状态的网络数据传输。之所以说是有状态的是因为传输分为未完成状态和已完成状态。Transmission提供了一个抽象方法complete表示传输是否完成,同时还定义了2个无返回值的方法: expectIncomplete和expecteComplete分别检测此次传输是否未完成以及此次传输是否已完成——如果检测不通过直接抛出异常。
 
Receive trait继承了Transmission,Receive表示的是一次传输是从channel读取的,定义了两个抽象方法和一个具体方法:
1. buffer: 抽象方法,此次传输读取字节到一个ByteBuffer
2. readFrom: 抽象方法,从一个ReadableByteChannel中读取字节,并返回读取的字节数
3. readCompletely: 具体方法,完整地执行完此次读取传输请求并返回总读取字节数
 
Send trait也继承自Transmission,表示的是此次传输是将字节发送到指定的channel上。Send定义了一个抽象方法和一个具体方法:
1. writeTo: 抽象方法,将待传输的字节发送至指定的channel上,并返回写入的字节数
2. writeCompletely: 具体方法,完整地执行完此次写传输请求并返回总写入字节数
 
除了上述三个trait, 该文件还定义了一个抽象类MultiSend接收一组Send,顺序地执行发送请求。MultiSend是一个泛型类型,接收Send或所有Send的子类,同时构造函数还接收一个列表。该抽象类有三个字段分别保存期望写入的字节数,当前的待发送列表以及总的写入字节数。因为MultiSend继承自Send,它就必须要实现writeTo方法和complete方法:
1. complete: 判断当前的待发送列表中是否为空——如果不为空自然返回false,说明还未完成发送;如果是的话还需要比较总的发送字节数与期望的发送字节数比较,如果不匹配的话直接报错。不返回true的情况统称为不完整的发送(incomplete write)
2. writeTo: 这方法会持续地将待发送的字节发送到channel中直到出现不完整发送。一旦出现的话,该方法会立即返回本次调用中写入的字节数。
二、SocketServer.scala
又是超长的一个文件!! 首先定义了一个SocketServer类,就是提供NIO socket服务的,它是一个多线程模型,由下面三部分组成:
1. 1个接收者线程负责处理新的连接请求
2. N个处理者线程,每一个都有自己的选择器(java.nio.Selector)用于从socket中读取请求
3. M个handler线程,用于处理请求并返回response给处理者线程以便让其进行写入。
 
由于其构造函数非常复杂,我打算就其参数一个一个说:
1. brokerId: Kafka Broker Id
2. host, port: Socket的host和port
3. numProcessorThreads: 处理者线程数
4. maxQueuedRequest: 队列中最大请求数
5. sendBufferSize: 设置SO_SNDBUF值
6. recvBufferSize: 设置SO_RCVBUF值
7. maxRequestSize: 一个请求的最大长度(单位:字节)
8. maxConnectionsPerIp: 每个IP发起的最大连接数
9. connectionsMaxIdleMs: 每个连接的最大空闲时间(单位:毫秒)
10. maxConnectionsPerIpOverrides: 保存了每个IP的当前连接数,以Map[String, Int]格式保存,比如"127.0.0.1" -> 100
在详细展开SocketServer构造函数之前,我们先介绍本文件中其他的类:
ConnectionQuotas类
顾名思义,这个类应该是管理连接配额方面的事情。定义的两个字段也很直观:overrides保存的是Map[String, Int]类型的的map,具体含义是[IP地址,该IP当前连接数];而counts创建了一个可变Map保存该IP的InetAddress对象 ->连接数的映射
该类提供两个方法分别用于增减某个IP的连接数
1. inc: 以同步的方式为该IP增加一个连接,只要没有超过为其分配的最大连接数限制即可,当然了,如果超过了的话抛出异常TooManyConnectionsException。
2. dec: 以同步的方式执行与inc相反的操作,减少某个IP的当前连接数1个。如果操作前连接数就是1 ,直接把该ip记录从map中移除。
AbstractServerThread类
说完了ConnectionQuotas之后,我们就能学习AbstractServerThread类了。该类继承自Runnable,同时还是抽象类。前面说的处理者线程和接收者线程都继承了这个抽象类。AbstractServerThread是它们的基类,同时提提供了很多有用的变量和方法。
 
AbstractServerThread的构造函数接收一个ConnectionQuotas类在接收连接和关闭线程的时候都会用到ConnectionQuotas。在AbstractServerThread的构造函数中初始化了一个java.nio.Selector来管理通道,2个CountDownLatch变量分别表示启动阀门和关闭阀门,最后定义了一个线程安全的AtomicBoolean的字段表征线程当前是否是alive。定义好了这些之后我们再来看它顶一个的方法:
1. shutdown: 发起一个正常的关闭请求,具体方法是将alive设置为false表明该线程已不在处于存活状态,然后调用Selector.wakeup方法唤醒阻塞在select方法上的线程然后一直等待关闭阀门关闭(即shutdownLatch减少变为0)。
2. awaitStartup: 等待线程完全启动——具体方法很简单,就是等启动阀门开启(这里的开启其实是指startupLatch减少变为0)
3. startupComplete: 开启启动阀门——这里的开启其实是指startupLatch减少变为0
4. shutdownComplete: 开启关闭阀门————这里的开启是指将shutdownLatch减少为0
5. isRunning: 判断线程是否依然存活
6. wakeup: 主要是为了唤醒被阻塞在select上的线程——linux上面很常见的做法。通过管道写数据唤醒线程,MemCached的多线程也是这么实现的。
7. close: 接收一个SelectionKey类型,如果不为空的话,去除掉当前attached的对象,然后调用close(channel)的方法关闭socket,关闭channel,并忽略任何异常。
8. closeAll: 关闭所有打开的连接——具体方法就是遍历selector上的所有key,然后直接调用close方法
9. countInterestOps: 遍历所有SelectionKey的所有interest集合
 
Acceptor类
它是一个接收者线程,负责接收并配置新的连接请求。其构造函数参数如下:
1. host/port: Socket的host和port
2. processors: 接收者线程需要保存处理者线程数组
3. sendBufferSize: 设置Socket的SO_SNDBUF
4. recvBufferSize: 设置Socket的SO_RCVBUF
5. connectionQuotas: 持有ConnectionQuotas对象管理每个IP的连接数
Acceptor提供了三个方法:
1. openServerSocket: 顾名思义,就是创建一个socket套接字用于监听入站连接,并返回一个ServerSocketChannel。这个channel被设置成非阻塞模式。
2. accept: 就像这个方法的名字说的,这个方法就是接收一个新的连接请求。需要传入2个参数:SelectionKey和Processor线程。首先从传入的SelectionKey对象中获取一个channel,然后调用accept方法接受新的连接,之后为该套接字的IP地址增加一个连接数,并将该套接字设置为非阻塞模式并且关闭nagle算法且不使用缓存。最后调用处理者线程的accept方法返回。如果连接数已达最大直接打印异常信息,并关闭channel
3. run: 只要线程一直处于运行状态,该方法就会循环检查是否需要建立新连接。首先要注册OP_ACCEPT事件准备好接受客户端连接,然后开启线程。之后不断地检查线程是否存活,如果不在存活关闭socket,channel然后关闭线程。如果一直存活的话首先检查所有准备执行IO操作的key的数目,因为是blocking方法,所以设置了500毫秒的超时。如果存在的key数目大于0,遍历每个key检测其是否准备好接收新的Socket连接,如果准好了直接调用accept接收此连接,否则直接抛出异常。选择processor时候采用了轮询的方式,即顺序循环地指定processor号。
 
Processos类
处理来自单个连接的入站请求。可以并行运行多个Processor线程,每个线程都有自己的selector。还是同样的思路,我们先看构造函数参数:
1. id: 处理者线程id号
2. time: 时间戳
3. maxRequestSize: 一次Socket请求的最大字节数,默认是100MB
4. idleMeter/aggregateIdleMeter: 代码会计算每次Selector.select的时间(最多300ms)并使用idelMeter进行标记(通过mark方法),并调用aggregatedIdleMeter.mark(idleMeter/处理网络请求的总线程数)进行标记——即摊还idleTime到每个线程上。
5. totalProcessorThreads: 用于处理网络请求的线程数,配置文件中默认是2个线程
6. requestChannel: Processor处理请求所使用的channel对象
7. connectionQuotas: 管理连接数的对象,主要是父类构造函数需要这个对象,并没有在该类中做一些特别的操作
8. connectionMaxIdleMs: 设置服务器socket processor线程的空闲超时时间,默认是10分钟
 
该类开始会新建一个并发非阻塞队列保存SocketChannel连接,并创建了一个时间戳保存Processor创建时间,同时还初始化了一个LinkedHashMap以LRU方式管理连接。Processor定义了9个方法,分别是:
1. maybeCloseOldestConnection: 以LRU方法关闭最久的连接。判断方法:如果关闭时刻的时间已经过了当初创建时定好的检查空闲连接窗口,就判断一下那个LinkedHashMap是不是空——如果为空,很简单直接重设下一个检查窗口;如果不为空,直接获取该hashmap的第一个连SelectionKey(LinkedHashMap是有顺序的,所以第一个元素必定是当前map中最先进入的,也就是最久的)并取得该连接的最近一次使用的时间。把该时间往后推设置的最大空闲超时时间(默认是10分钟)并更新到下一次空闲检查的时间戳。如果此时的时间还是晚于已更新的检查时间点,说明一定要关闭这个空闲连接了,直接调用close方法断开这个连接。其实说了那么多,就是一点:获取最久的那个连接的最近一次使用时间点,如果当前时间晚于加上超时时间后的时间点,就关闭那个连接。
2. close: 从LRU map中移除给定的SelectionKey并调用父类的close方法关闭对应的socket
3. channelFor: 根据给定的SelectionKey返回SocketChannel
4. configureNewConnection: 将newConnections队列中所有SocketChannel注册读事件到selector。
5. accept: 将传入的SocketChannel入队列并唤醒sleep的线程
6. processNewResponses: 根据Processor Id获取一个response,如果不为空,从这个channel中拿到SelectionKey,并判断response的类型以采取不同的措施
7. read: 从就绪的channel中读取数据
8. write: 写入数据到就绪的socket上
9. run: 开启线程,只要一直处于运行状态,则配置链接并注册新的repsonse用于写操作。如果发现有准备就绪的,直接获取key和iterator对象遍历key的模式并执行相应的操作,最后关闭线程。
 
三、Handler.scsala
一个很简单的object,定义了2个Handler类型:一个是函数,接收一个Receive并输出到Send对象;另一个也是函数,接收一个(Short, Receive)元组并返回一个Handler
四、ConnectionConfig.scala
连接配置类,主要参数有host, port, sendBufferSize, receiveBufferSize,tcpNoDelay和keepAlive
五、ByteBufferSend.scala
底层保有一个ByteBuffer缓冲区。只提供了一个方法:
writeTo: 就是讲channel待发送的数据发的送到bufer中。
六、BlockingChannel.scala
以伴生对象的方式提供了一个带超时的阻塞式通道。object中定义了默认的buffer大小是-1,主要是用于初始化。BlockingChannel的构造函数接收一个host,一个port,2个buffersize,分别设置SO_SNDBUF和SO_RCVBUF,另外还提供了一个毫秒级的timeout。
该类在内部维护了一个boolean的字段表明是否连接上该channel,还有3个channel,一个SocketChannel,一个ReadableByteChannel和一个GatheringByteChannel,同时还提供了一个锁对象和连接超时字段。下面我们具体地分析一下BlockingChannel提供的方法:
1. isConnected: 表明该channel是否连接上了
2. receive: 判断连接状态,如果未连接直接抛出异常;否则创建一个BoundedByteBufferReceive对象(后面会说到这个对象)主要用于从该channel中读取字节并返回成Receive对象。
3. connect: 以同步的方式连接,如果已连接直接退出,否则直接打开channel,分别设置SO_SNDBUF和SO_RCVBUF, 并设置阻塞模式为true——这也是为什么叫BlockingChannel,并设置超时时间SO_TIMEOUT、SO_KEEPALIVE和TCP_NODELAY,之后连接Socket。连接成功后,将该channel赋值给writeChannel,并新建一个读channel给readChannel——所有这些事情做完之后将connected设置为true,表明此时连接成功。但这些字段都是var,也就是可变的共享对象,因此BlockingChannel不是线程安全的。
4. disconnect: 断开连接。如果channel本身是null的话,writeChannel肯定也是null, readChannel有可能不是null,所以需要单独关闭。最后将connected设置为false表明连接已关闭。
5. send: 如果没有连接自然要抛出异常,否则根据传入的request构建一个BoundedByteBufferSend写入channel,并返回写入的字节数
七、BoundedByteBufferReceive.scala
表示客户端与服务器的连接,继承了Receive类。从名字来看,这应该是一个有界的buffer,构造函数接收的就是buffer大小。提供了三个方法:
1. byteBufferAllocate: 创建一个size大小的ByteBuffer
2. readFrom:sizeBuffer是表示请求大小的buffer,总共4个字节。这个方法就是先要读取sizeBuffer中的大小,即整个请求的大小size,然后分配size大小的contentBuffer,最后返回总的buffer字节数,同时将complete设置为true表明读取完毕
3. buffer: 返回请求内容的buffer
八、BoundedByteBufferSend.scala
与BoundedByteBufferReceive对应的,只是执行channel写入的,就不赘述了。
九、Request.scala
该scala中有两组伴生对象RequestChannel和RequestMetrics。先说RequestMetrics一组,主要是处理与请求相关的度量,比如请求速率(每秒多少个请求)、计算请求在队列中的时间、计算请求在本地broker处理时间等。而RequestChannel内部维护了一个request queue和response queue都是阻塞方式的,并且提供了sendRequest和sendResponse方法分别将请求加入到各自的队列中。同时,还提供了receiveRequest和receiveResponse分别从各自queue中获取request和response

【原创】Kakfa network包源代码分析的更多相关文章

  1. 【原创】Kakfa cluster包源代码分析

    kafka.cluster包定义了Kafka的基本逻辑概念:broker.cluster.partition和replica——这些是最基本的概念.只有弄懂了这些概念,你才真正地使用kakfa来帮助完 ...

  2. 【原创】Kakfa log包源代码分析(二)

    八.Log.scala 日志类,个人认为是这个包最重要的两个类之一(另一个是LogManager).以伴生对象的方式提供.先说Log object,既然是object,就定义了一些类级别的变量,比如定 ...

  3. 【原创】Kakfa log包源代码分析(一)

    Kafka日志包是提供的是日志管理系统.主要的类是LogManager——该类负责处理所有的日志,并根据topic/partition分发日志.它还负责flush策略以及日志保存策略.Kafka日志本 ...

  4. 【原创】Kakfa common包源代码分析

    初一看common包的代码吓了一跳,这么多scala文件!后面仔细一看大部分都是Kafka自定义的Exception类,简直可以改称为kafka.exceptions包了.由于那些异常类的名称通常都定 ...

  5. 【原创】Kakfa message包源代码分析

    笔者最近在研究Kafka的message包代码,有了一些心得,特此记录一下.其实研究的目的从来都不是只是看源代码,更多地是想借这个机会思考几个问题:为什么是这么实现的?你自己实现方式是什么?比起人家的 ...

  6. 【原创】Kakfa api包源代码分析

    既然包名是api,说明里面肯定都是一些常用的Kafka API了. 一.ApiUtils.scala 顾名思义,就是一些常见的api辅助类,定义的方法包括: 1. readShortString: 从 ...

  7. 【原创】Kakfa metrics包源代码分析

    这个包主要是与Kafka度量相关的. 一.KafkaTimer.scala 对代码块的运行进行计时.仅提供一个方法: timer——在运行传入函数f的同时为期计时 二.KafkaMetricsConf ...

  8. 【原创】Kakfa serializer包源代码分析

    这个包很简单,只有两个scala文件: decoder和encoder,就是提供序列化/反序列化的服务.我们一个一个说. 一.Decoder.scala 首先定义了一个trait: Decoder[T ...

  9. 【原创】kafka producer源代码分析

        Kafka 0.8.2引入了一个用Java写的producer.下一个版本还会引入一个对等的Java版本的consumer.新的API旨在取代老的使用Scala编写的客户端API,但为了兼容性 ...

随机推荐

  1. [oc] instancetype vs id for Objective-C 【转】

    原贴地址:http://blog.csdn.net/lyy_whg/article/details/12846055 http://www.iwangke.me/2013/01/06/instance ...

  2. 简单的Elf逆向Writeup

    ElfCrackMe1 html,body,div,span,applet,object,iframe,h1,h2,h3,h4,h5,h6,p,blockquote,pre,a,abbr,acrony ...

  3. JQuery对联广告

    html--------------------------------------------------------------------------------------<!DOCTY ...

  4. TextMesh Pro SpriteAsset Load From Assetbundle

    遇到问题 我们项目分两个Unity的工程,Art(美术资源工程),Client(代码工程) 在Art工程中的TextMeshProUGUI Text中使用Emoji,打包成AB之后,在Client运行 ...

  5. ORA-15137: cluster in rolling patch

    oracle 12.1.0.2,给diskgroup加盘的时候报错ORA-15137: cluster in rolling patch 确认两节点补丁相同 crsctl query crs soft ...

  6. mysql无密码登陆

    mysql登陆不上或者密码忘记可以尝试一下无密码登陆 以下一波神操作!! 首先关闭数据库服务(数据库在Centos7版本以上或者Redhat版本上被改名为mariadb) systemctl stop ...

  7. cherry-pick时的add by us / both modified / delete by us /delete by themk

    简单来说: us=into , them=from 比如你将test分支的某个提交cherry-pick到master分支上,那么us就是master分支,them 就是test分支 参考: http ...

  8. 机器学习笔记(6):多类逻辑回归-使用gluon

    上一篇演示了纯手动添加隐藏层,这次使用gluon让代码更精减,代码来自:https://zh.gluon.ai/chapter_supervised-learning/mlp-gluon.html f ...

  9. JMeter (2) —— JMeter与WebDriver测试用户登陆以CAS SSO为例(101 Tutorial)

    JMeter (2) -- JMeter与WebDriver测试用户登陆以CAS SSO为例(101 Tutorial) 主要内容 JMeter与WebDriver测试用户登陆以CAS SSO为例 环 ...

  10. 【VUE】@click加上v-bind绑定切换类名及动画事件

    好长的名字... 效果是 点击元素,通过改变类名的方式让其改变颜色+移动动画效果,这里用的是v-bind和@click 废话不说 show me the code! <div id=" ...