NioEventLoopGroup和NioEventLoop源码分析
1. NioEventLoopGroup的构造分析
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
我们先从NioEventLoopGroup开始,NioEventLoopGroup根据名字就能知道里面肯定包含了很多NioEventLoop
先看下NioEventLoop继承体系:
关键点:NioEventLoop既是EventLoop,同时也是EventExecutor
看下顶层Executor执行器接口解释:
//java.util.concurrent.Executor
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
* 在将来的某个时候执行给定的命令。该命令可以在新线程中执行,
* 可以在合用线程中执行,也可以在调用线程中执行,由{@code Executor}实现决定。
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
在看下NioEventLoopGroup的继承体系:
本质也是一个执行器,也可以execute,它是一个线程池实现的Executor,而NioEventLoop是线程实现的Executor
现在跟NioEventLoopGroup源码,看构造干了啥:
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
// 这里的executor为”总“executor
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
//拿到了一个全局的Selector提供者,nio讲过
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {
//准备进入父类构造了
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
//io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup
//注意是父类的构造了
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
//如果nThreads为0,则取默认值DEFAULT_EVENT_LOOP_THREADS,默认值为当前主机逻辑处理器数量的2倍
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
//注意又是父类ExecutorGroup构造了
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
//核心方法就在这,另起一页!!
...
}
...
}
逻辑处理器:
如果nThreads为0,则取默认值DEFAULT_EVENT_LOOP_THREADS,看下这个值是什么:public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup { private static final int DEFAULT_EVENT_LOOP_THREADS; static { // 该默认值为当前主机逻辑处理器数量的2倍 DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } } ... }
我的电脑内核8核的,所以逻辑处理器数量是16,逻辑处理器数量2倍就是32
我在我的机器上启动Dubug,看下
紧接着上面跟踪的构造代码:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
// 创建一个总executor,这个executor将来会为每一个EventLoop创建一个子executor,
// 子executor在执行execute方法的时候,可以通过这个总的executor的
// 线程工厂创建一个新的线程,让新线程处理任务
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 使用一个数组来存放当前group中所包含的eventLoop
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建每一个eventLoop实例,并初始化到相应的数组元素中
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 创建一个EventLoop数组元素的选择器
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
注意:
- new ThreadPerTaskExecutor(newDefaultThreadFactory());
之前分析类图我们知道NioEventLoopGroup,本身就是一个executor,这行代码可以看出它还包含了一个executor(ThreadPerTaskExecutor),即executor嵌套executor
其实NioEventLoop也是一个executor,同时也包含了一个executor,后面会看到
这段代码中有三个重要流程:
- 初始化一个”总”的executor
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
创建一个总executor,这个executor将来会为每一个EventLoop创建一个子executor,子executor在执行execute方法的时候,可以通过这个总的executor的线程工厂创建一个新的线程,让新线程处理任务(后面会看到) - 创建每一个eventLoop实例(即NioEventLoop),并初始化到相应的数组元素中
children[i] = newChild(executor, args);
- 创建一个EventLoop数组元素的选择器
chooser = chooserFactory.newChooser(children);
1.1 初始化一个”总”的executor
先看executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()):
首先看下这个ThreadFactory是什么:
//io.netty.channel.MultithreadEventLoopGroup#newDefaultThreadFactory
protected ThreadFactory newDefaultThreadFactory() {
// getClass()的结果为NioEventLoopGroup类
return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
}
//io.netty.util.concurrent.DefaultThreadFactory
public class DefaultThreadFactory implements ThreadFactory {
...
public DefaultThreadFactory(Class<?> poolType, int priority) {
this(poolType, false, priority);
}
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
//daemon:是否是守护线程
//priority:线程优先级
this(toPoolName(poolType), daemon, priority);
}
public static String toPoolName(Class<?> poolType) {
if (poolType == null) {
throw new NullPointerException("poolType");
}
// 获取简单类名,即为NioEventLoopGroup
String poolName = StringUtil.simpleClassName(poolType);
switch (poolName.length()) {
case 0:
return "unknown";
case 1:
return poolName.toLowerCase(Locale.US);
default:
// 若第一个字母是大写,且第二个字母是小写
if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) {
// 将简单类名的首字母变为小写 ,即为 nioEventLoopGroup
return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1);
} else {
return poolName;
}
}
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this(poolName, daemon, priority, System.getSecurityManager() == null ?
Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
if (poolName == null) {
throw new NullPointerException("poolName");
}
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(
"priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
}
// 最终获取到的prefix的值为 nioEventLoopGroup-线程池id-
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
}
}
注意:
- new DefaultThreadFactory(getClass())
getClass返回的就是当前实际运行的类的Class
可以看到DefaultThreadFactory在构造的时候,会初始化好一个前缀名,以及线程相关参数(是否是守护线程、线程优先级、线程所属group),后面就是通过它创建线程。
在看下ThreadPerTaskExecutor,后面分析线程绑定流程的时候还会看到这个类:
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
// 这里会创建一个线程,并且启动这个线程,就是执行command的run()方法
threadFactory.newThread(command).start();
}
}
1.3 创建一个EventLoop数组元素的选择器
我们不按顺序,先看第三步,创建一个EventLoop数组元素的选择器:
在跟踪构造的时候我们可以知道chooserFactory就是DefaultEventExecutorChooserFactory.INSTANCE,即io.netty.util.concurrent.DefaultEventExecutorChooserFactory#DefaultEventExecutorChooserFactory:
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
...
public EventExecutorChooser newChooser(EventExecutor[] executors) {
// 判断length是否是2的整数次幂
if (isPowerOfTwo(executors.length)) {
// 2次幂EventExecutor选择器
return new PowerOfTwoEventExecutorChooser(executors);
} else {
// 普通EventExecutor选择器
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
// 轮询
// 若a是一个2的整数次幂,则 b & (a-1) 与 b % a 是等价的
// 但 b & (a-1) 的效率要更高
// 位运算效率更高
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
// 取模算法,普通轮询
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
由此可以看出,两个选择器的区别就是如果线程池数量是2的整数次幂,会采用位运算方式,效率更高,否则就用普通的取模算法。
1.2 创建每一个eventLoop实例,并初始化到相应的数组元素中
现在看最核心的children[i] = newChild(executor, args)方法,是个抽象方法,具体方法在其实现类NioEventLoopGroup
//io.netty.channel.nio.NioEventLoopGroup#newChild
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
//args里面的参数,我们之前通过io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup的构造可以找到具体是哪些参数
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
这里就开始NioEventLoop的源码分析了
2. NioEventLoop构造分析
我们继续看NioEventLoop:
public final class NioEventLoop extends SingleThreadEventLoop {
...
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
//selectorProvider是用来创建Selector的
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
//selectStrategy下一章讲任务处理的时候会看到
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 创建一个Selector元组,类似于元数据的意思,即Selector的元数据
// 这里就是初始化了一个Selector
final SelectorTuple selectorTuple = openSelector();
// 里面包含了selector,已封装的selector
selector = selectorTuple.selector;
// 未封装的selector,最原始的selector
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
//继续看super构造
//io.netty.channel.SingleThreadEventLoop#SingleThreadEventLoop
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
// 创建一个尾部任务队列,收尾任务存放在这里
// Netty中有三种类型任务,tailTask是在核心任务完成以后做收尾工作的任务
tailTasks = newTaskQueue(maxPendingTasks);
}
//继续跟super构造
//io.netty.util.concurrent.SingleThreadEventExecutor#SingleThreadEventExecutor
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// 使用总的executor为当前eventLoop创建一个executor
// 这个executor执行的时候会创建一个新线程处理任务
this.executor = ThreadExecutorMap.apply(executor, this);
// 创建一个任务队列
// 处理普通任务的队列,定时任务也放这里面
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
...
}
从NioEventLoop构造可以看到,Selector初始化的时机就是在NioEventLoop构造的时候,并且一个NioEventLoop包含一个Selector!!!!
- 在其父类SingleThreadEventExecutor构造可以看出,NioEventLoop会通过总的executor创建一个子executor
(这个子executor一执行任务,就会创建一个线程,让新线程去处理这个任务,NioEventLoop
在处理异步任务的时候,如果当前还没有绑定线程,就会使用这个executor
去执行任务,执行的过程中会将executor新创建的线程和自己绑定,在执行后续的任务的时候,就直接使用自己绑定的线程,这个后面会说。)
newTaskQueue:Mpsc,无锁队列
//io.netty.channel.nio.NioEventLoop#newTaskQueue protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { // This event loop never calls takeTask() return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue() : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks); } //io.netty.util.internal.PlatformDependent#newMpscQueue() public static <T> Queue<T> newMpscQueue() { return Mpsc.newMpscQueue(); }
创建一个新的{@link Queue},它可以安全地用于多个生产者(不同的线程)和一个消费者(一个线程!)
Netty中几乎所有的操作都是通过异步任务处理的,还包括定时任务,这些任务就放在这些队列里面,即一个EventLoop里面有两个任务队列,可以并发的向队列里添任务,但是任务的处理者就一个,即EventLoop绑定的线程。
看下核心方法this.executor = ThreadExecutorMap.apply(executor, this):注意此时this是当前类NioEventLoop的实例,executor是NioEventLoopGroup传过来的那个“总”的ThreadPerTaskExecutor
/**
* io.netty.util.internal.ThreadExecutorMap#apply
* @param executor 总的executor
* @param eventExecutor 当前正在创建的eventLoop
*/
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
//返回一个匿名的子executor
return new Executor() {
@Override
public void execute(final Runnable command) {
// executor.execute()会使用这个总的executor所绑定的线程factory创建一个线程
executor.execute(apply(command, eventExecutor));
}
};
}
//io.netty.util.internal.ThreadExecutorMap#apply
//command实际任务
//eventExecutor:当前正在创建的eventLoop,即NioEventLoop
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
// 为了保证线程安全,这里为当前线程指定其所关联的eventLoop
setCurrentEventExecutor(eventExecutor);
try {
// 真正任务的执行是在这里开始的
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
};
}
//io.netty.util.internal.ThreadExecutorMap#setCurrentEventExecutor
private static void setCurrentEventExecutor(EventExecutor executor) {
mappings.set(executor);
}
//mapping是一个ThreadLocal
private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();
可以看出来这个子的executor其实就是将要执行的任务封装了一下,然后交给了总的executor
接下来看下executor.execute(apply(command, eventExecutor)),总的executor的execute方法,即ThreadPerTaskExecutor.execute方法:
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
//这个factory是DefaultThreadFactory
//这个command是被封装过的command
// 这里会创建一个线程,并且启动这个线程,就是执行command的run()方法
threadFactory.newThread(command).start();
}
}
继续跟newThread,又回到DefaultThreadFactory
public class DefaultThreadFactory implements ThreadFactory {
...
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
this(toPoolName(poolType), daemon, priority);
}
public static String toPoolName(Class<?> poolType) {...}
public Thread newThread(Runnable r) {
// 创建一个线程,使用线程的名称为 nioEventLoopGroup-线程池id-线程id
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
protected Thread newThread(Runnable r, String name) {
//FastThreadLocalThread extends Thread
return new FastThreadLocalThread(threadGroup, r, name);
}
}
总结:
上面跟的主要是NioEventLoopGroup和EventLoopGroup初始化过程 ,主要有如下几点:
NioEventLoopGroup本身是一个executor
- 包含了一个executor,ThreadPerTaskExecutor(总的executor,子EventLoop需要依靠它生成子的executor)
- 一个数组(EventExecutor[],放的就是NioEventLoop)
- 一个chooser(根据算法从group中返回一个子eventLoop,根据子EventLoop数量不一样,实现不一样)
NioEventLoop本身也是一个executor
- 包含了一个executor(使用总的executor为当前eventLoop创建一个executor)
- 该executor只要执行任务,都会创建一个新的线程,是通过ThreadPerTaskExecutor里面的DefaultThreadFactory.newThread创建的,
但是线程创建的时机不在这,这里只是初始化了一个executor,必须调用了executor.execute方法才会创建线程
- 该executor只要执行任务,都会创建一个新的线程,是通过ThreadPerTaskExecutor里面的DefaultThreadFactory.newThread创建的,
- 一个任务队列taskQueue(Mpsc无锁队列,多线程生产者,单线程消费者是线程安全的)
- 一个尾部任务队列tailTasks(Mpsc无锁队列,多线程生产者,单线程消费者是线程安全的)
- 一个Selector元组(Selector的初始化时机就在这,可以看出NioEventLoop和Selector 是1:1关系)
NioEventLoop绑定线程的流程分析
NioEventLoop一开始初始化后并没有绑定线程,而是第一次执行channel注册的时候完成绑定的,现在我们分析NioServerSocketChannel注册Selector的任务:
io.netty.bootstrap.AbstractBootstrap#bind(int)
io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
io.netty.bootstrap.AbstractBootstrap#doBind
io.netty.bootstrap.AbstractBootstrap#initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 创建一个channel
channel = channelFactory.newChannel();
// 初始化channel
init(channel);
} catch (Throwable t) {
...
}
// 将当前channel注册给selector
//ServerBootstrapConfig.NioEventLoopGroup.register
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
config().group()返回的是NioEventLoopGroup,所以我们看下NioEventLoopGroup.register方法,其实现方法在其父类MultithreadEventLoopGroup
//io.netty.channel.MultithreadEventLoopGroup#register
public ChannelFuture register(Channel channel) {
// 从parentGroup中根据算法选择一个eventLoop来完成注册
return next().register(channel);
}
next()方法的默认实现也在父类MultithreadEventExecutorGroup中,会根据chooser指定种算法返回一个EventLoop:
//io.netty.util.concurrent.MultithreadEventExecutorGroup#next public EventExecutor next() { // 使用轮询方式获取下一个EventLoop return chooser.next(); }
因为我们配置的线程数默认是32,2的整数次幂,所以实现类chooser走的是PowerOfTwoEventExecutorChooser:
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { // 轮询 // 若a是一个2的整数次幂,则 b & (a-1) 与 b % a 是等价的 // 但 b & (a-1) 的效率要更高 return executors[idx.getAndIncrement() & executors.length - 1]; } }
返回EventLoop后也会调用eventLoop的register方法,我们应该跟NioEventLoop,走的是其父类SingleThreadEventLoop的register方法:
//io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
//io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
unsafe()返回的就是NioSocketChannelUnsafe,register方法实现在继承的抽象类中:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
// 这里实现了channel与eventLoop的绑定
// 注意此时,eventLoop绑定的线程还没有初始化!!!
AbstractChannel.this.eventLoop = eventLoop;
// 判断当前正在执行的线程是否是当前eventLoop所绑定的线程
if (eventLoop.inEventLoop()) {
// 若当前线程是eventLoop绑定线程,则直接让这个线程来完成注册操作
register0(promise);
} else {
// 当前线程不是eventLoop绑定线程,则首先会创建一个线程,
// 然后使用这个新创建的eventLoop线程来完成注册
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
注意
:
第一次调用的时候,此时eventLoop 在NioEventLoopGroup构造的时候已经初始化好了,eventLoop绑定的executor也初始化好了,但是NioEventLoopGroup绑定线程还没有初始化好!
//io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop public boolean inEventLoop(Thread thread) { //this.thread:即为当前eventLoop绑定的线程,第一次调用还没绑定,所以是null return thread == this.thread; }
this就是当前NioEventLoop,可以看到this.thread为null,还没有绑定线程
如果当前eventLoop第一次执行任务,还没有绑定线程,所以肯定走else,即
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
我们跟NioEventLoop的execute方法,其实现在继承的SingleThreadEventExecutor中:
//io.netty.util.concurrent.SingleThreadEventExecutor#execute
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 若当前线程是当前EventLoop所绑定的线程,则返回true,否则返回false
// 第一次执行execute肯定是false,线程还没创建呢
boolean inEventLoop = inEventLoop();
// 将任务添加到taskQueue
// 此时任务是NioServerChannel注册Selector的任务
addTask(task);
if (!inEventLoop) {
// 创建并启动线程
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
看下添加任务
//io.netty.util.concurrent.SingleThreadEventExecutor#addTask
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 将任务添加到taskQueue
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
// 放入队列
return taskQueue.offer(task);
}
protected final void reject(Runnable task) {
rejectedExecutionHandler.rejected(task, this);
}
可以看到所谓执行注册Selector的任务,只是先将任务放入了普通任务队列!
因为当前eventLoop是第一次执行,还没绑定线程,所以inEventLoop肯定是false,会走startThread():
//io.netty.util.concurrent.SingleThreadEventExecutor#startThread
private void startThread() {
//判断当前eventLoop状态,如果还没开启
if (state == ST_NOT_STARTED) {
// 通过CAS将状态修改为已启动
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
// 启动线程
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}
继续跟doStratThread:
private void doStartThread() {
assert thread == null;
// 调用当前EventLoop所包含的executor(子executor,可以通过总executor创建线程的那个匿名executor)
executor.execute(new Runnable() {
@Override
public void run() {
//绑定线程的核心方法就在这!!!!!!!!
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//进行selector的选择,然后执行三类任务
//this指的是当前正在访问这段代码的对象,当在内部类中使用this指的就是内部类的对象,
//为了访问外层类对象,就可以使用外层类名.this来访问,一般也只在这种情况下使用这种
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
...
}
}
});
}
可以看到,因为当前NioEventLoop还没有绑定线程,同时也代表当前NioEventLoop还没有启动,所以会让当前NioEventLoop绑定的子executor执行一个非常重要的核心任务
!子executor只要执行任务就会创建一个线程来处理这个任务,我们只要在执行任务的过程中,将这个新创建的线程和自己绑定,即可完成绑定。即此时做了两个非常重要的事:
- 1.创建并绑定线程
- 2.开启核心任务(这个核心任务其实就是通过此时绑定的线程,轮询处理Selector.select的各种I/O事件、各种Netty中的普通任务、定时任务、尾部任务等,下一章会详细讲。)
上面run()方法中的thread = Thread.currentThread()就是绑定线程的核心方法,因为如果正在走到这个方法的时候,其实已经创建好了新线程。
EventLoop对应的Selector.select的核心方法其实就是SingleThreadEventExecutor.this.run()里面,下次讲,这次重点不在这。
我们跟 executor.execute方法,其实就走到之前跟过的地方:
//io.netty.util.internal.ThreadExecutorMap#apply
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Executor() {
@Override
public void execute(final Runnable command) {
// executor.execute()会使用这个总的executor所绑定的线程factory创建一个线程
// 注意:此时的command不是注册Selector的command了,而是上面进行selector的选择的command了
executor.execute(apply(command, eventExecutor));
}
};
}
注意:此时的command不是注册Selector的command了,注册Selector的commond已经被加入队列了,这里是上面说的那个核心任务
这个executor就是总的executor,在看总的executor:
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
// 这里会创建一个线程,并且启动这个线程,就是执行command的run()方法
threadFactory.newThread(command).start();
}
}
总结
通过以上分析,我们知道了当从NioEventLoopGroup中获取一个NioEventLoop后,只要这个NioEventLoop是第一次执行任务(通常第一次执行的任务都是注册channel的任务),即EventLoop第一次执行execute方法的时候,会先把要执行的任务先放入队列,然后通过子executor开启核心任务(核心任务是专门处理selector的选择、各种I/O事件,普通任务、定时任务等),子executor会通过总executor创建一个线程来处理任务,新线程处理这个核心任务的过程中,会完成NioEventLoop和线程的绑定,而这个核心任务就是用来处理队列中的任务的,并且还有其他Selector.select的各种事件的处理、定时任务等,任务的处理下一章会讲。
- 只要通过NioEventLoop.execute执行任务,都只是把任务加入队列,由NioEventLoop开启的核心任务去处理队列中的任务。
看下NioEventLoop已经绑定线程,开启状态下执行异步任务的流程分析
我们在回到io.netty.bootstrap.AbstractBootstrap#doBind,看下另一个任务,绑定端口号的任务:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 创建、初始化channel,并将其注册到Selector
final ChannelFuture regFuture = initAndRegister();
// 从异步结果中获取channel
final Channel channel = regFuture.channel();
// 获取异步操作执行过程中发生的异常
if (regFuture.cause() != null) {
return regFuture;
}
// 判断当前异步操作是否完成:或者是成功,或者是异常
if (regFuture.isDone()) { // 若异步操作成功
// At this point we know that the registration was complete and successful.
// 创建一个可修改的异步结果对象channelFuture
ChannelPromise promise = channel.newPromise();
// 绑定端口号
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else { // 若异步操作未完成
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 为异步操作添加监听器
regFuture.addListener(new ChannelFutureListener() {
// 当异步操作完成(成功,异常),就会触发该方法的执行
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 获取异步操作执行过程中发生的异常
Throwable cause = future.cause();
if (cause != null) { // 异步执行过程发生异常
...
} else {
promise.registered();
// 绑定端口号
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
//io.netty.bootstrap.AbstractBootstrap#doBind0
//该方法的执行,肯定channel已经注册到Selector了
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
//绑定任务
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
我们看这个execute方法,还是进入到SingleThreadEventExecutor:当绑定端口任务进来的时候,这个时候已经创建过并绑定好线程了,肯定是true,只会做添加任务动作
//io.netty.util.concurrent.SingleThreadEventExecutor#execute
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 若当前线程是当前EventLoop所绑定的线程,则返回true,否则返回false
// 当绑定端口任务进来的时候,这个时候肯定是true,只会做添加任务动作
boolean inEventLoop = inEventLoop();
// 将任务添加到taskQueue
addTask(task);
if (!inEventLoop) {
...
}
...
}
整体总结:
NioEventLoopGroup
- 本身是一个executor
- 包含了一个总的executor(ThreadPerTaskExecutor)
- 会为group中的每个子eventLoop都创建一个匿名子executor
- 该executor封装了一个DefaultThreadFactory线程工厂,在执行execute方法的时候可以创建一个线程,
只负责创建线程!线程和eventLoop绑定线程的逻辑不在这里!在EventLoop.execute里触发
- 包含一个数组,封装EventLoop的
- 包含了一个chooser,两种策略都是
轮询
,根据是否是2的整次数幂决定用哪种算法,效率高底的问题
NioEventLoop
- 本身也是一个executor
- 包含了一个子executor,是通过总的executor生成的
- 包含了一个selector元组和SelectStrategy
- 包含两个任务队列:tailTasks(尾部任务队列),taskQueue(任务队列)
任务跟踪:
- 什么时候Channel和EventLoop绑定?什么时候EventLoop和线程绑定?
简单来说是在执行channel注册Selector的时候,进行绑定的,先Channel和EventLoop绑定,后EventLoop和线程绑定
- 细说的话:
- Channel和EventLoop:
- 在ServerBootstrap执行bind方法,创建、初始化channel后,执行EventLoopGroup.register(channel)注册方法的时候,底层会调用Unsafe.register,在Unsafe.register中会进行 AbstractChannel.this.eventLoop = eventLoop,即完成绑定,之后会执行register0方法,是一个channel注册Selector的任务
- EventLoop和线程:
- 在上一步Channel和EventLoop绑定后,会执行register0方法,是一个channel注册Selector的任务,如果当前执行线程不是eventLoop绑定的线程,会通过eventLoop.execute异步执行的,EventLoop执行execute方法时候,会先将注册Selector的任务放入任务队列,然后会判断当前执行线程不是eventLoop绑定的线程(不是的情况就是还没有创建、启动、绑定线程),如果不是会通过子executor.execute方法,执行一个核心的”Selector.select任务”,(只有这个核心任务不是放入队列,立即处理的)该任务中会进行线程绑定thread = Thread.currentThread()(因为子executor里面是调用总的executor,总的executor里面有个线程工厂,具有创建线程执行任务的功能,所以Thread.currentThread()这个时候肯定是新创建的线程)
NioEventLoop 任务的执行(下章讲)
原文地址:https://blog.csdn.net/weixin_41947378/category_10273110.html
没有回复内容