线程池
线程池虽然使用过,还是再梳理补充一下。
一、什么是线程池
java.util.concurrent.Executors
提供了一个java.util.concurrent.Executor
接口的实现用于创建线程池。
线程池是一种线程使用模式。线程过多会带来内存调度开销,进而影响整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。
线程池的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大线程的数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
二、为什么要用线程池
(1)创建线程要花费昂贵的资源和时间,如果任务来了才创建线程那么响应时间会变
长,而且一个进程能创建的线程数有限,如果创建大量线程,内存无法承受。
(2)为了减少创建和销毁线程的次数,让一些线程可以多次使用,可根据系统情况调整执行的线程数量,防止消耗过多内存。
三、线程池的特点与作用
线程池的主要特点:
线程复用;
控制最大并发数;
管理线程;
线程池的主要作用
(1)降低资源消耗。通过重复利用已经创建的线程降低线程创建和销毁的消耗。
(2)提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
(3)提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程吃可以进行统一的分配、调优和监控。
四、线程池框架结构
JDK1.5
开始,Java API
提供了 Executor
框架来创建不同的线程池。该框架中用到Executor
、Executors
,ExecutorService
、ThreadPoolExecutor
这几个类。
4.1 线程池的框架结构:
核心类/接口结构:
说明:
(1)Executor
是一个顶级接口,它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable
来表示任务。Executor基于”生产者-消费者”模式,提交任务的操作相当于生产者,执行任务的则相当于消费者。
public interface Executor {
void execute(Runnable command);
}
(2)ExecutorService
扩展了Executor。添加了一些用于生命周期管理的方法(同时还提供一些用于任务提交的便利方法
ExecutorService
的生命周期有三种:运行、关闭和已终止。
ExecutorService
在创建时处于运行状态。
void shutdown();
方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完毕—-包括还没开始的任务,这种属于正常关闭。List<Runnable> shutdownNow();
方法将执行粗暴的关闭过程,类似kill命令,它将取消所有运行中的任务,并且不再启动队列中尚未开始的任务,这种属于强行关闭,关闭当前正在执行的任务,并返回所有尚未启动的任务清单。
ExecutorService
关闭后提交的任务将由”拒绝执行处理器”来处理,它会抛弃任务,或者使得execute方法抛出一个RejectedExecutionException
。等所有任务执行完成后,ExecutorService
将转入终止状态。可以调用awaitTermination
来等待ExecutorService
到达终止状态,或者通过isTerminated
来轮询ExecutorService
是否已经终止。通常在调用shutdown之后会立即调用awaitTermination
阻塞等待,从而产生同步地关闭ExecutorService
的效果。
(3)下面两个分支,AbstractExecutorService
分支就是普通的线程池分支,ScheduledExecutorService
是用来创建定时任务的。
(4)线程池启动初期线程不会启动,有任务提交(调用execute或submit)才会启动,直到到达最大数量就不再创建而是进入阻塞队列
主要方法:
4.2 线程池核心构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
// this 调用的真正的构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
线程池构造方法的7个重要参数解析
4.2.1 corePoolSize
核心池的大小。是线程池中的常驻核心线程数。类似银行的常开窗口。
(1)在创建了线程池之后,默认情况下,线程池中没有任何线程(线程数为0),而是等待有任务到来才创建线程去执行任务。
(2)当线程池中的线程数达到corePoolSize
后,就会把到达的任务放到缓存队列中。
keepAliveTime
和allowCoreThreadTimeOut
超时参数的影响,所以没有任务需要执行的时候,线程池的大小不一定是corePoolSize
。
4.2.2 maximumPoolSize
池中允许的最大线程数,线程池能够容纳的最大线程数,此值必须大于等于1。
(1)该参数表示线程池中最多能创建的线程数量。
(2)当任务数量比corePoolSize
大时,任务添加到workQueue
,当workQueue
满了,并且当前线程个数小于maximumPoolSize
,将继续创建线程以处理任务。
(3)maximumPoolSize
表示的就是wordQueue
满了,线程池中最多可以创建的线程数量。
4.2.3 keepAliveTime
表示多余的空闲线程的存活时间。
默认情况下,只有当线程池中的线程数大于corePoolSize
时,这个参数才会起作用。当线程数大于corePoolSize
时,当空闲时间达到keepAliveTime
的值时,多余的空闲线程会被销毁直到只剩下corePoolSize
。
4.2.4 unit
keepAliveTime
时间单位
4.2.5 workQueue
任务队列,用来缓存已经提交还未执行的任务。类似银行候客区。
4.2.6 threadFactory
表示生成线程池中工作线程的线程工厂。用于创建线程,一般默认即可。
4.2.7 handler
由于超出线程范围maximumPoolSize
和workQueue
队列容量而使执行被阻塞时所使用的处理程序拒绝策略,也可以叫拒绝执行处理器。
拒绝执行处理器实际上是定义了拒绝执行线程的行为,实际上也是一种饱和策略,当有界队列被填满后,饱和策略开始发挥作用。线程池类库自带4种拒绝策略。
四种默认拒绝策略
AbortPolicy
-终止策略。直接抛出一个RejectedExecutionException
,也是JDK
默认的拒绝策略。public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
CallerRunsPolicy
-调用者运行策略。该策略既不会抛弃任务,也不抛出异常,而是将任务回退到调用者(让调用者执行)降低任务量。public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
DiscardOldestPolicy
-抛弃最旧的策略。如果线程池没有关闭,就移除队列中最先进入(等待最近)的任务,并且尝试执行任务。public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
DiscardPolicy
-抛弃策略。直接丢弃任务,不予任何处理也不抛弃异常。如果允许任务丢失,这是合适的方案。public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
所有的拒绝策略都是
RejectedExecutionHandler
拒绝执行处理器接口的实现。如果要自定义拒绝策略,需要实现该执行处理器接口。
五、 线程池工作原理
工作流程:
(1)在创建线程池后,等待提交过来的任务请求。
(2)当调用execute()
方法添加一个请求任务时,线程池会进行一系列判断:
- 2.1 如果正在运行的线程数小于
corePoolSize
,那么马上创建线程执行这个任务。 - 2.2 如果正在运行的线程数大于或等于
corePoolSize
,那么将这个任务放入队列。 - 2.3 如果任务队列满了且正在运行的线程数小于
maximumPoolSize
,那么还是要创建非核心线程立即执行这个任务。 - 2.4 如果任务队列满了且正在运行的线程数大于或等于
maximumPoolSize
,那么线程池会启动饱和拒绝策略来执行。
(3)当一个线程完成任务时,它会从队列中取下一个任务来执行。
(4)当一个线程无事可做超过一定的单位(unit
)时间(keepAliveTime
)时,线程池会判断:
- 4.1 如果当前运行的线程数大于
corePoolSize
,那么这个线程就会被销毁。 - 4.2 如果线程池的全部任务完成,线程池将会收缩到
corePoolSize
大小。
六、线程池创建方式:
6.1 固定核心线程数量的线程池
1池定长5参数。
ExecutorService es01 = Executors.newFixedThreadPool(2);
真实的创建方式:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
核心线程数corePoolSize
和 最大线程数maximumPoolSize
是相同。
保持活跃时间keepAliveTime
是0,表示永不回收。
LinkedBlockingQueue
是任务队列,一个链表式的阻塞队列,也叫无界队列。
适用场景: 执行长期任务。
Demo 01
:
package com.xiaocai.threadpools;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @description: TODO 功能角色说明:
* TODO 描述:
* @author: 张小菜
* @date: 2020/11/2 22:36
* @version: v1.0
*/
public class TheadPoolDemo01 {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
try {
for (int i = 1; i <= 10 ; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"\t 执行卖票操作了");
});
}
} catch (Exception e) {
} finally {
// 关闭线程池
threadPool.shutdown();
}
}
}
执行结果:
pool-1-thread-2 执行卖票操作了
pool-1-thread-5 执行卖票操作了
pool-1-thread-2 执行卖票操作了
pool-1-thread-2 执行卖票操作了
pool-1-thread-4 执行卖票操作了
pool-1-thread-1 执行卖票操作了
pool-1-thread-3 执行卖票操作了
pool-1-thread-2 执行卖票操作了
pool-1-thread-5 执行卖票操作了
pool-1-thread-4 执行卖票操作了
6.2 单核心线程的线程池
1池1线程
ExecutorService es02 = Executors.newSingleThreadExecutor();
真实的创建方式:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
核心线程数corePoolSize
和最大线程数maximumPoolSize
都固定是1,创建的是单线程的线程池。
保持活跃时间keepAliveTime
是0,表示永不回收。
使用了也是阻拦队列LinkedBlockingQueue
。
适用场景: 适合一个任务一个任务的执行场景。
Demo 02 :
package com.xiaocai.threadpools;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @description: TODO 功能角色说明:
* TODO 描述:
* @author: 张小菜
* @date: 2020/11/2 22:36
* @version: v1.0
*/
public class TheadPoolDemo02 {
public static void main(String[] args) {
// 创建线程池
ExecutorService threadPool = Executors.newSingleThreadExecutor();
try {
for (int i = 1; i <= 10 ; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"\t 执行卖票操作了");
});
}
} catch (Exception e) {
} finally {
// 关闭线程池
threadPool.shutdown();
}
}
}
执行结果:
pool-1-thread-1 执行卖票操作了
pool-1-thread-1 执行卖票操作了
pool-1-thread-1 执行卖票操作了
pool-1-thread-1 执行卖票操作了
pool-1-thread-1 执行卖票操作了
pool-1-thread-1 执行卖票操作了
pool-1-thread-1 执行卖票操作了
pool-1-thread-1 执行卖票操作了
pool-1-thread-1 执行卖票操作了
pool-1-thread-1 执行卖票操作了
6.3 可扩容的线程数的线程池
1池N线程,可缓存的、可增长的。
ExecutorService es03 = Executors.newCachedThreadPool();
真实的创建方式:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程数corePoolSize
值是0 ;
最大线程数maximumPoolSize
是 231-1,值是2147483647,是在Integer类中定义的:
@Native public static final int MAX_VALUE = 0x7fffffff;
创建的是一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
使用的是SynchronousQueue
,是一个内部只能包含一个元素的队列。
适用场景:适合执行很多短期异步的小程序或负载较轻的服务。
Demo 03 :
package com.xiaocai.threadpools;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @description: TODO 功能角色说明:
* TODO 描述:
* @author: 张小菜
* @date: 2020/11/2 22:36
* @version: v1.0
*/
public class TheadPoolDemo03 {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newCachedThreadPool();
try {
for (int i = 1; i <= 10 ; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"\t 执行卖票操作了");
});
// 暂停
//TimeUnit.SECONDS.sleep(5);
}
} catch (Exception e) {
} finally {
// 关闭线程池
threadPool.shutdown();
}
}
}
执行结果:
pool-1-thread-2 执行卖票操作了
pool-1-thread-1 执行卖票操作了
pool-1-thread-3 执行卖票操作了
pool-1-thread-7 执行卖票操作了
pool-1-thread-4 执行卖票操作了
pool-1-thread-5 执行卖票操作了
pool-1-thread-8 执行卖票操作了
pool-1-thread-6 执行卖票操作了
pool-1-thread-9 执行卖票操作了
pool-1-thread-10 执行卖票操作了
如果放开暂停,则只会一个线程执行。
6.4 指定核心线程数量的调度线程的线程池
ScheduledExecutorService es04 = Executors.newScheduledThreadPool(3);
es04.
创建了有3个调度线程的线程池。
真实的创建方式:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
这里supper方法调用的是父类ThreadPoolExecutor
的构造方法,也就是创建线程的核心方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
使用的 DelayedWorkQueue
优先队列/延迟队列。该队列是定制的优先级队列,只能用来存储RunnableScheduledFutures
任务。堆是实现优先级队列的最佳选择,而该队列正好是基于堆数据结构的实现。
Demo 04 :
//延迟3秒后执行package com.xiaocai.threadpools;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @description: TODO 功能角色说明:
* TODO 描述:
* @author: 张小菜
* @date: 2020/11/2 22:36
* @version: v1.0
*/
public class TheadPoolDemo04_Scheduled {
public static void main(String[] args) {
ScheduledExecutorService es04 = Executors.newScheduledThreadPool(2);
es04.schedule(new Thread(()->{
System.out.println("I im execute after 3s....");
}), 3, TimeUnit.SECONDS);//延迟3秒后执行
es04.schedule(new Thread(()->{
System.out.println("I im execute after 5s....");
}), 5, TimeUnit.SECONDS);//延迟5秒后执行
es04.shutdown();
}
}
执行结果,延迟只会才会执行:
I im execute after 3s....
I im execute after 5s....
6.5 可以任务窃取的线程池
也称呼为抢占式线程池。
ExecutorService es05 = Executors.newWorkStealingPool();//无参数
ExecutorService es05 = Executors.newWorkStealingPool(2);// 有参数
参数名:parallelism 表示线程并行数。
真实创建方式:
无参数的:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
这里调用的是ForkJoinPool
的构造方法:
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
这里使用了this,又调用了本类的其他构造方法:
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
workStealingPool
的第一个参数就是parallelism
,方法里使用的是
Runtime.getRuntime().availableProcessors()
来获取CPU核数作为并行数。
创建一个指定并行级别的线程池来维护足够的线程执行,使用多个队列减少争用。并行级别对应于主动参与或可用与最大线程数参与,任务处理。实际线程数可能动态增长和收缩。
这个属于 ForkJoin
框架内容,后续专门学习。
学习参考:
B站阳哥视频