深究线程池原理并根据业务场景更改其工作流程

文章介绍

写这篇文章是因为我们后台有一个看板页面,并且指标非常多,查询条件也非常复杂导致没法做宽表只能实时从各个数据源查询出来并计算;想让接口响应时间更短一些更改了很多次线程池参数,但都达不到理想状态,为此研究了一下线程池原理,并且实现了一个适合自己业务场景的线程池工作。
本文会从为什么使用线程池、如何使用线程池、线程池的整体工作流程、线程池源码、手动实现一个适合实时io型密集型的线程池。

线程池

java中创建线程需要分配一定的内存和系统资源,频繁的创建销毁线程会带来较大的系统开销;同时创建大量的线程也会使cpu需要消耗更多的时间来管理线程,而影响整体性能;同时因为无法控制线程的数量,可能导致并发过多影响效率以及响应时间等一系列的问题。此时就需要我们有一个可以集中管理线程的工具线程池。 线程池的主要优点包括:
1.降低资源消耗:通过重复利用已创建的线程,减少了线程创建和销毁的开销。
2.提高响应速度:任务无需等待线程创建就能立即执行。
3.提高线程的可管理性:可以统一管理线程的数量、状态等。

使用

public static void main(String[] args) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1,
            2,
            1,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);

                    return t;
                }
            },
            new ThreadPoolExecutor.AbortPolicy()
    );

    executor.execute(任务);
    executor.submit(有返回结果的任务);

    executor.shutdown(销毁);
    executor.shutdownNow(立即销毁);
}

通过上述代码可以创建一个线程池,当然大部分人会直接使用spring自带的,那么我们来看一下传入构造方法中的各个参数以及他们的含义:

public ThreadPoolExecutor(int corePoolSize, 
                          int maximumPoolSize, 
                          long keepAliveTime, 
                          TimeUnit unit, 
                          BlockingQueue<Runnable> workQueue, 
                          ThreadFactory threadFactory, 
                          RejectedExecutionHandler handler 
                          )

在接下来的原理详解中会逐步分析各个参数的含义以及线程池如何执行与销毁的。

原理

ThreadPoolExecutor状态属性

打开ThreadPoolExecutor类直接看到的就是以下属性,熟悉以下属性的含义可以让我们理解ThreadPoolExecutor源码更加容易。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));


private static final int COUNT_BITS = Integer.SIZE - 3;





private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;



private static final int RUNNING    = -1 << COUNT_BITS;



private static final int SHUTDOWN   =  0 << COUNT_BITS;



private static final int STOP       =  1 << COUNT_BITS;



private static final int TIDYING    =  2 << COUNT_BITS;



private static final int TERMINATED =  3 << COUNT_BITS;





private static int runStateOf(int c)     { return c & ~COUNT_MASK; }





private static int workerCountOf(int c)  { return c & COUNT_MASK; }

execute方法

无论是调用execute还是submit最终都是调用execute方法进行任务执行,两者的区别是submit对任务封装了一层RunnableFuture。execute代码如下:

public void execute(Runnable command) {

    if (command == null)
        throw new NullPointerException();


    int c = ctl.get();


    if (workerCountOf(c) < corePoolSize) {


        if (addWorker(command, true))

            return;


        c = ctl.get();
    }


    if (isRunning(c) && workQueue.offer(command)) {


        int recheck = ctl.get();


        if (! isRunning(recheck) && remove(command))

            reject(command);

        else if (workerCountOf(recheck) == 0)

            addWorker(null, false);
    }


    else if (!addWorker(command, false))


        reject(command);
}

通过上述方法,可以很清楚这是非常常见的面试题,线程池的执行流程:

  1. 当提交一个新任务到线程池时,线程池会检查当前运行的线程数是否少于核心线程数(corePoolSize)。
  2. 如果当前运行的线程数少于核心线程数,则创建一个新线程来执行任务,即使有空闲的核心线程也创建新的线程。
  3. 如果当前运行的线程数等于或者多于核心线程数,但任务队列未满,则将任务添加到任务队列中等待执行。
  4. 如果任务队列已满,线程池会检查当前运行的线程数是否少于最大线程数(maximumPoolSize)。
  5. 如果当前运行的线程数少于最大线程数,则创建新的非核心线程来执行任务。
  6. 如果当前运行的线程数等于最大线程数,且任务队列已满,则根据拒绝策略来处理这个任务。
  7. 队列中如果线程数等于0,则创建一个非核心线程来处理队列中的任务。

addWorker 添加线程

private boolean addWorker(Runnable firstTask, boolean core) {

    retry:

    for (int c = ctl.get();;) {


* runStateAtLeast方法就是比较一下两个状态的大小,这里用来判断线程池状态
* private static boolean runStateAtLeast(int c, int s) {
*    return c >= s;
*}
**/     








        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;


        for (;;) {

            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;

            if (compareAndIncrementWorkerCount(c))

                break retry;


            c = ctl.get();  

            if (runStateAtLeast(c, SHUTDOWN))

                continue retry;

        }
    }


    boolean workerStarted = false;

    boolean workerAdded = false;

    Worker w = null;
    try {

        w = new Worker(firstTask);

        final Thread t = w.thread;


        if (t != null) {

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {

                int c = ctl.get();


                if (isRunning(c) || 
                    (runStateLessThan(c, STOP) && firstTask == null)) {


                    if (t.getState() != Thread.State.NEW)
                        throw new IllegalThreadStateException();


                    workers.add(w);


                    workerAdded = true;


                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                }
            } finally {

                mainLock.unlock();
            }

            if (workerAdded) {

                t.start();
                workerStarted = true;
            }
        }
    } finally {

        if (! workerStarted)


            addWorkerFailed(w);
    }

    return workerStarted;
}
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {

        if (w != null)

            workers.remove(w);

        decrementWorkerCount();

        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

runWorker 执行任务

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{

    final Thread thread;

    Runnable firstTask;

    Worker(Runnable firstTask) {

        setState(-1); 

        this.firstTask = firstTask;

        this.thread = getThreadFactory().newThread(this);
    }


    public void run() {
        runWorker(this);
    }
final void runWorker(Worker w) {

    Thread wt = Thread.currentThread();

    Runnable task = w.firstTask;

    w.firstTask = null;

    w.unlock(); 


    boolean completedAbruptly = true;
    try {

        while (task != null || (task = getTask()) != null) {
            w.lock();

            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())

                wt.interrupt();
            try {

                beforeExecute(wt, task);
                try {

                    task.run();

                    afterExecute(task, null);
                } catch (Throwable ex) {

                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {

                task = null;

                w.completedTasks++;
                w.unlock();
            }
        }

        completedAbruptly = false;
    } finally {

        processWorkerExit(w, completedAbruptly);
    }
}

processWorkerExit 工作线程退出

private void processWorkerExit(Worker w, boolean completedAbruptly) {

    if (completedAbruptly) 

        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {

        completedTaskCount += w.completedTasks;

        workers.remove(w);
    } finally {
        mainLock.unlock();
    }


    tryTerminate();


    int c = ctl.get();

    if (runStateLessThan(c, STOP)) {

        if (!completedAbruptly) {

            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

            if (min == 0 && ! workQueue.isEmpty())

                min = 1;

            if (workerCountOf(c) >= min)

                return; 
        }



        addWorker(null, false);
    }
}

getTask 工作线程获取任务

private Runnable getTask() {

    boolean timedOut = false; 
    for (;;) {

        int c = ctl.get();




        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {

            decrementWorkerCount();
            return null;
        }


        int wc = workerCountOf(c);



        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;


        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {

            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {





            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;

            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

reject 拒绝策略

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

拒绝策略完全是执行各种实现RejectedExecutionHandler的策略类。线程池提供的拒绝策略: 拒绝策略:线程池提供的拒绝策略,一般不适合你的业务场景时,你就自己定义即可。

  • AbortPolicy:抛出异常!
  • CallerRunsPolicy:让提交任务的线程处理这个任务!
  • DiscardPolicy:啥也不做,任务没了!
  • DiscardOldestPolicy:扔掉队列最前面的任务,尝试把当前任务添加进去!

实现适合自身业务的线程池

背景

在平时业务开发过程中,有一些数据查询场景需要并行查询一些任务返回给前端;此时就会有一些并行任务,希望每一个查询任务进入到线程池中都立刻会有线程来处理,而不是放到任务队列中,因为只要放到任务队列中等待线程take,那么整个接口的响应时间就会增加。

此时如何设置核心线程数、最大线程数、最大队列是一个很头疼的问题,刚开始引入了hippo4j,它是一个可以动态的修改线程线程池属性的框架,我们通过观察线程池的各种指标对线程池属性进行修改,最后的结果就是核心线程数量非常多,因为只有队列满了才会创建非核心线程,如果队列设置为0又担心任务太多,导致任务都丢弃了。此时就想更改一下线程池的工作流程,希望来一个任务就创建一个线程,一直创建到核心线程池数为止,后续再来任务判断当前活跃的线程+队列中任务数量是否小于核心线程数,如果小于放入到队列中,如果大于直接创建非核心线程池;这样就能保证所有的任务都立刻有线程执行。

实现

开始是想重写execute方法,当开始写的时候发现很多方法是私有的都没办法直接调用;此时就想到了可以从加入队列入手,只需要把队列的offer方法重写即可。

自定义队列队列:

public class TestTaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {

    private static final long serialVersionUID = -2635853580887179627L;

    private ThreadPoolExecutor executor;

    public TestTaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(ThreadPoolExecutor exec) {
        executor = exec;
    }

    @Override
    public boolean offer(Runnable runnable) {

        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }


        int currentPoolThreadSize = executor.getPoolSize();

        if (executor.getActiveCount() + size() < currentPoolThreadSize) {

            return super.offer(runnable);
        }


        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }


        return super.offer(runnable);
    }


    public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (executor.isShutdown()) {
            throw new RejectedExecutionException("Executor is shutdown!");
        }
        return super.offer(o, timeout, unit);
    }
}

使用方式:

TestTaskQueue<Runnable> testTaskQueue = new TestTaskQueue<>(DEFAULT_QUEUE_CAPACITY);

ThreadPoolExecutor executor = new ThreadPoolExecutor(
        DEFAULT_POOL_SIZE, DEFAULT_MAX_POOL_SIZE, DEFAULT_THREAD_TIMEOUT, TimeUnit.MINUTES, testTaskQueue,
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        },
        new ThreadPoolExecutor.CallerRunsPolicy());
testTaskQueue.setExecutor(executor);

链接:https://juejin.cn/post/7402444509152952332

请登录后发表评论

    没有回复内容