转载请注明出处:http://blog.csdn.net/ns_code/article/details/17465497

 

Executor框架简介

Java 5之后,并发编程引入了一堆新的启动、调度和管理线程的API。Executor框架便是Java 5中引入的,其内部使用了线程池机制,它在java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。因此,在Java 5之后,通过Executor来启动线程比使用Thread的start方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免this逃逸问题——如果我们在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始执行,此时可能会访问到初始化了一半的对象用Executor在构造器中。

Executor框架包括:线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。

Executor接口中之定义了一个方法execute(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类。ExecutorService接口继承自Executor接口,它提供了更丰富的实现多线程的方法,比如,ExecutorService提供了关闭自己的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 可以调用ExecutorService的shutdown()方法来平滑地关闭 ExecutorService,调用该方法后,将导致ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。因此我们一般用该接口来实现和管理多线程。

ExecutorService的生命周期包括三种状态:运行、关闭、终止。创建后便进入运行状态,当调用了shutdown()方法时,便进入关闭状态,此时意味着ExecutorService不再接受新的任务,但它还在执行已经提交了的任务,当素有已经提交了的任务执行完后,便到达终止状态。如果不调用shutdown()方法,ExecutorService会一直处在运行状态,不断接收新的任务,执行新的任务,服务器端一般不需要关闭它,保持一直运行即可。

Executor执行Runnable任务

ExecutorService实例,而后调用该实例的execute(Runnable command)方法即可。一旦Runnable任务传递到execute()方法,该方法便会自动在一个线程上执行。下面是是Executor执行Runnable任务的示例代码:

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;   

    public class TestCachedThreadPool{
        public static void main(String[] args){
            ExecutorService executorService = Executors.newCachedThreadPool();
    //      ExecutorService executorService = Executors.newFixedThreadPool(5);  //创建不同类型的线程池
    //      ExecutorService executorService = Executors.newSingleThreadExecutor();
            for (int i = 0; i < 5; i++){
                executorService.execute(new TestRunnable());
                System.out.println("************* a" + i + " *************");
            }
            executorService.shutdown();
        }
    }   

    class TestRunnable implements Runnable{
        public void run(){
            System.out.println(Thread.currentThread().getName() + "线程被调用了。");
        }
    }  

Executor执行Callable任务

在Java 5之后,任务分两类:一类是实现了Runnable接口的类,一类是实现了Callable接口的类。两者都可以被ExecutorService执行,但是Runnable任务没有返回值,而Callable任务有返回值。并且Callable的call()方法只能通过ExecutorService的submit(Callable<T> task) 方法来执行,并且返回一个 <T>Future<T>,是表示任务等待完成的 Future。

Callable接口类似于Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable 不会返回结果,并且无法抛出经过检查的异常而Callable又返回结果,而且当获取返回结果时可能会抛出异常。Callable中的call()方法类似Runnable的run()方法,区别同样是有返回值,后者没有。

当将一个Callable的对象传递给ExecutorService的submit方法,则该call方法自动在一个线程上执行,并且会返回执行结果Future对象。同样,将Runnable的对象传递给ExecutorService的submit方法,则该run方法自动在一个线程上执行,并且会返回执行结果Future对象,但是在该Future对象上调用get方法,将返回null。

下面给出一个Executor执行Callable任务的示例代码:

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;   

    public class CallableDemo{
        public static void main(String[] args){
            ExecutorService executorService = Executors.newCachedThreadPool();
            List<Future<String>> resultList = new ArrayList<Future<String>>();   

            //创建10个任务并执行
            for (int i = 0; i < 10; i++){
                //使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中
                Future<String> future = executorService.submit(new TaskWithResult(i));
                //将任务执行结果存储到List中
                resultList.add(future);
            }   

            //遍历任务的结果
            for (Future<String> fs : resultList){
                    try{
                        while(!fs.isDone);//Future返回如果没有完成,则一直循环等待,直到Future返回完成
                        System.out.println(fs.get());     //打印各个线程(任务)执行的结果
                    }catch(InterruptedException e){
                        e.printStackTrace();
                    }catch(ExecutionException e){
                        e.printStackTrace();
                    }finally{
                        //启动一次顺序关闭,执行以前提交的任务,但不接受新任务
                        executorService.shutdown();
                    }
            }
        }
    }   

    class TaskWithResult implements Callable<String>{
        private int id;   

        public TaskWithResult(int id){
            this.id = id;
        }   

        /**
         * 任务的具体过程,一旦任务传给ExecutorService的submit方法,
         * 则该方法自动在一个线程上执行
         */
        public String call() throws Exception {
            System.out.println("call()方法被自动调用!!!    " + Thread.currentThread().getName());
            //该返回结果将被Future的get方法得到
            return "call()方法被自动调用,任务返回的结果是:" + id + "    " + Thread.currentThread().getName();
        }
    }  

自定义线程池,可以用ThreadPoolExecutor类创建,它有多个构造方法来创建线程池,用该类很容易实现自定义的线程池,这里先贴上示例程序:

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;   

    public class ThreadPoolTest{
        public static void main(String[] args){
            //创建等待队列
            BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);
            //创建线程池,池中保存的线程数为3,允许的最大线程数为5
            ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);
            //创建七个任务
            Runnable t1 = new MyThread();
            Runnable t2 = new MyThread();
            Runnable t3 = new MyThread();
            Runnable t4 = new MyThread();
            Runnable t5 = new MyThread();
            Runnable t6 = new MyThread();
            Runnable t7 = new MyThread();
            //每个任务会在一个线程上执行
            pool.execute(t1);
            pool.execute(t2);
            pool.execute(t3);
            pool.execute(t4);
            pool.execute(t5);
            pool.execute(t6);
            pool.execute(t7);
            //关闭线程池
            pool.shutdown();
        }
    }   

    class MyThread implements Runnable{
        @Override
        public void run(){
            System.out.println(Thread.currentThread().getName() + "正在执行。。。");
            try{
                Thread.sleep(100);
            }catch(InterruptedException e){
                e.printStackTrace();
            }
        }
    }  

public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long         keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)

maximumPoolSize:池中允许的最大线程数。

keepAliveTime:线程池中的空闲线程所能持续的最长时间。

unit:持续时间的单位。

workQueue:任务执行前保存任务的队列,仅保存由execute方法提交的Runnable任务。

根据ThreadPoolExecutor源码前面大段的注释,我们可以看出,当试图通过excute方法讲一个Runnable任务添加到线程池中时,按照如下顺序来处理:

1、如果线程池中的线程数量少于corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务;

2、如果线程池中的线程数量大于等于corePoolSize,但缓冲队列workQueue未满,则将新添加的任务放到workQueue中,按照FIFO的原则依次等待执行(线程池中有线程空闲出来后依次将缓冲队列中的任务交付给空闲的线程执行);

3、如果线程池中的线程数量大于等于corePoolSize,且缓冲队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;

4、如果线程池中的线程数量等于了maximumPoolSize,有4种才处理方式(该构造方法调用了含有5个参数的构造方法,并将最后一个构造方法为RejectedExecutionHandler类型,它在处理线程溢出时有4种方式,这里不再细说,要了解的,自己可以阅读下源码)。

总结起来,也即是说,当有新的任务要处理时,先看线程池中的线程数量是否大于corePoolSize,再看缓冲队列workQueue是否满,最后看线程池中的线程数量是否大于maximumPoolSize。

另外,当线程池中的线程数量大于corePoolSize时,如果里面有线程的空闲时间超过了keepAliveTime,就将其移除线程池,这样,可以动态地调整线程池中线程的数量。

我们大致来看下Executors的源码,newCachedThreadPool的不带RejectedExecutionHandler参数(即第五个参数,线程数量超过maximumPoolSize时,指定处理方式)的构造方法如下:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }  

它将corePoolSize设定为0,而将maximumPoolSize设定为了Integer的最大值,线程空闲超过60秒,将会从线程池中移除。由于核心线程数为0,因此每次添加任务,都会先从线程池中找空闲线程,如果没有就会创建一个线程(SynchronousQueue<Runnalbe>决定的,后面会说)来执行新的任务,并将该线程加入到线程池中,而最大允许的线程数为Integer的最大值,因此这个线程池理论上可以不断扩大。

再来看newFixedThreadPool的不带RejectedExecutionHandler参数的构造方法,如下:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }  

它将corePoolSize和maximumPoolSize都设定为了nThreads,这样便实现了线程池的大小的固定,不会动态地扩大,另外,keepAliveTime设定为了0,也就是说线程只要空闲下来,就会被移除线程池,敢于LinkedBlockingQueue下面会说。

下面说说几种排队的策略:

1、直接提交。缓冲队列采用 SynchronousQueue,它将任务直接交给线程处理而不保持它们。如果不存在可用于立即运行任务的线程(即线程池中的线程都在工作),则试图把任务加入缓冲队列将会失败,因此会构造一个新的线程来处理新添加的任务,并将其加入到线程池中。直接提交通常要求无界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒绝新提交的任务。newCachedThreadPool采用的便是这种策略。

2、无界队列。使用无界队列(典型的便是采用预定义容量的 LinkedBlockingQueue,理论上是该缓冲队列可以对无限多的任务排队)将导致在所有 corePoolSize 线程都工作的情况下将新任务加入到缓冲队列中。这样,创建的线程就不会超过 corePoolSize,也因此,maximumPoolSize 的值也就无效了。当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列。newFixedThreadPool采用的便是这种策略。

3、有界队列。当使用有限的 maximumPoolSizes 时,有界队列(一般缓冲队列使用ArrayBlockingQueue,并制定队列的最大长度)有助于防止资源耗尽,但是可能较难调整和控制,队列大小和最大池大小需要相互折衷,需要设定合理的参数。

java并发编程(十七)Executor框架和线程池的更多相关文章

  1. Java并发编程(您不知道的线程池操作)

    Java并发编程(您不知道的线程池操作) 这几篇博客,一直在谈线程,设想一下这个场景,如果并发的线程很多,然而每个线程如果执行的时间很多的话,这样的话,就会大量的降低系统的效率.这时候就可以采用线程池 ...

  2. 转:【Java并发编程】之十九:并发新特性—Executor框架与线程池(含代码)

      Executor框架简介 在Java5之后,并发编程引入了一堆新的启动.调度和管理线程的API.Executor框架便是Java 5中引入的,其内部使用了线程池机制,它在java.util.coc ...

  3. Java 并发编程&mdash;&mdash;Executor框架和线程池原理

    Eexecutor作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务 ...

  4. 《Java并发编程实战》学习笔记 线程安全、共享对象和组合对象

    Java Concurrency in Practice,一本完美的Java并发参考手册. 查看豆瓣读书 推荐:InfoQ迷你书<Java并发编程的艺术> 第一章 介绍 线程的优势:充分利 ...

  5. 读书笔记-----Java并发编程实战(一)线程安全性

    线程安全类:在线程安全类中封装了必要的同步机制,客户端无须进一步采取同步措施 示例:一个无状态的Servlet @ThreadSafe public class StatelessFactorizer ...

  6. Java并发编程--Fork/Join框架使用

    上篇博客我们介绍了通过CyclicBarrier使线程同步,可是上述方法存在一个问题,那就是假设一个大任务跑了2个线程去完毕.假设线程2耗时比线程1多2倍.线程1完毕后必须等待线程2完毕.等待的过程线 ...

  7. Java并发编程(三)后台线程(Daemon Thread)

    后台线程,守护线程(Daemon Thread) 所谓的后台线程,就是指这种线程并不属于程序中不可或缺的部分,因此当所有的非后台线程结束时,程序也就终止了,同时会杀死进程中的所有后台线程.通过setD ...

  8. 《Java并发编程实战》第二章 线程安全性 读书笔记

    一.什么是线程安全性 编写线程安全的代码 核心在于要对状态訪问操作进行管理. 共享,可变的状态的訪问 - 前者表示多个线程訪问, 后者声明周期内发生改变. 线程安全性 核心概念是正确性.某个类的行为与 ...

  9. Java 并发编程(三)为线程安全类中加入新的原子操作

    Java 类库中包括很多实用的"基础模块"类.通常,我们应该优先选择重用这些现有的类而不是创建新的类.:重用能减少开发工作量.开发风险(由于现有类都已经通过測试)以及维护成本.有时 ...

随机推荐

  1. Tomcat如何配置环境变量

    1, JDK:版本为jdk-7-windows-i586.exe 下载地址: http://www.oracle.com/technetwork/java/javase/downloads/index ...

  2. Git是个好工具

    Git是分布式版本控制系统,我们常用的版本控制工具还有SVN.这里就得区分下什么是分布式版本控制系统,什么是集中化的版本控制系统. 集中化的版本控制系统 集中化的版本控制系统( Centralized ...

  3. mybatis oracle BLOB类型字段保存与读取

    一.BLOB字段 BLOB是指二进制大对象也就是英文Binary Large Object的所写,而CLOB是指大字符对象也就是英文Character Large Object的所写.其中BLOB是用 ...

  4. AsyncTask的缺陷和注意事项

    1. AsyncTask 主要是用来处理后台耗时操作,并将数据更新到主线程的一个工具类. AsyncTask的执行分为四个步骤,每一步都对应一个回调方法,这些方法不应该由应用程序调用,开发者需要做的就 ...

  5. 转载--CentOS 6.3下部署LVS(NAT)+keepalived实现高性能高可用负载均衡

    源地址:http://www.cnblogs.com/mchina/archive/2012/08/27/2644391.html 一.简介 VS/NAT原理图: 二.系统环境 实验拓扑: 系统平台: ...

  6. SharePoint 2013中Office Web Apps的一次排错

    转自http://www.cnblogs.com/awpatp/archive/2013/06/06/3121420.html, 仅供自己查看 笔者尝试在自己的测试环境中为SharePoint 201 ...

  7. windows git的安装配置(转)

    Win7上Git安装及配置过程 http://www.cnblogs.com/sunny5156/archive/2012/10/23/2735799.html   对于需要使用Putty登录的参见 ...

  8. php部分---人员表和民族表的显示、修改、删除

    1.连接数据库 进行网页的显示 <table width="100%" border="1" cellpadding="0" cell ...

  9. URAL 1242 Werewolf(DFS)

    Werewolf Time limit: 1.0 secondMemory limit: 64 MB Knife. Moonlit night. Rotten stump with a short b ...

  10. 关于java.lang.String理解中的一些难点

    最近温习java的一些基础知识,发现以往对String对象认识上的一些不足.特汇总如下,主要是帮助记忆,如能对其他朋友有些启发,不胜欣喜. String在JVM中内存驻留问题 JVM的常量区(Cons ...