• 传统的同步阻塞式I/O编程
  • 基于NIO的非阻塞编程
  • 基于NIO2.0的异步非阻塞(AIO)编程
  • 为什么要使用NIO编程
  • 为什么选择Netty

  第二章 NIO 入门

    2.1 传统的BIO编程

      2.1.1 BIO 通信模型图

      2.1.2 同步阻塞式I/O创建的TimeServer源码分析

package com.phei.netty.bio;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

public class TimeServer {
    public static void main(String[] args)throws IOException{
        int port = 8080;
        if(args != null && args.length > 0){
            try{
                port = Integer.valueOf(port);
            }catch(NumberFormatException e){
//                port = 8080;
            }
        }
        ServerSocket server = null;
        try{
//            如果端口合法且没有被占用,服务端监听成功
            server = new ServerSocket(port);
            System.out.println("The time server is start in port:" + port);
            Socket socket = null;
            while(true){
//                如果没有客户端接入,则主线程阻塞在ServerSocket的accept操作上
                socket = server.accept();
                new Thread(new TimeServerHandler(socket)).start();
            }
        }finally{
            if(server != null){
                System.out.println("The time server close");
                server.close();
                server = null;
            }
        }
    }
}
package com.phei.netty.bio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Date;

public class TimeServerHandler implements Runnable {

    private Socket socket;

    public TimeServerHandler() {
        // TODO Auto-generated constructor stub
    }

    public TimeServerHandler(Socket socket) {
        super();
        this.socket = socket;
    }

    @Override
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try{
//            输入流,获取客户端输出流信息
            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
//            输出流,放到客户端输入流中
            out = new PrintWriter(this.socket.getOutputStream(),true);
            String currentTime = null;
            String body = null;
            while(true){
//                获取客户端输出的信息
                body = in.readLine();
                if(body == null){
                    break;
                }
                System.out.println("The time server receive order : " + body);
                currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
//                发送信息到客户端输入流中
                out.println(currentTime);
            }
        }catch(Exception e){
            if(in != null){
                try{
                    in.close();
                }catch(IOException e1){
                    e1.printStackTrace();
                }
            }
            if(out != null){
                out.close();
                out = null;
            }
            if(this.socket != null){
                try{
                    this.socket.close();
                }catch(IOException e1){
                    e1.printStackTrace();
                }
                this.socket = null;
            }
        }
    }

}

      2.1.3 同步阻塞式I/O创建的TimeClient源码分析

package com.phei.netty.bio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class TimeClient {

    public static void main(String[] args){
        int port = 8080;
        if(args != null && args.length > 0){
            try{
                port = Integer.valueOf(args[0]);
            }catch(NumberFormatException e){

            }
        }
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try{
            socket = new Socket("127.0.0.1",port);
//            输入流,获取服务端输出流信息
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
//            输出流,放到服务端输入流中
            out = new PrintWriter(socket.getOutputStream(),true);
//            发送信息到服务端
            out.println("QUERY TIME ORDER");
            System.out.println("Send order 2 server succeed.");
//            读取输入流的信息
            String resp = in.readLine();
            System.out.println("Now is : " + resp);
        }catch(Exception e){

        }finally{
            if(out != null){
                out.close();
                out = null;
            }
            if(in != null){
                try{
                    in.close();
                }catch(IOException e){
                    e.printStackTrace();
                }
                in = null;
            }
            if(socket != null){
                try{
                    socket.close();
                }catch(IOException e){
                    e.printStackTrace();
                }
                socket = null;
            }
        }
    }
}

    2.2 伪异步I/O编程

      2.2.1 伪异步I/O模型图

      2.2.2 伪异步I/O创建的TimeServer源码分析

package com.phei.netty.pio;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

import com.phei.netty.bio.TimeServerHandler;

public class TimeServer {

    public static void main(String[] args) throws IOException{
        int port = 8080;
        if(args != null && args.length > 0){
            try{
                port = Integer.valueOf(args[0]);
            }catch(NumberFormatException e){

            }
        }
        ServerSocket server = null;
        try{
            server = new ServerSocket(port);
            System.out.println("The time server is start in port:" + port);
            Socket socket = null;
            //创建I/O任务线程池
            TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool(50,10000);
            while(true){
                socket = server.accept();
//                当接收到新的客户端连接时,将请求Socket封装成一个Task,然后调用线程池的execute方法执行,从而避免了每个请求接入都创建一个新的线程。
                singleExecutor.execute(new TimeServerHandler(socket));
            }
        }finally{
            if(server != null){
                System.out.println("The time server close");
                server.close();
                server = null;
            }
        }
    }
}
package com.phei.netty.pio;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TimeServerHandlerExecutePool {
    private ExecutorService executor;

    public TimeServerHandlerExecutePool(int maxPoolSize,int queueSize){
        executor = 
new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize)); } public void execute(Runnable task){ executor.execute(task); } }

      2.2.3 伪异步I/O弊端分析

    2.3 NIO编程

      2.3.1 NIO类库简介

        1.缓冲区Buffer

          ByteBuffer

          CharBuffer

          ShortBuffer

          IntBuffer

          LongBuffer

          FloatBuffer

          DoubleBuffer

        2.通道Channel

          网络读写:SelectableChannel

          文件操作:FileChannel

        3.多路复用器Selector

          多路复用器提供选择已经就绪的任务的能力:Selector回不断地轮询注册在其上的Channel,如果某个Channel上面发生读或者写时间,这个Channel就处                               于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。

      2.3.2 NIO服务端序列图

      2.3.3 NIO创建的TimeServer源码分析

package com.phei.netty.nio;

public class TimeServer {
    public static void main(String[] args){
        int port = 8080;
        if(args != null && args.length > 0){
            try{
                port = Integer.valueOf(args[0]);
            }catch(NumberFormatException e){

            }
        }
//        创建多路复用类MultiplexerTimeServer。它是一个独立的线程,负责轮询多路复用器Selctor,可以处理多个客户端的并发接入。
        MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
        new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
    }
}
package com.phei.netty.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

public class MultiplexerTimeServer implements Runnable {

    private Selector selector;
    private ServerSocketChannel servChannel;
    private volatile boolean stop;

    /*
     * 初始化多路复用器,绑定监听端口
     * 在构造方法中创建多路复用器Selector、ServerSocketChannel,对Channel和TCP参数进行配置。
     * 将ServlerSocketChannel设置为异步非阻塞模式,它的backlog设为1024。
     * 系统资源初始化成功后,将ServerSocketChannel注册到Selector,监听SelectionKey.OP_ACCEPT操作位。
     * 如果资源初始化失败,则退出。
     */
    public MultiplexerTimeServer(int port){
        try{
            //Opens a selector.
            selector = Selector.open();
            //Opens a server-socket channel.
            servChannel = ServerSocketChannel.open();
//            Adjusts this channel's non-blocking mode.
            servChannel.configureBlocking(false);
//            Retrieves a server socket associated with this channel.
//            Binds the ServerSocket to a specific address (IP address and port number).
//            1024 : requested maximum length of the queue of incoming connections.
            servChannel.socket().bind(new InetSocketAddress(port),1024);
//            Registers this channel with the given selector, returning a selection key.
//            SelectionKey.OP_ACCEPT : Operation-set bit for socket-accept operations.
            servChannel.register(selector,SelectionKey.OP_ACCEPT);
            System.out.println("The time server is start in port:" + port);
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void stop(){
        this.stop = true;
    }

    /**
     * 在线程的run方法的while循环体中循环遍历selector,它的休眠时间为1s。
     * 无论是否有读写等事件发生,selector每隔1s都被唤醒一次。
     * selector也提供了一个无参的select方法:当有处于就绪状态的Channel时,selector将返回该Channel的SelectionKey集合。
     * 通过对就绪状态的Channel集合进行迭代,可以进行网络的异步读写操作。
     */
    @Override
    public void run() {
        while(!stop){
            try{
//                Selects a set of keys whose corresponding channels are ready for I/O operations.
//                timeout - If positive, block for up to timeout milliseconds, more or less, while waiting for a channel to become ready; 
//          if zero, block indefinitely; must not be negative selector.select(1000); // Returns this selector's selected-key set. Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try{ handleInput(key); }catch(Exception e){ if(key != null){ // Requests that the registration of this key's channel with its selector be cancelled. key.cancel(); if(key.channel() != null){ key.channel().close(); } } } } }catch (Throwable t) { t.printStackTrace(); } } // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 if(selector != null){ try{ selector.close(); }catch(IOException e){ e.printStackTrace(); } } } /* * 处理新接入的客户端请求信息,根据SelectionKey的操作位进行判断即可获知网络时间的类型, * 通过ServerSocketChannel的accept接收客户端的连接请求并创建SocketChannel实例。 * 完成之后相当于完成了TCP的三次握手,TCP物理链路正是建立。 * 需要将新创建的SocketChannel设置为异步非阻塞,同时也可以对其TCP参数进行设置,如TCP接收和发送缓冲区的大小等。 */ private void handleInput(SelectionKey key) throws IOException{ // Tells whether or not this key is valid. if(key.isValid()){ // Tests whether this key's channel is ready to accept a new socket connection. // 处理新接入的请求消息 if(key.isAcceptable()){ // Returns the channel for which this key was created. ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); // Accepts a connection made to this channel's socket. SocketChannel sc = ssc.accept(); // Adjusts this channel's blocking mode. sc.configureBlocking(false); // Registers this channel with the given selector, returning a selection key. // The interest set for the resulting key sc.register(selector, SelectionKey.OP_READ); } /* * 用于读取客户端的请求消息。 * 首先创建一个ByteBuffer,由于事先无法得知客户端发送的码流大小,作为历程,开辟一个1MB的缓冲区。 * 然后调用SocketChannel的read方法读取请求码流。 */ // Tests whether this key's channel is ready for reading. if(key.isReadable()){ SocketChannel sc = (SocketChannel) key.channel(); // Allocates a new byte buffer. ByteBuffer readBuffer = ByteBuffer.allocate(1024); // Reads a sequence of bytes from this channel into the given buffer. int readBytes =sc.read(readBuffer); if(readBytes > 0){ // 对readBuffer进行flip操作,将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作。 // Flips this buffer. readBuffer.flip(); // 根据缓冲区刻度的字节个数创建字节数组 // Returns the number of elements between the current position and the limit. byte[] bytes = new byte[readBuffer.remaining()]; // 调用ByteBuffer的get操作将缓冲区可读字节数组复制到新创建的字节数组中 // Relative bulk get method. readBuffer.get(bytes); String body = new String(bytes,"UTF-8"); System.out.println("The time server receive order : " + body); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER"; doWrite(sc,currentTime); }else if(readBytes < 0){ // 对端链路关闭 key.cancel(); sc.close(); }else{ // 读到0字节,忽略 ; } } } } /* * 将应答消息异步发送给客户端。 * 由于SocketChannel是异步非阻塞的,它并不保证一次能够把需要发送的字节数组发送完,此时会出现“写半包”问题。 * 需要注册写操作,不断轮询Selector将没有发送完的ByteBuffer发送完毕, * 然后可以通过ByteBuffer的hasRemain()方法判断消息是否发送完成。 */ private void doWrite(SocketChannel channel,String response) throws IOException{ if(response != null && response.trim().length() > 0){ byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); // Relative bulk put method (optional operation). writeBuffer.put(bytes); writeBuffer.flip(); // 调用SocketChannel的write方法将缓冲区中的字节数组发送出去 // Writes a sequence of bytes to this channel from the given buffer. channel.write(writeBuffer); } } }

      2.3.4 NIO客户端序列图

      2.3.5 NIO创建的TimeClient源码分析

package com.phei.netty.nio;

public class TimeClient {

    public static void main(String[] args){
        int port = 8080;
        if(args != null && args.length > 0){
            try{
                port = Integer.valueOf(args[0]);
            }catch(NumberFormatException e){

            }
        }
        new Thread(new TimeClientHandle("127.0.0.1",port),"TimeClient-001").start();
    }
}
package com.phei.netty.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class TimeClientHandle implements Runnable {

    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;

    public TimeClientHandle() {
        super();
        // TODO Auto-generated constructor stub
    }

    /*
     * 使用构造函数初始化NIO的多路复用器和SocketChannel对象。
     * 创建SocketChannel之后将其设置为异步非阻塞模式。
     * 在此可以设置SocketChannel的TCP参数
     */
    public TimeClientHandle(String string, int port) {
        this.host = host == null ? "127.0.0.1" : host;
        this.port = port;
        try{
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }
    }

    /*
     * 作为示例,连接是成功的,所以不需要做重连操作,因此将其放到循环之前。
     */
    @Override
    public void run() {
        try{
//            如果连接成功、如果没有成功
            doConnect();
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }
//        轮询多路复用器Selector。当有就绪的Channel时,执行handleInput(key)方法。
        while(!stop){
            try{
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while(it.hasNext()){
                    key = it.next();
                    it.remove();
                    try{
//                        当有就绪的Channel时执行
                        handleInput(key);
                    }catch(Exception e){
                        if(key != null){
                            key.cancel();
                            if(key.channel() != null){
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch(Exception e){
                e.printStackTrace();
                System.exit(1);
            }
        }
//        多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
        if(selector != null){
            try{
                selector.close();
            }catch(IOException e){
                e.printStackTrace();
            }
        }
    }

    private void handleInput(SelectionKey key) throws IOException{
//        Tells whether or not this key is valid.
        if(key.isValid()){
//            Returns the channel for which this key was created.
            SocketChannel sc = (SocketChannel) key.channel();
//            判断是否连接成功
//            如果处于连接状态,说明服务端已经返回ACK应答消息。
//            Tests whether this key's channel has either finished, or failed to finish,
//            its socket-connection operation.
            if(key.isConnectable()){
//                调用SecoketChannel的finishConnect()方法。
//                如果返回值为true,说明客户端连接成功;
//                如果返回值为false或者抛出IOException,说明连接失败
//                Finishes the process of connecting a socket channel.
                if(sc.finishConnect()){
//                    将SocketChannel注册到多路复用器上,注册SelectionKey.OP_READ操作位,监听网络读操作,
//                    然后发送请求消息给服务端
                    sc.register(selector, SelectionKey.OP_READ);
//                    构造请求消息体,然后对其编码,写入到发送缓冲区中,最后调用SocketChannel的write方法进行发送。存在"半包写"
//                    最后通过hasRemaining()方法对发送结果进行判断
                    doWrite(sc);
                }else{
                    System.exit(1);//连接失败,进程退出
                }
//                测试此键的通道是否已准备好进行读取。
                if(key.isReadable()){
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
//                    将字节序列从此通道读入给定的缓冲区。
                    int readBytes = sc.read(readBuffer);
                    if(readBytes > 0){
//                        反转此缓冲区。
                        readBuffer.flip();
//                        返回当前位置与限制之间的元素数。
                        byte[] bytes = new byte[readBuffer.remaining()];
                        readBuffer.get(bytes);
                        String body = new String(bytes,"UTF-8");
                        System.out.println("Now is : " + body);
                        this.stop = true;
                    }else if(readBytes < 0){
                        //对端链路关闭
                        key.cancel();
                        sc.close();
                    }else{
                        ; //读到0字节,忽略
                    }
                }
            }
        }
    }

    private void doConnect() throws IOException{
//        如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
//        连接此通道的套接字
        if(socketChannel.connect(new InetSocketAddress(host, port))){
//            连接成功,将SocketChannel注册到多路复用器Selector上,注册SelectionKey.OP_READ
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        }else{
//            如果没有直接连接成功,则说明服务端没有返回TCP握手应答,但这并不代表连接失败。
//            需要将SocketChannel注册到多路复用器Selector上,注册SelectionKey.OP_CONNECT,
//            当服务端返回TCP syn-ack 消息后,Selector就能够轮询到这个SocketChannel处于连接就绪状态。
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }

    private void doWrite(SocketChannel sc) throws IOException{
        byte[] req = "QUERY TIME ORDER".getBytes();
//        分配一个新的字节缓冲区。
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
//        将字节序列从给定的缓冲区中写入此通道。
        sc.write(writeBuffer);
//        告知在当前位置和限制之间是否有元素。
        if(!writeBuffer.hasRemaining()){
//            如果缓冲区中的消息全部发送完成,打印
            System.out.println("Send order 2 server succeed.");
        }
    }
}

     服务端控制台:

The time server
The time server receive order : QUERY TIME ORDER

     客户端控制台:

Send  server succeed.

      socketChannel.connect(new InetSocketAddress(host, port):返回false

      key.isReadable():返回false

      WHY????????????????????????????

     2.4 AIO编程

      NIO 2.0 引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。

      异步通道提供一下两种方式获取操作结果:

        通过 java.util.concurrent.Future 类来表示异步操作的结果;

        在执行异步操作的时候传入一个 java.nio.channels.CompletionHandler接口的实现类作为操作完成的回调。

      NIO 2.0 的异步套接字通道是真正的异步非阻塞I/O,对应于UNIX网络编程中的事件驱动I/O(AIO)。它不需要通过多路复用器(Selector)对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO的编程模型。

      2.4.1 AIO创建的TimeServer源码分析

package com.phei.netty.aio;

public class TimeServer {

    public static void main(String[] args){
        int port = 8080;
        if(args != null && args.length > 0){
            try{
                port = Integer.valueOf(args[0]);
            }catch(NumberFormatException e){
                //采用默认值
            }
        }
//        创建异步的时间服务器处理类
        AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port);
//        启动线程
        new Thread(timeServer,"AIO-AsyncTimeServerHandler-001").start();
    }
}
package com.phei.netty.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;

public class AsyncTimeServerHandler implements Runnable {

    private int port;
    CountDownLatch latch;
    AsynchronousServerSocketChannel asynchronousServerSocketChannel;

//    创建一个异步的服务端通道AsynchronousServerSocketChannel,然后调用它的bind方法绑定监听端口。
    public AsyncTimeServerHandler(int port){
        this.port = port;
        try{
            asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
            asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
            System.out.println("The time server is start in port : " + port);
        }catch(IOException e){
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
//        在完成一组正在执行的操作之前,允许当前的线程一直阻塞。
//        在本例中,我们让线程在此阻塞,防止服务端执行完成退出。
//        在实际项目应用中,不需要启动独立的线程来处理AsynchronousServerSocketChannel,这里仅仅是个demo演示
        latch = new CountDownLatch(1);
//        用于接收客户端的连接,由于是异步操作,可以传递一个CompletionHandler<SsynchronousSocketChannel,? super A>类型的handler实例接收accept操作成功的通知消息
        doAccept();
        try{
            latch.await();
        }catch(InterruptedException e){
            e.printStackTrace();
        }
    }
    private void doAccept() {
        asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());
    }

}
package com.phei.netty.aio;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler> {

    @Override
    public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {
//        从attachment获取成员变量AsynchronousServerSocketChannel,然后继续调用她的accept方法
//        调用AsynchronousServerSocketChannel的accept方法后,
//        如果有新的客户端连接接入,系统将回调传入的CompletionHandler实例的completed方法,表示新的客户端已经接入成功。
//        因为一个AsynchronousServerSocketChannel可以接收成千上万个客户端,所以需要继续调用它的accept方法,
//        接收其他的客户端连接,最终形成一个循环。每当接收一个客户读连接成功之后,在异步连接新的客户端连接。
        attachment.asynchronousServerSocketChannel.accept(attachment, this);
//        链路建立成功之后,服务端需要接收客户端的请求消息。
//        创建新的ByteBuffer,预分配1MB的缓冲区。
        ByteBuffer buffer = ByteBuffer.allocate(1024);
//        通过调用AsynchronousSocketChannel的read方法进行异步读操作。
//        ByteBuffer dst : 接收缓冲区,用于从异步Channel中读取数据包;
//        A attachment : 异步Channel携带的附件,通知回调的时候作为入参使用;
//        CompletionHandler<Integer,? super A> : 接收通知回调的业务Handler,在例程中为ReadCOmpletionHandler
        result.read(buffer,buffer,new ReadCompletionHandler(result));
    }

    @Override
    public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
        exc.printStackTrace();
        attachment.latch.countDown();
    }

}
package com.phei.netty.aio;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Date;

public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {

    private AsynchronousSocketChannel channel;

//    将AsynchronousSocketChannel通过参数传递到ReadCompletionHandler中,
//    当作成员变量来使用,主要用于读取半包消息和发送应答
    public ReadCompletionHandler(AsynchronousSocketChannel channel) {
        if(this.channel == null){
            this.channel = channel;
        }
    }

//    读取到消息后的处理
    @Override
    public void completed(Integer result, ByteBuffer attachment) {
//        对attachment进行flip操作,为后续冲缓冲区读取数据做准备。
        attachment.flip();
//        根据缓冲区的刻度字节数创建byte数组
        byte[] body = new byte[attachment.remaining()];
        attachment.get(body);
        try{
//            通过new String方法创建请求消息,对请求消息进行判断
            String req = new String(body,"UTF-8");
            System.out.print("The time server receive order : " + req);
            String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
//            调用doWrite方法发送给客户端
            doWriter(currentTime);
        }catch(UnsupportedEncodingException e){
            e.printStackTrace();
        }
    }

    private void doWriter(String currentTime) {
//        对当前事件进行合法性校验
        if(currentTime != null && currentTime.trim().length() > 0){
//            调用字符串的解码方法将应答消息编码成字节数组,然后将它复制到发送缓冲区writeBuffer中
            byte[] bytes = currentTime.getBytes();
            ByteBuffer writerBuffer = ByteBuffer.allocate(bytes.length);
            writerBuffer.put(bytes);
            writerBuffer.flip();
//            调用AsynchronousSocketChannel的异步write方法
            channel.write(writerBuffer,writerBuffer,new CompletionHandler<Integer, ByteBuffer>() {

                @Override
                public void completed(Integer result, ByteBuffer buffer) {
//                    如果没有发送完成,继续发送,知道发送成功
                    if(buffer.hasRemaining()){
                        channel.write(buffer,buffer,this);
                    }
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    try{
                        channel.close();
                    }catch(IOException e){
                        //ingonre on close
                    }
                }
            });
        }
    }

//    当发送异常的时候,对异常Throwable进行判断:如果I/O异常,就关闭链路,释放资源;
//    如果是其他异常,按照业务自己的逻辑进行处理。本例程作为简单的demo,没有对异常进行分类判断,只要发生了读写异常,就关闭链路,释放资源。
    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        try{
            this.channel.close();
        }catch(IOException e){
            e.printStackTrace();
        }
    }

}

      2.4.2 AIO创建的TimeClient源码分析

      2.4.3 AIO版本时服务器的运行结果

    2.5 4中I/O的对比

      2.5.1 概念澄清

        1.异步非阻塞I/O

        2.多路复用器Selector

        3.伪异步I/O

      2.5.2 不同I/O模型对比

    2.6 选择Netty的理由

      2.6.1 不选择Java原生NIO编程的原因

      2.6.2 为什么选择Netty

    2.7 总结

    

  

随机推荐

  1. 微信聊天记录查看器(程序+源码) - iOS版

    本文版权归cxun所有,如有转载请注明出处与本文链接,谢谢!原文地址:http://www.cnblogs.com/cxun/p/4338643.html Updates [2016.10.14]感谢 ...

  2. JS - IE or not:判断是否为IE浏览器方法

    问题:使用JS判断是否为IE浏览器 方法: 1.IE='\v'=='v'  (失败!) if('\v'=='v') // true only in IE 2.IE=(!+"\v1" ...

  3. [转]epoll技术

    在linux的网络编程中,很长的时间都在使用select来做事件触发.在linux新的内核中,有了一种替换它的机制,就是epoll. 相比于select,epoll最大的好处在于它不会随着监听fd数目 ...

  4. codeforces #305 D Mike and Fish

    正解貌似是大暴搜? 首先我们考虑这是一个二分图,建立网络流模型后很容易得出一个算法 S->行 容量为Num[X]/2; 行->列 容量为1 且要求(x,y)这个点存在 列->T 容量 ...

  5. CTE在Oracle和Sqlserver中使用的差异

    CTE是一个很好用的工具,他可以帮助我们清晰代码结构,减少临时表使用,同时oracle和sqlserver都提供支持.但在oracle和sqlserver中使用CTE也存在一定区别. Oracle使用 ...

  6. 【异常处理】java.lang.NoClassDefFoundError

    Exception in thread"main" java.lang.NoClassDefFoundError:org/apache/commons/lang/exception ...

  7. 理解Ajax

    1.优化原则 优化的目的是希望降低程序的整体开销.虽然在程序中有许多因素可以优化,但是通常人们会认为这个开销就是程序的执行时间.其实我们更应该把重点放在对程序整体开销最大的那部分.   2.一切都是权 ...

  8. [Swust OJ 541]--排列字典序问题

    题目链接:http://acm.swust.edu.cn/problem/0541/ Time limit(ms): 2000 Memory limit(kb): 65535 n个元素{1,2,... ...

  9. jquery+css3打造一款ajax分页插件

    原文:[原创]jquery+css3打造一款ajax分页插件 最近公司的项目将好多分页改成了ajax的前台分页以前写的分页插件就不好用了,遂重写一个 支持IE6+,但没有动画效果如果没有硬需求,个人认 ...

  10. 编译原理子cygwin的使用

    目的:熟悉cygwin环境的使用,学习使用lex写简单的词法分析程序,会在cygwin环境下使用flex调试lex写的程序 内容:使用cygwin下的flex工具将exam1.l和exam2.l编译并 ...