一、简介

 所谓异步调用其实就是实现一个无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果。但调用者仍需要取线程的计算结果。

 JDK5新增了 Future 接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。

private static final ExecutorService POOL = Executors.newFixedThreadPool(TASK_THRESHOLD, new ThreadFactory() {
        AtomicInteger atomicInteger = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "demo15-" + atomicInteger.incrementAndGet());
        }
    });

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Future<Integer> submit = POOL.submit(() -> 123);
        // 1. get() 方法用户返回计算结果,如果计算还没有完成,则在get的时候会进行阻塞,直到获取到结果为止
        Integer get = submit.get();
        // 2. isDone() 方法用于判断当前Future是否执行完成。
        boolean done = submit.isDone();
        // 3. cancel(boolean mayInterruptIfRunning) 取消当前线程的执行。参数表示是否在线程执行的过程中阻断。
        boolean cancel = submit.cancel(true);
        // 4. isCancelled() 判断当前task是否被取消.
        boolean cancelled = submit.isCancelled();
        // 5. invokeAll 批量执行任务
        Callable<String> callable = () -> "Hello Future";
        List<Callable<String>> callables = Lists.newArrayList(callable, callable, callable, callable);
        List<Future<String>> futures = POOL.invokeAll(callables);
    }

 在Java8中,CompletableFuture 提供了非常强大的 Future 的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。

tips: CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段。

二、CompletableFuture 使用

1. runAsync、supplyAsync

// 无返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// 有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

runAsync、supplyAsync 方法是 CompletableFuture 提供的创建异步操作的方法。需要注意的是,如果没有指定 Executor 作为线程池,将会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码;如果指定线程池,则使用指定的线程池运行。以下所有的方法都类同。

public class Demo1 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> System.out.println(123));

        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "CompletableFuture");
        System.out.println(supplyAsync.get());
    }
}

2. whenComplete、exceptionally

// 执行完成时,当前任务的线程执行继续执行 whenComplete 的任务。
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
// 执行完成时,把 whenCompleteAsync 这个任务提交给线程池来进行执行。
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

当 CompletableFuture 的计算完成时,会执行 whenComplete 方法;当 CompletableFuture 计算中抛出异常时,会执行 exceptionally 方法。

public class Demo2 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> runAsync = CompletableFuture.supplyAsync(() -> 123456);
        runAsync.whenComplete((t, throwable) -> {
            System.out.println(t);
            if (throwable != null) {
                throwable.printStackTrace();
            }
        });
        runAsync.whenCompleteAsync((t, throwable) -> {
            System.out.println(t);
            if (throwable != null) {
                throwable.printStackTrace();
            }
        });
        runAsync.exceptionally((throwable) -> {
            if (throwable != null) {
                throwable.printStackTrace();
            }
            return null;
        });
        TimeUnit.SECONDS.sleep(2);
    }
}

3. thenApply、handle

// T:上一个任务返回结果的类型
// U:当前任务的返回值类型

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化

handle 方法和 thenApply 方法处理方式基本一样。不同的是 handle 是在任务完成后再执行,还可以处理异常的任务。thenApply 只可以执行正常的任务,任务出现异常则不执行 thenApply 方法。

public class Demo3 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // thenApply
        CompletableFuture<Integer> thenApply = CompletableFuture.supplyAsync(() -> 123).thenApply(t -> t * t);
        System.out.println(thenApply.get());

       // handle
        CompletableFuture<Integer> handle = CompletableFuture.supplyAsync(() -> {
            int i = 10 / 0;
            return new Random().nextInt(10);
        }).handle((t, throwable) -> {
            if (throwable != null) {
                throwable.printStackTrace();
                return -1;
            }
            return t * t;
        });
        System.out.println(handle.get());
    }
}

4. thenAccept、thenRun

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

thenAccept 接收任务的处理结果,并消费处理。无返回结果。

thenRun 跟 thenAccept 方法不一样的是,不关心任务的处理结果。只要上面的任务执行完成,就开始执行 thenRun。

public class Demo4 {

    public static void main(String[] args) {
        // thenAccept
        CompletableFuture<Void> thenAccept = CompletableFuture.supplyAsync(() -> new Random().nextInt(10)).thenAccept(System.out::println);

       // thenRun
        CompletableFuture<Void> thenRun = CompletableFuture.supplyAsync(() -> new Random().nextInt(10)).thenRun(() -> System.out.println(123));
    }
}

5. thenCombine、thenAcceptBoth

 // T 表示第一个 CompletionStage 的返回结果类型
 // U 表示第二个 CompletionStage 的返回结果类型
 // V表示 thenCombine/thenAcceptBoth 处理结果类型
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

thenCombine、thenAcceptBoth 都是用来合并任务 —— 等待两个 CompletionStage 的任务都执行完成后,把两个任务的结果一并来处理。区别在于 thenCombine 有返回值;thenAcceptBoth 无返回值。

public class Demo5 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // thenCombine
        CompletableFuture<String> thenCombine = CompletableFuture.supplyAsync(() -> new Random().nextInt(10))
                .thenCombine(CompletableFuture.supplyAsync(() -> "str"),
                        // 第一个参数是第一个 CompletionStage 的处理结果
                        // 第二个参数是第二个 CompletionStage 的处理结果
                        (i, s) -> i + s
                );
        System.out.println(thenCombine.get());

        // thenAcceptBoth
        CompletableFuture<Void> thenAcceptBoth = CompletableFuture.supplyAsync(() -> new Random().nextInt(10))
                .thenAcceptBoth(CompletableFuture.supplyAsync(() -> "str"),
                        (i, s) -> System.out.println(i + s));
    }
}

6. applyToEither、acceptEither、runAfterEither、runAfterBoth

  • applyToEither:两个 CompletionStage,谁执行返回的结果快,就用那个 CompletionStage 的结果进行下一步的处理,有返回值。
  • acceptEither:两个 CompletionStage,谁执行返回的结果快,就用那个 CompletionStage 的结果进行下一步的处理,无返回值。
  • runAfterEither:两个 CompletionStage,任何一个完成了,都会执行下一步的操作(Runnable),无返回值。
  • runAfterBoth:两个 CompletionStage,都完成了计算才会执行下一步的操作(Runnable),无返回值。

由于这几个方法含义相近,使用更加类似,我们就以 applyToEither 来介绍...

// T 两个 CompletionStage 组合运算后的结果类型
// U 下一步处理运算的结果返回值类型
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
public class Demo6 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> applyToEither = CompletableFuture.supplyAsync(() -> {
            int nextInt = new Random().nextInt(10);
            try {
                Thread.sleep(nextInt);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f1=" + nextInt);
            return nextInt;
        }).applyToEither(CompletableFuture.supplyAsync(() -> {
            int nextInt = new Random().nextInt(10);
            try {
                Thread.sleep(nextInt);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("f2=" + nextInt);
            return nextInt;
        }), i -> i);

        System.out.println(applyToEither.get());
    }
}

7. thenCompose

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;

thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。

public class Demo7 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> thenCompose = CompletableFuture.supplyAsync(() -> new Random().nextInt(10))
                .thenCompose(i -> CompletableFuture.supplyAsync(() -> i * i));
        System.out.println(thenCompose.get());

    }
}

参考博文:https://www.jianshu.com/p/6bac52527ca4

Java8 CompletableFuture 编程的更多相关文章

  1. 关于Java8函数式编程你需要了解的几点

    函数式编程与面向对象的设计方法在思路和手段上都各有千秋,在这里,我将简要介绍一下函数式编程与面向对象相比的一些特点和差异. 函数作为一等公民 在理解函数作为一等公民这句话时,让我们先来看一下一种非常常 ...

  2. Java8 函数式编程详解

    Java8 函数式编程详解 Author:Dorae Date:2017年11月1日23:03:26 转载请注明出处 说起Java8,可能很多人都已经知道其最大的改进,就是引入了Lambda表达式与S ...

  3. Java8函数式编程探秘

    引子 将行为作为数据传递 怎样在一行代码里同时计算一个列表的和.最大值.最小值.平均值.元素个数.奇偶分组.指数.排序呢? 答案是思维反转!将行为作为数据传递. 文艺青年的代码如下所示: public ...

  4. Java8 CompletableFuture组合式的编程(笔记)

    * 实现异步API public double getPrice(String product) { return calculatePrice(product); } /** * 同步计算商品价格的 ...

  5. [一] java8 函数式编程入门 什么是函数式编程 函数接口概念 流和收集器基本概念

      本文是针对于java8引入函数式编程概念以及stream流相关的一些简单介绍 什么是函数式编程?   java程序员第一反应可能会理解成类的成员方法一类的东西 此处并不是这个含义,更接近是数学上的 ...

  6. Java8 CompletableFuture

    http://colobu.com/2016/02/29/Java-CompletableFuture/ http://www.deadcoderising.com/java8-writing-asy ...

  7. Java8函数式编程学习笔记(初探)

    编程语言的整个目的就在于操作值,要是按照历史上编程语言的传统,这些值被成为一等值,而编程语言中的其他结构也许有助于表示值的结构,但在程序执行期间不能传递,因此为二等值,比如方法和类等则是二等值,类可以 ...

  8. Java8函数式编程

    在Java8的 java.util.function中包含以下几个接口 1.Function,先上源码 /* * Copyright (c) 2010, 2013, Oracle and/or its ...

  9. [三]java8 函数式编程Stream 概念深入理解 Stream 运行原理 Stream设计思路

    Stream的概念定义   官方文档是永远的圣经~     表格内容来自https://docs.oracle.com/javase/8/docs/api/   Package java.util.s ...

  10. java8函数式编程(转载)

    1. 概述 1.1 函数式编程简介 我们最常用的面向对象编程(Java)属于命令式编程(Imperative Programming)这种编程范式.常见的编程范式还有逻辑式编程(Logic Progr ...

随机推荐

  1. react-router配合webpack实现按需加载

    很久没有写博客了.一直感觉没有什么要写的,但是这个东西确实有必要的.使用react开发,不可能一直打包到一个文件.小项目肯定没有问题,但是变大一旦到几兆,这个问题就很严重.现在又Commonjs,AM ...

  2. Meet Github

    Source: http://www.liaoxuefeng.com/ Here only the local part. Install on windows download: https://g ...

  3. Bootstrap 响应式设计

    本教程讲解如何在网页布局中应用响应式设计.在课程中,您将学到响应式 Web 设计.随着移动设备的普及,如何让用户通过移动设备浏览您的网站获得良好的视觉效果,已经是一个不可避免的问题了.响应式 Web ...

  4. POJ - 2183 Bovine Math Geniuses

    “模拟“题,运用哈希,不断地按照一定运算规律对一个结果进行计算,如果重复出现就停止并且输出该数.注意到仔细看题,这种题一定要细心! POJ - 2183 Bovine Math Geniuses Ti ...

  5. Oracle Redo Log 机制 小结(转载)

    Oracle 的Redo 机制DB的一个重要机制,理解这个机制对DBA来说也是非常重要,之前的Blog里也林林散散的写了一些,前些日子看老白日记里也有说明,所以结合老白日记里的内容,对oracle 的 ...

  6. javascript url几种编码方式

    1.escape() 不能直接用于URL编码,它的真正作用是返回一个字符的Unicode编码值.比如“春节”的返回结果是%u6625%u8282,escape()不对"+"编码主要 ...

  7. CodeMIrror 简单使用

    代码高亮是程序员的刚需,不管是在笔记类,论坛类,博客类web网站中,都对代码高亮提出要求,不高亮的代码阅读体验很差,codeMirror是一个前端代码高亮库,使用方便. codeMirror可以直接在 ...

  8. PHP Version之PHP5.2.x到5.3.x

    不向下兼容的变化 1.  在5.3的所有绑定扩展中应用了新的内部参数解析API,当给函数传递不兼容的参数时将返回NULL,但有些例外,比如函数get_class()在出现错误时返回FALSE 2.  ...

  9. iOS 按钮连续提交执行一次(如留言提交,多次拍照问题)

    在很多项目中暴力测试时会出现多次点击执行一个方法  可以用下面的语句进行解决 //先将未到时间执行前的任务取消. [[self class] cancelPreviousPerformRequests ...

  10. UVA - 247 Calling Circles Floyd判圈

    思路:利用的Floyd判圈,如果i能到j,j也能到i说明i和j在同一个圈里.每个人的名字可用map编号.最后DFS打印答案即可. AC代码 #include <cstdio> #inclu ...