一次线上线程池任务问题处理历程-Spring专区论坛-技术-SpringForAll社区

一次线上线程池任务问题处理历程

一次线上线程池任务问题处理历程

[作者简介] 王日华,小米信息技术部订单组研发工程师,目前主要负责小米订单中台业务。

一、前言

在一次新功能上线过程中,出现线程池提交任务抛出 RejectedExecutionException 异常,即任务提交执行了拒绝策略的操作。查看业务情况和线程池配置,发现并行执行的任务数是小于线程池最大线程数的,为此展开了一次线程池问题排查历程。

二、业务情景

2.1. 任务描述

每次执行一组任务,一组任务最多有 15 个,多线程执行,每个线程处理一个任务;每次执行完一组任务后,再执行下一组,不存在上一组的任务和下一组一起执行的情况。

2.2. 任务提交流程

036cee4c1020231123115533

2.3. 线程池配置

<bean id="executor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="14"/>
    <property name="maxPoolSize" value="30"/>
    <property name="queueCapacity" value="1"/>
</bean>

三、出现问题

执行过程中出现 RejectedExecutionException 异常,由于是采用的是默认拒绝策略 AbortPolicy,因此,可以明确知道任务是提交到线程池后,线程池资源已满,导致任务被拒绝。

四、问题排查

4.1. 检查线程池配置

任务最多 15 个一组,核心线程有 14 个,阻塞队列是 1,最大线程 30,理论上 14 个核心线程+1 个阻塞队列即可完成一组任务,连非核心线程都无需使用,为什么会出现线程被占满的情况?

4.2. 检查业务代码

检查是否存在线程池被多处使用,或者有多批任务被同时执行的情况,并没有发现错误;

4.3. 线下重现

  • 配置线程池
  • <bean id="executor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="14"/>
        <property name="maxPoolSize" value="30"/>
        <property name="queueCapacity" value="1"/>
    </bean>

    • 建立 demo 代码
    • @RunWith(SpringRunner.class)
      @SpringBootTest
      public class SpringBootStartApplicationTests {
      
          @Resource
          private ThreadPoolTaskExecutor executor;
      
          @Test
          public void contextLoads() throws Exception {
              // 一共 10 批任务
              for(int i = 0; i < 10; i++) {
                  // 每次执行一批任务
                  doOnceTasks();
                  System.out.println("---------------------------------------" + i);
              }
          }
      
          /**
           * 每次完成 15 个任务后,再进行下一次任务
           */
          private void doOnceTasks(){
              List<Future> futureList = Lists.newArrayListWithCapacity(15);
              for(int i = 0; i < 15; ++i){
                  Future future = executor.submit(()->{
                      // 随机睡 0-5 秒
                      int sec = new Double(Math.random() * 5).intValue();
                      LockSupport.parkNanos(sec * 1000 * 1000 * 1000);
                      System.out.println(Thread.currentThread().getName() + "  end");
                  });
                  futureList.add(future);
              }
      
              // 等待所有任务执行结束
              for(Future future : futureList){
                  try {
                      future.get();
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
          }
      }

      • 异常重现
      • 883dda98c520231123115716

        五、线程池源码阅读

        5.1. 线程池执行任务流程

        • 当工作线程数 < corePoolSize 时,新创建一个新线程执行新提交任务,即使此时线程池中存在空闲线程;
        • 当工作线程数 == corePoolSize 时,新提交任务将被放入 workQueue 中;
        • 当 workQueue 已满,且工作线程数 < maximumPoolSize 时,新提交任务会创建新的非核心线程执行任务;
        • 当 workQueue 已满,且 工作线程数==maximumPoolSize 时,新提交任务由 RejectedExecutionHandler 处理;
        • c3d4eb7f2d20231123115747

          5.2. execute 线程池提交任务源码

        • class ThreadPoolExecutor{
              public void execute(Runnable command) {
                  // 提交任务不能为 null
                  if (command == null)
                      throw new NullPointerException();
          
                  // 获取控制位 ctl 的值
                  int c = ctl.get();
                  // work 线程数 < 核心线程数
                  if (workerCountOf(c) < corePoolSize) {
                      // 直接创建核心线程,执行任务
                      if (addWorker(command, true))
                          return;
          
                      /*
                          因为没有使用锁,可能会出现并发创建核心线程;
                          走到这里,说明核心线程已经创建满了,此时,重新获取控制位 ctl 的值
                        */
                      c = ctl.get();
                  }
          
                  // 如果线程池还是 RUNNING 状态,并且任务成功提交到阻塞队列中
                  if (isRunning(c) && workQueue.offer(command)) {
                      int recheck = ctl.get();
                      // double-check,再检查一次线程池状态
                      // 如果线程池变成非 RUNNING 状态,则回滚刚才新加的任务
                      if (! isRunning(recheck) && remove(command))
                          // 从阻塞队列中移除任务成功,使用拒绝策略执行任务
                          reject(command);
          
                      // 如果工作线程数==0,则添加一个线程
                      // 主要是兼容核心线程数==0 的情况
                      else if (workerCountOf(recheck) == 0)
                          addWorker(null, false);
                  }
          
                  /*
                      到达这里,则说明核心线程数已满,且阻塞队列已满
                      尝试创建非核心线程执行任务
                  */
                  else if (!addWorker(command, false))
                      // 非核心线程创建失败了,说明是线程数以达到 maximumPoolSize,此时执行拒绝策略
                      reject(command);
              }
          }

          5.3. addWorker 添加 worker 线程

      • class ThreadPoolExecutor{
            /**
             * 添加一个 worker 线程
             * @param firstTask 第一个要执行的 task
             * @param core 是否是核心线程
             * @return 创建成功还是失败
             */
            private boolean addWorker(Runnable firstTask, boolean core) {
                // 定义了一个 retry 标签
                retry:
                for (;;) {
                    // 获取控制位
                    int c = ctl.get();
                    // 获取运行状态
                    int rs = runStateOf(c);
        
                    /**
                     * rs >= SHUTDOWN:即非 RUNNING 状态,只有 RUNNING < SHUTDOWN
                     * ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
                     *      等价于 非 SHUTDOWN 态 ||  firstTask != null || workQueue.isEmpty()
                     *      非 SHUTDOWN 态 == true:SHUTDOWN 态之后的状态,都不允许再添加 worker 线程了,直接返回 false;
                     *      非 SHUTDOWN 态 == false || (firstTask != null) == true:SHUTDOWN 状态下,不允许再添加任务了,返回 false;
                     *      非 SHUTDOWN 态 == false || (firstTask != null) == false || workQueue.isEmpty() == true:SHUTDOWN 状态,没提交新任务,阻塞队列又是空的,没必要再添加线程了
                     */
                    if (rs >= SHUTDOWN &&
                        ! (rs == SHUTDOWN &&
                           firstTask == null &&
                           ! workQueue.isEmpty()))
                        return false;
        
                    // CAS 创建 worker 线程
                    for (;;) {
                        // 获取线程数
                        int wc = workerCountOf(c);
                        /*
                        当前线程数大于最大值
                            或
                        当前创建的是核心线程,但线程数量已经>=核心线程数
                            或
                        当前创建非核心线程,但线程数量已经>=maximumPoolSize
                        */
                        if (wc >= CAPACITY ||
                            wc >= (core ? corePoolSize : maximumPoolSize))
                            // 不创建,直接返回 false
                            return false;
        
                        // cas 修改 ctl 中的线程数,线程数+1
                        if (compareAndIncrementWorkerCount(c))
                            // cas 修改成功,break goto 结束循环(不会再进入标签下的循环)
                            break retry;
        
                        // 达到这里,说明 cas 增加线程数 1 失败了,此时进行尝试
                        c = ctl.get();
                        // 先判断一下线程池状态有没有改变,如果改变了,则 continue goto(会再进入标签下的循环)
                        // 跳转到最外层的循环,重新检测线程池的状态值
                        if (runStateOf(c) != rs)
                            continue retry;
                    }
                }
        
                boolean workerStarted = false;
                boolean workerAdded = false;
                Worker w = null;
                try {
                    // 创建 worker 对象
                    w = new Worker(firstTask);
                    // 获取 worker 的线程
                    final Thread t = w.thread;
                    if (t != null) {
                        // 加锁
                        final ReentrantLock mainLock = this.mainLock;
                        mainLock.lock();
                        try {
                            // 获取线程池状态
                            int rs = runStateOf(ctl.get());
        
                            /*
                             线程池是 RUNNING 状态
                                或
                             SHUTDOWN 态 且 firstTask == null(这种情况是需要创建线程,消费队列中剩余的任务)
                              */
                            if (rs < SHUTDOWN ||
                                (rs == SHUTDOWN && firstTask == null)) {
                                // 线程是活动状态,则不合法,因为线程是刚创建的,应该是 NEW 状态
                                if (t.isAlive())
                                    throw new IllegalThreadStateException();
        
                                // 将 worker 添加到 list 中
                                workers.add(w);
                                // largestPoolSize 记录该线程池使用过程中,达到最大的线程数
                                int s = workers.size();
                                if (s > largestPoolSize)
                                    largestPoolSize = s;
                                // worker 添加成功,workerAdded 置为 true
                                workerAdded = true;
                            }
                        } finally {
                            mainLock.unlock();
                        }
        
                        // worker 添加成功,此时就可以启动线程
                        if (workerAdded) {
                            t.start();
                            // 启动线程成功,workerStarted 置为 true
                            workerStarted = true;
                        }
                    }
                } finally {
                    // 如果 worker 启动失败,则移除它
                    if (! workerStarted)
                        // workers 移除新加的 worker,并在 ctl 中将 work 线程数量-1
                        addWorkerFailed(w);
                }
                return workerStarted;
            }
        }

        六、问题定位

        6.1. 定位执行拒绝策略入口

        执行拒绝策略的位置只有这两个地方,在这两个地方打上断点,执行 demo,结果发现拒绝策略是在第二处执行的;

      • 26cfe917f920231123115912

        6.2. 定位执行拒绝策略原因

        进入 addWorker 方法,只有这两个地方返回 false,创建线程失败,打断点,执行 demo,发现是在第二处返回 false 的;

      • 0e0e8db8e820231123120015

        七、问题确认

        确实是创建的 worker 线程已经达到最大线程数,无法再创建,然后执行拒绝策略的,为什么会被创建到最大呢,每组任务最大只有 15 个,为什么会用到非核心线程?

        八、定位原因

        8.1. 分析 execute 方法

        在添加非核心线程前,先尝试将任务放到阻塞队列中,如果阻塞队列已满,则尝试添加非核心线程,也就是说,创建非核心线程时:workQueue.offer(command) == false,即阻塞队列已满;

      • 89b59f98e820231123120048

        8.2. 猜测原因

        因为我们阻塞队列只有 1,会不会提交任务的速度比线程从阻塞队列取任务的速度快,进而导致创建非核心线程执行任务,最终的结果就是:在多批任务之后,再无非核心线程可创建,导致执行拒绝策略。

      • 1353a25e1f20231123120138

        8.3. 原因验证

        8.3.1 阻塞队列选择

      • 查看 Spring 的 ThreadPoolTaskExecutor 源码,发现如果阻塞队列数量>0,则使用 LinkedBlockingQueue,否则使用 SynchronousQueue。
      • 382c9fa3d920231123120220

        8.3.2 LinkedBlockingQueue

        • 查看 LinkedBlockingQueue#take 方法,如果队列已空,则所有取元素的线程会阻塞在一个 Lock 的 notEmpty 等待条件上,等有元素入队时,只会调用 signal 方法唤醒一个线程取元素,而不是所有线程。
        • class LinkedBlockingQueue{
              private void signalNotEmpty() {
                  final ReentrantLock takeLock = this.takeLock;
                  // 加锁
                  takeLock.lock();
                  try {
                      // 唤醒一个 take 线程
                      notEmpty.signal();
                  } finally {
                      takeLock.unlock();
                  }
              }
          }

          • 因为一个线程从唤醒到执行是有一段时间间隔的,阻塞被唤醒后,还要等待获取 cpu 时间片,而主线程一直在发布任务,此时就会造成队列中的元素来不及消费,只能创建非核心线程消费的现象。

          九、解决方式

          9.1. 使用 SynchronousQueue

          使用 SynchronousQueue,即阻塞队列大小设置为 0,原因在于:SynchronousQueue 和 LinkedBlockingQueue 维度不一致,SynchronousQueue 是根据是否有等待线程而决定是否入队成功,而 LinkedBlockingQueue 是根据缓冲区,而不管是否已经有等待线程。

          • SynchronousQueue
          • 16400169b020231123120326

            • LinkedBlockingQueue
            • c748632aa920231123120401

              9.2. 根据业务情况配置阻塞队列

              对于我们的业务情况,因为任务最多只有 15 个,将阻塞队列大小设置为 15,这样就保证了不会出现任务被拒绝。


              作者

              王日华,小米信息技术部订单组

              招聘

              信息部是小米公司整体系统规划建设的核心部门,支撑公司国内外的线上线下销售服务体系、供应链体系、ERP 体系、内网 OA 体系、数据决策体系等精细化管控的执行落地工作,服务小米内部所有的业务部门以及 40 家生态链公司。

              同时部门承担大数据基础平台研发和微服务体系建设落,语言涉及 Java、Go,长年虚位以待对大数据处理、大型电商后端系统、微服务落地有深入理解和实践的各路英雄。

            • 来源:https://xiaomi-info.github.io/2019/12/19/theadpool-rejected-task/
请登录后发表评论

    没有回复内容