并发编程(十二)—— Java 线程池 实现原理与源码深度解析 之 submit 方法 (二)
在上一篇《并发编程(十一)—— Java 线程池 实现原理与源码深度解析(一)》中提到了线程池ThreadPoolExecutor的原理以及它的execute方法。这篇文章是接着上一篇文章写的,如果你没有阅读上一篇文章,建议你去读读。本文解析ThreadPoolExecutor#submit。
对于一个任务的执行有时我们不需要它返回结果,但是有我们需要它的返回执行结果。对于线程来讲,如果不需要它返回结果则实现Runnable,而如果需要执行结果的话则可以实现Callable。在线程池同样execute提供一个不需要返回结果的任务执行,而对于需要结果返回的则可调用其submit方法。
AbstractExecutorService
我们把上一篇文章的代码贴过来
public abstract class AbstractExecutorService implements ExecutorService { // RunnableFuture 是用于获取执行结果的,我们常用它的子类 FutureTask
// 下面两个 newTaskFor 方法用于将我们的任务包装成 FutureTask 提交到线程池中执行
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
} protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
} // 提交任务
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 2. 交给执行器执行,execute 方法由具体的子类来实现
// 前面也说了,FutureTask 间接实现了Runnable 接口。
execute(ftask);
return ftask;
} public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task, result);
// 2. 交给执行器执行
execute(ftask);
return ftask;
} public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 1. 将任务包装成 FutureTask
RunnableFuture<T> ftask = newTaskFor(task);
// 2. 交给执行器执行
execute(ftask);
return ftask;
}
}
尽管submit方法能提供线程执行的返回值,但只有实现了Callable才会有返回值,而实现Runnable的线程则是没有返回值的,也就是说在上面的3个方法中,submit(Callable<T> task)能获取到它的返回值,submit(Runnable task, T result)能通过传入的载体result间接获得线程的返回值或者准确来说交给线程处理一下,而最后一个方法submit(Runnable task)则是没有返回值的,就算获取它的返回值也是null。
使用示例
submit(Callable<T> task)
/**
* @author: ChenHao
* @Date: Created in 14:54 2019/1/11
*/
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> callable = new Callable<String>() {
public String call() throws Exception {
System.out.println("This is ThreadPoolExetor#submit(Callable<T> task) method.");
return "result";
}
}; ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(callable);
executor.shutdown();
System.out.println(future.get());
}
}
运行结果:
submit(Runnable task, T result)
/**
* @author: ChenHao
* @Date: Created in 14:54 2019/1/11
*/
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newSingleThreadExecutor();
Data data = new Data();
Future<Data> future = executor.submit(new Task(data), data);
executor.shutdown();
System.out.println(future.get().getName());
}
}
class Data {
String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
} class Task implements Runnable {
Data data;
public Task(Data data) {
this.data = data;
}
@Override
public void run() {
System.out.println("This is ThreadPoolExetor#submit(Runnable task, T result) method.");
data.setName("陈浩");
}
}
运行结果:
submit(Runnable task)
/**
* @author: ChenHao
* @Date: Created in 14:54 2019/1/11
*/
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("This is ThreadPoolExetor#submit(Runnable runnable) method.");
}
}; ExecutorService executor = Executors.newSingleThreadExecutor();
Future future = executor.submit(runnable);
executor.shutdown();
System.out.println(future.get());
}
}
运行结果:
从上面的源码可以看到,这三者方法几乎是一样的,关键就在于:
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
是如何将一个任务作为参数传递给了newTaskFor,然后调用execute方法,最后进而返回ftask的呢?
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
} protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
源码分析
这里我建议大家去看看我之前的一篇文章《Java 多线程(五)—— 线程池基础 之 FutureTask源码解析》
submit(Callable<T> task)
我们看上面的源码中知道实际上是调用了如下代码
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
我们看看 FutureTask 的结构
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0; //初始状态
private static final int COMPLETING = 1; //结果计算完成或响应中断到赋值给返回值之间的状态。
private static final int NORMAL = 2; //任务正常完成,结果被set
private static final int EXCEPTIONAL = 3; //任务抛出异常
private static final int CANCELLED = 4; //任务已被取消
private static final int INTERRUPTING = 5; //线程中断状态被设置ture,但线程未响应中断
private static final int INTERRUPTED = 6; //线程已被中断 //将要执行的任务
private Callable<V> callable; //用于get()返回的结果,也可能是用于get()方法抛出的异常
private Object outcome; // non-volatile, protected by state reads/writes //执行callable的线程,调用FutureTask.run()方法通过CAS设置
private volatile Thread runner; //栈结构的等待队列,该节点是栈中的最顶层节点。
private volatile WaitNode waiters; public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
....
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
我们知道 FutureTask 继承了 Runnable,这里将 Callable<T> callable 的实例封装成 FutureTask 传给 execute(ftask);我们再来看看上一篇文章中线程运行的代码
// 此方法由 worker 线程启动后调用,这里用一个 while 循环来不断地从等待队列中获取任务并执行
// 前面说了,worker 在初始化的时候,可以指定 firstTask,那么第一个任务也就可以不需要从队列中获取
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 该线程的第一个任务(如果有的话)
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环调用 getTask 获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果线程池状态大于等于 STOP,那么意味着该线程也要中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 到这里终于可以执行任务了,这里是最重要的,task是什么?是Worker 中的firstTask属性 task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// 一个任务执行完了,这个线程还可以复用,接着去队列中拉取任务执行
// 置空 task,准备 getTask 获取下一个任务
task = null;
// 累加完成的任务数
w.completedTasks++;
// 释放掉 worker 的独占锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 如果到这里,需要执行线程关闭:
// 说明 getTask 返回 null,也就是超过corePoolSize的线程过了超时时间还没有获取到任务,也就是说,这个 worker 的使命结束了,执行关闭
processWorkerExit(w, completedAbruptly);
}
}
由上面第6行代码 task 就是execute(ftask)传入的任务,第26行 task.run(); 实际上就是 new FutureTask<T>(callable).run(),我们看看FutureTask中的run()方法
public void run() {
//保证callable任务只被运行一次
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable < V > c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//执行任务,上面的例子我们可以看出,call()里面可能是一个耗时的操作,不过这里是同步的
result = c.call();
//上面的call()是同步的,只有上面的result有了结果才会继续执行
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//执行完了,设置result
set(result);
}
}
finally {
runner = null;
int s = state;
//判断该任务是否正在响应中断,如果中断没有完成,则等待中断操作完成
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
在 FutureTask的构造方法中 this.callable = callable; ,因此我们可以知道上面run()方法中第6行 c 就是 代码示例中的 new Callable<String>(),c.call()就是调用 代码示例中new Callable 的call方法,并且这里可以取到返回结果,第22行处设置FutureTask 中 outcome 的值,这样线程就可以取到返回值了。
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
取值我就不分析了,我之前的文章里面有详细分析。
submit(Runnable task, T result)
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
} protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
我们来看看FutureTask的另外一个构造方法
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
} static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
上面将 runnable, result 封装成了 RunnableAdapter 作为FutureTask的callable属性,这和上面的submit(Callable<T> task) 是不同的,submit(Callable<T> task)是直接将 Callable<T> task作为FutureTask的callable属性。我们看看FutureTask中的run()方法中第6行 c 就是FutureTask 构造方法中的new RunnableAdapter<T>(task, result) ,c.call()就是调用 RunnableAdapter<T>(task, result) 的call方法,call()中的task.run()就是上面代码示例中new Task(data) 中的 run(),run()方法中业务大代码改变了data对象的属性,callable(Runnable task, T result)中也是传的相同的对象data, 所以,result = c.call(); 就是把更改后的data返回,并且将data设置为设置FutureTask 中 outcome 的值,后面的逻辑就是一样的了。
这里可以看成将同一个data传入线程进行处理,同时这个data也传入FutureTask中,并且在RunnableAdapter通过属性进行保存data,等线程将data处理完了,由于是同一个对象,RunnableAdapter中的result也就是data指向的是同一个对象,然后把此result返回到FutureTask保存在属性outcome中,就可以通过FutureTask.get()取到运行结果了。
如果new FutureTask<T>(runnable, null),则result = c.call(); 返回的值也是null,最后从线程池中get的值也是null。
并发编程(十二)—— Java 线程池 实现原理与源码深度解析 之 submit 方法 (二)的更多相关文章
- 并发编程(十一)—— Java 线程池 实现原理与源码深度解析(一)
史上最清晰的线程池源码分析 鼎鼎大名的线程池.不需要多说!!!!! 这篇博客深入分析 Java 中线程池的实现. 总览 下图是 java 线程池几个相关类的继承结构: 先简单说说这个继承结构,E ...
- 并发编程(十三)—— Java 线程池 实现原理与源码深度解析 之 Executors(三)
前两篇文章讲了线程池的源码分析,再来看这篇文章就比较简单了, 本文主要讲解 Executors 这个工具类,看看长江创建线程池的几种方法. newFixedThreadPool 生成一个固定大小的线程 ...
- 并发编程(十五)——定时器 ScheduledThreadPoolExecutor 实现原理与源码深度解析
在上一篇线程池的文章<并发编程(十一)—— Java 线程池 实现原理与源码深度解析(一)>中从ThreadPoolExecutor源码分析了其运行机制.限于篇幅,留下了Scheduled ...
- Java并发编程系列-(6) Java线程池
6. 线程池 6.1 基本概念 在web开发中,服务器需要接受并处理请求,所以会为一个请求来分配一个线程来进行处理.如果每次请求都新创建一个线程的话实现起来非常简便,但是存在一个问题:如果并发的请求数 ...
- Java并发指南12:深度解读 java 线程池设计思想及源码实现
深度解读 java 线程池设计思想及源码实现 转自 https://javadoop.com/2017/09/05/java-thread-pool/hmsr=toutiao.io&utm_ ...
- 【转载】深度解读 java 线程池设计思想及源码实现
总览 开篇来一些废话.下图是 java 线程池几个相关类的继承结构: 先简单说说这个继承结构,Executor 位于最顶层,也是最简单的,就一个 execute(Runnable runnable) ...
- 线程池 ThreadPoolExecutor 原理及源码笔记
前言 前面在学习 JUC 源码时,很多代码举例中都使用了线程池 ThreadPoolExecutor,并且在工作中也经常用到线程池,所以现在就一步一步看看,线程池的源码,了解其背后的核心原理. 公众号 ...
- 并发编程系列:Java线程池的使用方式,核心运行原理、以及注意事项
并发编程系列: 高并发编程系列:4种常用Java线程锁的特点,性能比较.使用场景 线程池的缘由 java中为了提高并发度,可以使用多线程共同执行,但是如果有大量线程短时间之内被创建和销毁,会占用大量的 ...
- 并发编程(十四)—— ScheduledThreadPoolExecutor 实现原理与源码深度解析 之 DelayedWorkQueue
我们知道线程池运行时,会不断从任务队列中获取任务,然后执行任务.如果我们想实现延时或者定时执行任务,重要一点就是任务队列会根据任务延时时间的不同进行排序,延时时间越短地就排在队列的前面,先被获取执行. ...
随机推荐
- BP神经网络综合评价法
BP神经网络综合评价法是一种交互式的评价方法,一种既能避免人为计取权重的不精确性, 又能避免相关系数求解的复杂性,还能对数量较大且指标更多的实例进行综合评价的方法,它可以根据用户期望的输出不断修改指标 ...
- windows许可证即将过期
win+R 输入 slmgr.vbs -xpr 查看日期 激活工具地址: 链接: https://pan.baidu.com/s/1S5nealQM1bytPYV6CYbgyg 提取码: sbmu 1 ...
- 自我介绍&软工实践博客点评
想想既然写了点评博客,那就顺便向同学们介绍下自己吧. 我是16届计科实验班的,水了两件小黄衫,于是就来当助教了_(:_」∠)_ 实话说身为同届生来当助教,我心里还是有点虚的,而且我还是计科的..感觉软 ...
- linux磁盘满了的处理
1.查看磁盘使用情况 cd / df -h 如果 总量Size和Used一样,按就证明磁盘满了 2.查看当前文件下每个文件大小 du -sh * 一层一层去查,就可以查到占用空间最大的那个文件及产生 ...
- Hibernate HQL ②
分页查询: - setFirstResult(int firstResult):设定从哪一个对象开始检索,参数 firstResult 表示这个对象在查询结果中的索引位置,索引位置的起始值为零.默认情 ...
- HBase shell scan 过滤器用法总结
比较器: 前面例子中的regexstring:2014-11-08.*.binary:\x00\x00\x00\x05,这都是比较器.HBase的filter有四种比较器: (1)二进制比较器:如’b ...
- python中的单向链表实现
引子 数据结构指的是是数据的组织的方式.从单个数据到一维结构(线性表),二维结构(树),三维结构(图),都是组织数据的不同方式. 为什么需要链表? 顺序表的构建需要预先知道数据大小来申请连续的存储空间 ...
- React的类型检测PropTypes
React.propTypes:React.PropTypes 提供很多验证器来验证传入数据的有效性,当向props传入无效数据时,JavaScript 控制台会抛出警告. ; class MyTit ...
- MORE XOR
MORE XOR #include<bits/stdc++.h> using namespace std; ; int a[maxn]; ][maxn]; int main() { ios ...
- Go语言基础之函数
Go语言基础之函数 函数是组织好的.可重复使用的.用于执行指定任务的代码块.本文介绍了Go语言中函数的相关内容. 函数 Go语言中支持函数.匿名函数和闭包,并且函数在Go语言中属于“一等公民”. 函数 ...