【CompletableFuture】常用方法(三)

发布于:2025-06-18 ⋅ 阅读:(22) ⋅ 点赞:(0)

1. 线程池的使用

默认情况下通过静态方法 runAsyncsupplyAsync 创建的 CompletableFuture 任务会使用公
共的 ForkJoinPool 线程池,其默认的线程数是 CPU 的核数。当然,其线程数可以通过以下 JVM
参数去设置:
option:-Djava.util.concurrent.ForkJoinPool.common.parallelism

问题是:如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 IO
操作,就会导致线程池中所有线程都阻塞在 IO 操作上,从而造成线程饥饿,进而影响整个系统
的性能。所以,强烈建议大家根据不同的业务类型创建不同的线程池,以避免互相干扰。

所以,建议大家在生产环境使用时,根据不同的业务类型创建不同的线程池,以避免互相影
响。前面第一章为大家介绍了三种线程池:IO 密集型任务线程池、CPU 密集型任务线程池、混合
型任务线程池。大家可以根据不同的任务类型,确定线程池的类型和线程数。

作为演示,这里使用“混合型任务线程池”执行CompletableFuture任务,具体的代码如下:
在这里插入图片描述
在这里插入图片描述

2. 异步任务的串行执行

如果两个异步任务需要串行(当一个任务依赖另一个任务)执行,可以通过 CompletionStage
接口的 thenApply、thenAccept、thenRun 和 thenCompose 四个方法实现。

2.1 thenApply 方法

thenApply 方法有三个重载版本,三个版本的声明如下:

// 后一个任务与前一个任务在同一个线程中执行
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)

// 后一个任务与前一个任务可以不在同一个线程中执行
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)

// 后一个任务在指定的 executor 线程池中执行
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

thenApply 三个重载版本有一个共同的参数 fn,该参数表示待串行执行的第二个异步任务,
其类型为 Function。fn 的类型声明涉及到两个范型参数,具体如下:

  • 范型参数 T:上一个任务所返回结果的类型。
  • 范型参数 U:当前任务的返回值类型。

作为示例,使用 thenApply 分两步计算(10+10)*2,代码如下:
在这里插入图片描述
在这里插入图片描述

Function<T, R>接口既能接收参数也支持返回值,所以 thenApply 可以将前一个任务的结果,
通过 FunctionR apply(T t) 方法传递给到第二个任务,并且能输出第二个任务的执行结果。

2.1.1 thenApply(Function<? super T, ? extends U> fn)

再举一个🌰

同步执行:后一个任务和前一个任务 在同一个线程中执行(如果前一个任务是异步的,使用的是那个线程)。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1线程: " + Thread.currentThread().getName());
    return "hello";
}).thenApply(result -> {
    System.out.println("任务2线程: " + Thread.currentThread().getName());
    return result.toUpperCase();
});
future.join();

输出的结果:

任务1线程: ForkJoinPool.commonPool-worker-1
任务2线程: ForkJoinPool.commonPool-worker-1

✅ 任务2复用了任务1的线程。

2.1.2 thenApplyAsync(Function<? super T, ? extends U> fn)

异步执行(默认线程池):后一个任务在公共线程池中异步执行,不一定和前一个任务在同一个线程。

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1线程: " + Thread.currentThread().getName());
    return "hello";
}).thenApplyAsync(result -> {
    System.out.println("任务2线程: " + Thread.currentThread().getName());
    return result.toUpperCase();
});
future.join();
任务1线程: ForkJoinPool.commonPool-worker-1
任务2线程: ForkJoinPool.commonPool-worker-3

✅ 任务2可能使用不同的线程(ForkJoinPool)执行。

2.1.3 thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)

异步执行(指定线程池):后一个任务在你提供的 Executor 线程池中执行,更灵活控制线程来源。

ExecutorService executor = Executors.newFixedThreadPool(2);

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1线程: " + Thread.currentThread().getName());
    return "hello";
}).thenApplyAsync(result -> {
    System.out.println("任务2线程: " + Thread.currentThread().getName());
    return result.toUpperCase();
}, executor);

future.join();
executor.shutdown();

输出可能:

任务1线程: ForkJoinPool.commonPool-worker-1
任务2线程: pool-1-thread-1

✅ 任务2明确使用了你自己创建的线程池中的线程。

2.1.4

方法名 是否异步 使用线程 是否可指定线程池
thenApply 同前任务线程
thenApplyAsync(fn) 默认线程池(ForkJoinPool)
thenApplyAsync(fn, executor) 指定线程池 ✅ 是

2.2 thenRun 方法

thenRun 与 thenApply 方法不一样的是,不关心任务的处理结果。只要前一个任务执行完成,就开始执行后一个串行任务。

thenApply 方法也有三个重载版本,三个版本的声明如下:

// 后一个任务与前一个任务在同一个线程中执行
public CompletionStage<Void> thenRun(Runnable action);

// 后一个任务与前一个任务可以不在同一个线程中执行
public CompletionStage<Void> thenRunAsync(Runnable action);

// 后一个任务在 executor 线程池中执行
public CompletionStage<Void> thenRunAsync(Runnable action,Executor 
executor);

从方法的声明可以看出,thenRun 方法同 thenApply 方法类似;不同的:前一个任务处理完成后,thenRun 并不会把计算的结果传给后一个任务,而且后一个任务也没有结果输出。

thenRun 系列方法里的 action 参数是 Runnable 类型,所以 thenRun 既不能接收参数也不支持返回值。

2.2.1 方法区别

thenApply / thenApplyAsyncthenRun / thenRunAsync 的核心区别在于:

✅ 是否接收上一个任务的执行结果?

方法 是否接收上一个任务的结果? 是否有返回值? 用途
thenApply ✅ 是 → 作为参数传入 ✅ 有 用上一个任务的结果,返回新值
thenApplyAsync ✅ 是 → 异步处理上一个结果 ✅ 有 同上,但在异步线程中执行
thenRun ❌ 否 → 不关心前面结果 ❌ 无 不需要用前面结果,只是继续执行一个 Runnable 任务
thenRunAsync ❌ 否 → 不关心前面结果 ❌ 无 同上,但在异步线程中执行

1.thenApply 使用前一个结果并返回新值:

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> "hello")
    .thenApply(s -> {
        System.out.println("接收到结果: " + s);
        return s.toUpperCase();
    });

System.out.println(future.join()); // 输出:HELLO

thenRun 不关心前一个结果,也没有返回值:

CompletableFuture<Void> future = CompletableFuture
    .supplyAsync(() -> "hello")
    .thenRun(() -> {
        System.out.println("只是继续执行任务,不关心结果");
    });

future.join(); // 只是等待完成,没有结果返回

2.3 thenAccept 方法

thenAccept 折衷了 thenRun、thenApply 的特点,使用此方法,后一个任务可以接收(或消费)前一个任务的处理结果,但是后一个任务没有结果输出。

一句话总结:thenAccept 用来接收上一个任务的结果并进行处理,但不返回任何值。

方法 是否接收上一个任务的结果 是否有返回值 典型用途
thenApply ✅ 是 ✅ 有 对结果转换后返回新值
thenAccept ✅ 是 ❌ 无 对结果消费(如:打印、保存、发通知等)

thenAccept 方法有三个重载版本,三个版本的声明如下:

// 后一个任务与前一个任务在同一个线程中执行
public CompletionStage<Void> thenAccept(Consumer<? super T> action);

// 后一个任务与前一个任务可以不在同一个线程中执行
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

// 后一个任务在指定的 executor 线程池中执行
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)

使用示例:

CompletableFuture.supplyAsync(() -> "hello")
    .thenAccept(result -> {
        System.out.println("收到结果: " + result); // 输出: hello
    });

24. thenCompose 方法

thenCompose 方法在功能上与 thenApply、thenAccept、thenRun 一样,可以对两个任务进行串行的调度操作,第一个任务操作完成时,将其结果作为参数传递给第二个任务。

✅ 一句话总结:
thenCompose 用于将两个 CompletableFuture 链接起来,前一个异步任务的结果作为参数传给下一个返回 CompletableFuture 的方法,从而避免嵌套(flatMap 的效果)。

👇 示例:两个异步任务顺序执行,第一个结果传给第二个

import java.util.concurrent.CompletableFuture;

public class ThenComposeExample {
    public static void main(String[] args) {
        CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> {
            System.out.println("获取用户ID...");
            return "user123";
        }).thenCompose(userId -> {
            // 这个方法返回一个 CompletableFuture<String>
            return getUserDetail(userId);
        });

        System.out.println("最终结果:" + result.join());
    }

    // 模拟一个异步方法:根据用户ID查询用户详情
    private static CompletableFuture<String> getUserDetail(String userId) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("获取用户详情 for " + userId);
            return "用户:" + userId + ", 张三";
        });
    }
}

📌 输出示例:

获取用户ID...
获取用户详情 for user123
最终结果:用户:user123, 张三

网站公告

今日签到

点亮在社区的每一天
去签到