风离不摆烂学习日志Day16 Java 异步线程池 处理策略
风离不摆烂学习日志Day16 Java 异步线程池 处理策略
四种基本拒绝策略:
1.AbortPolicy 默认 – 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。
2.CallerRunsPolicy
常用 – 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。
3.DiscardOldestPolicy – 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
4.DiscardPolicy – 当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。
拒绝策略生效原理
新任务到达线程池时
每当有新的任务到线程池时,
- 第一步:先判断线程池中当前线程数量是否达到了
corePoolSize
,若未达到,则新建线程运行此任务,且任务结束后将该线程保留在线程池中,不做销毁处理,若当前线程数量已达到corePoolSize
,则进入下一步;- 第二步:判断工作队列(
workQueue
)是否已满,未满则将新的任务提交到工作队列中,满了则进入下一步;- 第三步:判断线程池中的线程数量是否达到了
maxuPoolSize
,如果未达到,则新建一个工作线程来执行这个任务,如果达到了则使用饱和策略来处理这个任务。注意: 在线程池中的线程数量超过corePoolSize时,每当有线程的空闲时间超过了keepAliveTime
,这个线程就会被终止。直到线程池中线程的数量不大于corePoolSize为止。
核心线程数:m,最大线程数: n(一般 n > m),线程池队列最大容量: j,请求进入线程总数: s;
线程进场后(s<m),线程会直接启动直到启动的线程数达到核心线程数(s=m);
线程继续进场(m<s<m+j),线程开始排队,此时启动的线程数不会增加直到队列饱和(s=m+j);
线程继续进场(m+j<s<n+j),每进场一个线程,该线程就会启动;直到启动的线程达到最大线程数(s=n+j);
线程继续进场(s>n+j),此时触发拒绝策略;
落地思路分析
异步处理需要考虑可能出现的问题最大有六点
- 合理控制处理线程的数量
处理线程的数量不要超过CPU核数
等于是最好的 核心线程数量设置为最大线程数的3/4 保证留余 - 找到线程池队列的最大容量 尽量减少触发拒绝策略
- 选择合适的线程池队列(
有界队列
无界队列
同步移交队列
)
- 系统并发量 当采用
CallerRunsPolicy
策略的时候 触发的时候 被拒绝的会被交由主线程来处理 并发部分进行优化,优化的核心就是思想就是:控制并发请求的流量
- 避免异步之间的依赖关系造成死锁 即互相等待(这种情况建议是不使用异步 具体场景具体分析)
- 要获取异步函数的返回值可以使用 Future,但是Future 的get方法是阻塞的,使用时需要注意 等所有异步任务结束之后再调用获取方法
针对上述四点 我们可以优化上一篇文章的代码 来解决上述问题
首先查资料可得知
IO密集型(某大厂实践经验) 核心线程数 = CPU核数 / (1-阻塞系数)
CPU密集型:核心线程数 = CPU核数 + 1
=================================================================================
(1)、CPU密集型
CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading 很高。
在多重程序系统中,大部分时间用来做计算、逻辑判断等CPU动作的程序称之CPU bound。例如一个计算圆周率至小数点一千位以下的程序,在执行的过程当中绝大部分时间用在三角函数和开根号的计算,便是属于CPU bound的程序。
CPU bound的程序一般而言CPU占用率相当高。这可能是因为任务本身不太需要访问I/O设备,也可能是因为程序是多线程实现因此屏蔽掉了等待I/O的时间。
(2)、IO密集型
IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。
I/O bound的程序一般在达到性能极限时,CPU占用率仍然较低。这可能是因为任务本身需要大量I/O操作,而pipeline做得不是很好,没有充分利用处理器能力。
二来 关于队列的选取
1、无界队列
队列大小无限制,常用的为无界的LinkedBlockingQueue,使用该队列作为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。
阅读代码发现,Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue,而博主踩到的就是这个坑,当QPS很高,发送数据很大,大量的任务被添加到这个无界LinkedBlockingQueue 中,导致cpu和内存飙升服务器挂掉。
当然这种队列,maximumPoolSize 的值也就无效了。
当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。
这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
2、有界队列
当使用有限的 maximumPoolSizes 时,有界队列有助于防止资源耗尽,但是可能较难调整和控制。
常用的有两类,一类是遵循FIFO原则的队列如ArrayBlockingQueue,另一类是优先级队列如PriorityBlockingQueue。
PriorityBlockingQueue中的优先级由任务的Comparator决定。
使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量。
3、同步移交队列
如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。
SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。
只有在使用无界线程池或者有饱和策略时才建议使用该队列。
上一篇文章代码改进
原: ExecutorUtils
package top.flya.utils;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import top.flya.dao.entity.PublicUser;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程工具类 ExecutorUtils
* @author flya
* @date 2019/12/18
*/
@Slf4j
public class ExecutorUtils {
private static int corePoolSize = 10; // 线程池维护线程的最少数量
private static int maxPoolSize = 100; // 线程池维护线程的最大数量
private static int queueCapacity = 100; // 缓存队列
private static Long keepAliveTime = 1L; //设置线程池维护线程所允许的空闲时间 1L
public static volatile ThreadPoolExecutor threadPoolExecutorInstance = null; // 线程池
// 单例模式 懒汉式: 线程安全 但是效率低 为了效率使用双重检查锁
public ExecutorUtils() {
}
/**
* 自定义线程池初始参数
* @param corePoolSite 线程池维护线程的最少数量
* @param maxPoolSite 线程池维护线程的最大数量
* @param queueCapacity 缓存队列
* @param keepAliveTime 设置线程池维护线程所允许的空闲时间
*/
public static void initialize(int corePoolSite, int maxPoolSite, int queueCapacity, long keepAliveTime) {
ExecutorUtils.corePoolSize = corePoolSite;
ExecutorUtils.maxPoolSize = maxPoolSite;
ExecutorUtils.queueCapacity = queueCapacity;
ExecutorUtils.keepAliveTime = keepAliveTime;
}
/**
* 获取线程池 初始化
* @return
*/
public static ThreadPoolExecutor getThreadPoolExecutorInstance() {
if (threadPoolExecutorInstance == null||threadPoolExecutorInstance.isShutdown()) {
synchronized (ExecutorUtils.class) {
if (threadPoolExecutorInstance == null||threadPoolExecutorInstance.isShutdown()) {// 双重检查
log.info("初始化线程池");
threadPoolExecutorInstance = new ThreadPoolExecutor(corePoolSize,
maxPoolSize, keepAliveTime,
java.util.concurrent.TimeUnit.SECONDS,
new java.util.concurrent.ArrayBlockingQueue<Runnable>(queueCapacity),
new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()); //.ThreadPoolExecutor.CallerRunsPolicy() 的意思是:如果线程池已经满了,那么就由调用者所在的线程来执行任务
}
}
}
return threadPoolExecutorInstance;
}
public static void batchExec(CompletableFuture[] completableFutures)
{
CompletableFuture.allOf(completableFutures).join();
//TODO 任务执行完毕后的处理
}
}
核心改进点为自动获取配置的核心线程数
package top.flya.utils;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import lombok.extern.slf4j.Slf4j;
import top.flya.dao.entity.PublicUser;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程工具类 ExecutorUtils
* @author flya
* @date 2019/12/18
*/
@Slf4j
public class ExecutorUtils {
private static final int N_CPU = Runtime.getRuntime().availableProcessors();
private static int corePoolSize = (int) (0.75*N_CPU); // 线程池维护线程的最少数量 0.75*N_CPU
private static int maxPoolSize = N_CPU; // 线程池维护线程的最大数量 设置为cpu核数
private static int queueCapacity = 300; // 缓存队列
private static Long keepAliveTime = 1L; //设置线程池维护线程所允许的空闲时间 1L
public static volatile ThreadPoolExecutor threadPoolExecutorInstance = null; // 线程池
/**
* 并发处理的大小,防止触发风控系统的流控 我们商城系统没做限制
*/
private static final int batchSize = 100;
// 单例模式 懒汉式: 线程安全 但是效率低 为了效率使用双重检查锁
public ExecutorUtils() {
}
/**
* 自定义线程池初始参数
* @param corePoolSite 线程池维护线程的最少数量
* @param maxPoolSite 线程池维护线程的最大数量
* @param queueCapacity 缓存队列
* @param keepAliveTime 设置线程池维护线程所允许的空闲时间
*/
public static void initialize(int corePoolSite, int maxPoolSite, int queueCapacity, long keepAliveTime) {
ExecutorUtils.corePoolSize = corePoolSite;
ExecutorUtils.maxPoolSize = maxPoolSite;
ExecutorUtils.queueCapacity = queueCapacity;
ExecutorUtils.keepAliveTime = keepAliveTime;
}
/**
* 获取线程池 初始化
* @return
*/
public static ThreadPoolExecutor getThreadPoolExecutorInstance() {
if (threadPoolExecutorInstance == null||threadPoolExecutorInstance.isShutdown()) {
synchronized (ExecutorUtils.class) {
if (threadPoolExecutorInstance == null||threadPoolExecutorInstance.isShutdown()) {// 双重检查
log.info("doProcess with cpu numbers: {}, batchSize: {}", N_CPU, batchSize);
threadPoolExecutorInstance = new ThreadPoolExecutor(corePoolSize,
maxPoolSize, keepAliveTime,
java.util.concurrent.TimeUnit.SECONDS,
new java.util.concurrent.ArrayBlockingQueue<Runnable>(queueCapacity),
new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()); //.ThreadPoolExecutor.CallerRunsPolicy() 的意思是:如果线程池已经满了,那么就由调用者所在的线程来执行任务
}
}
}
return threadPoolExecutorInstance;
}
public static void batchExec(CompletableFuture[] completableFutures)
{
CompletableFuture.allOf(completableFutures).join();
//TODO 任务执行完毕后的处理
}
}