嘘~ 正在从服务器偷取页面 . . .

Java并发之线程池


线程池

线程池虽然使用过,还是再梳理补充一下。

一、什么是线程池

java.util.concurrent.Executors提供了一个java.util.concurrent.Executor 接口的实现用于创建线程池。

线程池是一种线程使用模式。线程过多会带来内存调度开销,进而影响整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。

线程池的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大线程的数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

二、为什么要用线程池

(1)创建线程要花费昂贵的资源和时间,如果任务来了才创建线程那么响应时间会变

长,而且一个进程能创建的线程数有限,如果创建大量线程,内存无法承受。

(2)为了减少创建和销毁线程的次数,让一些线程可以多次使用,可根据系统情况调整执行的线程数量,防止消耗过多内存。

三、线程池的特点与作用

线程池的主要特点:

  • 线程复用;

  • 控制最大并发数;

  • 管理线程;

线程池的主要作用

(1)降低资源消耗。通过重复利用已经创建的线程降低线程创建和销毁的消耗。

(2)提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

(3)提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程吃可以进行统一的分配、调优和监控。

四、线程池框架结构

JDK1.5 开始,Java API提供了 Executor 框架来创建不同的线程池。该框架中用到ExecutorExecutorsExecutorServiceThreadPoolExecutor这几个类。

4.1 线程池的框架结构:

核心类/接口结构:

说明:

(1)Executor是一个顶级接口,它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。Executor基于”生产者-消费者”模式,提交任务的操作相当于生产者,执行任务的则相当于消费者。

public interface Executor {
    void execute(Runnable command);
}

(2)ExecutorService扩展了Executor。添加了一些用于生命周期管理的方法(同时还提供一些用于任务提交的便利方法

ExecutorService的生命周期有三种:运行、关闭和已终止。

ExecutorService在创建时处于运行状态。

  • void shutdown();方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完毕—-包括还没开始的任务,这种属于正常关闭。

  • List<Runnable> shutdownNow();方法将执行粗暴的关闭过程,类似kill命令,它将取消所有运行中的任务,并且不再启动队列中尚未开始的任务,这种属于强行关闭,关闭当前正在执行的任务,并返回所有尚未启动的任务清单。

ExecutorService关闭后提交的任务将由”拒绝执行处理器”来处理,它会抛弃任务,或者使得execute方法抛出一个RejectedExecutionException。等所有任务执行完成后,ExecutorService将转入终止状态。可以调用awaitTermination来等待ExecutorService到达终止状态,或者通过isTerminated来轮询ExecutorService是否已经终止。通常在调用shutdown之后会立即调用awaitTermination阻塞等待,从而产生同步地关闭ExecutorService的效果。

(3)下面两个分支,AbstractExecutorService分支就是普通的线程池分支,ScheduledExecutorService是用来创建定时任务的。

(4)线程池启动初期线程不会启动,有任务提交(调用execute或submit)才会启动,直到到达最大数量就不再创建而是进入阻塞队列

主要方法:

核心类结构

4.2 线程池核心构造方法

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    // this 调用的真正的构造方法
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

线程池构造方法的7个重要参数解析

4.2.1 corePoolSize

核心池的大小。是线程池中的常驻核心线程数。类似银行的常开窗口。

(1)在创建了线程池之后,默认情况下,线程池中没有任何线程(线程数为0),而是等待有任务到来才创建线程去执行任务。

(2)当线程池中的线程数达到corePoolSize后,就会把到达的任务放到缓存队列中。

keepAliveTimeallowCoreThreadTimeOut超时参数的影响,所以没有任务需要执行的时候,线程池的大小不一定是corePoolSize

4.2.2 maximumPoolSize

池中允许的最大线程数,线程池能够容纳的最大线程数,此值必须大于等于1。

(1)该参数表示线程池中最多能创建的线程数量。

(2)当任务数量比corePoolSize大时,任务添加到workQueue,当workQueue满了,并且当前线程个数小于maximumPoolSize,将继续创建线程以处理任务。

(3)maximumPoolSize表示的就是wordQueue满了,线程池中最多可以创建的线程数量。

4.2.3 keepAliveTime

表示多余的空闲线程的存活时间。

默认情况下,只有当线程池中的线程数大于corePoolSize时,这个参数才会起作用。当线程数大于corePoolSize时,当空闲时间达到keepAliveTime的值时,多余的空闲线程会被销毁直到只剩下corePoolSize

4.2.4 unit

keepAliveTime时间单位

4.2.5 workQueue

任务队列,用来缓存已经提交还未执行的任务。类似银行候客区。

4.2.6 threadFactory

表示生成线程池中工作线程的线程工厂。用于创建线程,一般默认即可。

4.2.7 handler

由于超出线程范围maximumPoolSizeworkQueue队列容量而使执行被阻塞时所使用的处理程序拒绝策略,也可以叫拒绝执行处理器。

拒绝执行处理器实际上是定义了拒绝执行线程的行为,实际上也是一种饱和策略,当有界队列被填满后,饱和策略开始发挥作用。线程池类库自带4种拒绝策略。

四种默认拒绝策略

  • AbortPolicy-终止策略。直接抛出一个RejectedExecutionException,也是JDK默认的拒绝策略

    public static class AbortPolicy implements RejectedExecutionHandler {
            public AbortPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
  • CallerRunsPolicy-调用者运行策略。该策略既不会抛弃任务,也不抛出异常,而是将任务回退到调用者(让调用者执行)降低任务量。

    public static class CallerRunsPolicy implements RejectedExecutionHandler {
            public CallerRunsPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }
  • DiscardOldestPolicy-抛弃最旧的策略。如果线程池没有关闭,就移除队列中最先进入(等待最近)的任务,并且尝试执行任务。

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
            public DiscardOldestPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
    }
  • DiscardPolicy-抛弃策略。直接丢弃任务,不予任何处理也不抛弃异常。如果允许任务丢失,这是合适的方案。

      public static class DiscardPolicy implements RejectedExecutionHandler {
            public DiscardPolicy() { }
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }

    所有的拒绝策略都是RejectedExecutionHandler拒绝执行处理器接口的实现。如果要自定义拒绝策略,需要实现该执行处理器接口。

五、 线程池工作原理

线程池工作原理

工作流程:

(1)在创建线程池后,等待提交过来的任务请求。

(2)当调用execute() 方法添加一个请求任务时,线程池会进行一系列判断:

  • 2.1 如果正在运行的线程数小于corePoolSize ,那么马上创建线程执行这个任务。
  • 2.2 如果正在运行的线程数大于或等于corePoolSize,那么将这个任务放入队列
  • 2.3 如果任务队列满了且正在运行的线程数小于maximumPoolSize,那么还是要创建非核心线程立即执行这个任务。
  • 2.4 如果任务队列满了且正在运行的线程数大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。

(3)当一个线程完成任务时,它会从队列中取下一个任务来执行。

(4)当一个线程无事可做超过一定的单位(unit)时间(keepAliveTime)时,线程池会判断:

  • 4.1 如果当前运行的线程数大于corePoolSize,那么这个线程就会被销毁。
  • 4.2 如果线程池的全部任务完成,线程池将会收缩到corePoolSize大小。

线程池工作流程

六、线程池创建方式:

6.1 固定核心线程数量的线程池

1池定长5参数。

 ExecutorService es01 = Executors.newFixedThreadPool(2);

真实的创建方式:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

核心线程数corePoolSize 和 最大线程数maximumPoolSize 是相同。

保持活跃时间keepAliveTime是0,表示永不回收。

LinkedBlockingQueue 是任务队列,一个链表式的阻塞队列,也叫无界队列。

适用场景: 执行长期任务。

Demo 01

package com.xiaocai.threadpools;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @description: TODO 功能角色说明:
 * TODO 描述:
 * @author: 张小菜
 * @date: 2020/11/2 22:36
 * @version: v1.0
 */
public class TheadPoolDemo01 {

    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(2);

        try {
            for (int i = 1; i <= 10 ; i++) {

                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"\t 执行卖票操作了");
                });
            }
        } catch (Exception e) {

        } finally {
            // 关闭线程池
            threadPool.shutdown();
        }
    }
}

执行结果:

pool-1-thread-2     执行卖票操作了
pool-1-thread-5     执行卖票操作了
pool-1-thread-2     执行卖票操作了
pool-1-thread-2     执行卖票操作了
pool-1-thread-4     执行卖票操作了
pool-1-thread-1     执行卖票操作了
pool-1-thread-3     执行卖票操作了
pool-1-thread-2     执行卖票操作了
pool-1-thread-5     执行卖票操作了
pool-1-thread-4     执行卖票操作了

6.2 单核心线程的线程池

1池1线程

ExecutorService es02 = Executors.newSingleThreadExecutor();  

真实的创建方式:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

核心线程数corePoolSize和最大线程数maximumPoolSize都固定是1,创建的是单线程的线程池。

保持活跃时间keepAliveTime是0,表示永不回收。

使用了也是阻拦队列LinkedBlockingQueue

适用场景: 适合一个任务一个任务的执行场景。

Demo 02 :

package com.xiaocai.threadpools;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @description: TODO 功能角色说明:
 * TODO 描述:
 * @author: 张小菜
 * @date: 2020/11/2 22:36
 * @version: v1.0
 */
public class TheadPoolDemo02 {

    public static void main(String[] args) {
        //  创建线程池
        ExecutorService threadPool = Executors.newSingleThreadExecutor();


        try {
            for (int i = 1; i <= 10 ; i++) {

                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"\t 执行卖票操作了");
                });
            }
        } catch (Exception e) {

        } finally {
            // 关闭线程池
            threadPool.shutdown();
        }

    }
}

执行结果:

pool-1-thread-1     执行卖票操作了
pool-1-thread-1     执行卖票操作了
pool-1-thread-1     执行卖票操作了
pool-1-thread-1     执行卖票操作了
pool-1-thread-1     执行卖票操作了
pool-1-thread-1     执行卖票操作了
pool-1-thread-1     执行卖票操作了
pool-1-thread-1     执行卖票操作了
pool-1-thread-1     执行卖票操作了
pool-1-thread-1     执行卖票操作了

6.3 可扩容的线程数的线程池

1池N线程,可缓存的、可增长的。

ExecutorService es03 = Executors.newCachedThreadPool();

真实的创建方式:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

核心线程数corePoolSize 值是0 ;

最大线程数maximumPoolSize 是 231-1,值是2147483647,是在Integer类中定义的:

 @Native public static final int   MAX_VALUE = 0x7fffffff;

创建的是一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

使用的是SynchronousQueue,是一个内部只能包含一个元素的队列。

适用场景:适合执行很多短期异步的小程序或负载较轻的服务。

Demo 03 :

package com.xiaocai.threadpools;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @description: TODO 功能角色说明:
 * TODO 描述:
 * @author: 张小菜
 * @date: 2020/11/2 22:36
 * @version: v1.0
 */
public class TheadPoolDemo03 {

    public static void main(String[] args) {

        ExecutorService threadPool = Executors.newCachedThreadPool();

        try {
            for (int i = 1; i <= 10 ; i++) {

                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"\t 执行卖票操作了");
                });
                // 暂停
                //TimeUnit.SECONDS.sleep(5);
            }
        } catch (Exception e) {

        } finally {
            // 关闭线程池
            threadPool.shutdown();
        }
    }
}

执行结果:

pool-1-thread-2     执行卖票操作了
pool-1-thread-1     执行卖票操作了
pool-1-thread-3     执行卖票操作了
pool-1-thread-7     执行卖票操作了
pool-1-thread-4     执行卖票操作了
pool-1-thread-5     执行卖票操作了
pool-1-thread-8     执行卖票操作了
pool-1-thread-6     执行卖票操作了
pool-1-thread-9     执行卖票操作了
pool-1-thread-10     执行卖票操作了

如果放开暂停,则只会一个线程执行。

6.4 指定核心线程数量的调度线程的线程池

ScheduledExecutorService es04 = Executors.newScheduledThreadPool(3);
es04.

创建了有3个调度线程的线程池。

真实的创建方式:

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

这里supper方法调用的是父类ThreadPoolExecutor的构造方法,也就是创建线程的核心方法

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

使用的 DelayedWorkQueue优先队列/延迟队列。该队列是定制的优先级队列,只能用来存储RunnableScheduledFutures任务。堆是实现优先级队列的最佳选择,而该队列正好是基于堆数据结构的实现。

Demo 04 :

//延迟3秒后执行package com.xiaocai.threadpools;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @description: TODO 功能角色说明:
 * TODO 描述:
 * @author: 张小菜
 * @date: 2020/11/2 22:36
 * @version: v1.0
 */
public class TheadPoolDemo04_Scheduled {

    public static void main(String[] args) {

        ScheduledExecutorService es04 = Executors.newScheduledThreadPool(2);

        es04.schedule(new Thread(()->{
            System.out.println("I im execute after 3s....");
        }), 3, TimeUnit.SECONDS);//延迟3秒后执行

        es04.schedule(new Thread(()->{
            System.out.println("I im execute  after 5s....");
        }), 5, TimeUnit.SECONDS);//延迟5秒后执行

        es04.shutdown();
    }
}

执行结果,延迟只会才会执行:

I im execute after 3s....
I im execute  after 5s....

6.5 可以任务窃取的线程池

也称呼为抢占式线程池。

ExecutorService es05 = Executors.newWorkStealingPool();//无参数
ExecutorService es05 = Executors.newWorkStealingPool(2);// 有参数

参数名:parallelism 表示线程并行数。

真实创建方式:

无参数的:

    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

这里调用的是ForkJoinPool的构造方法:

    public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }

这里使用了this,又调用了本类的其他构造方法:

private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }

workStealingPool 的第一个参数就是parallelism ,方法里使用的是

Runtime.getRuntime().availableProcessors()

来获取CPU核数作为并行数。

创建一个指定并行级别的线程池来维护足够的线程执行,使用多个队列减少争用。并行级别对应于主动参与或可用与最大线程数参与,任务处理。实际线程数可能动态增长和收缩。

这个属于 ForkJoin 框架内容,后续专门学习。

学习参考:

B站阳哥视频

博客原地址



文章作者: Small-Rose /张小菜
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-SA 4.0 许可协议。转载请注明来源 Small-Rose /张小菜 !
评论
  目录