线程池调度
Timer
Timer 实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务
private static void method1() {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
System.out.println("task 1");
//int i = 1 / 0;//任务一的出错会导致任务二无法执行
Thread.sleep(2000);
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
System.out.println("task 2");
}
};
// 使用 timer 添加两个任务,希望它们都在 1s 后执行
// 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此任务1的延时,影响了任务2的执行
timer.schedule(task1, 1000);//17:45:56 c.ThreadPool [Timer-0] - task 1
timer.schedule(task2, 1000);//17:45:58 c.ThreadPool [Timer-0] - task 2
}
Scheduled
任务调度线程池 ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor:
- 使用内部类 ScheduledFutureTask 封装任务
- 使用内部类 DelayedWorkQueue 作为线程池队列
- 重写 onShutdown 方法去处理 shutdown 后的任务
- 提供 decorateTask 方法作为 ScheduledFutureTask 的修饰方法,以便开发者进行扩展
构造方法:Executors.newScheduledThreadPool(int corePoolSize)
public ScheduledThreadPoolExecutor(int corePoolSize) {
// 最大线程数固定为 Integer.MAX_VALUE,保活时间 keepAliveTime 固定为 0
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
// 阻塞队列是 DelayedWorkQueue
new DelayedWorkQueue());
}
常用 API:
ScheduledFuture<?> schedule(Runnable/Callable<V>, long delay, TimeUnit u)
:延迟执行任务ScheduledFuture<?> scheduleAtFixedRate(Runnable/Callable<V>, long initialDelay, long period, TimeUnit unit)
:定时执行周期任务,不考虑执行的耗时,参数为初始延迟时间、间隔时间、单位ScheduledFuture<?> scheduleWithFixedDelay(Runnable/Callable<V>, long initialDelay, long delay, TimeUnit unit)
:定时执行周期任务,考虑执行的耗时,参数为初始延迟时间、间隔时间、单位
基本使用:
延迟任务,但是出现异常并不会在控制台打印,也不会影响其他线程的执行
javapublic static void main(String[] args){ // 线程池大小为1时也是串行执行 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); // 添加两个任务,都在 1s 后同时执行 executor.schedule(() -> { System.out.println("任务1,执行时间:" + new Date()); //int i = 1 / 0; try { Thread.sleep(2000); } catch (InterruptedException e) { } }, 1000, TimeUnit.MILLISECONDS); executor.schedule(() -> { System.out.println("任务2,执行时间:" + new Date()); }, 1000, TimeUnit.MILLISECONDS); }
定时任务 scheduleAtFixedRate:一次任务的启动到下一次任务的启动之间只要大于等于间隔时间,抢占到 CPU 就会立即执行
javapublic static void main(String[] args) { ScheduledExecutorService pool = Executors.newScheduledThreadPool(1); System.out.println("start..." + new Date()); pool.scheduleAtFixedRate(() -> { System.out.println("running..." + new Date()); Thread.sleep(2000); }, 1, 1, TimeUnit.SECONDS); } /*start...Sat Apr 24 18:08:12 CST 2021 running...Sat Apr 24 18:08:13 CST 2021 running...Sat Apr 24 18:08:15 CST 2021 running...Sat Apr 24 18:08:17 CST 2021
定时任务 scheduleWithFixedDelay:一次任务的结束到下一次任务的启动之间等于间隔时间,抢占到 CPU 就会立即执行,这个方法才是真正的设置两个任务之间的间隔
javapublic static void main(String[] args){ ScheduledExecutorService pool = Executors.newScheduledThreadPool(3); System.out.println("start..." + new Date()); pool.scheduleWithFixedDelay(() -> { System.out.println("running..." + new Date()); Thread.sleep(2000); }, 1, 1, TimeUnit.SECONDS); } /*start...Sat Apr 24 18:11:41 CST 2021 running...Sat Apr 24 18:11:42 CST 2021 running...Sat Apr 24 18:11:45 CST 2021 running...Sat Apr 24 18:11:48 CST 2021
成员属性
成员变量
shutdown 后是否继续执行周期任务:
javaprivate volatile boolean continueExistingPeriodicTasksAfterShutdown;
shutdown 后是否继续执行延迟任务:
javaprivate volatile boolean executeExistingDelayedTasksAfterShutdown = true;
取消方法是否将该任务从队列中移除:
java// 默认 false,不移除,等到线程拿到任务之后抛弃 private volatile boolean removeOnCancel = false;
任务的序列号,可以用来比较优先级:
javaprivate static final AtomicLong sequencer = new AtomicLong();
延迟任务
ScheduledFutureTask 继承 FutureTask,实现 RunnableScheduledFuture 接口,具有延迟执行的特点,覆盖 FutureTask 的 run 方法来实现对延时执行、周期执行的支持。对于延时任务调用 FutureTask#run,而对于周期性任务则调用 FutureTask#runAndReset 并且在成功之后根据 fixed-delay/fixed-rate 模式来设置下次执行时间并重新将任务塞到工作队列
在调度线程池中无论是 runnable 还是 callable,无论是否需要延迟和定时,所有的任务都会被封装成 ScheduledFutureTask
成员变量:
任务序列号:
javaprivate final long sequenceNumber;
执行时间:
javaprivate long time; // 任务可以被执行的时间,交付时间,以纳秒表示 private final long period; // 0 表示非周期任务,正数表示 fixed-rate 模式的周期,负数表示 fixed-delay 模式
fixed-rate:两次开始启动的间隔,fixed-delay:一次执行结束到下一次开始启动
实际的任务对象:
javaRunnableScheduledFuture<V> outerTask = this;
任务在队列数组中的索引下标:
java// DelayedWorkQueue 底层使用的数据结构是最小堆,记录当前任务在堆中的索引,-1 代表删除 int heapIndex;
成员方法:
构造方法:
javaScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); // 任务的触发时间 this.time = ns; // 任务的周期,多长时间执行一次 this.period = period; // 任务的序号 this.sequenceNumber = sequencer.getAndIncrement(); }
compareTo():ScheduledFutureTask 根据执行时间 time 正序排列,如果执行时间相同,在按照序列号 sequenceNumber 正序排列,任务需要放入 DelayedWorkQueue,延迟队列中使用该方法按照从小到大进行排序
javapublic int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { // 类型强转 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; // 比较者 - 被比较者的执行时间 long diff = time - x.time; // 比较者先执行 if (diff < 0) return -1; // 被比较者先执行 else if (diff > 0) return 1; // 比较者的序列号小 else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } // 不是 ScheduledFutureTask 类型时,根据延迟时间排序 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
run():执行任务,非周期任务直接完成直接结束,周期任务执行完后会设置下一次的执行时间,重新放入线程池的阻塞队列,如果线程池中的线程数量少于核心线程,就会添加 Worker 开启新线程
javapublic void run() { // 是否周期性,就是判断 period 是否为 0 boolean periodic = isPeriodic(); // 根据是否是周期任务检查当前状态能否执行任务,不能执行就取消任务 if (!canRunInCurrentRunState(periodic)) cancel(false); // 非周期任务,直接调用 FutureTask#run 执行 else if (!periodic) ScheduledFutureTask.super.run(); // 周期任务的执行,返回 true 表示执行成功 else if (ScheduledFutureTask.super.runAndReset()) { // 设置周期任务的下一次执行时间 setNextRunTime(); // 任务的下一次执行安排,如果当前线程池状态可以执行周期任务,加入队列,并开启新线程 reExecutePeriodic(outerTask); } }
周期任务正常完成后任务的状态不会变化,依旧是 NEW,不会设置 outcome 属性。但是如果本次任务执行出现异常,会进入 setException 方法将任务状态置为异常,把异常保存在 outcome 中,方法返回 false,后续的该任务将不会再周期的执行
javaprotected boolean runAndReset() { // 任务不是新建的状态了,或者被别的线程执行了,直接返回 false if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { // 执行方法,没有返回值 c.call(); ran = true; } catch (Throwable ex) { // 出现异常,把任务设置为异常状态,唤醒所有的 get 阻塞线程 setException(ex); } } } finally { // 执行完成把执行线程引用置为 null runner = null; s = state; // 如果线程被中断进行中断处理 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } // 如果正常执行,返回 true,并且任务状态没有被取消 return ran && s == NEW; }
java// 任务下一次的触发时间 private void setNextRunTime() { long p = period; if (p > 0) // fixed-rate 模式,【时间设置为上一次执行任务的时间 + p】,两次任务执行的时间差 time += p; else // fixed-delay 模式,下一次执行时间是【当前这次任务结束的时间(就是现在) + delay 值】 time = triggerTime(-p); }
reExecutePeriodic():准备任务的下一次执行,重新放入阻塞任务队列
java// ScheduledThreadPoolExecutor#reExecutePeriodic void reExecutePeriodic(RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true)) { // 【放入任务队列】 super.getQueue().add(task); // 如果提交完任务之后,线程池状态变为了 shutdown 状态,需要再次检查是否可以执行, // 如果不能执行且任务还在队列中未被取走,则取消任务 if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else // 当前线程池状态可以执行周期任务,加入队列,并【根据线程数量是否大于核心线程数确定是否开启新线程】 ensurePrestart(); } }
cancel():取消任务
javapublic boolean cancel(boolean mayInterruptIfRunning) { // 调用父类 FutureTask#cancel 来取消任务 boolean cancelled = super.cancel(mayInterruptIfRunning); // removeOnCancel 用于控制任务取消后是否应该从阻塞队列中移除 if (cancelled && removeOnCancel && heapIndex >= 0) // 从等待队列中删除该任务,并调用 tryTerminate() 判断是否需要停止线程池 remove(this); return cancelled; }
延迟队列
DelayedWorkQueue 是支持延时获取元素的阻塞队列,内部采用优先队列 PriorityQueue(小根堆、满二叉树)存储元素
其他阻塞队列存储节点的数据结构大都是链表,延迟队列是数组,所以延迟队列出队头元素后需要让其他元素(尾)替换到头节点,防止空指针异常
成员变量:
容量:
javaprivate static final int INITIAL_CAPACITY = 16; // 初始容量 private int size = 0; // 节点数量 private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; // 存放节点
锁:
javaprivate final ReentrantLock lock = new ReentrantLock(); // 控制并发 private final Condition available = lock.newCondition(); // 条件队列
阻塞等待头节点的线程:线程池内的某个线程去 take() 获取任务时,如果延迟队列顶层节点不为 null(队列内有任务),但是节点任务还不到触发时间,线程就去检查队列的 leader字段是否被占用
- 如果未被占用,则当前线程占用该字段,然后当前线程到 available 条件队列指定超时时间
堆顶任务.time - now()
挂起 - 如果被占用,当前线程直接到 available 条件队列不指定超时时间的挂起
java// leader 在 available 条件队列内是首元素,它超时之后会醒过来,然后再次将堆顶元素获取走,获取走之后,take()结束之前,会调用是 available.signal() 唤醒下一个条件队列内的等待者,然后释放 lock,下一个等待者被唤醒后去到 AQS 队列,做 acquireQueue(node) 逻辑 private Thread leader = null;
- 如果未被占用,则当前线程占用该字段,然后当前线程到 available 条件队列指定超时时间
成员方法
offer():插入节点
javapublic boolean offer(Runnable x) { // 判空 if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; // 队列锁,增加删除数据时都要加锁 final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; // 队列数量大于存放节点的数组长度,需要扩容 if (i >= queue.length) // 扩容为原来长度的 1.5 倍 grow(); size = i + 1; // 当前是第一个要插入的节点 if (i == 0) { queue[0] = e; // 修改 ScheduledFutureTask 的 heapIndex 属性,表示该对象在队列里的下标 setIndex(e, 0); } else { // 向上调整元素的位置,并更新 heapIndex siftUp(i, e); } // 情况1:当前任务是第一个加入到 queue 内的任务,所以在当前任务加入到 queue 之前,take() 线程会直接 // 到 available 队列不设置超时的挂起,并不会去占用 leader 字段,这时需会唤醒一个线程 让它去消费 // 情况2:当前任务【优先级最高】,原堆顶任务可能还未到触发时间,leader 线程设置超时的在 available 挂起 // 原先的 leader 等待的是原先的头节点,所以 leader 已经无效,需要将 leader 线程唤醒, // 唤醒之后它会检查堆顶,如果堆顶任务可以被消费,则直接获取走,否则继续成为 leader 等待新堆顶任务 if (queue[0] == e) { // 将 leader 设置为 null leader = null; // 直接随便唤醒等待头结点的阻塞线程 available.signal(); } } finally { lock.unlock(); } return true; }
java// 插入新节点后对堆进行调整,进行节点上移,保持其特性【节点的值小于子节点的值】,小顶堆 private void siftUp(int k, RunnableScheduledFuture<?> key) { while (k > 0) { // 父节点,就是堆排序 int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; // key 和父节点比,如果大于父节点可以直接返回,否则就继续上浮 if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); }
poll():非阻塞获取头结点,获取执行时间最近并且可以执行的
java// 非阻塞获取 public RunnableScheduledFuture<?> poll() { final ReentrantLock lock = this.lock; lock.lock(); try { // 获取队头节点,因为是小顶堆 RunnableScheduledFuture<?> first = queue[0]; // 头结点为空或者的延迟时间没到返回 null if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else // 头结点达到延迟时间,【尾节点成为替代节点下移调整堆结构】,返回头结点 return finishPoll(first); } finally { lock.unlock(); } }
javaprivate RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) { // 获取尾索引 int s = --size; // 获取尾节点 RunnableScheduledFuture<?> x = queue[s]; // 将堆结构最后一个节点占用的 slot 设置为 null,因为该节点要尝试升级成堆顶,会根据特性下调 queue[s] = null; // s == 0 说明 当前堆结构只有堆顶一个节点,此时不需要做任何的事情 if (s != 0) // 从索引处 0 开始向下调整 siftDown(0, x); // 出队的元素索引设置为 -1 setIndex(f, -1); return f; }
take():阻塞获取头节点,读取当前堆中最小的也就是触发时间最近的任务
javapublic RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; // 保证线程安全 lock.lockInterruptibly(); try { for (;;) { // 头节点 RunnableScheduledFuture<?> first = queue[0]; if (first == null) // 等待队列不空,直至有任务通过 offer 入队并唤醒 available.await(); else { // 获取头节点的延迟时间是否到时 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // 到达触发时间,获取头节点并调整堆,重新选择延迟时间最小的节点放入头部 return finishPoll(first); // 逻辑到这说明头节点的延迟时间还没到 first = null; // 说明有 leader 线程在等待获取头节点,当前线程直接去阻塞等待 if (leader != null) available.await(); else { // 没有 leader 线程,【当前线程作为leader线程,并设置头结点的延迟时间作为阻塞时间】 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 在条件队列 available 使用带超时的挂起(堆顶任务.time - now() 纳秒值..) available.awaitNanos(delay); // 到达阻塞时间时,当前线程会从这里醒来来 } finally { // t堆顶更新,leader 置为 null,offer 方法释放锁后, // 有其它线程通过 take/poll 拿到锁,读到 leader == null,然后将自身更新为leader。 if (leader == thisThread) // leader 置为 null 用以接下来判断是否需要唤醒后继线程 leader = null; } } } } } finally { // 没有 leader 线程,头结点不为 null,唤醒阻塞获取头节点的线程, // 【如果没有这一步,就会出现有了需要执行的任务,但是没有线程去执行】 if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
remove():删除节点,堆移除一个元素的时间复杂度是 O(log n),延迟任务维护了 heapIndex,直接访问的时间复杂度是 O(1),从而可以更快的移除元素,任务在队列中被取消后会进入该逻辑
javapublic boolean remove(Object x) { final ReentrantLock lock = this.lock; lock.lock(); try { // 查找对象在队列数组中的下标 int i = indexOf(x); // 节点不存在,返回 false if (i < 0) return false; // 修改元素的 heapIndex,-1 代表删除 setIndex(queue[i], -1); // 尾索引是长度-1 int s = --size; // 尾节点作为替代节点 RunnableScheduledFuture<?> replacement = queue[s]; queue[s] = null; // s == i 说明头节点就是尾节点,队列空了 if (s != i) { // 向下调整 siftDown(i, replacement); // 说明没发生调整 if (queue[i] == replacement) // 上移和下移不可能同时发生,替代节点大于子节点时下移,否则上移 siftUp(i, replacement); } return true; } finally { lock.unlock(); } }
成员方法
提交任务
schedule():延迟执行方法,并指定执行的时间,默认是当前时间
javapublic void execute(Runnable command) { // 以零延时任务的形式实现 schedule(command, 0, NANOSECONDS); }
javapublic ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { // 判空 if (command == null || unit == null) throw new NullPointerException(); // 没有做任何操作,直接将 task 返回,该方法主要目的是用于子类扩展,并且【根据延迟时间设置任务触发的时间点】 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>( command, null, triggerTime(delay, unit))); // 延迟执行 delayedExecute(t); return t; }
java// 返回【当前时间 + 延迟时间】,就是触发当前任务执行的时间 private long triggerTime(long delay, TimeUnit unit) { // 设置触发的时间 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } long triggerTime(long delay) { // 如果 delay < Long.Max_VALUE/2,则下次执行时间为当前时间 +delay // 否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }
overflowFree 的原因:如果某个任务的 delay 为负数,说明当前可以执行(其实早该执行了)。阻塞队列中维护任务顺序是基于 compareTo 比较的,比较两个任务的顺序会用 time 相减。那么可能出现一个 delay 为正数减去另一个为负数的 delay,结果上溢为负数,则会导致 compareTo 产生错误的结果
javaprivate long overflowFree(long delay) { Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { long headDelay = head.getDelay(NANOSECONDS); // 判断一下队首的delay是不是负数,如果是正数就不用管,怎么减都不会溢出 // 否则拿当前 delay 减去队首的 delay 来比较看,如果不出现上溢,排序不会乱 // 不然就把当前 delay 值给调整为 Long.MAX_VALUE + 队首 delay if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; }
scheduleAtFixedRate():定时执行,一次任务的启动到下一次任务的启动的间隔
javapublic ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); // 任务封装,【指定初始的延迟时间和周期时间】 ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); // 默认返回本身 RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; // 开始执行这个任务 delayedExecute(t); return t; }
scheduleWithFixedDelay():定时执行,一次任务的结束到下一次任务的启动的间隔
javapublic ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); // 任务封装,【指定初始的延迟时间和周期时间】,周期时间为 - 表示是 fixed-delay 模式 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
运行任务
delayedExecute():校验线程池状态,延迟或周期性任务的主要执行方法
javaprivate void delayedExecute(RunnableScheduledFuture<?> task) { // 线程池是 SHUTDOWN 状态,需要执行拒绝策略 if (isShutdown()) reject(task); else { // 把当前任务放入阻塞队列,因为需要【获取执行时间最近的】,当前任务需要比较 super.getQueue().add(task); // 线程池状态为 SHUTDOWN 并且不允许执行任务了,就从队列删除该任务,并设置任务的状态为取消状态 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else // 可以执行 ensurePrestart(); } }
ensurePrestart():开启线程执行任务
java// ThreadPoolExecutor#ensurePrestart void ensurePrestart() { int wc = workerCountOf(ctl.get()); // worker数目小于corePoolSize,则添加一个worker。 if (wc < corePoolSize) // 第二个参数 true 表示采用核心线程数量限制,false 表示采用 maximumPoolSize addWorker(null, true); // corePoolSize = 0的情况,至少开启一个线程,【担保机制】 else if (wc == 0) addWorker(null, false); }
canRunInCurrentRunState():任务运行时都会被调用以校验当前状态是否可以运行任务
javaboolean canRunInCurrentRunState(boolean periodic) { // 根据是否是周期任务判断,在线程池 shutdown 后是否继续执行该任务,默认非周期任务是继续执行的 return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); }
onShutdown():删除并取消工作队列中的不需要再执行的任务
javavoid onShutdown() { BlockingQueue<Runnable> q = super.getQueue(); // shutdown 后是否仍然执行延时任务 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); // shutdown 后是否仍然执行周期任务 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); // 如果两者皆不可,则对队列中【所有任务】调用 cancel 取消并清空队列 if (!keepDelayed && !keepPeriodic) { for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture<?>) ((RunnableScheduledFuture<?>) e).cancel(false); q.clear(); } else { for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; // 不需要执行的任务删除并取消,已经取消的任务也需要从队列中删除 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { if (q.remove(t)) t.cancel(false); } } } } // 因为任务被从队列中清理掉,所以需要调用 tryTerminate 尝试【改变线程池的状态】 tryTerminate(); }