NioEventLoop 任务的执行
今天跟核心方法,关于任务的处理
下面是EventLoop第一次执行execute方法的时候,会触发的逻辑,会执行一个核心Runnable任务,该任务会进行selector的选择,然后处理三类任务,以及I/O就绪事件:
- 注册、绑定端口、为NioServerChannel的pipline添加连接处理器等任务都会放到任务队列taskQueue,这是一类
- 还有一类是定时任务,但是定时任务的处理只要定时任务到了执行时间,也会放入taskQueue
- 第三类是收尾任务,会放到tailTasks
除了处理任务,还会处理Selector监听到的这种就绪事件。
SingleThreadEventExecutor.this.run()
//io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
private void doStartThread() {
assert thread == null;
// 调用当前EventLoop所包含的executor(子executor)
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 进行selector的选择,然后执行三类任务
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
...
}
}
});
}
我们跟SingleThreadEventExecutor.this.run()具体实现方法io.netty.channel.nio.NioEventLoop#run:
//io.netty.channel.nio.NioEventLoop#run
protected void run() {
// 永久循环
for (;;) {
try {
try {
// ------------------------- 1 selector选择 -------------------
// 计算出选择selector策略
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE: // NioEventLoop不支持
continue;
case SelectStrategy.BUSY_WAIT: // Nio不支持
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: // NioEventLoop支持的唯一策略
// 若执行这里,说明当前任务队列中没有任务
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
// 若当前线程刚被唤醒,selector立即将其选择的结果返回给我们
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
// ioRatio用于控制IO处理与任务队列中任务的处理所占时间比例
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
// ------------------------- 2 处理就绪的IO -------------------
// IO操作的开始时间
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// ------------------------- 3 执行任务队列中的任务 -------------------
// Ensure we always run tasks.
// IO操作总用时
final long ioTime = System.nanoTime() - ioStartTime;
// ioTime * [(100 - ioRatio) / ioRatio]
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
以上代码我们分三大流程:
- 1.selector选择
- 2.处理就绪的IO
- 3.执行任务队列中的任务
1.selector选择
//io.netty.channel.nio.NioEventLoop#run
protected void run() {
// 永久循环
for (;;) {
try {
try {
// ------------------------- 1 selector选择 -------------------
// 计算出选择selector策略
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE: // NioEventLoop不支持
continue;
case SelectStrategy.BUSY_WAIT: // Nio不支持
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: // NioEventLoop支持的唯一策略
// 若执行这里,说明当前任务队列中没有任务
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
// 若当前线程刚被唤醒,selector立即将其选择的结果返回给我们
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}
...
}
}
selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()):
计算出选择策略,有三种策略:
- SelectStrategy.SELECT值为-1 (只有这个策略会用到,其他都没用)
- SelectStrategy.CONTINUE值为-2
- SelectStrategy.BUSY_WAIT值为-3
先看下hasTasks():
//io.netty.channel.SingleThreadEventLoop#hasTasks
protected boolean hasTasks() {
// 判断 taskQueue 或 tailTasks 任务队列是否为空
return super.hasTasks() || !tailTasks.isEmpty();
}
//io.netty.util.concurrent.SingleThreadEventExecutor#hasTasks
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}
即判断当前EventLoop中的普通任务队列和尾部任务队列中是否有任务。
再看selectStrategy.calculateStrategy:
//io.netty.channel.DefaultSelectStrategy
final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() { }
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
// 若任务队列有任务,则马上进行非阻塞选择
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}
有任务走selectSupplier.get(),selectSupplier是啥,是NioEventLoop 中的一个成员变量:
public final class NioEventLoop extends SingleThreadEventLoop {
...
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
// 非阻塞选择
return selectNow();
}
};
/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeout for
* the select method and the select method will block for that time unless
* waken up.
* 布尔值,用于控制选择器是否被阻塞。select应该跳出它的选择过程。
* 在我们的例子中,我们为选择方法使用了一个超时阻塞方式,而选择方法将在那个时间内阻塞,除非被唤醒。
* 解释:简单来说就是在NioEventLoop的线程在执行SelectStrategy.SELECT策略对应
* 的select方法的时候,里面会用selector.select(long)指定超时时间阻塞的方式
* 进行选择,如果我们希望跳出这个策略对应的整个select方法,可以将wakenUp变量
* 置为true,代表唤醒,内部会自己调用selector.wakeup(),唤醒线程并跳出
* 整个select选择这一步,而去执行后面处理就绪IO事件和任务队列中的任务逻辑。
*/
private final AtomicBoolean wakenUp = new AtomicBoolean();
int selectNow() throws IOException {
try {
// NIO底层的非阻塞选择
return selector.selectNow();
} finally {
// restore wakeup state if needed
// wakenUp为true,表示当前eventLoop所绑定的线程刚刚被唤醒
// wakenUp为false,表示当前eventLoop所绑定的线程即将被阻塞
if (wakenUp.get()) {
// 立即将选择的结果写入到当前eventLoop的集合
// 唤醒selector.select方法
selector.wakeup();
}
}
}
...
}
selector.selectNow()的返回值是已经就绪的key的数量,没有就绪的就是0,即selectSupplier.get()的返回值是大于等于0的,所以只要任务队列有任务肯定是不会走到后面case SelectStrategy.SELECT的逻辑,不会进行选择。
selector.wakeup():这个是nio的api,之前应该介绍过,使尚未返回的第一个选择操作立即返回,即如果selector.select()方法在执行中并且被阻塞,调用这个方法可以让selector.select()立即返回
/**
* //java.nio.channels.Selector#wakeup
*
* Causes the first selection operation that has not yet returned to return
* immediately.
* 使尚未返回的第一个选择操作立即返回。
*
* <p> If another thread is currently blocked in an invocation of the
* {@link #select()} or {@link #select(long)} methods then that invocation
* will return immediately. If no selection operation is currently in
* progress then the next invocation of one of these methods will return
* immediately unless the {@link #selectNow()} method is invoked in the
* meantime. In any case the value returned by that invocation may be
* non-zero. Subsequent invocations of the {@link #select()} or {@link
* #select(long)} methods will block as usual unless this method is invoked
* again in the meantime.
*
* <p> Invoking this method more than once between two successive selection
* operations has the same effect as invoking it just once. </p>
*
* @return This selector
*/
public abstract Selector wakeup();
回到selectStrategy.calculateStrategy,再看这里的意思就很清楚了:若任务队列有任务,则马上进行非阻塞选择
,此时返回值肯定是大于等于0,直接走出swith case了,不会被任何一个case选中
//io.netty.channel.DefaultSelectStrategy
final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() { }
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
// 若任务队列有任务,则马上进行非阻塞选择
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}
否则走case SelectStrategy.SELECT,准备执行SELECT选择策略对应的select方法,只要走这段代码,即代表当前任务队列肯定是没有任何任务的
:
case SelectStrategy.SELECT: // NioEventLoop支持的唯一策略
// 若执行这里,说明当前任务队列中没有任务
// 把wakenUp设置为false,意思是当前线程马上即将要阻塞,阻塞去处理Selector.select进行选择
select(wakenUp.getAndSet(false));
// 若当前线程刚被唤醒,selector立即将其选择的结果返回给我们
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
select(wakenUp.getAndSet(false))的意思是:把wakenUp设置为false,设置为阻塞状态,代表当前线程正在处理“选择操作”,可以认为是正在执行SelectStrategy.SELECT对应的select方法,但不能简单认为是selector.select()方法导致的线程阻塞
,select()这个方法比较复杂,分四大块:
- 1 处理定时任务
- 2 在选择期间,判断任务队列中有新任务加入
- 3 阻塞式选择
- 4 处理导致选择结束阻塞的各种情况(5种)
private void select(boolean oldWakenUp) throws IOException {
//每个NioEventLoop都有一个Selector,在构造里面初始化的
Selector selector = this.selector;
try {
// 计数器,记录当前选择执行的轮数
int selectCnt = 0;
// 获取当前select()开始的时间点
long currentTimeNanos = System.nanoTime();
// delayNanos():从定时任务队列中取出一个定时任务,计算其还有多久就要执行了
// selectDeadLineNanos : 表示这个定时任务要开始执行的时间点
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// --------------------- 1 处理定时任务 ------------------
// 对于马上就要到执行时间的定时任务,立即进行选择
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
// 非阻塞选择
selector.selectNow();
selectCnt = 1;
}
break;
}
// --------------------- 2 在选择期间,任务队列中有新任务加入 ------------------
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
// 非阻塞选择
selector.selectNow();
selectCnt = 1;
break;
}
// --------------------- 3 阻塞式选择 ------------------
// select()方法结束的条件:
// 1)有channel被选择
// 2)seleter.wakeup()被调用
// 3)当前线程被打断
// 4)阻塞时间超时
// 5)其实这里还有一个结束的条件:
// 当长时间没有就绪的channel时,轮询会出现长时间空转,从而会导致CPU占用率飙升,
// 此时会使select()结束
// 注意,timeoutMillis 在这里是作为select()的阻塞时长的
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something, 有channel被选择
// - waken up by user, or 或 seleter.wakeup()被调用
// - the task queue has a pending task. // 任务队列中有挂起的任务
// - a scheduled task is ready for processing // 有定时任务
break;
}
// --------------------- 4 处理一些特殊情况 ------------------
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
// 代码走到这里,说明select()结束的条件是4)或5)
// 记录当前时间
long time = System.nanoTime();
// 下面的式子等价于:
// time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)
// 当前for循环已经执行的时长 >= 阻塞时长
// 若if的这个条件成立,说明前面的select()方法是通过条件4)结束的
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
// 执行else说明 当前for循环已经执行的时长 < 阻塞时长 ,说明前面的select()是通过
// 条件5)结束的。若空转次数大于等于指定的阈值512,则重新构建selector
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
1.1 处理定时任务
看下delayNanos:从定时任务队列中取出一个定时任务,计算其还有多久就要执行了
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
...
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
//io.netty.util.concurrent.SingleThreadEventExecutor#delayNanos
// currentTimeNanos是当前select()开始的时间点
protected long delayNanos(long currentTimeNanos) {
// 从定时任务队列中获取一个任务
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
// 若没有定时任务,则返回1秒
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
// 返回任务还有多久就要开始执行
return scheduledTask.delayNanos(currentTimeNanos);
}
//获取任务该方法在父类实现中
//io.netty.util.concurrent.AbstractScheduledEventExecutor#peekScheduledTask
final ScheduledFutureTask<?> peekScheduledTask() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (scheduledTaskQueue == null) {
return null;
}
// 从定时任务队列中取出一个任务
// 定时任务队列中的任务是按照时间升序排列的
return scheduledTaskQueue.peek();
}
...
}
看下scheduledTask.delayNanos
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
...
//返回任务还有多久就要开始执行
//currentTimeNanos是当前select()开始的时间点
public long delayNanos(long currentTimeNanos) {
// currentTimeNanos - START_TIME : 当前定时任务系统已经启动了多久
// deadlineNanos() - (currentTimeNanos - START_TIME) :当前任务还需要等待多久就可以执行了
// 判断取最大值,如果存活时间超过了定时任务指定的时间,会是负数,则返回0,需要马上执行
return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
}
// 记录当前类SingleThreadEventExecutor加载的时间点
// 可以理解为定时任务系统启动的时间(类变量)
private static final long START_TIME = System.nanoTime();
public long deadlineNanos() {
// 当前定时任务的执行期限
// 这个期限一定是相对于START_TIME的
return deadlineNanos;
}
...
}
所以long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos),selectDeadLineNanos 代表的就是定时任务队列中最早执行的定时任务要开始执行的时间点,注意如果没有定时任务delayNanos返回的是1秒
看下for循环里面第一个处理定时任务的逻辑:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
// 计数器,记录当前选择执行的轮数
int selectCnt = 0;
// 获取当前select()开始的时间点
long currentTimeNanos = System.nanoTime();
// delayNanos():从定时任务队列中取出一个定时任务,计算其还有多久就要执行了
// selectDeadLineNanos : 表示这个定时任务要开始执行的时间点
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// --------------------- 1 处理定时任务 ------------------
// 对于马上就要到执行时间的定时任务,立即进行选择
// 时间单位都是纳秒,500000L是0.5毫秒,加0.5豪秒再除以1秒,取余的意思是"有快到了的任务,
// 就退出当前select操作,立即选择返回结果,然后结束循环,出去去处理定时任务"
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
// 非阻塞选择
selector.selectNow();
selectCnt = 1;
}
break;
}
...
}
...
如果有马上要执行的定时任务,则selector立即选择,然后break退出循环后,退出select方法,出去以后,走到第三步“处理任务队列中的任务”逻辑,会处理定时任务的逻辑,代码就在下面的在第三步,先看下位置在哪后面在讲定时任务的处理:
//io.netty.channel.nio.NioEventLoop#run
protected void run() {
// 永久循环
for (;;) {
try {
try {
// ------------------------- 1 selector选择 -------------------
// 计算出选择selector策略
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
...
case SelectStrategy.SELECT: // NioEventLoop支持的唯一策略
// 若执行这里,说明当前任务队列中没有任务
select(wakenUp.getAndSet(false));
// 若当前线程刚被唤醒,selector立即将其选择的结果返回给我们
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
...
}
cancelledKeys = 0;
needsToSelectAgain = false;
// ioRatio用于控制IO处理与任务队列中任务的处理所占时间比例
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
...
} else {
// ------------------------- 2 处理就绪的IO -------------------
// IO操作的开始时间
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// ------------------------- 3 执行任务队列中的任务 -------------------
// Ensure we always run tasks.
// IO操作总用时
final long ioTime = System.nanoTime() - ioStartTime;
// ioTime * [(100 - ioRatio) / ioRatio]
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
...
}
}
select()中如果暂时没有快要执行的定时任务,则不会退出循环,走后面的逻辑:
1.2 在选择期间,判断任务队列中是否有新任务加入
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
...
for (;;) {
...
//如果暂时没有快要执行的定时任务,则不会退出循环,走后面的逻辑:
// --------------------- 2 在选择期间,任务队列中有新任务加入 ------------------
// 在进select方法之前,任务队列中肯定是没有任务的,在select过程中,任务队列是有可能有新任务加入的
// 所以这个if代表的情况是 没有即将执行的定时任务,任务队列有任务 的情况
// 这个时候就需要结束select去执行任务队列里面的任务,和上面定时逻辑退出循环一样
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
//任务队列有任务,并且当前wakenUp是false状态,则置为true唤醒状态
// 唤醒状态不会进行select
// 非阻塞选择
selector.selectNow();
selectCnt = 1;
break;
}
//如果定时任务没有即将执行的,任务队列也没有任务要执行,则继续走下面逻辑:阻塞等待方式select
...
}
...
} catch (CancelledKeyException e) {
...
}
}
如果定时任务没有即将执行的,任务队列也没有任务要执行,则继续走下面逻辑
1.3 阻塞式选择
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
...
for (;;) {
...
//如果定时任务没有即将执行的,任务队列也没有任务要执行,则继续走下面逻辑:阻塞等待方式select
// --------------------- 3 阻塞式选择 ------------------
// selector.select()方法结束的条件(看源码注释):
// 1)至少有一个channel被选择
// 2)seleter.wakeup()被调用
// 3)当前线程被打断
// 4)阻塞时间超时
// 5)其实这里还有一个结束的条件:
// 当长时间没有就绪的channel时,轮询会出现长时间空转,从而会导致CPU占用率飙升,
// 此时会使select()结束,这是一个BUG,后面会处理这个BUG
// 也算是一个优化吧,避免CPU占用率飙升
// 简单解释:比如selector.select,我们阻塞500个时间单位,当走了10个时间单位后
// 一直没有就绪的channel,select一直在空转,此时CPU占用率飙升,这里就会结束阻塞直接返回
// 等再次轮询到这里继续select的时候,还是空转CPU占用率又飙升,还会立即结束阻塞
// 所以这种情况我们设置的变量selectCnt会一直递增
//
// 注意,timeoutMillis 在这里是作为select()的阻塞时长的
// 如果之前有定时任务,则timeoutMillis 是定时任务执行的时间点
// 如果没有delayNanos默认返回是1秒
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
...
}
...
} catch (CancelledKeyException e) {
...
}
}
1.4 处理导致选择结束阻塞的各种情况(5种)
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
...
for (;;) {
...
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
//下面几种情况会退出循环:
// - Selected something, 有channel被选择
// - waken up by user, or
// oldWakenUp为true,或者wakenUp.get()为ture
// 代表用户主动将wakenUp置为true,同时会调用
// seleter.wakeup()方法来唤醒线程,所以这里代表的
// 是seleter.wakeup()被调用的情况
// 前两种情况考虑的是select()方法结束的1,2两种情况
//
// - the task queue has a pending task. // 任务队列中有挂起的任务
// - a scheduled task is ready for processing // 有定时任务要执行了
// 即最后两种情况是因为select操作的时候又阻塞了一段时间,
// 所以再重新校验一下任务队列是否有任务要处理或者定时任务要执行了
break;
}
// 代码走到这里,说明select()结束不是1),2)两种情况
// 这里考虑的是select()方法结束原因是当前线程被打断的情况
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
// 代码走到这里,说明select()结束的条件是4)或5)
// 记录当前时间
long time = System.nanoTime();
// 下面的式子等价于:
// time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)
// 当前for循环已经执行的时长 >= 阻塞时长
// 若if的这个条件成立,说明前面的select()方法是通过条件4)结束的
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
// 执行else说明 当前for循环已经执行的时长 < 阻塞时长 ,说明前面的select()
// 是通过条件5)结束的。若空转次数大于等于指定的阈值512,则重新构建selector
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
//重新构建的逻辑就不关注了
selector = selectRebuildSelector(selectCnt);
//构建完成以后又重新置为1
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
注意
:
SELECTOR_AUTO_REBUILD_THRESHOLD
总结:
- 如果任务队列中
有任务
,则会让selector进行非阻塞选择
,直接进入后面
处理就绪IO、任务队列任务的逻辑 - 如果任务队列中
没有任务
,会执行SelectStrategy.SELECT对应的选择策略,执行select方法,进行阻塞式选择,当然不是简单的直接就执行selector.select(long)阻塞式选择,整个select方法有如下四个步骤:- 1.处理定时任务,计算阻塞选择需要阻塞的时间
- 获取最快要执行的定时任务,如果有定时任务,计算该定时任务执行的时间点
- 如果该时间点马上就要到了,直接让selector进行非阻塞选择,然后会立即结束select策略,
进入后面
处理就绪IO/任务队列任务的逻辑 - 如果时间没有马上到,则后面第三步selector阻塞选择的时候,阻塞的时间就是该时间
- 如果该时间点马上就要到了,直接让selector进行非阻塞选择,然后会立即结束select策略,
- 如果没有定时任务,则默认1秒
- 获取最快要执行的定时任务,如果有定时任务,计算该定时任务执行的时间点
- 2.如果没有快要执行的定时任务需要处理,则判断任务队列中有没有任务(在选择策略期间,任务队列中可能会有新任务加入)
- 如果选择策略期间有新任务加入,则直接让selector进行非阻塞选择,然后立即结束选择策略,进入后面逻辑
- 3.如果任务队列没有任务,进行第三步selector.select阻塞时选择,阻塞的时间由上面处理定时任务的时候计算好了
- 有5中情况会立即结束selector.select方法:
- 1)至少有一个channel被选择
- 2)seleter.wakeup()被调用
- 3)当前线程被打断
- 4)阻塞时间超时
- 5)当长时间没有就绪的channel时,轮询会出现长时间空转,从而会导致CPU占用率飙升,为了避免这种情况会使select()结束
- 如果出现·1、2两种结束情况·,或者
select过程中,任务队列中加入新任务
,或者定时任务执行时间到了
则会立即结束选择策略,进入后面处理任务逻辑 - 否则进入第四步,处理剩下的情况
- 有5中情况会立即结束selector.select方法:
- 4.处理导致选择结束阻塞的各种情况(5种)
- 线程被打断的情况
- select()方法是通过条件4结束的情况
- 通过条件5结束的情况
- 该情况是selector轮询空转导致CPU占用率飙升从而结束select方法的情况
- 如果空转次数大于等于指定的阈值512,则重新构建selector
- 1.处理定时任务,计算阻塞选择需要阻塞的时间
2.处理就绪的IO
不管selector选择这一步多么复杂,结束这一步后说明有任务需要处理了,包括就绪IO任务和任务队列的任务(定时任务等),先看处理就绪IO
protected void run() {
// 永久循环
for (;;) {
try {
// ------------------------- 1 selector选择 -------------------
// ...
...
// selector选择结束后:
cancelledKeys = 0;
needsToSelectAgain = false;
// ioRatio用于控制IO处理与任务队列中任务的处理所占时间比例
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
//100的话等于不用控制,全部都处理完
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
// ------------------------- 2 处理就绪的IO -------------------
// IO操作的开始时间
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// ------------------------- 3 执行任务队列中的任务 -------------------
// Ensure we always run tasks.
// IO操作总用时
final long ioTime = System.nanoTime() - ioStartTime;
// ioTime * [(100 - ioRatio) / ioRatio]
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
处理就绪IO核心方法processSelectedKeys:
public final class NioEventLoop extends SingleThreadEventLoop {
...
private SelectedSelectionKeySet selectedKeys;
private void processSelectedKeys() {
// 若selectedKeys是优化过的
if (selectedKeys != null) {
// 优化的
processSelectedKeysOptimized();
} else {
// 一般的
processSelectedKeysPlain(selector.selectedKeys());
}
}
...
}
注意
:
Netty对Nio对SelectionKeySet做过优化,看SelectedSelectionKeySet这个类:
Nio的Selector里面有三个集合,key set/Selected set/cancel set,都是Set
Netty优化就把这些Set都变成数组了,效率更高了
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
//改成数组了
SelectionKey[] keys;
//数组的length
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
...
}
2.1 如果是优化过的Set就执行processSelectedKeysOptimized:
//io.netty.channel.nio.NioEventLoop#processSelectedKeysOptimized
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// 设置为null以后channel关闭的时候可以马上就行gc
// 其就相当于对set集合处理时,要将处理过的key从set集合中删除是一样的,
// 为了避免对key的重复处理
selectedKeys.keys[i] = null;
// 对于NioEventLoop,key中的附件attachement中存放的是当前key所关联的NioChannel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// 处理当前遍历的key
processSelectedKey(k, (AbstractNioChannel) a);
} else {
//NioTask是在测试的时候放的,我们不用管
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
//io.netty.channel.nio.NioEventLoop#processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 处理key失效的情况
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
// 获取到当前key所有就绪的操作
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
// 若就绪操作中包含连接操作,处理连接就绪
// 这个一般在客户端连接才会有这种情况
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
// 处理一次就会取消监听连接事件
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
// 进行连接
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
// 处理写就绪
// 当将数据写入到buffer,那么当前channel就处于写就绪
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 还要检查readOps是否为0,以解决可能出现的JDK bug,否则可能导致自旋循环
// 如果没有关注的事件,也unsafe.read,只要read就会打断高速循环
// 处理读就绪 或 接收连接就绪
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
- 连接就绪:对于客户端连接才有这种情况
关于客户端连接就绪操作:
客户端发起连接请求,第一次直接连上了就不说了
如果没连上,这时候会把连接就绪事件注册到Selector,让Selector去监听连接,只要连接了一次没连接上,这个连接就处于连接就绪状态
所以客户端连接,先连一次,成功就成功了,否则就是处于就绪状态//io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#finishConnect public final void finishConnect() { // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. // 注意,只有在连接尝试既没有取消也没有超时的情况下,事件循环才会调用此方法。 assert eventLoop().inEventLoop(); try { boolean wasActive = isActive(); doFinishConnect(); fulfillConnectPromise(connectPromise, wasActive); } catch (Throwable t) { fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); } finally { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; } } //说了只有客户端才有这种事件,所以doFinishConnect的实现我们看NioSocketChannel //io.netty.channel.socket.nio.NioSocketChannel#doFinishConnect protected void doFinishConnect() throws Exception { //看到调用的是nio的finishConnect方法 //之前讲nio的时候说过 if (!javaChannel().finishConnect()) { throw new Error(); } }
- 写就绪:平常我们调用api向通道写数据,其实都是写入到Buffer,当写入到buffer,那么当前channel就处于写就绪状态,此时就会将给定缓冲区的内容刷新到远程对等点。
- 读就绪和接受连接就绪,都是通过read得到的
unsafe.read()方法有两个实现,message消息的是用来处理接受连接就绪的,byte的是用来处理读就绪的
看下NioMessageUnsafe:private final class NioMessageUnsafe extends AbstractNioUnsafe { private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read() { ... try { try { do { //读消息 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; //将消息传给pipline上的处理器,并触发其channelRead方法 //pipline下一章会介绍 pipeline.fireChannelRead(readBuf.get(i)); } ... } finally { ... } } } //看下读消息的实现 //io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { //这就是为什么接受连接触发channelRead方法时 //接受的参数msg就是Channel的原因 buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
再看下NioByteUnsafe:
//io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read public final void read() { ... ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator); //读消息数据 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; //将消息传到pipline上的处理器,触发其channelRead方法 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); ... } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { ... } } //读数据一般都是客户端对应的通道发来的数据 //io.netty.channel.socket.nio.NioSocketChannel#doReadBytes protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }
2.2 如果selectedKeys没有优化过,是一般的就走processSelectedKeysPlain方法
在回到处理就绪IO,如果selectedKeys没有优化过,是一般的就走processSelectedKeysPlain方法,和优化区别就是一个操作Set一个操作数组:
//io.netty.channel.nio.NioEventLoop#processSelectedKeysPlain
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
3.执行任务队列中的任务
io.netty.channel.nio.NioEventLoop#run中的第三步 执行任务队列中的任务
io.netty.channel.nio.NioEventLoop#run
protected void run() {
// 永久循环
for (;;) {
try {
// ------------------------- 1 selector选择 -------------------
...
cancelledKeys = 0;
needsToSelectAgain = false;
// ioRatio用于控制IO处理与任务队列中任务的处理所占时间比例
// ioRatio初始值50
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
//如果是100,不用控制,全部处理完
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
// ------------------------- 2 处理就绪的IO -------------------
// IO操作的开始时间
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// ------------------------- 3 执行任务队列中的任务 -------------------
// Ensure we always run tasks.
// IO操作总用时
final long ioTime = System.nanoTime() - ioStartTime;
// ioTime * [(100 - ioRatio) / ioRatio]
// 如果ioRatio是20,则(100-20)/20 = 4 ,即ioTime * 4
// 如果ioRatio是50,则(100-50)50 = 1 ,即ioTime * 1
// ioRatio用于控制IO处理与任务队列中任务的处理所占时间比例
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
接下来看任务处理方法:
//io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)
protected boolean runAllTasks(long timeoutNanos) {
// 将定时任务队列中所有可以执行的任务(即已经超过定时时间的任务还没处理的任务)添加到taskQueue
fetchFromScheduledTaskQueue();
// 从taskQueue中获取一个任务
Runnable task = pollTask();
// 若该任务为null,说明当前任务队列中没有任务了,
// 此时执行tailTasks中的收尾任务
if (task == null) {
afterRunningAllTasks();
return false;
}
// ScheduledFutureTask.nanoTime()返回的是ScheduledFutureTask类的存活时间
// 可以理解为异步任务系统启动到现在的时间
// 计算runAllTasks方法最多执行到哪个时间点必须结束,控制处理任务的时间
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
// 遍历执行taskQueue中的所有任务
for (;;) {
// 执行当前遍历的任务
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
// 每64次任务检查一次超时
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
} // end-for
// 执行tailTasks中的收尾任务
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
该方法三个核心流程:
- 将定时任务队列中所有可以执行的任务(即已经超过定时时间还没处理的任务)添加到taskQueue
- 执行当前遍历的任务
- 执行tailTasks中的收尾任务
3.1 将定时任务队列中所有可以执行的任务(即已经超过定时还没处理的任务)添加到taskQueue
//io.netty.util.concurrent.SingleThreadEventExecutor#fetchFromScheduledTaskQueue
// 将定时任务队列中所有可以执行的任务添加到taskQueue
private boolean fetchFromScheduledTaskQueue() {
// 获取当前时间相对于定时任务系统开始的时间的时长(即定时任务系统已经运行的时间)
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
// 从定时任务队列中取出一个最紧急的需要执行的定时任务
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
// 在定时任务不空的前提下,将任务添加到taskQueue
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
// 若没有添加成功,则重新放回到定时任务队列
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
// 从定时任务队列中再取出一个最紧急的需要执行的定时任务
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
从定时任务队列取一个最紧急的已经要执行的任务
//io.netty.util.concurrent.AbstractScheduledEventExecutor#pollScheduledTask(long)
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
// 从定时任务队列中取一个任务
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ?
null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
// 任务的期限时间都是相对于定时任务系统开始的那个时间的
// 若配置的期限时间比定时任务系统已经运行的时间小,说明这个任务早就应该执行了
if (scheduledTask.deadlineNanos() <= nanoTime) {
// 从定时任务队列中删除该任务
scheduledTaskQueue.remove();
// 返回该任务,以将其添加到taskQUeue去执行
return scheduledTask;
}
return null;
}
由此可以看出来,fetchFromScheduledTaskQueue方法就是将此时可以执行的所有异步任务添加到普通任务队列taskQueue
pollTask取任务:
//io.netty.util.concurrent.SingleThreadEventExecutor#pollTask
protected Runnable pollTask() {
assert inEventLoop();
return pollTaskFrom(taskQueue);
}
//io.netty.util.concurrent.SingleThreadEventExecutor#pollTaskFrom
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
// 从任务队列中取出一个任务,只要其不是一个唤醒任务,则直接返回
for (;;) {
Runnable task = taskQueue.poll();
if (task == WAKEUP_TASK) {
continue;
}
return task;
}
}
3.2 执行当前遍历的任务
safeExecute
//io.netty.util.concurrent.AbstractEventExecutor#safeExecute
protected static void safeExecute(Runnable task) {
try {
// 任务的run()最终在这里执行了
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
3.2 执行tailTasks中的收尾任务
afterRunningAllTasks
//io.netty.channel.SingleThreadEventLoop#afterRunningAllTasks
protected void afterRunningAllTasks() {
//tailTasks就是那个尾部任务队列
runAllTasksFrom(tailTasks);
}
//io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasksFrom
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
// 从任务队列中获取一个任务
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
safeExecute(task);
task = pollTaskFrom(taskQueue);
// task为null,说明taskQueue中的任务全部执行完毕
if (task == null) {
return true;
}
}
}
//io.netty.util.concurrent.SingleThreadEventExecutor#pollTaskFrom
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
// 从任务队列中取出一个任务,只要其不是一个唤醒任务,则直接返回
for (;;) {
Runnable task = taskQueue.poll();
if (task == WAKEUP_TASK) {
continue;
}
return task;
}
}
原文地址:https://blog.csdn.net/weixin_41947378/category_10273110.html
没有回复内容