转载请注明出处:http://blog.csdn.net/ns_code/article/details/17465497

 

Executor框架简介

Java 5之后,并发编程引入了一堆新的启动、调度和管理线程的API。Executor框架便是Java 5中引入的,其内部使用了线程池机制,它在java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。因此,在Java 5之后,通过Executor来启动线程比使用Thread的start方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免this逃逸问题——如果我们在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始执行,此时可能会访问到初始化了一半的对象用Executor在构造器中。

Executor框架包括:线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。

Executor接口中之定义了一个方法execute(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类。ExecutorService接口继承自Executor接口,它提供了更丰富的实现多线程的方法,比如,ExecutorService提供了关闭自己的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 可以调用ExecutorService的shutdown()方法来平滑地关闭 ExecutorService,调用该方法后,将导致ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。因此我们一般用该接口来实现和管理多线程。

ExecutorService的生命周期包括三种状态:运行、关闭、终止。创建后便进入运行状态,当调用了shutdown()方法时,便进入关闭状态,此时意味着ExecutorService不再接受新的任务,但它还在执行已经提交了的任务,当素有已经提交了的任务执行完后,便到达终止状态。如果不调用shutdown()方法,ExecutorService会一直处在运行状态,不断接收新的任务,执行新的任务,服务器端一般不需要关闭它,保持一直运行即可。

Executor执行Runnable任务

ExecutorService实例,而后调用该实例的execute(Runnable command)方法即可。一旦Runnable任务传递到execute()方法,该方法便会自动在一个线程上执行。下面是是Executor执行Runnable任务的示例代码:

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;   

    public class TestCachedThreadPool{
        public static void main(String[] args){
            ExecutorService executorService = Executors.newCachedThreadPool();
    //      ExecutorService executorService = Executors.newFixedThreadPool(5);  //创建不同类型的线程池
    //      ExecutorService executorService = Executors.newSingleThreadExecutor();
            for (int i = 0; i < 5; i++){
                executorService.execute(new TestRunnable());
                System.out.println("************* a" + i + " *************");
            }
            executorService.shutdown();
        }
    }   

    class TestRunnable implements Runnable{
        public void run(){
            System.out.println(Thread.currentThread().getName() + "线程被调用了。");
        }
    }  

Executor执行Callable任务

在Java 5之后,任务分两类:一类是实现了Runnable接口的类,一类是实现了Callable接口的类。两者都可以被ExecutorService执行,但是Runnable任务没有返回值,而Callable任务有返回值。并且Callable的call()方法只能通过ExecutorService的submit(Callable<T> task) 方法来执行,并且返回一个 <T>Future<T>,是表示任务等待完成的 Future。

Callable接口类似于Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable 不会返回结果,并且无法抛出经过检查的异常而Callable又返回结果,而且当获取返回结果时可能会抛出异常。Callable中的call()方法类似Runnable的run()方法,区别同样是有返回值,后者没有。

当将一个Callable的对象传递给ExecutorService的submit方法,则该call方法自动在一个线程上执行,并且会返回执行结果Future对象。同样,将Runnable的对象传递给ExecutorService的submit方法,则该run方法自动在一个线程上执行,并且会返回执行结果Future对象,但是在该Future对象上调用get方法,将返回null。

下面给出一个Executor执行Callable任务的示例代码:

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;   

    public class CallableDemo{
        public static void main(String[] args){
            ExecutorService executorService = Executors.newCachedThreadPool();
            List<Future<String>> resultList = new ArrayList<Future<String>>();   

            //创建10个任务并执行
            for (int i = 0; i < 10; i++){
                //使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中
                Future<String> future = executorService.submit(new TaskWithResult(i));
                //将任务执行结果存储到List中
                resultList.add(future);
            }   

            //遍历任务的结果
            for (Future<String> fs : resultList){
                    try{
                        while(!fs.isDone);//Future返回如果没有完成,则一直循环等待,直到Future返回完成
                        System.out.println(fs.get());     //打印各个线程(任务)执行的结果
                    }catch(InterruptedException e){
                        e.printStackTrace();
                    }catch(ExecutionException e){
                        e.printStackTrace();
                    }finally{
                        //启动一次顺序关闭,执行以前提交的任务,但不接受新任务
                        executorService.shutdown();
                    }
            }
        }
    }   

    class TaskWithResult implements Callable<String>{
        private int id;   

        public TaskWithResult(int id){
            this.id = id;
        }   

        /**
         * 任务的具体过程,一旦任务传给ExecutorService的submit方法,
         * 则该方法自动在一个线程上执行
         */
        public String call() throws Exception {
            System.out.println("call()方法被自动调用!!!    " + Thread.currentThread().getName());
            //该返回结果将被Future的get方法得到
            return "call()方法被自动调用,任务返回的结果是:" + id + "    " + Thread.currentThread().getName();
        }
    }  

自定义线程池,可以用ThreadPoolExecutor类创建,它有多个构造方法来创建线程池,用该类很容易实现自定义的线程池,这里先贴上示例程序:

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;   

    public class ThreadPoolTest{
        public static void main(String[] args){
            //创建等待队列
            BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);
            //创建线程池,池中保存的线程数为3,允许的最大线程数为5
            ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);
            //创建七个任务
            Runnable t1 = new MyThread();
            Runnable t2 = new MyThread();
            Runnable t3 = new MyThread();
            Runnable t4 = new MyThread();
            Runnable t5 = new MyThread();
            Runnable t6 = new MyThread();
            Runnable t7 = new MyThread();
            //每个任务会在一个线程上执行
            pool.execute(t1);
            pool.execute(t2);
            pool.execute(t3);
            pool.execute(t4);
            pool.execute(t5);
            pool.execute(t6);
            pool.execute(t7);
            //关闭线程池
            pool.shutdown();
        }
    }   

    class MyThread implements Runnable{
        @Override
        public void run(){
            System.out.println(Thread.currentThread().getName() + "正在执行。。。");
            try{
                Thread.sleep(100);
            }catch(InterruptedException e){
                e.printStackTrace();
            }
        }
    }  

public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long         keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)

maximumPoolSize:池中允许的最大线程数。

keepAliveTime:线程池中的空闲线程所能持续的最长时间。

unit:持续时间的单位。

workQueue:任务执行前保存任务的队列,仅保存由execute方法提交的Runnable任务。

根据ThreadPoolExecutor源码前面大段的注释,我们可以看出,当试图通过excute方法讲一个Runnable任务添加到线程池中时,按照如下顺序来处理:

1、如果线程池中的线程数量少于corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务;

2、如果线程池中的线程数量大于等于corePoolSize,但缓冲队列workQueue未满,则将新添加的任务放到workQueue中,按照FIFO的原则依次等待执行(线程池中有线程空闲出来后依次将缓冲队列中的任务交付给空闲的线程执行);

3、如果线程池中的线程数量大于等于corePoolSize,且缓冲队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;

4、如果线程池中的线程数量等于了maximumPoolSize,有4种才处理方式(该构造方法调用了含有5个参数的构造方法,并将最后一个构造方法为RejectedExecutionHandler类型,它在处理线程溢出时有4种方式,这里不再细说,要了解的,自己可以阅读下源码)。

总结起来,也即是说,当有新的任务要处理时,先看线程池中的线程数量是否大于corePoolSize,再看缓冲队列workQueue是否满,最后看线程池中的线程数量是否大于maximumPoolSize。

另外,当线程池中的线程数量大于corePoolSize时,如果里面有线程的空闲时间超过了keepAliveTime,就将其移除线程池,这样,可以动态地调整线程池中线程的数量。

我们大致来看下Executors的源码,newCachedThreadPool的不带RejectedExecutionHandler参数(即第五个参数,线程数量超过maximumPoolSize时,指定处理方式)的构造方法如下:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }  

它将corePoolSize设定为0,而将maximumPoolSize设定为了Integer的最大值,线程空闲超过60秒,将会从线程池中移除。由于核心线程数为0,因此每次添加任务,都会先从线程池中找空闲线程,如果没有就会创建一个线程(SynchronousQueue<Runnalbe>决定的,后面会说)来执行新的任务,并将该线程加入到线程池中,而最大允许的线程数为Integer的最大值,因此这个线程池理论上可以不断扩大。

再来看newFixedThreadPool的不带RejectedExecutionHandler参数的构造方法,如下:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }  

它将corePoolSize和maximumPoolSize都设定为了nThreads,这样便实现了线程池的大小的固定,不会动态地扩大,另外,keepAliveTime设定为了0,也就是说线程只要空闲下来,就会被移除线程池,敢于LinkedBlockingQueue下面会说。

下面说说几种排队的策略:

1、直接提交。缓冲队列采用 SynchronousQueue,它将任务直接交给线程处理而不保持它们。如果不存在可用于立即运行任务的线程(即线程池中的线程都在工作),则试图把任务加入缓冲队列将会失败,因此会构造一个新的线程来处理新添加的任务,并将其加入到线程池中。直接提交通常要求无界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒绝新提交的任务。newCachedThreadPool采用的便是这种策略。

2、无界队列。使用无界队列(典型的便是采用预定义容量的 LinkedBlockingQueue,理论上是该缓冲队列可以对无限多的任务排队)将导致在所有 corePoolSize 线程都工作的情况下将新任务加入到缓冲队列中。这样,创建的线程就不会超过 corePoolSize,也因此,maximumPoolSize 的值也就无效了。当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列。newFixedThreadPool采用的便是这种策略。

3、有界队列。当使用有限的 maximumPoolSizes 时,有界队列(一般缓冲队列使用ArrayBlockingQueue,并制定队列的最大长度)有助于防止资源耗尽,但是可能较难调整和控制,队列大小和最大池大小需要相互折衷,需要设定合理的参数。

java并发编程(十七)Executor框架和线程池的更多相关文章

  1. Java并发编程(您不知道的线程池操作)

    Java并发编程(您不知道的线程池操作) 这几篇博客,一直在谈线程,设想一下这个场景,如果并发的线程很多,然而每个线程如果执行的时间很多的话,这样的话,就会大量的降低系统的效率.这时候就可以采用线程池 ...

  2. 转:【Java并发编程】之十九:并发新特性—Executor框架与线程池(含代码)

      Executor框架简介 在Java5之后,并发编程引入了一堆新的启动.调度和管理线程的API.Executor框架便是Java 5中引入的,其内部使用了线程池机制,它在java.util.coc ...

  3. Java 并发编程&mdash;&mdash;Executor框架和线程池原理

    Eexecutor作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务 ...

  4. 《Java并发编程实战》学习笔记 线程安全、共享对象和组合对象

    Java Concurrency in Practice,一本完美的Java并发参考手册. 查看豆瓣读书 推荐:InfoQ迷你书<Java并发编程的艺术> 第一章 介绍 线程的优势:充分利 ...

  5. 读书笔记-----Java并发编程实战(一)线程安全性

    线程安全类:在线程安全类中封装了必要的同步机制,客户端无须进一步采取同步措施 示例:一个无状态的Servlet @ThreadSafe public class StatelessFactorizer ...

  6. Java并发编程--Fork/Join框架使用

    上篇博客我们介绍了通过CyclicBarrier使线程同步,可是上述方法存在一个问题,那就是假设一个大任务跑了2个线程去完毕.假设线程2耗时比线程1多2倍.线程1完毕后必须等待线程2完毕.等待的过程线 ...

  7. Java并发编程(三)后台线程(Daemon Thread)

    后台线程,守护线程(Daemon Thread) 所谓的后台线程,就是指这种线程并不属于程序中不可或缺的部分,因此当所有的非后台线程结束时,程序也就终止了,同时会杀死进程中的所有后台线程.通过setD ...

  8. 《Java并发编程实战》第二章 线程安全性 读书笔记

    一.什么是线程安全性 编写线程安全的代码 核心在于要对状态訪问操作进行管理. 共享,可变的状态的訪问 - 前者表示多个线程訪问, 后者声明周期内发生改变. 线程安全性 核心概念是正确性.某个类的行为与 ...

  9. Java 并发编程(三)为线程安全类中加入新的原子操作

    Java 类库中包括很多实用的"基础模块"类.通常,我们应该优先选择重用这些现有的类而不是创建新的类.:重用能减少开发工作量.开发风险(由于现有类都已经通过測试)以及维护成本.有时 ...

随机推荐

  1. Js中caller和callee的区别

    1 :caller 返回一个调用当前函数的引用 如果是由顶层调用的话 则返回null (举个栗子哈 caller给你打电话的人  谁给你打电话了 谁调用了你 很显然是下面a函数的执行 只有在打电话的时 ...

  2. [翻译]AKKA笔记 - LOGGING与测试ACTORS -2 (一)

    在前两章 ( 一 , 二 ) ,我们大致讲了Actor和message是怎么工作的,让我们看一下日志和测试我们的 TeacherActor . RECAP 这是上一节我们的Actor代码: class ...

  3. HTML学习笔记——选择器

    1> ID选择器.交叉选择器.群组选择器.子代选择器 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN& ...

  4. 【SIGGRAPH】用【有说服力的照片真实】技术实现最终幻想15的视觉特效

    原文:西川善司 http://www.4gamer.net/games/075/G007535/20160726064/   最终幻想15的演讲会场.相当大,听众非常多.      在本次计算机图形和 ...

  5. centos7通过yum安装mysql,并授权远程连接

    安装: CentOS 7的yum源中没有正常安装MySQL的mysql-sever文件,需要去官网上下载(通过安装mysql的yum容器,再通过yum安装mysql) 注:安装前,需要卸载所有的mar ...

  6. 学习WordPress必须知道的函数(转)

    WordPress是目前十分流行的独立博客程序,因傻瓜化安装和使用,其在网民中的应用已近乎普及.但也因为很多新入门的用户几乎对WordPress 程序没有任何了解,造成使用中碰到问题无法解决,求助也十 ...

  7. 58. Length of Last Word

    题目: Given a string s consists of upper/lower-case alphabets and empty space characters ' ', return t ...

  8. 转:Android 2.3 代码混淆proguard技术介绍

    ProGuard简介 ProGuard是一个SourceForge上非常知名的开源项目.官网网址是:http://proguard.sourceforge.net/. Java的字节码一般是非常容易反 ...

  9. Drupal安装及使用问题解决列表

    #1. 启动 Clean URL 修改Apache的配置文件(如httpd.conf),打开 LoadModule rewrite_module modules/mod_rewrite.so选项.然后 ...

  10. (转)JS获取当前对象大小以及屏幕分辨率等

    Code highlighting produced by Actipro CodeHighlighter (freeware)http://www.CodeHighlighter.com/--> ...