io.netty.handler.timeout.IdleStateHandler功能是监测Channel上read, write或者这两者的空闲状态。当Channel超过了指定的空闲时间时,这个Handler会触发一个IdleStateEvent事件。

  在第一次检测到Channel变成active状态时向EventExecutor中提交三个延迟任务:

    ReaderIdleTimeoutTask: 检测read空闲超时。

    WriterIdleTimeoutTask: 检测write空闲超时。

    AllIdleTimeoutTask: 检测所有的空闲超时。

  任何一个延迟任务检测到空闲超时是会触发一个IdleStateEvent。无论如何,延迟任务都会再次把自己提交到EventExecutor中,等待下次执行。

  三个延迟任务对应于三个超时时间,都是可以独立设置的:

 public IdleStateHandler(boolean observeOutput,
             long readerIdleTime, long writerIdleTime, long allIdleTime,
             TimeUnit unit) {
         if (unit == null) {
             throw new NullPointerException("unit");
         }

         this.observeOutput = observeOutput;

         if (readerIdleTime <= 0) {
             readerIdleTimeNanos = 0;
         } else {
             readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
         }
         if (writerIdleTime <= 0) {
             writerIdleTimeNanos = 0;
         } else {
             writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
         }
         if (allIdleTime <= 0) {
             allIdleTimeNanos = 0;
         } else {
             allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
         }
     }

  这个类继承自io.netty.channel.ChannelDuplexHandler, 它是一个有状态的ChannelHandler, 定义了三个状态:

  private byte state; // 0 - none, 1 - initialized, 2 - destroyed

  state属性保存了它的状态。0:初始状态,1:已经初始化, 2: 已经销毁。

  这个ChannelHandler被加入到Channel的pipeline中之后,在Channel已经被register到EventLoop中,且处于Active状态时,会执行一次初始化操作,向EventExecutor提交前面提到的三个延迟任务。这初始化操作在initialize方法中实现。

     private void initialize(ChannelHandlerContext ctx) {
         // Avoid the case where destroy() is called before scheduling timeouts.
         // See: https://github.com/netty/netty/issues/143
         switch (state) {
         case 1:
         case 2:
             return;
         }

         state = 1;
         initOutputChanged(ctx);

         lastReadTime = lastWriteTime = ticksInNanos();
         if (readerIdleTimeNanos > 0) {
             readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                     readerIdleTimeNanos, TimeUnit.NANOSECONDS);
         }
         if (writerIdleTimeNanos > 0) {
             writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                     writerIdleTimeNanos, TimeUnit.NANOSECONDS);
         }
         if (allIdleTimeNanos > 0) {
             allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                     allIdleTimeNanos, TimeUnit.NANOSECONDS);
         }
     }

  第4-10行,只有处于初始状态时才执行后面的操作,避免多次提交定时任务。

  第11行, 初始化对对Channel的outboundBuffer变化的监视,只有当observeOutput属性设置为true时才开启这个监视。

  第13-25行,分别提交三个延迟任务。

  initialize方法可能在三个地方被调用:

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
            // channelActive() event has been fired already, which means this.channelActive() will
            // not be invoked. We have to initialize here instead.
            initialize(ctx);
        } else {
            // channelActive() event has not been fired yet.  this.channelActive() will be invoked
            // and initialization will occur there.
        }
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // Initialize early if channel is active already.
        if (ctx.channel().isActive()) {
            initialize(ctx);
        }
        super.channelRegistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // This method will be invoked only if this handler was added
        // before channelActive() event is fired.  If a user adds this handler
        // after the channelActive() event, initialize() will be called by beforeAdd().
        initialize(ctx);
        super.channelActive(ctx);
    }

  如果在Channel初始化的时候把这个Handler添加到pipeline中,那么这个Handler的channelActive方法一定会被调用,只需要在channleActive中调用initialize就可以打了。但是Handler可以在任何时候被加入到pipleline中。当ChannelHandler被添加到pipeline中时,Channel可能已经被register到EventLoop中,且已经处于Active状态,这种情况下,channelRegistered和channelActive方法都不会被调用,所以必须在handlerAdded中调用initialize。如果此时,Channnel已经处于Active状态,但还没被注册到EventLoop,只能在channelRegisted中调用initialize。

  

  初始化完成之后,延迟任务到期执行时会把自己再次提交到EventExecutor中,等待下次执行。同时会检查是否满足触发事件的条件,如果是就触发一条自定义的事件。

  

read空闲超时检查

 private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
         @Override
         protected void run(ChannelHandlerContext ctx) {
             long nextDelay = readerIdleTimeNanos;
             if (!reading) {
                 nextDelay -= ticksInNanos() - lastReadTime;
             }

             if (nextDelay <= 0) {
                 // Reader is idle - set a new timeout and notify the callback.
                 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

                 boolean first = firstReaderIdleEvent;
                 firstReaderIdleEvent = false;

                 try {
                     IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                     channelIdle(ctx, event);
                 } catch (Throwable t) {
                     ctx.fireExceptionCaught(t);
                 }
             } else {
                 // Read occurred before the timeout - set a new timeout with shorter delay.
                 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
             }
         }
     }

  4-9行,判断是否read空闲超时。

  11-21行,read空闲超时,重新把自己提交成延迟任务。

  24行,read没有空闲超时,重新把自己提交成延迟任务。

  这里的关键是判断read空闲超时。lastReadTime是最近一次执行read的时间,readerIdleTimeNanos是初始化时设置的空闲超时时间,因此如果readerIdleTimeNanos - (ticksInNanos() - lastReadtime)  <= 0,表示已经read空闲超时了。令人困惑的是第5行,只有在reading==false才检查进行空闲超时的计算。笔者在<<netty源码解解析(4.0)-14 Channel NIO实现:读取数据>>一章中分析过Channel read的实现。一次read操作或触发多个read和一个readComplete事件,read操作由多个步骤组成。这reading属性用来表示正在read的状态。

     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
             reading = true;
             firstReaderIdleEvent = firstAllIdleEvent = true;
         }
         ctx.fireChannelRead(msg);
     }

     @Override
     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
         if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
             lastReadTime = ticksInNanos();
             reading = false;
         }
         ctx.fireChannelReadComplete();
     }

  3-4行,在设置了读空闲超时或所有空闲超时的情况下,会吧reading设置成true,表示当前正处于正在read的状态。

  12-14行,在设置了读空闲超时或所有空闲超时的情况下, 如果当前正处于read状态,把reading设置成false,同时更新最近一次执行read的时间。

write空闲超时检查

     private final class WriterIdleTimeoutTask extends AbstractIdleTask {

         @Override
         protected void run(ChannelHandlerContext ctx) {

             long lastWriteTime = IdleStateHandler.this.lastWriteTime;
             long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
             if (nextDelay <= 0) {
                 // Writer is idle - set a new timeout and notify the callback.
                 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);

                 boolean first = firstWriterIdleEvent;
                 firstWriterIdleEvent = false;

                 try {
                     if (hasOutputChanged(ctx, first)) {
                         return;
                     }

                     IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
                     channelIdle(ctx, event);
                 } catch (Throwable t) {
                     ctx.fireExceptionCaught(t);
                 }
             } else {
                 // Write occurred before the timeout - set a new timeout with shorter delay.
                 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
             }
         }
     }

  6-8行,检查write空闲超时,和检查read空闲超时类似。

  12-21行,如果write空闲超时,且outboundBuffer中的数据没有变化, 触发write空闲超时事件。

  这里调用了hasOutputChanged方法检查outboundBuffer中的数据是否有变化。笔者在<<netty源码解解析(4.0)-15 Channel NIO实现:写数据>>中分write实现时,已经讲过,每个Channel都以一个outboundBuffer, write的数据会先序列化成Byte流追加到outboundBuffer中,然后再从outboundBuffer中顺序读出Byte流执行真正的write操作。在Handler的write方法没有被调用的情况下,如果outboundBuffer中有数据,且数据发送了变化,表示正在执行真正的write操作,反之则意味着Channel处于不可写的状态,无法执行真正的write操作。write空闲超时事件只会在write空闲超时且没有执行真正write操作的时候才会触发。另外,这个检查有个开关属性,只有observeOutput==true时才会检查。

  

  AllIdleTimeoutTask的实现和WriterIdleTimeoutTask类似,只不过检查超时的条件有些差别:read和write任何一个空闲超时都算超时。

ReadTimeoutHandler实现

  ReadTimeoutHandler继承了IdleStateHandler类,它的功能是在触发read空闲超时事件时触发一个ReadTimeoutException异常,同时关闭Channel。 

    @Override
    protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        assert evt.state() == IdleState.READER_IDLE;
        readTimedOut(ctx);
    }

    /**
     * Is called when a read timeout was detected.
     */
    protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!closed) {
            ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
            ctx.close();
            closed = true;
        }
    }

WriteTimeoutHandler实现

  WriteTimeoutHandler继承了ChannelOutboundHandlerAdapter,它的功能是在触发监视Channel的write调用超时,如果超时则关闭掉这个Channel。和ReadTimeoutHandler不同,它监控的不是空闲超时,而是Channel的write方法返回的Promise超时。

  首先在write时候,为每个Promise添加一个监控超时的延迟任务:

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        scheduleTimeout(ctx, promise);
        ctx.write(msg, promise);
    }
    private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise promise) {
        // Schedule a timeout.
        final WriteTimeoutTask task = new WriteTimeoutTask(ctx, promise);
        task.scheduledFuture = ctx.executor().schedule(task, timeoutNanos, TimeUnit.NANOSECONDS);

        if (!task.scheduledFuture.isDone()) {
            addWriteTimeoutTask(task);

            // Cancel the scheduled timeout if the flush promise is complete.
            promise.addListener(task);
        }
    }

  然后,如果延迟任务执行的时候检查到Promise超时,就触发一个WriteTimeoutException异常,然后关闭掉这个Channel。

    protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!closed) {
            ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
            ctx.close();
            closed = true;
        }
    }

  WriteTimeoutTask类同时实现了Runnable和ChannelFutureListener接口,超时后会调用run方法。

         @Override
         public void run() {
             // Was not written yet so issue a write timeout
             // The promise itself will be failed with a ClosedChannelException once the close() was issued
             // See https://github.com/netty/netty/issues/2159
             if (!promise.isDone()) {
                 try {
                     writeTimedOut(ctx);
                 } catch (Throwable t) {
                     ctx.fireExceptionCaught(t);
                 }
             }
             removeWriteTimeoutTask(this);
         }

  7-10行,promise没有完成,触发WriteTimeoutException或其他异常。

13行,write已经完成,删除当前的WriteTimeoutTask对象。

   如果promise已经完成, 会调用operationComplete方法, 清理掉当前的WriteTimeoutTask对象。

        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            // scheduledFuture has already be set when reaching here
            scheduledFuture.cancel(false);
            removeWriteTimeoutTask(this);
        }

netty源码解解析(4.0)-17 ChannelHandler: IdleStateHandler实现的更多相关文章

  1. netty源码解解析(4.0)-16 ChannelHandler概览

    本章开始分析ChannelHandler实现代码.ChannelHandler是netty为开发者提供的实现定制业务的主要接口,开发者在使用netty时,最主要的工作就是实现自己的ChannelHan ...

  2. netty源码解解析(4.0)-11 Channel NIO实现-概览

      结构设计 Channel的NIO实现位于io.netty.channel.nio包和io.netty.channel.socket.nio包中,其中io.netty.channel.nio是抽象实 ...

  3. netty源码解解析(4.0)-10 ChannelPipleline的默认实现--事件传递及处理

    事件触发.传递.处理是DefaultChannelPipleline实现的另一个核心能力.在前面在章节中粗略地讲过了事件的处理流程,本章将会详细地分析其中的所有关键细节.这些关键点包括: 事件触发接口 ...

  4. netty源码解解析(4.0)-15 Channel NIO实现:写数据

    写数据是NIO Channel实现的另一个比较复杂的功能.每一个channel都有一个outboundBuffer,这是一个输出缓冲区.当调用channel的write方法写数据时,这个数据被一系列C ...

  5. netty源码解解析(4.0)-14 Channel NIO实现:读取数据

     本章分析Nio Channel的数据读取功能的实现. Channel读取数据需要Channel和ChannelHandler配合使用,netty设计数据读取功能包括三个要素:Channel, Eve ...

  6. netty源码解解析(4.0)-12 Channel NIO实现:channel初始化

    创建一个channel实例,并把它register到eventLoopGroup中之后,这个channel然后处于inactive状态,仍然是不可用的.只有在bind或connect方法调用成功之后才 ...

  7. netty源码解解析(4.0)-4 线程模型-概览

    netty线程体系概览 netty的高并发能力很大程度上由它的线程模型决定的,netty定义了两种类型的线程: I/O线程: EventLoop, EventLoopGroup.一个EventLoop ...

  8. netty源码解解析(4.0)-9 ChannelPipleline的默认实现-链表管理

    io.netty.channel.DefaultChannelPipeline implements ChannelPipleline   DefaultChannelPiple给出了ChannelP ...

  9. netty源码解解析(4.0)-7 线程模型-IO线程EventLoopGroup和NIO实现(二)

    把NIO事件转换成对channel unsafe的调用或NioTask的调用 processSelectedKeys()方法是处理NIO事件的入口: private void processSelec ...

  10. netty源码解解析(4.0)-6 线程模型-IO线程EventLoopGroup和NIO实现(一)

    接口定义 io.netty.channel.EventLoopGroup extends EventExecutorGroup 方法 说明 ChannelFuture register(Channel ...

随机推荐

  1. MSVC 报错 unable to use inline in declaration get error C2054

    晚上用cmake生成了一份lua-cjson的工程文件,msvc6的 编译时报错 后来再stackoverflow找到答案:unable to use inline in declaration ge ...

  2. C# DevExpress 的gridControl或gridView数据导出失败解决方法

    来自:http://blog.csdn.net/lybwwp/article/details/8049464 谢谢 在使用DevExpress 的GridPanel控件的时候出现了一个莫名其妙的现象, ...

  3. uva11624 - Fire!

    uva11624 - Fire! 火在蔓延,人在走.火会蔓延,不会熄灭,我们可以确定某个点着火的时间(广搜).对于J来说,要是他走到某点的时间比火蔓延到该点的时间要短,那么他走到该点的时候,火还没蔓延 ...

  4. js实现input输入框只能输入数字的功能

    <input type="text" style="ime-mode:disabled;" onpaste="return false;&quo ...

  5. JavaScript用法

    JavaScript 用法 JavaScript 语句,会在页面加载时执行. <body> 中的 JavaScript <!DOCTYPE html> <html> ...

  6. python第九天(9-34)

    一:队列的三种模式 先进先出(FIFO) class queue.Queue(maxsize) 后进先出(LIFO) class queue.LifoQueue(maxsize) 优先级顺序(优先级低 ...

  7. PS调出最美海滨城市俯拍照

    原图 一.找一张漂亮的风景照片,美丽的海滩. 二.打开PS做效果把图片放进去然后ctrl+j复制一层,添加滤镜-模糊-特殊模糊. 三.然后在这个图层的基础上添加滤镜-滤镜库-干画笔效果. 四.这个时候 ...

  8. Guava:好用的java类库 学习小记

    基础功能 google guava中定义的String操作 在google guava中为字符串操作提供了很大的便利,有老牌的判断字符串是否为空字符串或者为null,用指定字符填充字符串,以及拆分合并 ...

  9. ipv6无网络访问权限怎么办

    有时IP4和IP6都正常连接,但突然又出现“IPV6无网络访问权限” 这是win7系统下经常发生的事情,如下图. 方法/步骤 1.IPV6没网络权限是正常的因为你没有IPV6的网络环境,那个只有部分教 ...

  10. 给echarts加个“全屏展示”

    echarts的工具箱并没有提供放大/全屏的功能, 查找文档发现可自定义工具https://www.echartsjs.com/option.html#toolbox.feature show代码 t ...