风离不摆烂学习日志Day15 Java 异步线程池 工具类以及使用

场景描述

我们在组装请求参数的时候经常会调用别的接口来得到这些参数 而且这些接口是独立的 我们可以利用异步线程池来完成这些接口的请求操作 可以节约请求时间 最长请求时间取决于最长接口耗时那个 我们可以封装一个请求的工具类 去批量请求 得到批量返回

线程初始工具类 (公共)

package top.flya.utils;


import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程工具类 ExecutorUtils
 * @author flya
 * @date 2019/12/18
 */

@Slf4j
public class ExecutorUtils {

    private static int corePoolSize = 10; // 线程池维护线程的最少数量

    private static int maxPoolSize = 100; // 线程池维护线程的最大数量

    private static int queueCapacity = 100; // 缓存队列

    private static Long keepAliveTime = 1L;  //设置线程池维护线程所允许的空闲时间 1L

    public  static volatile ThreadPoolExecutor threadPoolExecutorInstance = null; // 线程池

    // 单例模式 懒汉式: 线程安全 但是效率低 为了效率使用双重检查锁
    public ExecutorUtils() {
    }


    /**
     * 自定义线程池初始参数
     * @param corePoolSite 线程池维护线程的最少数量
     * @param maxPoolSite 线程池维护线程的最大数量
     * @param queueCapacity 缓存队列
     * @param keepAliveTime 设置线程池维护线程所允许的空闲时间
     */
    public static void initialize(int corePoolSite, int maxPoolSite, int queueCapacity, long keepAliveTime) {
        ExecutorUtils.corePoolSize = corePoolSite;
        ExecutorUtils.maxPoolSize = maxPoolSite;
        ExecutorUtils.queueCapacity = queueCapacity;
        ExecutorUtils.keepAliveTime = keepAliveTime;
    }

    /**
     * 获取线程池 初始化
     * @return
     */
    public static ThreadPoolExecutor getThreadPoolExecutorInstance() {
        if (threadPoolExecutorInstance == null||threadPoolExecutorInstance.isShutdown()) {
            synchronized (ExecutorUtils.class) {
                if (threadPoolExecutorInstance == null||threadPoolExecutorInstance.isShutdown()) {// 双重检查
                    log.info("初始化线程池");
                    threadPoolExecutorInstance = new ThreadPoolExecutor(corePoolSize,
                            maxPoolSize, keepAliveTime,
                            java.util.concurrent.TimeUnit.SECONDS,
                            new java.util.concurrent.ArrayBlockingQueue<Runnable>(queueCapacity),
                            new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()); //.ThreadPoolExecutor.CallerRunsPolicy() 的意思是:如果线程池已经满了,那么就由调用者所在的线程来执行任务
                }
            }
        }
        return threadPoolExecutorInstance;
    }



}

示例

 @SneakyThrows
    @GetMapping("/async")
    public Fl async() {

        ThreadPoolExecutor executor = ExecutorUtils.getThreadPoolExecutorInstance(); //获取线程池


        CompletableFuture<List<PublicUser>> futureOne = CompletableFuture.supplyAsync(() -> {
            //获取公众号用户
            List<PublicUser> publicUsers = publicUserMapper.selectList(new QueryWrapper<>());
            return publicUsers;
        }, executor);

        CompletableFuture<List<PrivateUser>> futureTwo = CompletableFuture.supplyAsync(() -> {
            //获取私人用户
            List<PrivateUser> privateUsers = privateUserMapper.selectList(new QueryWrapper<>());
            return privateUsers;
        }, executor);

//        executor.shutdown();  //这里不能关闭线程池,因为是公共的,如果关闭了,其他地方就不能用了

//        executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);//等待所有任务执行完毕 这个只能等待线程池关闭,但是线程池不能关闭,因为是公共的

        CompletableFuture[] completableFutures = Arrays.asList(futureOne, futureTwo).toArray(new CompletableFuture[0]);

        CompletableFuture.allOf(completableFutures).join();


//        CompletableFuture.allOf(futureOne, futureTwo);//等待所有任务执行完毕
        List<PublicUser> publicUsers = futureOne.get();
        List<PrivateUser> privateUsers = futureTwo.get();


        return FlResultUtils.success(MapUtil.builder().put("publicUsers", publicUsers).put("privateUsers", privateUsers).build());
    }

进一步封装

   public static void batchExec(CompletableFuture[] completableFutures)
    {
        CompletableFuture.allOf(completableFutures).join();
        //TODO 任务执行完毕后的处理
    }

拒绝策略 阻塞问题

img

线程池嵌套使用造成死锁问题(代码层面避免)

线程池内的线程又使用了线程池内的线程,父子线程相互等待:

假设 executorService 线程数最大为10
10个请求同时到达时,线程池被打满,子任务请求被迫进入阻塞队列
但是父任务的完成又依赖于子任务,这时由于子任务得不到线程,父任务无法完成
主线程执行join()进入阻塞状态,因为永远获取不到结果,永远无法恢复,造成服务故障