创建一个channel实例,并把它register到eventLoopGroup中之后,这个channel然后处于inactive状态,仍然是不可用的。只有在bind或connect方法调用成功之后才能正常。因此bind或connect算是channel初始化的最后一步,本章这就重点分析这两个功能的实现。

  接下来的代码分析如果没有特别说明,都是以NioSocketChannel为例。

  bind实现

  bind方法的调用栈如下:

io.netty.channel.AbstractChannel#bind(java.net.SocketAddress)
io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress)
io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress)  io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
io.netty.channel.AbstractChannelHandlerContext#invokeBind
io.netty.channel.DefaultChannelPipeline.HeadContext#bind
io.netty.channel.AbstractChannel.AbstractUnsafe#bind
io.netty.channel.socket.nio.NioSocketChannel#doBind
io.netty.channel.socket.nio.NioSocketChannel#doBind0

  为了能简单明了地展示调用关系,这个调用栈忽略了一些调用。可能有多个AbstractChannelHandlerContext的方法在不同的线程中被调用。以后在描述调用栈时也会忽略这一点,不再赘述。

  io.netty.channel.AbstractChannel.AbstractUnsafe#bind执行了主要的bind逻辑,它会调用doBind, 然后在channel的状态从inactive变成active,就调用pipline的fireChannelActive方法触发channelActives事件。doBind是io.netty.channel.AbstractChannel定义的抽象方法。NioSocketChannel只需要实现这个方法,整个bind功能就完整了。

 1     @Override
 2     protected void doBind(SocketAddress localAddress) throws Exception {
 3         doBind0(localAddress);
 4     }
 5     private void doBind0(SocketAddress localAddress) throws Exception {
 6         if (PlatformDependent.javaVersion() >= 7) {
 7             SocketUtils.bind(javaChannel(), localAddress);
 8         } else {
 9             SocketUtils.bind(javaChannel().socket(), localAddress);
10         }
11     }

  SocketUtils封装了通过AccessController调用JDK的socket API接口,事实上还是调用Socket或SocketChannel的bind方法。Nio的三个Channel类实现doBind的代码几乎一样。

  connect实现

  connect的调用栈如下:

io.netty.channel.AbstractChannel#connect(java.net.SocketAddress)
io.netty.channel.DefaultChannelPipeline#connect(java.net.SocketAddress)
io.netty.channel.AbstractChannelHandlerContext#connect(java.net.SocketAddress)
io.netty.channel.AbstractChannelHandlerContext#connect(java.net.SocketAddress, io.netty.channel.ChannelPromise)
io.netty.channel.AbstractChannelHandlerContext#connect(java.net.SocketAddress, java.net.SocketAddress, io.netty.channel.ChannelPromise)
io.netty.channel.AbstractChannelHandlerContext#invokeConnect
io.netty.channel.DefaultChannelPipeline.HeadContext#connect
io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
io.netty.channel.socket.nio.NioSocketChannel#doConnect

  connect的主要逻辑在io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect中实现,它的流程是:

  1. 调用doConnect方法,这个方法是AbstractNioChanne定义的抽象方法。

  2. 如果doConnect成功,且channel的状态从inactive变成active,则调用pipeline的fireChannelActive方法触发channelActive事件。

  3. 如果doConnection失败,调用close关闭channel。

  io.netty.channel.socket.nio.NioSocketChannel#doConnect中是socket connect API的调用。下面是connect的关键代码。

 1 @Override
 2 public final void connect(
 3         final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
 4     if (!promise.setUncancellable() || !ensureOpen(promise)) {
 5         return;
 6     }
 7
 8     try {
 9         if (connectPromise != null) {
10             // Already a connect in process.
11             throw new ConnectionPendingException();
12         }
13
14         boolean wasActive = isActive();
15         if (doConnect(remoteAddress, localAddress)) {
16             fulfillConnectPromise(promise, wasActive);
17         } else {
18             connectPromise = promise;
19             requestedRemoteAddress = remoteAddress;
20
21             // Schedule connect timeout.
22             int connectTimeoutMillis = config().getConnectTimeoutMillis();
23             if (connectTimeoutMillis > 0) {
24                 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
25                     @Override
26                     public void run() {
27                         ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
28                         ConnectTimeoutException cause =
29                                 new ConnectTimeoutException("connection timed out: " + remoteAddress);
30                         if (connectPromise != null && connectPromise.tryFailure(cause)) {
31                             close(voidPromise());
32                         }
33                     }
34                 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
35             }
36
37             promise.addListener(new ChannelFutureListener() {
38                 @Override
39                 public void operationComplete(ChannelFuture future) throws Exception {
40                     if (future.isCancelled()) {
41                         if (connectTimeoutFuture != null) {
42                             connectTimeoutFuture.cancel(false);
43                         }
44                         connectPromise = null;
45                         close(voidPromise());
46                     }
47                 }
48             });
49         }
50     } catch (Throwable t) {
51         promise.tryFailure(annotateConnectException(t, remoteAddress));
52         closeIfClosed();
53     }
54 }
55
56 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
57     if (promise == null) {
58         return;
59     }
60     boolean active = isActive();
61     boolean promiseSet = promise.trySuccess();
62
63     if (!wasActive && active) {
64         pipeline().fireChannelActive();
65     }
66     if (!promiseSet) {
67         close(voidPromise());
68     }
69 }

  第14,15行和整个fulfillConnectPromise方法处理正常流程。

  第18-52行处理异常流程。代码虽然多,但总结起来就一句话: 设置promis返回错误,确保能够调用close方法

  io.netty.channel.socket.nio.NioSocketChannel#doConnect实现和doBind实现类似:

 1 @Override
 2 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
 3     if (localAddress != null) {
 4         doBind0(localAddress);
 5     }
 6
 7     boolean success = false;
 8     try {
 9         boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
10         if (!connected) {
11             selectionKey().interestOps(SelectionKey.OP_CONNECT);
12         }
13         success = true;
14         return connected;
15     } finally {
16         if (!success) {
17             doClose();
18         }
19     }
20 }

  在第11行,注册OP_CONNECT事件。由于channel在初始化是被设置成非阻塞模式,connect方法可能返回false, 如果返回false表示connect操作没有完成,需要通过selector关注OP_CONNECT事件,把connect变成一个异步过程。只有异步调用io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#finishConnect之后,connect才算完成。finishConnect在eventLoop中被调用:

 //io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
     int ops = k.interestOps();
     ops &= ~SelectionKey.OP_CONNECT;
     k.interestOps(ops);
     unsafe.finishConnect();
 }

   finishConnection的实现如下:

 //io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#finishConnect
 @Override
 public final void finishConnect() {
     // Note this method is invoked by the event loop only if the connection attempt was
     // neither cancelled nor timed out.

     assert eventLoop().inEventLoop();
     try {
 9         boolean wasActive = isActive();
10         doFinishConnect();
11         fulfillConnectPromise(connectPromise, wasActive);
     } catch (Throwable t) {
         fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
     } finally {
         // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
         // See https://github.com/netty/netty/issues/1770
         if (connectTimeoutFuture != null) {
             connectTimeoutFuture.cancel(false);
         }
         connectPromise = null;
     }
 }

 //io.netty.channel.socket.nio.NioSocketChannel#doFinishConnect
 @Override
 protected void doFinishConnect() throws Exception {
     if (!javaChannel().finishConnect()) {
         throw new Error();
     }
 }

  9-11行是finishConnection的关键代码, 先调用doFinishConnect执行完成连接之后的操作,NioSocketChannel实现是检查连接是否真的已经完成(27-29行),然后调用fulfillConnectPromise触发事件,设置promise返回值。在前面分析netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect代码时,可以看到在doConnect调用成功以后会立即调用这个方法。这个方法被调用两次是为了确保channelActive事件一定会被触发一次。

  localAddress,remoteAddress实现:得到channel的本地和远程地址

  这个两个方法的实现几乎一样,这里只分析localAddress,它的调用栈如下:

1 io.netty.channel.AbstractChannel#localAddress
2 io.netty.channel.AbstractChannel.AbstractUnsafe#localAddress
3 io.netty.channel.socket.nio.NioSocketChannel#localAddress0

  这个方法不会触发任何事件,因此没有通过pipline调用unsafe,它直接调用unsafe的方法:

 1 //io.netty.channel.AbstractChannel#localAddress
 2 @Override
 3 public SocketAddress localAddress() {
 4     SocketAddress localAddress = this.localAddress;
 5     if (localAddress == null) {
 6         try {
 7             this.localAddress = localAddress = unsafe().localAddress();
 8         } catch (Throwable t) {
 9             // Sometimes fails on a closed socket in Windows.
10             return null;
11         }
12     }
13     return localAddress;
14 }

  在第7行直接调用unsafe的locallAddress方法,这个方法在AbstractUnsafe中实现,它调用了localAddress0,这一个protected的抽象方法,在NioSocketChannel中的实现是:

1 @Override
2 protected SocketAddress localAddress0() {
3     return javaChannel().socket().getLocalSocketAddress();
4 }

 

  

netty源码解解析(4.0)-12 Channel NIO实现:channel初始化的更多相关文章

  1. netty源码解解析(4.0)-11 Channel NIO实现-概览

      结构设计 Channel的NIO实现位于io.netty.channel.nio包和io.netty.channel.socket.nio包中,其中io.netty.channel.nio是抽象实 ...

  2. netty源码解解析(4.0)-10 ChannelPipleline的默认实现--事件传递及处理

    事件触发.传递.处理是DefaultChannelPipleline实现的另一个核心能力.在前面在章节中粗略地讲过了事件的处理流程,本章将会详细地分析其中的所有关键细节.这些关键点包括: 事件触发接口 ...

  3. netty源码解解析(4.0)-17 ChannelHandler: IdleStateHandler实现

    io.netty.handler.timeout.IdleStateHandler功能是监测Channel上read, write或者这两者的空闲状态.当Channel超过了指定的空闲时间时,这个Ha ...

  4. netty源码解解析(4.0)-18 ChannelHandler: codec--编解码框架

    编解码框架和一些常用的实现位于io.netty.handler.codec包中. 编解码框架包含两部分:Byte流和特定类型数据之间的编解码,也叫序列化和反序列化.不类型数据之间的转换. 下图是编解码 ...

  5. netty源码解解析(4.0)-20 ChannelHandler: 自己实现一个自定义协议的服务器和客户端

    本章不会直接分析Netty源码,而是通过使用Netty的能力实现一个自定义协议的服务器和客户端.通过这样的实践,可以更深刻地理解Netty的相关代码,同时可以了解,在设计实现自定义协议的过程中需要解决 ...

  6. netty源码解解析(4.0)-14 Channel NIO实现:读取数据

     本章分析Nio Channel的数据读取功能的实现. Channel读取数据需要Channel和ChannelHandler配合使用,netty设计数据读取功能包括三个要素:Channel, Eve ...

  7. netty源码解解析(4.0)-9 ChannelPipleline的默认实现-链表管理

    io.netty.channel.DefaultChannelPipeline implements ChannelPipleline   DefaultChannelPiple给出了ChannelP ...

  8. netty源码解解析(4.0)-15 Channel NIO实现:写数据

    写数据是NIO Channel实现的另一个比较复杂的功能.每一个channel都有一个outboundBuffer,这是一个输出缓冲区.当调用channel的write方法写数据时,这个数据被一系列C ...

  9. netty源码解解析(4.0)-7 线程模型-IO线程EventLoopGroup和NIO实现(二)

    把NIO事件转换成对channel unsafe的调用或NioTask的调用 processSelectedKeys()方法是处理NIO事件的入口: private void processSelec ...

随机推荐

  1. Python 学习拾遗

    该博文主要适应于python2.7,并没有对py3进行测试. 主要记录学习python过程中容易出现的一些小问题.小错误,相信能给你启发. 1.剔除一个字符串中的所有空格(假设该字符串是s) &quo ...

  2. cocos2d学习记录

    视频 - http://www.manew.com/forum-105-3.html一个论坛帖 - http://www.zhihu.com/question/21114802官网 - http:// ...

  3. Angularjs使用的一些特点

    1.函数会影响到全局命名空间 javascript 尽量避免使用全局变量,因为他们容易被其他文件脚本覆盖. angularjs让所有函数的作用域作用在该模块下面,避免了该问题. 2.angularjs ...

  4. 学习JQ

    目前我对jq的了解还很少,只知道jq比js简单很多,只需引入一个js文件,然后,在js中很多行才能实现的代码,jq中或许一行就搞掂了,比如根据类名获取元素,$(".类名")即可,对 ...

  5. MIS框架开发计划

    计划开发模块 缓存模块 全球化模块(时间转换.货币转换.语言切换.度量转换.时区转换) 用户模块 用户短消息模块 日志模块(系统日志.用户操作日志.安全审计日志) 权限模块 配置模块 事件模块(观察者 ...

  6. Python抓取第一网贷中国网贷理财每日收益率指数

    链接:http://www.p2p001.com/licai/index/id/147.html 所需获取数据链接类似于:http://www.p2p001.com/licai/shownews/id ...

  7. centos7.2中文乱码解决办法

    centos7.2 中文乱码解决办法 1.查看安装中文包: 查看系统是否安装中文语言包 (列出所有可用的公共语言环境的名称,包含有zh_CN) # locale -a |grep "zh_C ...

  8. [Mysql]备份同库中一张表的历史记录 insert into ..select

    需求 现在有个这么一个需求,mysql中有个表,数据增长的很快,但是呢这个数据有效期也就是1个月,一个月以前的记录不太重要了,但是又不能删除.为了保证这个表的查询速度,需要一个简单的备份表,把数据倒进 ...

  9. Beta冲刺(4/7)

    目录 摘要 团队部分 个人部分 摘要 队名:小白吃 组长博客:hjj 作业博客:beta冲刺(4/7) 团队部分 后敬甲(组长) 过去两天完成了哪些任务 整理博客 ppt模板 接下来的计划 做好机动. ...

  10. 02-spark sql

    1.概念 Spark SQL是一个用来处理结构化数据的Spark组件. 优点:  ①SparkSQL是一个SQL解析引擎,将SQL解析成特殊的RDD(DataFrame),然后在Spark集群中运行 ...