【异步操作工具 CompletableFuture 详解】

CompletableFuture 用法详解

  • 前言
  • 一、CompletableFuture是什么
  • 二、如何使用
  • 1.CompletableFuture.runAsync
  • 2.CompletableFuture.supplyAsync
  • 3.CompletableFuture.completedFuture/thenAccept/thenCombine/get
  • 4.CompletableFuture.getNow
  • 5.CompletableFuture.isDone/isCancelled/cancel
  • 6.CompletableFuture.get/join/complete/completeExceptionally
  • 7.Completable.thenApply/thenApplyAsync
  • 8.CompletableFuture.thenAccept/thenAcceptAsync
  • 9.CompletableFuture.thenRun/thenRunAsync
  • 10.CompletableFuture.thenAcceptBoth/acceptEither
  • 11.CompletableFuture.runAfterBoth/applyToEither/runAfterEither
  • 12.Completable.thenCompose
  • 13.CompletableFuture.whenComplete/handle
  • 总结

  • 前言

    提示:本文介绍的CompletableFuture用法都是基于JDK 1.8

    随着业务系统的不断复杂化,业务数据量的不断增加,单线程有时已经很难满足现有的系统要求。为了提升用户体验,有时不得不通过异步多线程的代码去加快系统响应速度。这个时候,CompletableFuture 就可以帮大忙了。


    一、CompletableFuture是什么

    CompletableFutureJava JUC 包中的一个工具类,它继承了 Future 接口和 CompletionStage 接口。
    类图

    Future 可以获取到异步执行代码块的返回值,包含了以下五个方法,相对比较简单。

  • cancel(boolean mayInterruptIfRunning) 尝试取消此任务的执行。 如果任务已完成、已取消或由于某些其他原因无法取消,则此尝试将失败。 如果成功,并且调用取消时该任务尚未启动,则该任务永远不应运行。 如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应中断执行此任务的线程以尝试停止该任务
  • isCancelled() 任务是否已经取消
  • isDone() 任务是否不在执行状态,取消/异常/执行结束都会返回true
  • get() 获取任务返回结果,若任务未执行结束,则阻塞等待任务结束返回结果
  • get(long timeout, TimeUnit unit) 设置超时时间获取任务返回结果,超时抛出异常
  • CompletionStage 可以针对任务执行的步骤进行一系列细粒度操作。方法比较多,这里就不列出,在下面具体使用时进行介绍。

    二、如何使用

    1.CompletableFuture.runAsync

    runAsync 方法会开启一个新的线程去执行对应的任务,不存在返回值。并且可以指定对应的线程池执行,不指定的话会使用一个内部默认的线程池 asyncPool。

    package com.curtis.demo.future;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.function.Supplier;
    
    /**
     * @author Curtis
     * @since 2024-04-19 10:56
     */
    public class CompletableFutureTest {
    
        public static void main(String[] args) {
    
            // 使用Runnable
            Runnable runnable = () -> {
                System.out.println(Thread.currentThread().getName() + ": execute CompletableFuture.runAsync");
            };
            CompletableFuture.runAsync(runnable);
    
            // 使用Runnable 简写
            CompletableFuture.runAsync(() -> {
                System.out.println(Thread.currentThread().getName() + ": execute CompletableFuture.runAsync");
            });
    
            // 指定线程池(默认使用CompletableFuture的asyncPool)
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            CompletableFuture.runAsync(() -> {
                System.out.println(Thread.currentThread().getName() + ": execute CompletableFuture.runAsync");
            }, executorService);
            // 关闭线程池
            executorService.shutdown();
        }
    }
    

    输出结果如下:可以观察到默认的线程池为ForkJoinPool.commonPool

    ForkJoinPool.commonPool-worker-1: execute CompletableFuture.runAsync
    ForkJoinPool.commonPool-worker-1: execute CompletableFuture.runAsync
    pool-1-thread-1: execute CompletableFuture.runAsync
    

    2.CompletableFuture.supplyAsync

    supplyAsync 方法会开启一个新的线程去执行对应的任务,存在返回值。同时可以指定对应的线程池执行,不指定线程池会使用默认的线程池asyncPool。

    package com.curtis.demo.future;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Executors;
    import java.util.function.Supplier;
    
    /**
     * @author Curtis
     * @since 2024-04-19 10:56
     */
    public class CompletableFutureTest {
    
        public static void main(String[] args) {
            // 存在返回值
            Supplier<String> supplier = () -> {
                System.out.println("supplier");
                return "supplier execute";
            };
    		CompletableFuture.supplyAsync(supplier);
    
    		// 存在返回值简写
    		CompletableFuture.supplyAsync(() -> {
                System.out.println("supplier");
                return "supplier execute";
            });
            
            // 指定线程池
           ExecutorService executorService = Executors.newSingleThreadExecutor();
    	   CompletableFuture.supplyAsync(() -> {
                System.out.println("supplier");
                return "supplier execute";
            }, executorService);
            // 关闭线程池
            executorService.shutdown();
        }
    }
    

    输出结果如下:可以观察到默认的线程池为ForkJoinPool.commonPool

    ForkJoinPool.commonPool-worker-1: supplier
    ForkJoinPool.commonPool-worker-1: supplier
    pool-1-thread-1: supplier
    

    3.CompletableFuture.completedFuture/thenAccept/thenCombine/get

    completedFuture 方法可以创建出一个具有指定value结果的 CompletableFuture,可能有人会问,这有什么作用?别急,可以在后续的一些方法中使用到。
    例如:CompletableFuture.thenCombine 可以接收一个 completableFuture 参数和一个函数式接口。对 completableFuture 参数的结果和当前 completableFuture 实例结果进行函数式接口操作,这时候需要使用到 completedFuture 创建一个实例。
    另外 thenAccept 可以接受一个 Consumer 函数式接口,对于 CompletableFuture 实例结果进行自定义操作。

    package com.curtis.demo.future;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    /**
     * @author Curtis
     * @since 2024-04-19 10:56
     */
    public class CompletableFutureTest {
    
        public static void main(String[] args) {
        	// 创建返回值为supplyAsync1的CompletableFuture
            CompletableFuture<String> completableFuture1 = CompletableFuture.completedFuture("supplyAsync1");
            try {
            	// get阻塞等待任务执行结束,最后获取结果
                System.out.println(completableFuture1.get());
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
    		
    		// 创建一个具有world返回值的completableFuture
          	CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("world");
    		// 模拟一段业务逻辑之后,返回了一个值
            CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            	// mock一段业务逻辑的返回值(假装执行完一段业务逻辑)
                return "hello";
            });
    
    		// completableFuture2接收completableFuture参数
    		// 和一个BiFunction参数, r1, r2, 分别为两个completableFuture的返回值
    		// thenAccept可以接收一个Consumer 例如 r -> {}, r为thenCombine之后生成的CompletableFuture的结果
            completableFuture2.thenCombine(completableFuture, (r1, r2) -> {
                return r1 + r2;
            }).thenAccept(r -> System.out.println(r));
        }
    }
    

    输出结果:

    supplyAsync1
    helloworld
    

    4.CompletableFuture.getNow

    执行 getNow 方法时将会判断当前任务是否执行结束,若已执行结束则会任务执行完的结果,但是若未执行完,则会直接结束任务返回指定的value结果。

    package com.curtis.demo.future;
    
    import java.util.concurrent.CompletableFuture;
    
    /**
     * @author Curtis
     * @since 2024-04-19 10:56
     */
    public class CompletableFutureTest {
    
        public static void main(String[] args) {
    
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                return "completableFuture: hello";
            });
            // completableFuture未执行结束, 调用getNow 将直接返回指定的value
            System.out.println(completableFuture.getNow("completableFuture: customer value"));
    
            CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
                return "completableFuture2: hello";
            });
            // 这里睡眠一秒钟确保completableFuture2任务已经执行结束
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            // 执行结束调用getNow将返回任务执行结果
            System.out.println(completableFuture2.getNow("completableFuture2: customer value"));
        }
    }
    

    结果如下:和预期料想的一样

    completableFuture: customer value
    completableFuture2: hello
    

    5.CompletableFuture.isDone/isCancelled/cancel

    isDone 方法判断当前CompletableFuture任务是否已经执行结束,不论是否正常结束,也就是即使抛出异常或者被取消都会返回true。
    isCancelled 方法判断当前CompletableFuture任务是否被取消。
    cancel 方法取消当前CompletableFuture任务。

    package com.curtis.demo.future;
    
    import java.util.concurrent.CompletableFuture;
    
    /**
     * @author Curtis
     * @since 2024-04-19 10:56
     */
    public class CompletableFutureTest {
    
        public static void main(String[] args) {
    
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                return "completableFuture: hello";
            });
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("completableFuture: isDone ? " + completableFuture.isDone());
    
            CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "completableFuture: hello";
            });
            // 取消任务, 方便查看cancel之后 isDone结果
            completableFuture2.cancel(true);
            System.out.println("completableFuture2: isCancelled ? " + completableFuture2.isCancelled());
            System.out.println("completableFuture2: isDone ? " + completableFuture2.isDone());
    
            CompletableFuture<String> completableFuture3 = CompletableFuture.supplyAsync(() -> {
            	// 抛出异常, 查看抛出异常isDone结果
                throw new RuntimeException("eee");
            });
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("completableFuture3: isDone ? " + completableFuture3.isDone());
        }
    }
    

    输出结果

    completableFuture: isDone ? true
    completableFuture2: isCancelled ? true
    completableFuture2: isDone ? true
    completableFuture3: isDone ? true
    

    6.CompletableFuture.get/join/complete/completeExceptionally

    执行 get 方法会阻塞当前线程执行,等待任务执行结束,并获取到返回值。
    执行 join 方法之后会优先执行当前 CompletableFuture 的任务,任务执行结束才会执行后续代码。
    执行 complete 方法时,若方法未执行结束会直接结束任务,并设置自定义结果。
    执行 completeExceptionally 方法时,若方法未执行结束会直接结束任务,并在执行 get 方法时,将直接抛出设置的异常。

    package com.curtis.demo.future;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author Curtis
     * @since 2024-04-19 10:56
     */
    public class CompletableFutureTest {
    
        public static void main(String[] args) {
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "CompletableFuture supplyAsync");
            try {
                // 阻塞并获取结果
                System.out.println(completableFuture.get());
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
    
            CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
                // 设置睡眠2s确保超时
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "CompletableFuture supplyAsync";
            });
            try {
                // 设置超时时间阻塞并获取结果
                System.out.println(completableFuture2.get(1, TimeUnit.SECONDS));
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                e.printStackTrace();
            }
    
            // 直接完成任务,若任务未完成返回值设置为value,若已完成则不做处理
            CompletableFuture<String> completableFuture3 = CompletableFuture.supplyAsync(() -> "completableFuture3 supplyAsync");
            try {
            	// 设置睡眠时间
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            // 直接结束任务并设置自定义结果
            completableFuture3.complete("completableFuture3 customer value");
            try {
                System.out.println(completableFuture3.get());
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
    
            // 直接结束任务,如果任务未完成,则在调用get时抛出指定的异常。若已完成则不做处理
            CompletableFuture<String> completableFuture4 = CompletableFuture.supplyAsync(() -> "completableFuture4 supplyAsync");
            // 睡眠一秒则任务执行结束不会抛出异常
    //        try {
    //            Thread.sleep(1000);
    //        } catch (InterruptedException e) {
    //            throw new RuntimeException(e);
    //        }
            completableFuture4.completeExceptionally(new RuntimeException());
            try {
                System.out.println(completableFuture4.get());
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
    
           CompletableFuture<Void> completableFuture5 = CompletableFuture.runAsync(() -> {
                System.out.println(Thread.currentThread().getName() + ": completableFuture5 runAsync");
                // 设置睡眠时间
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
            // 主线程等待completableFuture5任务执行结束 才会执行后续代码
            completableFuture5.join();
            System.out.println(Thread.currentThread().getName() + ": 执行我!!!!");
        }
    }
    

    输出结果:

    CompletableFuture supplyAsync
    
    // completableFuture2 超时结果
    java.util.concurrent.TimeoutException
    	at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1784)
    	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
    	at com.curtis.demo.future.CompletableFutureTest.main(CompletableFutureTest.java:34)
    completableFuture3 supplyAsync
    
    // completableFuture4 睡眠执行结果
    completableFuture4 supplyAsync
    // completableFuture4 未睡眠
    Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException
    	at com.curtis.demo.future.CompletableFutureTest.main(CompletableFutureTest.java:64)
    Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException
    	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    	at com.curtis.demo.future.CompletableFutureTest.main(CompletableFutureTest.java:62)
    Caused by: java.lang.RuntimeException
    	at com.curtis.demo.future.CompletableFutureTest.main(CompletableFutureTest.java:60)
    
    // completableFuture5 join, 可以看出等待completableFuture5执行结束再执行主线程代码
    ForkJoinPool.commonPool-worker-1: completableFuture5 runAsync
    main: 执行我!!!!
    

    7.Completable.thenApply/thenApplyAsync

    thenApply 获得 completableFuture 的执行结果,并作为参数进行进下一步处理。参数为一个函数式接口 Function,而 thenApplyAsync 只是在 thenApply 的基础上,是一个异步处理,可以指定线程池执行,无其他差异。

    package com.curtis.demo.future;
    
    import java.util.concurrent.*;
    
    /**
     * @author Curtis
     * @since 2024-04-19 10:56
     */
    public class CompletableFutureTest {
    
        public static void main(String[] args) {
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "CompletableFuture supplyAsync");
            completableFuture.thenApply((r) -> r + " -- thenApply")
                    .thenAccept(r -> System.out.println(Thread.currentThread().getName() + ": " + r));
            // 异步
            completableFuture.thenApply((r) -> r + " -- thenApply")
                    .thenAcceptAsync(r -> System.out.println(Thread.currentThread().getName() + ": " + r));
            // 异步并指定线程
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            completableFuture.thenApply((r) -> r + " -- thenApply")
                    .thenAcceptAsync(r -> System.out.println(Thread.currentThread().getName() + ": " + r), executorService);
    		// 关闭线程池
            executorService.shutdown();
        }
    }
    

    输出结果:可以看出执行线程的差异,未指定线程会有默认的线程池线程执行。

    main: CompletableFuture supplyAsync -- thenApply
    ForkJoinPool.commonPool-worker-1: CompletableFuture supplyAsync -- thenApply
    pool-1-thread-1: CompletableFuture supplyAsync -- thenApply
    

    8.CompletableFuture.thenAccept/thenAcceptAsync

    package com.curtis.demo.future;
    
    import java.util.concurrent.*;
    import java.util.function.Consumer;
    
    /**
     * @author Curtis
     * @since 2024-04-19 10:56
     */
    public class CompletableFutureTest {
    
        public static void main(String[] args) {
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "CompletableFuture supplyAsync");
            // 这里不再写简写,将Consumer直接创建出来,便于加深大家的印象
            Consumer<String> consumer = (r) -> System.out.println(Thread.currentThread().getName() + ": consumer " + r);
            completableFuture.thenAccept(consumer);
    	
    		// 这里异步执行, 可以指定线程池
            completableFuture.thenAcceptAsync(consumer);
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            completableFuture.thenAcceptAsync(consumer, executorService);
            executorService.shutdown();
       }
    }
    

    输出结果:可以看出执行线程的差异,未指定线程会有默认的线程池线程执行。

    main: consumer CompletableFuture supplyAsync
    ForkJoinPool.commonPool-worker-1: consumer CompletableFuture supplyAsync
    pool-1-thread-1: consumer CompletableFuture supplyAsync
    

    9.CompletableFuture.thenRun/thenRunAsync

    thenRun 执行指定的 Runnable ,而 thenRunAsync 也仅是进行异步执行。相信大家都发现了,方法上加了 Async 仅仅至少代表异步而已。

    package com.curtis.demo.future;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Executors;
    
    /**
     * @author Curtis
     * @since 2024-04-19 10:56
     */
    public class CompletableFutureTest {
    
        public static void main(String[] args) {
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "CompletableFuture supplyAsync");
    
            completableFuture.thenRun(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + ": thenRun" + completableFuture.get());
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            });
    
            completableFuture.thenRunAsync(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + ": thenRun" + completableFuture.get());
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            });
    
    		ExecutorService executorService = Executors.newSingleThreadExecutor();
            completableFuture.thenRunAsync(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + ": thenRun" + completableFuture.get());
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }, executorService);
            // 关闭线程池
            executorService.shutdown();
        }
    }
    

    执行结果:

    main: thenRunCompletableFuture supplyAsync
    ForkJoinPool.commonPool-worker-1: thenRunCompletableFuture supplyAsync
    pool-1-thread-1: thenRunCompletableFuture supplyAsync
    

    10.CompletableFuture.thenAcceptBoth/acceptEither

    thenAcceptBoth 获取到两个CompletableFuture实例的结果,作为 BiConsumer 的参数传入,执行对应的Consumer逻辑。acceptEither 接收执行最快的CompletableFuture实例的结果,作为 Consumer 参数传入,执行对应的Consumer逻辑。

    package com.curtis.demo.future;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Executors;
    
    /**
     * @author Curtis
     * @since 2024-04-19 10:56
     */
    public class CompletableFutureTest {
    
        public static void main(String[] args) {
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                try {
                	// 保证completableFuture2优先结束
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "CompletableFuture supplyAsync";
            });
            CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> "CompletableFuture2 supplyAsync");
    
    		// 获取到completableFuture2和completableFuture的结果, 针对结果进行一系列操作
            completableFuture.thenAcceptBoth(completableFuture2, (result1, result2) -> {
                System.out.println(Thread.currentThread().getName() + ": result1: " + result1);
                System.out.println(Thread.currentThread().getName() + ": result2: " + result2);
            });
    
    		// 获取到completableFuture2和completableFuture最先执行结束的结果, 针对结果进行一系列操作
            completableFuture.acceptEither(completableFuture2, (r) -> {
                System.out.println(Thread.currentThread().getName() + ": r: " + r);
            });
    
    		// 为了保证主线程不结束,才能看到completableFuture执行结果的打印
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
    
    

    需要注意的是:若代码执行到acceptEither或thenAcceptBoth的位置时,不存在执行结束的任务,这个时候主线程不再关心这个代码的执行,会交给CompletableFuture的默认线程池去执行
    执行结果:

    main: r: CompletableFuture2 supplyAsync
    ForkJoinPool.commonPool-worker-1: result1: CompletableFuture supplyAsync
    ForkJoinPool.commonPool-worker-1: result2: CompletableFuture2 supplyAsync
    
    // 在CompletableFuture2中加入睡眠一秒钟代码会发现执行结果会变成
    ForkJoinPool.commonPool-worker-2: r: CompletableFuture2 supplyAsync
    ForkJoinPool.commonPool-worker-1: result1: CompletableFuture supplyAsync
    ForkJoinPool.commonPool-worker-1: result2: CompletableFuture2 supplyAsync
    

    11.CompletableFuture.runAfterBoth/applyToEither/runAfterEither

    package com.curtis.demo.future;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    /**
     * @author Curtis
     * @since 2024-04-19 10:56
     */
    public class CompletableFutureTest {
    
        public static void main(String[] args) {
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "CompletableFuture.supplyAsync");
            CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> "CompletableFuture2.supplyAsync");
    		
    		// 在两个CompletableFuture都执行结束后执行
            completableFuture.runAfterBoth(completableFuture2, () -> {
                System.out.println("runAfterBoth");
                try {
                    System.out.println(completableFuture2.get());
                    System.out.println(completableFuture.get());
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            });
    
            try {
            	// 在两个completableFuture任意一个执行结束后,获取结果对结果进行处理
                System.out.println(completableFuture.applyToEither(completableFuture2, (r) -> "completableFuture.applyToEither" + r).get());
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
    
    		// 在两个completableFuture任意一个执行结束后执行
            completableFuture.runAfterEither(completableFuture2, () -> System.out.println("completableFuture.runAfterEither"));
        }
    }
    

    输出结果:

    runAfterBoth
    CompletableFuture2.supplyAsync
    CompletableFuture.supplyAsync
    
    completableFuture.applyToEitherCompletableFuture.supplyAsync
    completableFuture.runAfterEither
    

    12.Completable.thenCompose

    thenCompose 可以针对 Completable 进行组合式异步操作,例如:构造了一个异步链

    package com.curtis.demo.future;
    
    import java.util.concurrent.CompletableFuture;
    
    /**
     * @author Curtis
     * @since 2024-04-19 10:56
     */
    public class CompletableFutureTest {
    
        public static void main(String[] args) {
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "CompletableFuture.supplyAsync");
    
            // 可以组合 completableFuture 执行异步任务链
            completableFuture.thenCompose((s) -> {
                // completableFuture执行结束后拿到结果s继续执行另一个任务
                return CompletableFuture.supplyAsync(() -> {
                    return "anOtherCompletableFuture supplyAsync --- " + s;
                });
            }).thenCompose((r) -> {
            	// 上一个 thenCompose 执行结束后拿到结果r继续执行另一个任务
                return CompletableFuture.supplyAsync(() -> {
                   return "thenCompose result";
                });
            }).thenAccept(System.out::println);
    
        }
    }
    

    输出结果:

    thenCompose result
    

    13.CompletableFuture.whenComplete/handle

    whenComplete 在任务结束时执行,正常结束是会接收到result,异常结束时会接收到异常信息。该方法,没有返回值,只能对于异常进行一些日志记录等。而 handler 方法会在任务结束时执行,同样可以获取到执行结果和异常信息。区别在于 handler 方法能够修改任务执行结果。

    package com.curtis.demo.future;
    
    import java.util.concurrent.CompletableFuture;
    
    /**
     * @author Curtis
     * @since 2024-04-19 10:56
     */
    public class CompletableFutureTest {
    
        public static void main(String[] args) {
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                // return "CompletableFuture.supplyAsync";
                // 抛出异常查看打印结果
                throw new RuntimeException();
            });
    
            // 当任务执行结束时执行,可以获取到执行结果和异常信息
            completableFuture.whenComplete((result, ex) -> {
                System.out.println(result);
                if (ex != null) {
                    ex.printStackTrace();
                }
            });
    
            // 当任务执行结束时执行,可以获取到执行结果和异常信息,并且可以修改返回值
            completableFuture.handle((result, ex) -> {
                System.out.println(result);
                if (ex != null) {
    				// ex.printStackTrace();
    				// 修改执行结果
                    return "customer result";
                }
                return result;
            });
        }
    }
    

    执行结果:

    null
    java.util.concurrent.CompletionException: java.lang.RuntimeException
    	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
    	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
    Caused by: java.lang.RuntimeException
    	at com.curtis.demo.future.CompletableFutureTest.lambda$main$0(CompletableFutureTest.java:15)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    	... 5 more
    
    null
    customer result
    
    

    总结

    CompletableFuture 内部封装了一系列针对异步任务的操作,能够在细粒度的一些节点进行一些自定义操作,并且可以获取到异步执行后的返回结果,非常灵活。
    名称上包含 Async 的方法仅代表是该方法异步操作,并且可以指定线程池执行,默认使用内部线程池。

    灵活得组合方法进行使用,可以基本覆盖实现所有场景。优雅,太优雅了。

    作者:技术贪吃怪

    物联沃分享整理
    物联沃-IOTWORD物联网 » 【异步操作工具 CompletableFuture 详解】

    发表回复