Netty(八)源码解析 之 NioEventLoopGroup和NioEventLoop源码-Java专区论坛-技术-SpringForAll社区

Netty(八)源码解析 之 NioEventLoopGroup和NioEventLoop源码

NioEventLoopGroup和NioEventLoop源码分析

1. NioEventLoopGroup的构造分析

EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();

我们先从NioEventLoopGroup开始,NioEventLoopGroup根据名字就能知道里面肯定包含了很多NioEventLoop
先看下NioEventLoop继承体系:
在这里插入图片描述
关键点:NioEventLoop既是EventLoop,同时也是EventExecutor

看下顶层Executor执行器接口解释:

//java.util.concurrent.Executor
public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     * 在将来的某个时候执行给定的命令。该命令可以在新线程中执行,
     * 可以在合用线程中执行,也可以在调用线程中执行,由{@code Executor}实现决定。
     * 
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

在看下NioEventLoopGroup的继承体系:
在这里插入图片描述

本质也是一个执行器,也可以execute,它是一个线程池实现的Executor,而NioEventLoop是线程实现的Executor

现在跟NioEventLoopGroup源码,看构造干了啥:

public class NioEventLoopGroup extends MultithreadEventLoopGroup {

    public NioEventLoopGroup() {
        this(0);
    }
    public NioEventLoopGroup(int nThreads) {
        // 这里的executor为”总“executor
        this(nThreads, (Executor) null);
    }
    public NioEventLoopGroup(int nThreads, Executor executor) {
    //拿到了一个全局的Selector提供者,nio讲过
        this(nThreads, executor, SelectorProvider.provider());
    }
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {
        //准备进入父类构造了
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
    //io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup
    //注意是父类的构造了
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    //如果nThreads为0,则取默认值DEFAULT_EVENT_LOOP_THREADS,默认值为当前主机逻辑处理器数量的2倍
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    //注意又是父类ExecutorGroup构造了
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
//核心方法就在这,另起一页!!                            
...
}
    ...
}

逻辑处理器:
如果nThreads为0,则取默认值DEFAULT_EVENT_LOOP_THREADS,看下这个值是什么:

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements  EventLoopGroup {
 private static final int DEFAULT_EVENT_LOOP_THREADS;

 static {
     // 该默认值为当前主机逻辑处理器数量的2倍
     DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
             "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
         logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
     }
 }
 ...
}

在这里插入图片描述
我的电脑内核8核的,所以逻辑处理器数量是16,逻辑处理器数量2倍就是32
我在我的机器上启动Dubug,看下
在这里插入图片描述

紧接着上面跟踪的构造代码:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        // 创建一个总executor,这个executor将来会为每一个EventLoop创建一个子executor,
        // 子executor在执行execute方法的时候,可以通过这个总的executor的
        // 线程工厂创建一个新的线程,让新线程处理任务
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    // 使用一个数组来存放当前group中所包含的eventLoop
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 创建每一个eventLoop实例,并初始化到相应的数组元素中
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    // 创建一个EventLoop数组元素的选择器
    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

注意:

  • new ThreadPerTaskExecutor(newDefaultThreadFactory());
    之前分析类图我们知道NioEventLoopGroup,本身就是一个executor,这行代码可以看出它还包含了一个executor(ThreadPerTaskExecutor),即executor嵌套executor
    其实NioEventLoop也是一个executor,同时也包含了一个executor,后面会看到

这段代码中有三个重要流程:

  • 初始化一个”总”的executor
    executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    创建一个总executor,这个executor将来会为每一个EventLoop创建一个子executor,子executor在执行execute方法的时候,可以通过这个总的executor的线程工厂创建一个新的线程,让新线程处理任务(后面会看到)
  • 创建每一个eventLoop实例(即NioEventLoop),并初始化到相应的数组元素中
    children[i] = newChild(executor, args);
  • 创建一个EventLoop数组元素的选择器
    chooser = chooserFactory.newChooser(children);

1.1 初始化一个”总”的executor

先看executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()):
首先看下这个ThreadFactory是什么:

//io.netty.channel.MultithreadEventLoopGroup#newDefaultThreadFactory
protected ThreadFactory newDefaultThreadFactory() {
    // getClass()的结果为NioEventLoopGroup类
    return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
}

//io.netty.util.concurrent.DefaultThreadFactory
public class DefaultThreadFactory implements ThreadFactory {
...
    public DefaultThreadFactory(Class<?> poolType, int priority) {
        this(poolType, false, priority);
    }
    public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
    //daemon:是否是守护线程
    //priority:线程优先级
        this(toPoolName(poolType), daemon, priority);
    }

    public static String toPoolName(Class<?> poolType) {
        if (poolType == null) {
            throw new NullPointerException("poolType");
        }
        // 获取简单类名,即为NioEventLoopGroup
        String poolName = StringUtil.simpleClassName(poolType);
        switch (poolName.length()) {
            case 0:
                return "unknown";
            case 1:
                return poolName.toLowerCase(Locale.US);
            default:
                // 若第一个字母是大写,且第二个字母是小写
                if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) {
                    // 将简单类名的首字母变为小写 ,即为  nioEventLoopGroup
                    return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1);
                } else {
                    return poolName;
                }
        }
    }

    public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
        this(poolName, daemon, priority, System.getSecurityManager() == null ?
                Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
    }

    public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
        if (poolName == null) {
            throw new NullPointerException("poolName");
        }
        if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
            throw new IllegalArgumentException(
                    "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
        }
        // 最终获取到的prefix的值为    nioEventLoopGroup-线程池id-
        prefix = poolName + '-' + poolId.incrementAndGet() + '-';
        this.daemon = daemon;
        this.priority = priority;
        this.threadGroup = threadGroup;
    }
}

注意:

  • new DefaultThreadFactory(getClass())
    getClass返回的就是当前实际运行的类的Class
    在这里插入图片描述

可以看到DefaultThreadFactory在构造的时候,会初始化好一个前缀名,以及线程相关参数(是否是守护线程、线程优先级、线程所属group),后面就是通过它创建线程。

在看下ThreadPerTaskExecutor,后面分析线程绑定流程的时候还会看到这个类:

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        // 这里会创建一个线程,并且启动这个线程,就是执行command的run()方法
        threadFactory.newThread(command).start();
    }
}

1.3 创建一个EventLoop数组元素的选择器

我们不按顺序,先看第三步,创建一个EventLoop数组元素的选择器:
在跟踪构造的时候我们可以知道chooserFactory就是DefaultEventExecutorChooserFactory.INSTANCE,即io.netty.util.concurrent.DefaultEventExecutorChooserFactory#DefaultEventExecutorChooserFactory:

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
...

    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        // 判断length是否是2的整数次幂
        if (isPowerOfTwo(executors.length)) {
            // 2次幂EventExecutor选择器
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            // 普通EventExecutor选择器
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            // 轮询
            // 若a是一个2的整数次幂,则 b & (a-1) 与  b % a  是等价的
            // 但 b & (a-1)  的效率要更高
            // 位运算效率更高
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            // 取模算法,普通轮询
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}

由此可以看出,两个选择器的区别就是如果线程池数量是2的整数次幂,会采用位运算方式,效率更高,否则就用普通的取模算法。

1.2 创建每一个eventLoop实例,并初始化到相应的数组元素中

现在看最核心的children[i] = newChild(executor, args)方法,是个抽象方法,具体方法在其实现类NioEventLoopGroup

//io.netty.channel.nio.NioEventLoopGroup#newChild
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
//args里面的参数,我们之前通过io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup的构造可以找到具体是哪些参数
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

这里就开始NioEventLoop的源码分析了

2. NioEventLoop构造分析

我们继续看NioEventLoop:

public final class NioEventLoop extends SingleThreadEventLoop {
...

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
        //selectorProvider是用来创建Selector的
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
        //selectStrategy下一章讲任务处理的时候会看到
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        // 创建一个Selector元组,类似于元数据的意思,即Selector的元数据
        // 这里就是初始化了一个Selector
        final SelectorTuple selectorTuple = openSelector();
    // 里面包含了selector,已封装的selector
        selector = selectorTuple.selector;
        // 未封装的selector,最原始的selector
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }
    
//继续看super构造
//io.netty.channel.SingleThreadEventLoop#SingleThreadEventLoop
    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
        // 创建一个尾部任务队列,收尾任务存放在这里
        // Netty中有三种类型任务,tailTask是在核心任务完成以后做收尾工作的任务
        tailTasks = newTaskQueue(maxPendingTasks);
    }
    //继续跟super构造
    //io.netty.util.concurrent.SingleThreadEventExecutor#SingleThreadEventExecutor
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        // 使用总的executor为当前eventLoop创建一个executor
        // 这个executor执行的时候会创建一个新线程处理任务
        this.executor = ThreadExecutorMap.apply(executor, this);
        // 创建一个任务队列
        // 处理普通任务的队列,定时任务也放这里面
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
...
}
  • 从NioEventLoop构造可以看到,Selector初始化的时机就是在NioEventLoop构造的时候,并且一个NioEventLoop包含一个Selector!!!!
  • 在其父类SingleThreadEventExecutor构造可以看出,NioEventLoop会通过总的executor创建一个子executor
    (这个子executor一执行任务,就会创建一个线程,让新线程去处理这个任务,NioEventLoop在处理异步任务的时候,如果当前还没有绑定线程,就会使用这个executor去执行任务,执行的过程中会将executor新创建的线程和自己绑定,在执行后续的任务的时候,就直接使用自己绑定的线程,这个后面会说。)
  • newTaskQueue:Mpsc,无锁队列

    //io.netty.channel.nio.NioEventLoop#newTaskQueue
    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
        // This event loop never calls takeTask()
        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
                                                    : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
    }
    
    //io.netty.util.internal.PlatformDependent#newMpscQueue()
    public static <T> Queue<T> newMpscQueue() {
        return Mpsc.newMpscQueue();
    }
    

    创建一个新的{@link Queue},它可以安全地用于多个生产者(不同的线程)和一个消费者(一个线程!)
    在这里插入图片描述
    Netty中几乎所有的操作都是通过异步任务处理的,还包括定时任务,这些任务就放在这些队列里面,即一个EventLoop里面有两个任务队列,可以并发的向队列里添任务,但是任务的处理者就一个,即EventLoop绑定的线程。

看下核心方法this.executor = ThreadExecutorMap.apply(executor, this):
注意此时this是当前类NioEventLoop的实例,executor是NioEventLoopGroup传过来的那个“总”的ThreadPerTaskExecutor

/**
 * io.netty.util.internal.ThreadExecutorMap#apply
 * @param executor  总的executor
 * @param eventExecutor  当前正在创建的eventLoop
 */
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
    ObjectUtil.checkNotNull(executor, "executor");
    ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
    //返回一个匿名的子executor
    return new Executor() {
        @Override
        public void execute(final Runnable command) {
            // executor.execute()会使用这个总的executor所绑定的线程factory创建一个线程
            executor.execute(apply(command, eventExecutor));
        }
    };
}
//io.netty.util.internal.ThreadExecutorMap#apply
//command实际任务
//eventExecutor:当前正在创建的eventLoop,即NioEventLoop
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
    ObjectUtil.checkNotNull(command, "command");
    ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
    return new Runnable() {
        @Override
        public void run() {
            // 为了保证线程安全,这里为当前线程指定其所关联的eventLoop
            setCurrentEventExecutor(eventExecutor);
            try {
                // 真正任务的执行是在这里开始的
                command.run();
            } finally {
                setCurrentEventExecutor(null);
            }
        }
    };
}

//io.netty.util.internal.ThreadExecutorMap#setCurrentEventExecutor
private static void setCurrentEventExecutor(EventExecutor executor) {
    mappings.set(executor);
}
//mapping是一个ThreadLocal
private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();

可以看出来这个子的executor其实就是将要执行的任务封装了一下,然后交给了总的executor

接下来看下executor.execute(apply(command, eventExecutor)),总的executor的execute方法,即ThreadPerTaskExecutor.execute方法:

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
//这个factory是DefaultThreadFactory
//这个command是被封装过的command
        // 这里会创建一个线程,并且启动这个线程,就是执行command的run()方法
        threadFactory.newThread(command).start();
    }
}

继续跟newThread,又回到DefaultThreadFactory

public class DefaultThreadFactory implements ThreadFactory {
...
    public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
        this(toPoolName(poolType), daemon, priority);
    }

    public static String toPoolName(Class<?> poolType) {...}

    public Thread newThread(Runnable r) {
        // 创建一个线程,使用线程的名称为  nioEventLoopGroup-线程池id-线程id
        Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
        try {
            if (t.isDaemon() != daemon) {
                t.setDaemon(daemon);
            }

            if (t.getPriority() != priority) {
                t.setPriority(priority);
            }
        } catch (Exception ignored) {
            // Doesn't matter even if failed to set.
        }
        return t;
    }
    protected Thread newThread(Runnable r, String name) {
    //FastThreadLocalThread extends Thread
        return new FastThreadLocalThread(threadGroup, r, name);
    }
}

总结:

上面跟的主要是NioEventLoopGroup和EventLoopGroup初始化过程 ,主要有如下几点:

NioEventLoopGroup本身是一个executor

  • 包含了一个executor,ThreadPerTaskExecutor(总的executor,子EventLoop需要依靠它生成子的executor)
  • 一个数组(EventExecutor[],放的就是NioEventLoop)
  • 一个chooser(根据算法从group中返回一个子eventLoop,根据子EventLoop数量不一样,实现不一样)

NioEventLoop本身也是一个executor

  • 包含了一个executor(使用总的executor为当前eventLoop创建一个executor)
    • 该executor只要执行任务,都会创建一个新的线程,是通过ThreadPerTaskExecutor里面的DefaultThreadFactory.newThread创建的,但是线程创建的时机不在这,这里只是初始化了一个executor,必须调用了executor.execute方法才会创建线程
  • 一个任务队列taskQueue(Mpsc无锁队列,多线程生产者,单线程消费者是线程安全的)
  • 一个尾部任务队列tailTasks(Mpsc无锁队列,多线程生产者,单线程消费者是线程安全的)
  • 一个Selector元组(Selector的初始化时机就在这,可以看出NioEventLoop和Selector 是1:1关系)

NioEventLoop绑定线程的流程分析

NioEventLoop一开始初始化后并没有绑定线程,而是第一次执行channel注册的时候完成绑定的,现在我们分析NioServerSocketChannel注册Selector的任务:

io.netty.bootstrap.AbstractBootstrap#bind(int)
io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
io.netty.bootstrap.AbstractBootstrap#doBind
io.netty.bootstrap.AbstractBootstrap#initAndRegister

   final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 创建一个channel
            channel = channelFactory.newChannel();
            // 初始化channel
            init(channel);
        } catch (Throwable t) {
...
        }

        // 将当前channel注册给selector
        //ServerBootstrapConfig.NioEventLoopGroup.register
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

config().group()返回的是NioEventLoopGroup,所以我们看下NioEventLoopGroup.register方法,其实现方法在其父类MultithreadEventLoopGroup

//io.netty.channel.MultithreadEventLoopGroup#register
public ChannelFuture register(Channel channel) {
    // 从parentGroup中根据算法选择一个eventLoop来完成注册
    return next().register(channel);
}

next()方法的默认实现也在父类MultithreadEventExecutorGroup中,会根据chooser指定种算法返回一个EventLoop:

//io.netty.util.concurrent.MultithreadEventExecutorGroup#next
public EventExecutor next() {
 // 使用轮询方式获取下一个EventLoop
 return chooser.next();
}

因为我们配置的线程数默认是32,2的整数次幂,所以实现类chooser走的是PowerOfTwoEventExecutorChooser:

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
 private final AtomicInteger idx = new AtomicInteger();
 private final EventExecutor[] executors;

 PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
     this.executors = executors;
 }

 @Override
 public EventExecutor next() {
     // 轮询
     // 若a是一个2的整数次幂,则 b & (a-1) 与  b % a  是等价的
     // 但 b & (a-1)  的效率要更高
     return executors[idx.getAndIncrement() & executors.length - 1];
 }
}

返回EventLoop后也会调用eventLoop的register方法,我们应该跟NioEventLoop,走的是其父类SingleThreadEventLoop的register方法:

//io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}
//io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

unsafe()返回的就是NioSocketChannelUnsafe,register方法实现在继承的抽象类中:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
    // 这里实现了channel与eventLoop的绑定
    // 注意此时,eventLoop绑定的线程还没有初始化!!!
    AbstractChannel.this.eventLoop = eventLoop;

    // 判断当前正在执行的线程是否是当前eventLoop所绑定的线程
    if (eventLoop.inEventLoop()) {
        // 若当前线程是eventLoop绑定线程,则直接让这个线程来完成注册操作
        register0(promise);
    } else {
        // 当前线程不是eventLoop绑定线程,则首先会创建一个线程,
        // 然后使用这个新创建的eventLoop线程来完成注册
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

注意

  • 第一次调用的时候,此时eventLoop 在NioEventLoopGroup构造的时候已经初始化好了,eventLoop绑定的executor也初始化好了,但是NioEventLoopGroup绑定线程还没有初始化好!

    //io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop
    public boolean inEventLoop(Thread thread) {
    //this.thread:即为当前eventLoop绑定的线程,第一次调用还没绑定,所以是null
        return thread == this.thread;
    }
    

    在这里插入图片描述
    this就是当前NioEventLoop,可以看到this.thread为null,还没有绑定线程

如果当前eventLoop第一次执行任务,还没有绑定线程,所以肯定走else,即

eventLoop.execute(new Runnable() {
    @Override
    public void run() {
        register0(promise);
    }
});

我们跟NioEventLoop的execute方法,其实现在继承的SingleThreadEventExecutor中:

//io.netty.util.concurrent.SingleThreadEventExecutor#execute
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    // 若当前线程是当前EventLoop所绑定的线程,则返回true,否则返回false
    // 第一次执行execute肯定是false,线程还没创建呢
    boolean inEventLoop = inEventLoop();
    // 将任务添加到taskQueue
    // 此时任务是NioServerChannel注册Selector的任务
    addTask(task);
    if (!inEventLoop) {
        // 创建并启动线程
        startThread();
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {
                reject();
            }
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

看下添加任务

//io.netty.util.concurrent.SingleThreadEventExecutor#addTask
protected void addTask(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    // 将任务添加到taskQueue
    if (!offerTask(task)) {
        reject(task);
    }
}

final boolean offerTask(Runnable task) {
    if (isShutdown()) {
        reject();
    }
    // 放入队列
    return taskQueue.offer(task);
}

protected final void reject(Runnable task) {
    rejectedExecutionHandler.rejected(task, this);
}

可以看到所谓执行注册Selector的任务,只是先将任务放入了普通任务队列!

因为当前eventLoop是第一次执行,还没绑定线程,所以inEventLoop肯定是false,会走startThread():

//io.netty.util.concurrent.SingleThreadEventExecutor#startThread
private void startThread() {
//判断当前eventLoop状态,如果还没开启
    if (state == ST_NOT_STARTED) {
        // 通过CAS将状态修改为已启动
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            try {
                // 启动线程
                doStartThread();
            } catch (Throwable cause) {
                STATE_UPDATER.set(this, ST_NOT_STARTED);
                PlatformDependent.throwException(cause);
            }
        }
    }
}

继续跟doStratThread:

private void doStartThread() {
    assert thread == null;
    // 调用当前EventLoop所包含的executor(子executor,可以通过总executor创建线程的那个匿名executor)
    executor.execute(new Runnable() {
        @Override
        public void run() {
        //绑定线程的核心方法就在这!!!!!!!!
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                //进行selector的选择,然后执行三类任务
                //this指的是当前正在访问这段代码的对象,当在内部类中使用this指的就是内部类的对象,
//为了访问外层类对象,就可以使用外层类名.this来访问,一般也只在这种情况下使用这种
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
...
}
        }
    });
}

可以看到,因为当前NioEventLoop还没有绑定线程,同时也代表当前NioEventLoop还没有启动,所以会让当前NioEventLoop绑定的子executor执行一个非常重要的核心任务!子executor只要执行任务就会创建一个线程来处理这个任务,我们只要在执行任务的过程中,将这个新创建的线程和自己绑定,即可完成绑定。即此时做了两个非常重要的事:

  • 1.创建并绑定线程
  • 2.开启核心任务(这个核心任务其实就是通过此时绑定的线程,轮询处理Selector.select的各种I/O事件、各种Netty中的普通任务、定时任务、尾部任务等,下一章会详细讲。)

上面run()方法中的thread = Thread.currentThread()就是绑定线程的核心方法,因为如果正在走到这个方法的时候,其实已经创建好了新线程。

EventLoop对应的Selector.select的核心方法其实就是SingleThreadEventExecutor.this.run()里面,下次讲,这次重点不在这。
我们跟 executor.execute方法,其实就走到之前跟过的地方:

//io.netty.util.internal.ThreadExecutorMap#apply
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
    ObjectUtil.checkNotNull(executor, "executor");
    ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
    return new Executor() {
        @Override
        public void execute(final Runnable command) {
            // executor.execute()会使用这个总的executor所绑定的线程factory创建一个线程
            // 注意:此时的command不是注册Selector的command了,而是上面进行selector的选择的command了
            executor.execute(apply(command, eventExecutor));
        }
    };
}

注意:此时的command不是注册Selector的command了,注册Selector的commond已经被加入队列了,这里是上面说的那个核心任务
这个executor就是总的executor,在看总的executor:

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        // 这里会创建一个线程,并且启动这个线程,就是执行command的run()方法
        threadFactory.newThread(command).start();
    }
}

总结

  • 通过以上分析,我们知道了当从NioEventLoopGroup中获取一个NioEventLoop后,只要这个NioEventLoop是第一次执行任务(通常第一次执行的任务都是注册channel的任务),即EventLoop第一次执行execute方法的时候,会先把要执行的任务先放入队列,然后通过子executor开启核心任务(核心任务是专门处理selector的选择、各种I/O事件,普通任务、定时任务等),子executor会通过总executor创建一个线程来处理任务,新线程处理这个核心任务的过程中,会完成NioEventLoop和线程的绑定,而这个核心任务就是用来处理队列中的任务的,并且还有其他Selector.select的各种事件的处理、定时任务等,任务的处理下一章会讲。
  • 只要通过NioEventLoop.execute执行任务,都只是把任务加入队列,由NioEventLoop开启的核心任务去处理队列中的任务。

看下NioEventLoop已经绑定线程,开启状态下执行异步任务的流程分析

我们在回到io.netty.bootstrap.AbstractBootstrap#doBind,看下另一个任务,绑定端口号的任务:

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 创建、初始化channel,并将其注册到Selector
    final ChannelFuture regFuture = initAndRegister();
    // 从异步结果中获取channel
    final Channel channel = regFuture.channel();
    // 获取异步操作执行过程中发生的异常
    if (regFuture.cause() != null) {
        return regFuture;
    }

    // 判断当前异步操作是否完成:或者是成功,或者是异常
    if (regFuture.isDone()) {   // 若异步操作成功
        // At this point we know that the registration was complete and successful.
        // 创建一个可修改的异步结果对象channelFuture
        ChannelPromise promise = channel.newPromise();
        // 绑定端口号
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {  // 若异步操作未完成
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        // 为异步操作添加监听器
        regFuture.addListener(new ChannelFutureListener() {
            // 当异步操作完成(成功,异常),就会触发该方法的执行
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                // 获取异步操作执行过程中发生的异常
                Throwable cause = future.cause();
                if (cause != null) {  // 异步执行过程发生异常
...
                } else {
                    promise.registered();
                    // 绑定端口号
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

//io.netty.bootstrap.AbstractBootstrap#doBind0
//该方法的执行,肯定channel已经注册到Selector了
private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
//绑定任务
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

我们看这个execute方法,还是进入到SingleThreadEventExecutor:
当绑定端口任务进来的时候,这个时候已经创建过并绑定好线程了,肯定是true,只会做添加任务动作

//io.netty.util.concurrent.SingleThreadEventExecutor#execute
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    // 若当前线程是当前EventLoop所绑定的线程,则返回true,否则返回false
    // 当绑定端口任务进来的时候,这个时候肯定是true,只会做添加任务动作
    boolean inEventLoop = inEventLoop();
    // 将任务添加到taskQueue
    addTask(task);
    if (!inEventLoop) {
  ...
    }
...
}

整体总结:

NioEventLoopGroup

  • 本身是一个executor
  • 包含了一个总的executor(ThreadPerTaskExecutor)
    • 会为group中的每个子eventLoop都创建一个匿名子executor
    • 该executor封装了一个DefaultThreadFactory线程工厂,在执行execute方法的时候可以创建一个线程,只负责创建线程!线程和eventLoop绑定线程的逻辑不在这里!在EventLoop.execute里触发
  • 包含一个数组,封装EventLoop的
  • 包含了一个chooser,两种策略都是轮询,根据是否是2的整次数幂决定用哪种算法,效率高底的问题

NioEventLoop

  • 本身也是一个executor
  • 包含了一个子executor,是通过总的executor生成的
  • 包含了一个selector元组和SelectStrategy
  • 包含两个任务队列:tailTasks(尾部任务队列),taskQueue(任务队列)

任务跟踪:

  • 什么时候Channel和EventLoop绑定?什么时候EventLoop和线程绑定?
    • 简单来说是在执行channel注册Selector的时候,进行绑定的,先Channel和EventLoop绑定,后EventLoop和线程绑定
    • 细说的话:
    • Channel和EventLoop:
      • 在ServerBootstrap执行bind方法,创建、初始化channel后,执行EventLoopGroup.register(channel)注册方法的时候,底层会调用Unsafe.register,在Unsafe.register中会进行 AbstractChannel.this.eventLoop = eventLoop,即完成绑定,之后会执行register0方法,是一个channel注册Selector的任务
    • EventLoop和线程:
      • 在上一步Channel和EventLoop绑定后,会执行register0方法,是一个channel注册Selector的任务,如果当前执行线程不是eventLoop绑定的线程,会通过eventLoop.execute异步执行的,EventLoop执行execute方法时候,会先将注册Selector的任务放入任务队列,然后会判断当前执行线程不是eventLoop绑定的线程(不是的情况就是还没有创建、启动、绑定线程),如果不是会通过子executor.execute方法,执行一个核心的”Selector.select任务”,(只有这个核心任务不是放入队列,立即处理的)该任务中会进行线程绑定thread = Thread.currentThread()(因为子executor里面是调用总的executor,总的executor里面有个线程工厂,具有创建线程执行任务的功能,所以Thread.currentThread()这个时候肯定是新创建的线程)

NioEventLoop 任务的执行(下章讲)

 

原文地址:https://blog.csdn.net/weixin_41947378/category_10273110.html

请登录后发表评论

    没有回复内容