Java8异步编程

发布于:2022-12-19 ⋅ 阅读:(954) ⋅ 点赞:(0)

Java8异步编程

引言

  • 参考Java多线程几种实现方式,我们可以很快速的通过new Thread(...).start()开启一个新的线程,但是这样创建线程会有很多坏处:

    • 每次都要新建一个对象,性能差;

    • 建出来的很多个对象是独立的,缺乏统一的管理。如果在代码中无限新建线程会导致这些线程相互竞争,占用过多的系统资源从而导致死机或者 oom;

    • 缺乏许多功能如定时执行、中断等。

  • 因此Java给我们提供好一个十分好用的工具,那就是线程池。线程池(ThreadPool)是一种基于池化思想管理和使用线程的机制。它是将多个线程预先存储在一个"池子"内,当有任务出现时可以避免重新创建和销毁线程所带来性能开销,只需要从"池子"内取出相应的线程执行对应的任务即可。

Java Executors线程池

  • Java 线程池概述

  • Java提供了一个工厂类来构造我们需要的线程池,这个工厂类就是 Executors 。这里主要讲6个创建线程池的方法。默认拒绝策略都为AbortPolicy,即丢弃任务并抛出RejectedExecutionException异常-

    • newCachedThreadPool()

    • newFixedThreadPool(int nThreads)

    • newScheduledThreadPool(int corePoolSize)

    • newSingleThreadExecutor()

    • newSingleThreadScheduledExecutor()

    • newWorkStealingPool()

newCachedThreadPool()

  • 创建缓存线程池。缓存的意思就是这个线程池会根据需要创建新的线程,在有新任务的时候会优先使用先前创建出的线程。线程一旦创建了就一直在这个池子里面了,执行完任务后后续还有任务需要会重用这个线程,若是线程不够用了再去新建线程。

  • public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());

  • corePoolSize = 0

  • maximumPoolSize = Integer.MAX_VALUE,即线程数量几乎无限制

  • keepAliveTime = 60s,线程空闲 60s 后自动结束

  • workQueue 为 SynchronousQueue 同步队列,这个队列类似于一个接力棒,入队出队必须同时传递,因为 CachedThreadPool 线程创建无限制,不会有队列等待,所以使用 SynchronousQueue

适用场景

  • 快速处理大量耗时较短的任务,如 Netty 的 NIO 接受请求时,可使用 CachedThreadPool

  • ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < 10; i++) {
        final int index = i;
        
    // 每次发布任务前根据奇偶不同等待一段时间,如1s,这样就会创建两个线程
    if (i % 2 == 0) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    ​
    // 执行任务
    executorService.execute(() -> System.out.println(Thread.currentThread().getName() + ":" + index));

  • 但注意这里的线程池是无限大的,并没有规定他的大小

newFixedThreadPool(int nThreads)

  • 创建定长线程池,参数是线程池的大小。也就是说,在同一时间执行的线程数量只能是 nThreads 这么多,这个线程池可以有效的控制最大并发数从而防止占用过多资源。超出的线程会放在线程池的一个无界队列里等待其他线程执行完。

  • public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());}

  • corePoolSize 与 maximumPoolSize 相等,即其线程全为核心线程,是一个固定大小的线程池,是其优势

  • keepAliveTime = 0 该参数默认对核心线程无效,而 FixedThreadPool 全部为核心线程

  • workQueue 为 LinkedBlockingQueue(无界阻塞队列),队列最大值为 Integer.MAX_VALUE。如果任务提交速度持续大于任务处理速度,会造成队列大量阻塞。因为队列很大,很有可能在拒绝策略前,内存溢出。是其劣势;

  • FixedThreadPool 的任务执行是无序的

适用场景

  • 可用于 Web 服务瞬时削峰,但需注意长时间持续高峰情况造成的队列阻塞。

newScheduledThreadPool(int corePoolSize)

  • ScheduledThreadPool创建一个可以执行延迟任务的线程池,同时这个线程池也是定长的,参数 corePoolSize 就是线程池的大小,即在空闲状态下要保留在池中的线程数量。而要实现调度需要使用这个线程池的 schedule() 方法,队列使用了DelayedWorkQueue延迟队列

  • ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
    // 三秒后执行
    scheduledExecutorService.schedule(() -> System.out.println(Thread.currentThread().getName() + ": 我会在3秒后执行。"),
                    3, TimeUnit.SECONDS);

newSingleThreadExecutor()

  • 创建单线程池,只使用一个线程来执行任务。但是它与 newFixedThreadPool(1, threadFactory) 不同,它会保证创建的这个线程池不会被重新配置为使用其他的线程,也就是说这个线程池里的线程始终如一,同时可以保证先进先出的执行顺序

  • ExecutorService executorService = Executors.newSingleThreadExecutor();

newSingleThreadScheduledExecutor()

  • 创建一个单线程的可以执行延迟任务的线程池

  • ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();
    System.out.println("添加任务,时间:" + new Date());
    threadPool.schedule(() -> {
        System.out.println("任务被执行,时间:" + new Date());
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
        }
    }, 2, TimeUnit.SECONDS);

newWorkStealingPool()

  • 创建一个抢占式执行的线程池(任务执行顺序不确定),注意此方法只有在 JDK 1.8+ 版本中才能使用

  • ExecutorService threadPool = Executors.newWorkStealingPool();
    for (int i = 0; i < 10; i++) {
        final int index = i;
        threadPool.execute(() -> {
            System.out.println(index + " 被执行,线程名:" + Thread.currentThread().getName());
        });
    }
    // 确保任务执行完成
    while (!threadPool.isTerminated()) {
    ​
    }

线程池的关闭

  • 线程池启动后需要手动关闭,否则会一直不结束

  • shutdown() : 将线程池状态置成 SHUTDOWN,此时不再接受新的任务,等待线程池中已有任务执行完成后结束

  • shutdownNow() : 将线程池状态置成 SHUTDOWN,将线程池中所有线程中断(调用线程的 interrupt() 操作),清空队列,并返回正在等待执行的任务列表

  • 并且它还提供了查看线程池是否关闭和是否终止的方法,分别为 isShutdown() 和 isTerminated()

自定义线程池ThreadPoolExecutor

概述

  • 阿里巴巴的JAVA开发手册推荐用ThreadPoolExecutor创建线程池,可以规避资源耗尽的风险。因为Executors线程池都不支持自定义拒绝策略。newFixedThreadPool 和 newSingleThreadExecutor主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至 OOM。newCachedThreadPool 和 newScheduledThreadPool主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。

自定义线程池参数介绍

  • public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler);

  • corePoolSize:核心线程数,线程池中始终存活的线程数

  • maximumPoolSize:最大线程数,线程池中允许的最大线程数,当线程池的任务队列满了之后可以创建的最大线程数

  • keepAliveTime:最大线程数可以存活的时间,当线程中没有任务执行时,最大线程就会销毁一部分,最终保持核心线程数量的线程

  • unit:keepAliveTime的时间单位,和keepAliveTime配合使用

  • workQueue:一个阻塞队列,用来存储线程池等待执行的任务,均为线程安全。它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种

    • SynchronousQueue(常用):直接提交队列。SynchronousQueue没有容量,所以实际上提交的任务不会被添加到任务队列,总是将新任务提交给线程执行,如果没有空闲的线程,则尝试创建新的线程,如果线程数量已经达到最大值(maximumPoolSize),则执行拒绝策略

    • LinkedBlockingQueue(常用):无界的任务队列。当有新的任务来到时,若系统的线程数小于corePoolSize,线程池会创建新的线程执行任务;当系统的线程数量等于corePoolSize后,因为是无界的任务队列,总是能成功将任务添加到任务队列中,所以线程数量不再增加。若任务创建的速度远大于任务处理的速度,无界队列会快速增长,直到内存耗尽。

    • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列

    • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列

    • DelayQueue:一个使用优先级队列实现的无界阻塞队列,只有在延迟期满时才能从中提取元素

    • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。与SynchronousQueue类似,还含有非阻塞方法

    • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列

  • threadFactory:线程工厂,用于创建线程,一般情况下使用默认的,即Executors类的静态方法defaultThreadFactory();

  • handler:拒绝策略。当任务太多来不及处理时,如何拒绝任务。系统提供四种,默认策略为 AbortPolicy

    • DiscardOldestPolicy:丢弃任务队列中最早添加的任务,并尝试提交当前任务

    • CallerRunsPolicy:调用主线程执行被拒绝的任务,这提供了一种简单的反馈控制机制,将降低新任务的提交速度

    • DiscardPolicy:默默丢弃无法处理的任务,不予任何处理

    • AbortPolicy:直接抛出异常,阻止系统正常工作

线程池的执行流程

  • 当线程数小于核心线程数时,创建线程

  • 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列

  • 当线程数大于等于核心线程数,且任务队列已满:若线程数小于最大线程数,创建线程;若线程数等于最大线程数,抛出异常,拒绝任务

自定义线程池demo

  • @Slf4j
    public class ThreadPoolService {
    ​
        public static void main(String[] args) throws Exception {
    ​
    ​
            BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
            NameThreadFactory threadFactory = new NameThreadFactory();
            RejectedExecutionHandler handler = new MyIgnorePolicy();
            //默认线程工程 Executors.defaultThreadFactory()
            //默认拒绝策略 new ThreadPoolExecutor.AbortPolicy()
            ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 2000, TimeUnit.MILLISECONDS,
                    workQueue, threadFactory,  handler);
            // 预启动所有核心线程
            executor.prestartAllCoreThreads();
    ​
            for (int i = 1; i <= 10; i++) {
                MyTask task = new MyTask(String.valueOf(i));
                executor.execute(task);
            }
    ​
            //阻塞主线程
            System.in.read();
        }
    ​
        /**
         * Description:自定义线程名字
        */
        static class NameThreadFactory implements ThreadFactory {
            private final AtomicInteger mThreadNum = new AtomicInteger(1);
    ​
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
                System.out.println(t.getName() + " has been created");
                return t;
            }
        }
    ​
        static class MyIgnorePolicy implements RejectedExecutionHandler {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                doLog(r, executor);
            }
    ​
            private void doLog(Runnable r, ThreadPoolExecutor e) {
                // 可做日志记录等
                System.err.println( r.toString() + " rejected");
                System.out.println("completedTaskCount: " + e.getCompletedTaskCount());
            }
        }
    ​
        static class MyTask implements Runnable {
            private String name;
    ​
            public MyTask(String name) {
                this.name = name;
            }
    ​
            @Override
            public void run() {
                try {
                    System.out.println(this.toString() + " is running!");
                    // 让任务执行慢点
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    ​
            public String getName() {
                return name;
            }
    ​
            @Override
            public String toString() {
                return "MyTask [name=" + name + "]";
            }
        }
    }

异步编程函数式接口

简介

  • 函数式接口可以参考java8常用新特性,这里主要介绍几种函数式接口,Callable、Runnable、Future、CompletableFuture和FutureTask

Callable和Runnable异同

  • //两个接口的定义
    @FunctionalInterface
    public interface Runnable {
        
        public abstract void run();
    }
    ​
    @FunctionalInterface
    public interface Callable<V> {
        
        V call() throws Exception;
    }
    ​

  • 相同点

  • 都是接口,都可以编写多线程程序,都可以通过线程池启动线程

  • 不同点

  • Runnable没有返回值,Callable可以返回执行结果,是个泛型;Callable接口的call()方法允许抛出异常;而Runnable接口的run()方法的异常只能在内部消化,不能继续上抛

  • public class Test1 {
    ​
        static class Min implements Callable<Integer> {
            @Override
            public Integer call() throws Exception {
                Thread.sleep(1000);
                return 4;
            }
        }
        static class Max implements Runnable{
            @SneakyThrows
            @Override
            public void run() {
                Thread.sleep(1000);
            }
        }
        
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            executorService.submit(new Min());
            executorService.submit(new Max());
        }

Future类

future介绍

  • Future是Java5新加的一个接口,它提供了一种异步并行计算的功能。如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。主线程继续处理其他任务,处理完成后,再通过Future获取计算结果

  • //Since:1.5
    public interface Future<V> {
        //取消任务的执行,参数指定是否立即中断任务执行,或者等等任务结束
        boolean cancel(boolean mayInterruptIfRunning);
        //任务是否已经取消,任务正常完成前将其取消,则返回 true
        boolean isCancelled();
        //任务是否已经完成。需要注意的是如果任务正常终止、异常或取消,都将返回true
        boolean isDone();
        //等待任务执行结束,然后获得V类型的结果。InterruptedException 线程被中断异常, ExecutionException任务执行异常,如果任务被取消,还会抛出CancellationException
        V get() throws InterruptedException, ExecutionException;
        //同上面的get功能一样,多了设置超时时间。超时会抛出TimeoutException
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }

  • 一般情况下,我们会结合Callable和Future一起使用,通过ExecutorService的submit方法执行Callable,并返回Future。

  • public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
           ExecutorService executor = Executors.newCachedThreadPool();
           //Lambda 是一个 callable, 提交后便立即执行,这里返回的是 FutureTask 实例
           Future<String> future = executor.submit(() -> {
               System.out.println("running task");
               Thread.sleep(10000);
               return "return task";
           });
           future.get(2, TimeUnit.SECONDS);
       }

  • 当然Future模式也有它的缺点,它没有提供通知的机制,我们无法得知Future什么时候完成。如果要在future.get()的地方等待future返回的结果,那只能通过isDone()轮询查询。

FutureTask

介绍

  • FutureTask (Java Platform SE 8 )

  • Future是一个接口,是无法生成一个实例的,所以又有了FutureTask。FutureTask实现了RunnableFuture接口,RunnableFuture接口又实现了Runnable接口和Future接口。所以FutureTask既可以被当做Runnable来执行,也可以被当做Future来获取Callable的返回结果。

CompletableFuture类

介绍

  • CompletableFuture (Java Platform SE 8 )

  • CompletableFuture能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。它避免了传统回调最大的问题,那就是能够将控制流分离到不同的事件处理器中。CompletableFuture弥补了Future模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。可以直接通过thenAccept、thenApply、thenCompose等方式将前面异步处理的结果交给另外一个异步事件处理线程来处理。

异步任务创建

  • CompletableFuture创建异步任务,一般有supplyAsync和runAsync两个方法

    • supplyAsync执行CompletableFuture任务,支持返回值

    • runAsync执行CompletableFuture任务,没有返回值

  • 方法名 描述
    runAsync(Runnable runnable) 使用ForkJoinPool.commonPool()作为它的线程池执行异步代码
    runAsync(Runnable runnable, Executor executor) 使用指定的thread pool执行异步代码
    supplyAsync(Supplier supplier) 使用ForkJoinPool.commonPool()作为它的线程池执行异步代码,异步操作有返回值
    supplyAsync(Supplier supplier, Executor executor) 使用指定的thread pool执行异步代码,异步操作有返回值
  • public static void main(String[] args) {
        //可以自定义线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        //runAsync的使用
        CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("run,shawn"), executor);
        //supplyAsync的使用
        CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
                    System.out.print("supply,shawn");
                    return "shawn"; }, executor);
        //runAsync的future没有返回值,输出null
        System.out.println(runFuture.join());
        //supplyAsync的future,有返回值
        System.out.println(supplyFuture.join());
        executor.shutdown(); // 线程池需要关闭
    }

任务异步回调

  • thenRun/thenRunAsync

  • 做完第一个任务后,再做第二个任务。某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值

  • thenAccept/thenAcceptAsync

  • 第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。

  • thenApply/thenApplyAsync

  • 第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

  • 注意:上述三个方法如果执行第一个任务的时候,传入了一个自定义线程池:

  • 调用xxx方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。

  • 调用xxxAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池

  • //举个例子
    public static void main(String[] args) throws Exception {
            CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                    ()->{
                        System.out.println("原始CompletableFuture方法任务");
                        return "shawn";
                    }
            );
            CompletableFuture<String> thenApplyFuture = orgFuture.thenApply((a) -> {
                if ("shawn".equals(a)) {
                    return "确实很帅";
                }
                return "考虑考虑";
            });
    ​
            System.out.println(thenApplyFuture.get());
        }

  • exceptionally

  • 某个任务执行异常时,执行的回调方法;并且有抛出异常作为参数,传递到回调方法

  • whenComplete

  • 某个任务执行完成后,执行的回调方法,无返回值;并且whenComplete方法返回的CompletableFuture的result是上个任务的结果

  • handle

  • 某个任务执行完成后,执行回调方法,并且是有返回值的;并且handle方法返回的CompletableFuture的result是回调方法执行的结果

  • //举例
    CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                ()->{
                    System.out.println("当前线程名称:" + Thread.currentThread().getName());
                    try {
                        Thread.sleep(2000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return "shawn";
                }
        );
        //exceptionally使用方法是orgFuture.exceptionally((e)->{...})
        CompletableFuture<String> rstFuture = orgFuture.handle((a, throwable) -> {
    ​
            System.out.println("上个任务执行完啦,还把" + a + "传过来");
            if ("shawn".equals(a)) {
                System.out.println("666");
                //whenComplete没有返回
                return "好帅啊";
            }
            System.out.println("233333");
            return null;
        });
    ​
        System.out.println(rstFuture.get());
    }
    ​
  • 最后对于complete方法

  • public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
            //在这里执行返回值为World
            // future.complete("World");
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //在这里执行结果为Hello
            future.complete("World");
    ​
            try {
                //get() 方法会抛出经检查的异常,可被捕获,自定义处理或者直接抛出。join() 会抛出未经检查的异常。
                System.out.println(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

多个任务组合处理

  • AND组合关系

  • thenCombine / thenAcceptBoth / runAfterBoth都表示:将两个CompletableFuture组合起来,只有这两个都正常执行完了,才会执行某个任务。

  • 区别在于:

  • thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值

  • thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值

  • runAfterBoth 不会把执行结果当做方法入参,且没有返回值。

  • OR组合关系

  • applyToEither / acceptEither / runAfterEither 都表示:将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务。

  • 区别在于:

  • applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值

  • acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值

  • runAfterEither:不会把执行结果当做方法入参,且没有返回值。

  • //举个例子
    public static void main(String[] args) {
        //第一个异步任务,休眠2秒,保证它执行晚点
        CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{
            try{
    ​
                Thread.sleep(2000L);
                System.out.println("执行完第一个异步任务");}
            catch (Exception e){
                return "第一个任务异常";
            }
            return "第一个异步任务";
        });
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<Void> future = CompletableFuture
                //第二个异步任务
                .supplyAsync(() -> {
                            System.out.println("执行完第二个任务");
                            return "第二个任务";}
                        , executor)
                //第三个任务
                .acceptEitherAsync(first, System.out::println, executor);
    ​
        executor.shutdown();
    }

  • AllOf

  • 所有任务都执行完成后,才执行 allOf返回的CompletableFuture。如果任意一个任务异常,allOf的CompletableFuture,执行get方法,会抛出异常

  • AnyOf

  • 任意一个任务执行完,就执行anyOf返回的CompletableFuture。如果执行的任务异常,anyOf的CompletableFuture,执行get方法,会抛出异常

  • //举例
    public static void main(String[] args) {
    ​
        CompletableFuture<Void> a = CompletableFuture.runAsync(()->{
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("我执行完了");
        });
        CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
            System.out.println("我也执行完了");
        });
        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m,k)->{
            System.out.println("finish");
            //handle可以返回
    //            return "shawn";
        });
        anyOfFuture.join();
    }

  • thenCompose

  • thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例

  • 如果该CompletableFuture实例的result不为null,则返回一个基于该result新的CompletableFuture实例;

  • 如果该CompletableFuture实例为null,然后就执行这个新任务

  • //举个例子
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    ​
        CompletableFuture<String> f = CompletableFuture.completedFuture("第一个任务");
        //第二个异步任务
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> "第二个任务", executor)
                .thenComposeAsync(data -> {
                    System.out.println(data); return f; //使用第一个任务作为返回
                }, executor);
        //get() 方法会抛出经检查的异常,可被捕获,自定义处理或者直接抛出。而 join() 会抛出未经检查的异常。
        System.out.println(future.join());
        executor.shutdown();
    ​
    }

CompletableFuture使用注意点

  • Future需要获取返回值,才能获取异常信息

  • Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。小伙伴们使用的时候,注意一下哈,考虑是否加try...catch...或者使用exceptionally方法

  • CompletableFuture的get()方法是阻塞的

  • CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间

  • 默认线程池的注意点

  • CompletableFuture代码中又使用了默认的线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。

  • 自定义线程池时,注意饱和策略

  • 如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离

CompletionService使用

介绍

  • 和CompletableFuture类似,CompletionService是对定义ExecutorService进行了包装,可以一边生成任务,一边获取任务的返回值。让这两件事分开执行,任务之间不会互相阻塞,可以获取最先完成的任务结果。

  • CompletionService的实现原理比较简单,底层通过FutureTask+阻塞队列,实现了任务先完成的话,可优先获取到。也就是说任务执行结果按照完成的先后顺序来排序,先完成可以优化获取到。内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,你调用CompletionService的poll或take方法即可获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果

demo代码

  • ExecutorService executor = Executors.newFixedThreadPool(10);
    //查询用户信息
    CompletionService<Object> baseDTOCompletionService = new ExecutorCompletionService<Object>(executor);
    Callable<Object> userInfoDTOCallableTask = () -> {
        UserInfoParam userInfoParam = buildUserParam(req);
        return userService.queryUserInfo(userInfoParam);
    };
    //banner信息任务
    Callable<Object> bannerDTOCallableTask = () -> {
        BannerParam bannerParam = buildBannerParam(req);
        return bannerService.queryBannerInfo(bannerParam);
    };
    ​
    //提交用户信息任务
    baseDTOCompletionService.submit(userInfoDTOCallableTask);
    //提交banner信息任务
    baseDTOCompletionService.submit(bannerDTOCallableTask);
    ​
    //Future<Object> baseRspDTOFuture = baseDTOCompletionService.poll(1, TimeUnit.SECONDS);

网站公告

今日签到

点亮在社区的每一天
去签到