Java异步编程CompletableFuture(串行,并行,批量执行)

🍓 简介:java系列技术分享(👉持续更新中…🔥)
🍓 初衷:一起学习、一起进步、坚持不懈
🍓 如果文章内容有误与您的想法不一致,欢迎大家在评论区指正🙏
🍓 希望这篇文章对你有所帮助,欢迎点赞 👍 收藏 ⭐留言 📝

🍓 更多文章请点击

文章目录

  • 一、什么是CompletableFuture?
  • 二、CompletableFuture的结构
  • 三、基础方法介绍
  • 3.1 `get()` 、`join()`、`isDone`方法介绍
  • 3.2 使用 `runAsync()` 开启一个子线程去执行`无结果`
  • 3.3 使用 `supplyAsync()` 开启一个子线程去执行`有返回结果`
  • 3.4 CompletableFuture API 的所有方法都有两个重载的方法`一个接受Executor`线程池
  • 3.4.1 线程池工具类
  • 四、CompletableFuture常用方法介绍
  • 消息打印小工具类(`用不用都行`)
  • 4.1 串行
  • 4.1.1 前一个返回有结果再执行下一个 thenCompose 、 thenApply()、thenApplyAsync、 thenAccept() 、thenRun()
  • 4.2 并行
  • 4.2.1两个同时执行,等待结果一起返回thenCombine()
  • 4.2.2 两个任务都完成了,不关注执行结果的进行下一步操作runAfterBoth/runAfterBothAsync
  • 4.2.3 两个任务并行进行用快的那个的结果作为后续处理applyToEither\applyToEitherAsync
  • 4.3 异常处理
  • 4.3.1 使用 exceptionally() 回调处理异常
  • 4.3.2 使用 handle() 方法处理异常
  • 4.3.3 使用 whenComplete()/whenCompleteAsync() 方法处理异常
  • 4.4 批量执行(组合多个CompletableFuture)
  • 4.4.1 CompletableFuture.allOf()
  • `没有返回值`
  • `有返回值`
  • 4.4.2 CompletableFuture.anyOf()
  • 图1

    一、什么是CompletableFuture?

        在Java中CompletableFuture用于异步编程,异步编程是编写非阻塞的代码,运行的任务在一个单独的线程,与主线程隔离,并且会通知主线程它的进度,成功或者失败。在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成。主线程可以并行的执行其他任务。 使用这种并行方式,可以极大的提高程序的性能。

    CompletableFuture 是 Java 8 引入的一个类,用于简化异步编程模型。它是 Future 接口的一个增强版本,提供了更加丰富的功能和更灵活的用法

       CompletableFuture是一个Java库中的类,用于处理异步编程。它提供了一种简化异步操作的方式,可以更方便地编写异步代码,并处理异步任务的结果。

       使用CompletableFuture可以简化异步编程的复杂性,并提供更灵活和强大的功能。它是Java 8中新增的功能之一,为开发者提供了更好的异步编程体验。

    CompletableFuture可以用于以下几种情况:

  • 异步执行任务:可以使用CompletableFuture来执行一个耗时的任务,而不会阻塞主线程。

  • 组合多个异步任务的结果:可以将多个CompletableFuture串联起来,以便在它们都完成后执行一些操作。

  • 处理异常情况:可以使用CompletableFuture来处理异步任务中可能出现的异常情况。

  • 并发执行多个任务 :可以使用CompletableFuture来同时执行多个任务,并等待它们全部完成。

  • 二、CompletableFuture的结构


    CompletableFuture它同时实现了FutureCompletionStage两个接口。

  • Future : 它代表一个异步计算的结果。它可以用于提交一个任务并在未来的某个时间获取它的结果。Future提供了一些方法来检查任务的完成状态、取消任务以及获取任务的结果。
  • CompletionStage : CompletionStage是Java 8引入的一个接口,它扩展了Future的功能,并提供了更强大的异步编程支持。它表示一个异步计算的阶段,可以将多个阶段链接在一起形成一个复杂的异步计算流水线。CompletionStage提供了一系列方法来描述和组合异步计算的各个阶段,例如thenApply、thenCompose、thenCombine等
  • CompletableFuture : 是Java 8中新增加的一个类,它实现了Future和CompletionStage两个接口。CompletableFuture提供了更加灵活和强大的异步编程模型,可以用于处理复杂的异步计算场景。它提供了丰富的方法来处理异步计算的结果、执行转换操作、组合多个异步计算等。通过CompletableFuture,我们可以更加方便地编写异步代码,实现更高效的并发编程。
  • 三、基础方法介绍

    因为方法非常多,这里我对常用的进行介绍

    3.1 get()join()isDone方法介绍

  • get()join() 都会等待异步操作完成并返回结果,但 get() 在等待期间线程被中断时会抛出异常,而 join() 不会。
  • isDone() 用于检查异步操作是否完成,而不等待操作完成,适用于非阻塞的轮询场景。
  • 3.2 使用 runAsync() 开启一个子线程去执行无结果

    // Using Lambda Expression
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        System.out.println("I'll run in a separate thread than the main thread.");
    });
    

    3.3 使用 supplyAsync() 开启一个子线程去执行有返回结果

    // Using Lambda Expression
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of the asynchronous computation";
    });
    
    System.out.println("结果 : "+future.get());
    

    3.4 CompletableFuture API 的所有方法都有两个重载的方法一个接受Executor线程池

    static CompletableFuture<Void>  runAsync(Runnable runnable)
    static CompletableFuture<Void>  runAsync(Runnable runnable, Executor executor)
    static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
    

    你可能想知道,runAsync() supplyAsync()方法在单独的线程中执行他们的任务。但是我们不会永远只创建一个线程。CompletableFuture可以从全局的ForkJoinPool.commonPool()获得一个线程中执行这些任务。但是你也可以创建一个线程池并传给runAsync() 和supplyAsync()方法来让他们从线程池中获取一个线程执行它们的任务。

    这里使用了简单的线程池创建

    Executor executor = Executors.newFixedThreadPool(10);
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of the asynchronous computation";
    }, executor);
    

    3.4.1 线程池工具类

    可以注入线程池,进行传递给runAsync() 和supplyAsync()方法来让他们从线程池中获取一个线程执行它们的任务

    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.ThreadPoolExecutor;
    
    @Configuration
    public class MongoThreadPoolConfig {
        //参数初始化
        private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
        //核心线程数量大小
        private static final int corePoolSize = Math.max(2, Math.min(CPU_COUNT - 1, 4));
        //线程池最大容纳线程数
        private static final int maxPoolSize = CPU_COUNT * 2 + 1;
        //阻塞队列
        private static final int workQueue = 20;
        //线程空闲后的存活时长
        private static final int keepAliveTime = 30;
    
        @Bean("asyncTaskExecutor")
        public ThreadPoolTaskExecutor getAsyncExecutor() {
            ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
            //核心线程数
            threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
            //最大线程数
            threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);
            //等待队列
            threadPoolTaskExecutor.setQueueCapacity(workQueue);
            //线程前缀
            threadPoolTaskExecutor.setThreadNamePrefix("taskExecutor-");
            //线程池维护线程所允许的空闲时间,单位为秒
            threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveTime);
            // 线程池对拒绝任务(无线程可用)的处理策略
            threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            threadPoolTaskExecutor.initialize();
            return threadPoolTaskExecutor;
        }
    }
    

    四、CompletableFuture常用方法介绍

    消息打印小工具类(用不用都行)

    主要观察和学习CompletableFuture方法

    import java.util.StringJoiner;
    /**
     * 小工具
     */
    @Slf4j
    public class SmallTool {
    
        /** 
         * 睡眠
         */ 
        public static void sleepMillis(long millis) {
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /** 
         * 打印 message
         */ 
        public static void printTimeAndThread(String tag) {
            String result = new StringJoiner("\t|\t")
                    .add(String.valueOf(System.currentTimeMillis()))
                    .add(String.valueOf(Thread.currentThread().getId()))
                    .add(Thread.currentThread().getName())
                    .add(tag)
                    .toString();
            System.out.println(result);
        }
    }
    

    4.1 串行

    4.1.1 前一个返回有结果再执行下一个 thenCompose 、 thenApply()、thenApplyAsync、 thenAccept() 、thenRun()

    作用都是使得一个阶段的结果可以作为下一个阶段的输入

    尽管它们的目标相同,但它们在使用方式和功能上有一些区别。

  • thenCompose()thenApply()方法用于将前一个阶段的结果与下一个阶段进行组合
  • thenApplyAsync()方法在此基础上实现异步执行
  • thenAccept()方法用于处理前一个阶段的结果而不需要返回值
  • thenRun()方法用于在前一个阶段完成后执行一些操作
  • 不同的方法适用于不同的场景,可以根据实际需求选择合适的方法来组合异步计算。

        public static void main(String[] args) {
            SmallTool.printTimeAndThread("小白进入餐厅");
            SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");
    
            CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("厨师炒菜");
                SmallTool.sleepMillis(200);
                return "番茄炒蛋";
            }).thenCompose(dish -> CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("服务员打饭");
                SmallTool.sleepMillis(100);
                return dish + " + 米饭";
            }));
    			
            SmallTool.printTimeAndThread("小白在打王者");
            SmallTool.printTimeAndThread(String.format("%s 好了,小白开吃", cf1.join()));
        }
    

    类似

    4.2 并行

    4.2.1两个同时执行,等待结果一起返回thenCombine()

        public static void main(String[] args) {
            SmallTool.printTimeAndThread("小白进入餐厅");
            SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");
    
            CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("厨师炒菜");
                SmallTool.sleepMillis(200);
                return "番茄炒蛋";
            }).thenCombine(CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("服务员蒸饭");
                SmallTool.sleepMillis(300);
                return "米饭";
            }), (dish, rice) -> {
                SmallTool.printTimeAndThread("服务员打饭");
                SmallTool.sleepMillis(100);
                return String.format("%s + %s 好了", dish, rice);
            });
    
            SmallTool.printTimeAndThread("小白在打王者");
            SmallTool.printTimeAndThread(String.format("%s ,小白开吃", cf1.join()));
    
        }
    

    4.2.2 两个任务都完成了,不关注执行结果的进行下一步操作runAfterBoth/runAfterBothAsync

     public static void main(String[] args) {
            SmallTool.printTimeAndThread("小白进入餐厅");
            SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");
    
            CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("厨师炒菜");
                SmallTool.sleepMillis(200);
                return "番茄炒蛋";
            });
            cf1.runAfterBoth(CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("服务员蒸饭");
                SmallTool.sleepMillis(300);
                return "米饭";
            }), () -> {
                SmallTool.printTimeAndThread("厨师和服务员都执行完了");
            }).join();
            SmallTool.printTimeAndThread("结束");
        }
    

    4.2.3 两个任务并行进行用快的那个的结果作为后续处理applyToEither\applyToEitherAsync

        public static void main(String[] args) {
            SmallTool.printTimeAndThread("张三走出餐厅,来到公交站");
            SmallTool.printTimeAndThread("等待 700路 或者 800路 公交到来");
    
            CompletableFuture<String> bus = CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("700路公交正在赶来");
                SmallTool.sleepMillis(1);
                return "700路到了";
            }).applyToEither(CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("800路公交正在赶来");
                SmallTool.sleepMillis(3);
                return "800路到了";
            }), firstComeBus -> firstComeBus);
    
            SmallTool.printTimeAndThread(String.format("%s,小白坐车回家", bus.join()));
        }
    

    4.3 异常处理

    如果在原始的supplyAsync()任务中发生一个错误,这时候没有任何thenApply()会被调用并且future将以一个异常结束。如果在第一个thenApply发生错误,这时候第二个和第三个将不会被调用,同样的,future将以异常结束。

    4.3.1 使用 exceptionally() 回调处理异常

  • exceptionally()当前一个阶段的计算出现异常时,会执行该函数,并将异常作为输入参数。你可以在这里记录这个异常并返回一个默认值。
  • public static void main(String[] args) {
            SmallTool.printTimeAndThread("张三走出餐厅,来到公交站");
            SmallTool.printTimeAndThread("等待 700路 或者 800路 公交到来");
    
            CompletableFuture<String> bus = CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("700路公交正在赶来");
                SmallTool.sleepMillis(100);
                return "700路到了";
            }).applyToEither(CompletableFuture.supplyAsync(() -> {
                SmallTool.printTimeAndThread("800路公交正在赶来");
                SmallTool.sleepMillis(200);
                return "800路到了";
            }), firstComeBus -> {
                SmallTool.printTimeAndThread(firstComeBus);
                if (firstComeBus.startsWith("700")) {
                    throw new RuntimeException("撞树了……");
                }
                return firstComeBus;
            }).exceptionally(e -> {
                SmallTool.printTimeAndThread(e.getMessage());
                SmallTool.printTimeAndThread("小白叫出租车");
                return "出租车 叫到了";
            });
    
            SmallTool.printTimeAndThread(String.format("%s,小白坐车回家", bus.join()));
        }
    

    如果上方不好理解,可以看下面方法

    public static void main(String[] args) {
    		Integer age = -1;
    		
    		CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    		    if(age < 0) {
    		        throw new IllegalArgumentException("Age can not be negative");
    		    }
    		    if(age > 18) {
    		        return "Adult";
    		    } else {
    		        return "Child";
    		    }
    		}).exceptionally(ex -> {
    		    System.out.println("Oops! We have an exception - " + ex.getMessage());
    		    return "Unknown!";
    		});
    		
    		System.out.println("Maturity : " + maturityFuture.get()); 
    }
    

    4.3.2 使用 handle() 方法处理异常

    API提供了一个更通用的方法 – handle()从异常恢复,无论一个异常是否发生它都会被调用。

    public static void main(String[] args) {
    		Integer age = -1;
    		CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    		    if(age < 0) {
    		        throw new IllegalArgumentException("Age can not be negative");
    		    }
    		    if(age > 18) {
    		        return "Adult";
    		    } else {
    		        return "Child";
    		    }
    		}).handle((res, ex) -> {
    		    if(ex != null) {
    		        System.out.println("Oops! We have an exception - " + ex.getMessage());
    		        return "Unknown!";
    		    }
    		    return res;
    		});
    		
    		System.out.println("Maturity : " + maturityFuture.get());
    }
    

    4.3.3 使用 whenComplete()/whenCompleteAsync() 方法处理异常

    whenComplete()handle()方法在功能上确实有一些相似之处。它们都可以在异步计算完成后执行一些操作,并且都可以处理结果或异常。

    区别:

  • whenComplete()主要用于执行副作用操作,没有返回值
  • handle()主要用于处理结果和异常,并返回一个新的结果
  • 根据具体需求,可以选择使用不同的方法来满足需求。

    参考:

    	//有异常时执行exceptionally()
        CompletableFuture.runAsync(() -> System.out.println("执行自己的操作"), asyncTaskExecutor)
                 .exceptionally(e -> {
                     log.info("获取浙农码失败{}", e.getMessage());
                     return null;
                 });
         //需要返回值
         CompletableFuture.runAsync(() -> System.out.println("执行自己的操作"), asyncTaskExecutor)
                 .handle((result,ex) -> {
                     if (ex != null) log.info("品种推荐同步执行失败", ex);
                     return result;
                 });
         //没有返回值
         CompletableFuture.runAsync(() -> System.out.println("执行自己的操作"), asyncTaskExecutor)
                 .whenComplete((result,ex) -> {
                     if (ex != null) log.info("品种推荐同步执行失败", ex);
                 });
    

    4.4 批量执行(组合多个CompletableFuture)

  • 我们使用thenCompose() thenCombine()把两个CompletableFuture组合在一起。现在如果你想组合任意数量的CompletableFuture,应该怎么做?我们可以使用以下两个方法组合任意数量的CompletableFuture。
    static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
    static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
    
  • 区别

  • CompletableFuture.allOf()返回一个CompletableFuture<Void>对象,表示所有传入的CompletableFuture都已经完成
  • completableFuture.anyOf()返回一个CompletableFuture<Object>对象,表示任意一个传入的CompletableFuture对象已经完成
  • 4.4.1 CompletableFuture.allOf()

    所有传入的CompletableFuture都已经完成

    案例中你只需关注allOf()方法的使用就可以了

    没有返回值
        public static void main(String[] args) {
    
            SmallTool.printTimeAndThread("小白和小伙伴们 进餐厅点菜");
            // 点菜
            ArrayList<Dish> dishes = new ArrayList<>();
            for (int i = 1; i <= 12; i++) {
                Dish dish = new Dish("菜" + i, 1);
                dishes.add(dish);
            }
            // 做菜
            ArrayList<CompletableFuture> cfList = new ArrayList<>();
            for (Dish dish : dishes) {
                CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> dish.make());
                cfList.add(cf);
            }
            // 等待所有任务执行完毕
            CompletableFuture.allOf(cfList.toArray(new CompletableFuture[cfList.size()])).join();
        }
    

    使用Lambda表达式(和上面逻辑一样)

     public static void main(String[] args) {
    
            SmallTool.printTimeAndThread("小白和小伙伴们 进餐厅点菜");
       			CompletableFuture[] dishes = IntStream.rangeClosed(1, 12)
                    .mapToObj(i -> new Dish("菜" + i, 1))
                    .map(dish -> CompletableFuture.runAsync(dish::make))
                    .toArray(size -> new CompletableFuture[size]);
    
            CompletableFuture.allOf(dishes).join();
        }
    
  • 使用IntStream.rangeClosed(1, 12)创建了一个整数流,表示1到12之间的数字
  • 使用mapToObj方法将每个数字转换为一个Dish对象。这里使用了一个自定义的Dish类,表示一个菜品,其中包含菜品名称和制作所需时间
  • 使用map方法将每个Dish对象转换为一个CompletableFuture对象。这里使用了CompletableFuture.runAsync方法,该方法会接收一个Runnable参数,并在后台线程中异步执行Runnable的任务。这里使用了方法引用dish::make,表示异步执行Dish对象的make方法。
  • 最后,使用toArray方法将生成的CompletableFuture对象存储在一个CompletableFuture数组中。该数组将用于后续操作。
  • 有返回值
      @Slf4j
    public class OrderJob  {
        
        @Autowired
        private businessServie businessServie;
        
        @Resource(name ="asyncTaskExecutor")
        private ThreadPoolTaskExecutor asyncTaskExecutor;
    
        public void execute() {
            //1.从表中查询出1000条订单数据处理
            List<Order> orderList = businessServie.selectList();
            
            //每条子线程处理数据的数量
            int perCount = 100;
            //拆分集合,每个100
            List<List<Order>> partitions = Lists.partition(orderList, perCount);
            log.info("批量处理多线程开始,本次处理的订单数量:{}",orderList.size());
            log.info("多线程数量:{}",partitions.size());
    
            List<CompletableFuture> futures = Lists.newArrayList();
            for(List<Order> dataList : partitions){
                List<Order> finalDataList = dataList;
                CompletableFuture<List<Order>> future = CompletableFuture.supplyAsync(() -> createOrder(finalDataList),asyncTaskExecutor);
                futures.add(future);
            }
            //等待所有线程执行完成
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
            //获取线程返回结果,封装集合
            List<Order> updateList = new ArrayList<>();
    
            futures.stream().forEach(future->{
                try {
                    List<Order> list = (List<Order>) future.get();
                    if(CollectionUtils.isNotEmpty(list)){
                        updateList.addAll(list);
                    }
                }catch (Exception e){
                    log.error("获取多线程返回结果数据异常",e);
                }
            });
            businessServie.updateBatchList(updateList);
        }
    }
    

    4.4.2 CompletableFuture.anyOf()

    任意一个传入的CompletableFuture对象已经完成

    //调用5次同时执行,等待最快返回的结果
        List<CompletableFuture<String>> futures = new ArrayList<>();
         for (int i = 0; i < 5; i++) {
             int finalI = i;
             CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                 String result = HttpRequest.post(urlRequestId)
                         .body(JSON.toJSONString(body))
                         .headerMap(header, true)
                         .timeout(3000)
                         .execute()
                         .body();
                 log.info("获取第三方接口信息");
                 return result;
             });
             futures.add(future);
         }
    
         // 等待任意一个CompletableFuture完成
         CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()]));
         String join = (String) anyFuture.join();
    

    这里也可以设置等待时间

    等待任意一个CompletableFuture完成,最多等待5秒,如果在指定的时间内没有得到结果,就会抛出TimeoutException异常。

    try {
        String join = (String) anyFuture.get(5, TimeUnit.SECONDS);
        // 处理获取到的结果
    } catch (TimeoutException e) {
        // 处理超时异常
        System.out.println("获取结果超时");
        // 其他操作
    }
    

    作者:Dream_sky分享

    物联沃分享整理
    物联沃-IOTWORD物联网 » Java异步编程CompletableFuture(串行,并行,批量执行)

    发表回复