当前位置: 主页 > JAVA语言

java线程池最大线程数-java线程池原理

发布时间:2023-02-13 14:07   浏览次数:次   作者:佚名

前言

创建新线程可以通过继承 Thread 类或实现 Runnable 接口来实现,用这两种方式创建的线程在运行后会被虚拟机销毁,并进行垃圾回收,如果线程数量太大,频繁创建和销毁线程会浪费资源,降低效率。线程池的引入很好地解决了上述问题,线程池可以更好的创建、维护、管理线程的生命周期,实现复用,提高资源使用效率,同时也避免了开发者滥用new关键字创建线程的不规范行为。

注意:《阿里巴巴开发手册》明确指出,在实际生产中,线程资源必须通过线程池提供,不允许在应用中显式创建线程。如果不使用线程池,可能会导致系统创建大量同构线程,导致内存消耗或“过度切换”问题。

接下来,

主要讲解线程池核心实现类 ThreadPoolExecutor 核心参数和 Java 中的工作原理,并讲解 Executors 工具类。

ThreadPoolExecutor

ThreadPoolExecutor 是线程池的核心实现类,在 java.util.current 包的 JDK 1.5 中引入,由 Doug Lea 完成。

执行程序接口

Executor 是线程池的顶级接口,在 java.util.并发包中的 JDK 1.5 中引入。

public interface Executor {
    // 该接口中只定义了一个Runnable作为入参的execute方法
    void execute(Runnable command);
}

请参阅执行器接口的实现类图

image

生命周期

线程有生命周期,

线程池也有生命周期,源代码中定义了五种状态。

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

下图显示了线程池状态转换的下图

java线程池原理_java线程池大小设置_java线程池最大线程数

构造 函数

方法

如何使用 ThreadPoolExecutor 创建线程池,看看它是如何构造的

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

ThreadPoolExecutor 包含 7 个核心参数,参数含义:

核心参数

ThreadPoolExecutor 包含七个核心参数,如果需要自定义线程池,则需要对其中核心的参数有一定程度的了解。

核心池大小

ThreadPoolExecutor 根据构造函数中 corePoolSize 和 maximumPoolSize 设置的边界值自动调整池大小,也可以使用 setCorePoolSize 和 setMaximumPoolSize 动态更改

当一个新的

任务在线程池中提交,并且运行的线程数少于 corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来处理请求。

java线程池最大线程数_java线程池原理_java线程池大小设置

如果正在运行的线程数多于 corePoolSize 但小于 maxumPoolSize,则仅当队列已满时,才会创建新线程。

如果 corePoolSize 和 maxumPoolSize 相同,则可以创建固定大小的线程池。如果 maximumPoolSize 设置为 Integer.MAX_VALUE,则意味着在资源允许的情况下,允许线程池容纳任意数量的并发任务。

默认情况下,当新任务到达时,即使是核心线程也会开始创建和启动,如果使用非空队列创建线程池池,则可以通过重写 prestartCoreThread 或 prestartAllCoreThreads 方法动态覆盖线程预启动。

在实际开发中,如果需要自定义线程数,可以参考以下公式:

java线程池最大线程数_java线程池原理_java线程池大小设置

这些参数具有以下含义:

保持活动时间

keepAliveTime 参数用于设置空闲时间。如果池当前有多个 corePoolSize 线程,则冗余线程在空闲时间较长时将被终止,从而在任务数较少时减少线程池资源消耗。如果在给定时间需要处理的任务数增加,则会构造一个新线程。使用 setKeepAliveTime 方法动态更改参数值。

默认情况下,保持活动状态策略仅在超出 corePoolSize 线程时才有效,但方法allowThreadTimeOut 也可用于将此超时策略应用于核心线程,只要 keepAliveTime 值不为零。

工作队列

workQueue 参数用于指定存储已提交任务的队列,任何 BlockingQueue 都可用于传输和保存提交的任务。关于队列大小和线程数之间的关系:

线程池中常用的阻塞队列一般有 SyncQueue、LinkedBlockingQueue、ArrayBlockingQueue,它们都是 BlockingQueue 的实现类,下面简单介绍一下。

同步队列

同步队列不是一个真正的队列,虽然它实现了 BlockingQueue 接口,但它没有存储任务的能力。只需维护一组线程,等待在队列中添加或删除元素,就相当于直接将任务交给执行任务的线程。

如果没有立即可用的线程来运行任务,则尝试对任务进行排队将失败,因此会构造一个新线程。此策略可避免在处理可能具有内部依赖项的请求集时锁定。这种排队方式通常需要无限的最大池大小,以避免拒绝新提交的任务。当任务提交的平均到达速度快于线程处理速度时,就有可能出现无限的线程增长,CachedThreadPool 正式采用了这种形式。

链接阻塞队列

链接阻塞队列

是用链表实现的无界队列,如果使用没有预定义容量的 LinkedBlockingQueue,当所有 corePoolSize 线程都在处理任务时,会导致新任务在队列中等待,并且不会创建超过 corePoolSize 的线程。在这种情况下,最大池大小的值对线程数没有影响。

这种基于队列的处理任务的方式与 SynchronousQueue 处理线程上任务的方式相反。

ArrayBlockingQueue

ArrayBlockingQueue 是通过数组实现的有界队列。当与有限的 maxumPoolSize 一起使用时,有界队列有助于防止资源耗尽,但可能更难调整和控制。使用ArrayBlockingQueue根据应用场景提前估算池和队列的容量,并相互权衡队列大小和最大池大小

螺纹工厂

此参数提供线程池中的线程的创建方式,其中新线程是使用工厂模式 ThreadFactory 创建的,默认情况下,使用 Executors.defaultThreadFactory,这将创建全部位于同一 ThreadGroup 中且具有相同NORM_PRIORITY优先级和非守护程序状态的线程。

您还可以根据

实际场景,可以更改线程名称、线程组、优先级、守护进程状态等,在自定义的情况下,需要注意的是,如果 ThreadFactory 在从 newThread 返回 null 时创建线程失败,执行器会继续,但可能无法执行任何任务。线程应该具有“modifyThread”运行时权限。如果使用池的工作线程或其他线程没有此权限,则服务可能会降级:配置更改可能无法及时生效,并且池的关闭可能会保持可以终止但未完成的状态。

处理器

如果线程池是

饱和且没有足够的线程计数或队列空间来处理提交的任务,或者如果线程池已关闭但仍在处理正在进行的任务,则继续提交的任务将根据线程池的拒绝策略进行处理。

在任一情况下,execute 方法都会调用其 RejectedExecutionHandler 的 rejectedExecution 方法。线程池中提供了四个预定义的处理程序策略:

这些预定义的策略实现 RejectedExecutionHandler 接口,您还可以定义实现类覆盖拒绝策略。

中止策略

查看 AbortPolicy 的源代码,处理程序在拒绝时抛出运行时异常 RejectedExecutionException。

public static class AbortPolicy implements RejectedExecutionHandler {
    /**
     * Creates an {@code AbortPolicy}.
     */
    public AbortPolicy() { }
    /**
     * Always throws RejectedExecutionException.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     * @throws RejectedExecutionException always
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                " rejected from " +
                e.toString());
    }
}

丢弃策略

查看源代码,无法执行的任务被简单地丢弃而不进行任何处理。

public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardPolicy}.
     */
    public DiscardPolicy() { }

java线程池最大线程数_java线程池原理_java线程池大小设置

/** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }

丢弃最旧的策略

看着

源代码中java线程池最大线程数,如果未关闭执行程序,则会丢弃工作队列头部的任务并重试执行(可能会再次失败,从而导致此重复。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }
    /**
     * Obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}

调用者运行策略

查看源代码,此策略调用执行自己正在运行的任务的线程,这也提供了一个简单的反馈控制机制,可以减慢新任务的提交速度。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() { }
    /**
     * Executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

挂钩功能

ThreadPoolExecutor 为线程池中的线程提供受保护的可重写钩子函数,以便在初始化或执行任务后执行特殊处理,以及在线程池终止时可以覆盖的终止方法。

java线程池大小设置_java线程池最大线程数_java线程池原理

之前执行

在线程执行 Runnable 之前调用的方法。此方法由将执行任务 r 的线程调用,可用于重新初始化 ThreadLocals 或执行日志记录。此实现不执行任何操作,但可以在子类中自定义。重要的是要注意java线程池最大线程数,要正确嵌套多个覆盖,子类通常应该在此方法结束时调用super.beforeExecute。查看源代码:

protected void beforeExecute(Thread t, Runnable r) { }

后执行完成

给定的 Runnable 任务时调用的方法,由执行该任务的线程调用。请注意,要正确嵌套多个覆盖,子类通常应在此方法的开头调用 super.afterExecute。查看源代码:

protected void afterExecute(Runnable r, Throwable t) { }

终止

执行程序终止时调用的方法。请注意,在此方法中,子类通常应调用super.terminated。查看源代码:

protected void terminated() { }

核心源代码分析

线程池的整体工作流程如下图所示:

java线程池大小设置_java线程池原理_java线程池最大线程数

ThreadPoolExecutor 执行方法分为以下四种情况。

如果当前运行的线程少于 corePoolSize,则会创建一个新线程来执行任务(请注意,此步骤需要获取全局锁)。如果正在运行的线程等于或大于 corePoolSize,则该任务将添加到 BlockingQueue。如果无法将任务添加到 BlockingQueue(队列已满),则会创建一个新线程来处理该任务(请注意,此步骤需要获取全局锁)。如果创建新线程将导致当前正在运行的线程超过 maxumPoolSize,则该任务将被拒绝,并且将调用 RejectedExecutionHandler.rejectedExecution() 方法。

ThreadPoolExecutor 采取上述步骤的总体设计思想是在执行 execute() 方法时尽可能避免获取全局锁(这将是一个严重的可伸缩性瓶颈)。在 ThreadPoolExecutor 完成预热(当前运行的线程数大于或等于 corePoolSize)后,几乎所有的 execute() 方法调用都会执行步骤 2,这不需要获取全局锁。

任务单元工作人员

ThreadPoolExecutor 中的核心任务单元由定义两个重要方法的 worker 内部类实现:runWorker 方法和 addWorker 方法。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;
    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;
    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 这儿是Worker的关键所在,使用了线程工厂创建了一个线程。传入的参数为当前worker
        this.thread = getThreadFactory().newThread(this);
    }
    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
    // 省略代码...
}

addWorker 和 runWorker 提交并执行

ThreadPoolExecutor 有两个方法,submit 和 execute,这两种方法的区别是

下面我们简单介绍一下提交和执行的用法

提交方法在执行器服务中定义

接口,抽象类 AbstractExecutorService 在 ExecutorService 中实现了 submit 方法。

java线程池大小设置_java线程池原理_java线程池最大线程数

public Future submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
/**
  * @throws RejectedExecutionException {@inheritDoc}
  * @throws NullPointerException       {@inheritDoc}
  */
public  Future submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}
/**
  * @throws RejectedExecutionException {@inheritDoc}
  * @throws NullPointerException       {@inheritDoc}
  */
public  Future submit(Callable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

当提交方法传递给 Runnable 对象时,Future 对象的 get 方法是

使用 null 返回值调用,当传入 Callable 对象时,返回 get 自定义的值,主线程块在返回结果之前等待结果返回,然后再执行,然后再返回结果。

class RunnableDemo implements Runnable{
    @Override
    public void run() {
        System.out.println("RunableDemo is execute");
    }
}
class CallableDemo implements Callable{
    @Override
    public String call() throws Exception {
        return "Call is Done";
    }
}
public class Test {
    public static void main(String[] args) throws Exception{
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future call = executorService.submit(new CallableDemo());
        System.out.println("Callable'S Result:"+call.get());
        Future run = executorService.submit(new RunnableDemo());
        System.out.println("Runnable'S Result:"+run.get());
        System.out.println("Current Thread:"+Thread.currentThread().getName());
    }
}

输出结果

Callable'S Result:Call is Done
RunableDemo is execute
Runnable'S Result:null
Current Thread:main
使用

java线程池原理_java线程池大小设置_java线程池最大线程数

submit 方法处理任务执行结果的成败,或执行过程中抛出的异常,暂停其他任务的处理,使用执行及时处理处理程序操作过程中发生的异常情况。

class CallableDemo implements Callable{
    @Override
    public String call() throws Exception {
        return "Call is Done";
    }
}
public class Test {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future call = executorService.submit(new CallableDemo());
        try {
            call.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

我们可以根据具体的业务场景来考虑可能的异常,实现 Callable 的接口抛出,然后由 ThreadPoolExecutor 调用方处理,以提高多线程场景下的容错率。

执行程序类

Executors 是 Executor 框架的一个实用程序类,它提供了几种创建线程池的方法,以及线程池中默认配置(如线程工厂)的处理,然后描述了几种常用的创建线程池的方法。

newSingleThreadExecutor

** 单线程执行器

使用 Executors.newSingleThreadExecutor() 创建,查看源代码

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue()));
    }

哪里:

因此,newSingleThreadExecutor 可用于处理任务量不大的场景,但不希望频繁创建和销毁需要与虚拟机处于同一周期的场景。

newFixedThreadPool

FixedThreadPool 是使用 Executors.newFixedThreadPool() 创建的,用于查看源代码

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }

哪里:

FixedThreadPool 和 SingleThreadExecutor 一样,如果 nThreads 的值设置得太小,在任务太多的场景下,可能会因为线程太少而造成 OOM,导致任务堆积在队列中,导致 OOM,相对的好处是可以多线程,处理效率得到一定程度的提高。

newCachedThreadPool

CachedThreadPool 是使用 Executors.newCachedThreadPool() 创建的,用于查看源代码

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }

哪里:

在 CachedThreadPool 中,如果主线程提交任务的速度比 maximumPool 中的线程快,则将继续创建新线程,最终导致创建过多线程并耗尽 CPU 和内存资源。忽略 CPU 和内存消耗,在一定程度上 CachedThreadPool 可以快速解决短期并发的问题,因为核心线程数为 0 并且设置了生存时间,这些临时打开的线程会在任务处理后被回收。

执行者是

提供是为了屏蔽线程池,避免过多的参数设置影响易用性,实际上这些参数并不需要一一设置,Executors 也是在源代码中推荐使用的方式,但需要熟悉它们各自的特性。

总结

执行器框架主要由三部分组成,任务,

任务执行器、执行结果、ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 的设计思想也是将这三个关键元素解耦,将任务的提交和执行分开。

参考:__Java并发Programming__Concurrent Programming.com 的艺术:并发编程网络 – ifeve.com |让没有难学的技术

先介绍一下自己,小编13年从交通大学毕业,曾经待过小公司,去了华为OPPO等大厂,18年进入阿里,一直到现在。我知道,大多数初中级Java工程师,如果想提升自己的技能,往往需要探索自己的成长或者报课,但对于培训机构近万元的学费来说,实在是压力很大。自学效率低下且耗时长,很容易碰到天花板和技术停止。因此,我收集了一本《Java开发完整学习资料》送给大家,初衷也很简单,就是帮助想自学但不知道从哪里入手的朋友,同时减轻大家的负担。添加下面的名片以获取全套学习材料