Netty(七)源码解析 之 Reactor 模型、Netty的服务端启动源码-Java专区论坛-技术-SpringForAll社区

Netty(七)源码解析 之 Reactor 模型、Netty的服务端启动源码

Netty 源码解析

1. Reactor 模型

在解析 Netty 源码之前,我们首先要搞清楚 Reactor 模型。因为现在的网络通信框架,大多数都是基于 Reactor 模型进行设计和开发的,Netty 也不例外。

1.1 Reactor 单线程模型

在这里插入图片描述

Selector.select()方法查询Channel是否就绪,遍历完所有Channel作为一次轮询,一次轮询后没有任何就绪,如果设置了阻塞时间,判断是否超时,超时直接返回0,否则一直阻塞继续轮询,只要有任意一个Channel就绪了立即返回

Reactor 单线程模型,指的是当前的 Sever 会为每一个通信对端形成一个 Channel,而所有这些 Channel 都会与一个线程相绑定,该线程用于完成它们间的所有通信处理。该线程需要完成的操作有:

  • 若当前为 Server,则该线程需要接收并处理 Client 的连接请求
  • 若当前为 Client,则该线程需要向 Server 发起连接
  • 读取通信对端的消息
  • 向通信对端发送消息

1.2 Reactor 线程池模型

在这里插入图片描述

Reactor 单线程模型中使用一个线程处理所有通信对端的所有请求,在高并发场景中会严重影响系统性能。所以,就将单线程模型中的这一个线程替换为了一个线程池。大大提高了系统性能。

1.3 Reactor 多线程池模型

在这里插入图片描述

若请求连接的并发量是数以百万计的,且 IO 操作还比较耗时,此时的 Server 即使采用的是 Reactor 线程池模型,系统性能也会急剧下降。此时,可以将连接操作与 IO 操作分开处理,形成 Reactor 的多线程模型。

当客户端通过处理连接请求的 Channel 连接上 Server 后,系统会为该客户端再生成一个子 Channel 专门用于处理该客户端的 IO 请求。这两类不同的 Channel 连接着两类不同的线程池。而线程池中的线程数量,可以根据需求分别设置。提高了系统性能。

1.4 Netty-Server 的 Reactor 模型

在这里插入图片描述

Netty-Server 采用了多线程池模型。不过线程池是由 EventLoopGroup 充当。EventLoopGroup中的每一个 EventLoop 都绑定着一个线程,用于处理该 Channel 与当前 Server 间的操作。一个 Channel 只能与一个 EventLoop 绑定,但一个 EventLoop 可以绑定多个 Channel。即 Channel与 EventLoop 间的关系是 n:1。

经过生产下实践证明的,一般对于百万级的 QPS,parentGroup 设置为 2 ,childGroup设置为 4 ,就完全没有问题。

1.5 Netty-Client 的 Reactor 模型

在这里插入图片描述

Netty-Client 采用的是线程池模型。因为其只需要与 Server 连接一次即可,无需区分连接请求与 IO 请求。

强调一下:以上模型可以看出,一个EventLoop专门绑定一个Selector

Netty 框架中导入外部工程

2. Netty 服务端启动

我们先来分析第一个流程:Netty 服务端启动,一共包括四个子流程

  • 创建服务端 Channel
  • 初始化服务端 Channel
  • 将Channel注册给 Selector
  • 端口绑定

入口

Channel的创建是在bind里面触发的
在这里插入图片描述

public class SomeServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)
                    // 指定要创建Channel类型
                     .channel(NioServerSocketChannel.class)
                     .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 获取channel中的Pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            // StringDecoder:字符串解码器,将Channel中的ByteBuf数据解码为String
                            pipeline.addLast(new StringDecoder());
                            // StringEncoder:字符串编码器,将String编码为将要发送到Channel中的ByteBuf
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new SomeServerHandler());
                        }
                    });
            ChannelFuture future = bootstrap.bind(8888).sync();
            System.out.println("服务器已启动");
            future.channel().closeFuture().sync();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}

先跟bind方法:

//io.netty.bootstrap.AbstractBootstrap#bind(int)
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}


//io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
public ChannelFuture bind(SocketAddress localAddress) {
validate();//做一些验证
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
  • validate()做一些验证:

    public B validate() {
        if (group == null) {  // EventLoopGroup不能为空
            throw new IllegalStateException("group not set");
        }
        if (channelFactory == null) { //ChannelFactory不能为空
            throw new IllegalStateException("channel or channelFactory not set");
        }
        return self();
    }
    

继续看doBind方法:

该方法核心逻辑就两件事:

一是创建初始化channel,并注册到Selector(其中注册到Selector这个操作是异步的
二就是将这个初始化和注册成功的channel绑定到指定端口

因为第一个逻辑是异步执行的,所以这里有两个分支:

  • 如果异步操作已经完成,创建一个可修改的异步结果对象,直接进行绑定端口号操作,返回
  • 如果异步操作还未完成,创建一个正在注册的可修改异步结果对象直接返回,然后通过监听器,一但异步操作完成,在进行绑定端口

其中注册到Selector这个异步操作的结果是成功还是失败的逻辑判断是在doBind0里处理的,即在绑定端口的时候先确定channel是否已经注册成功,这个在本章最后端口绑定的时候会讲

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 创建、初始化channel,并将其注册到Selector
    final ChannelFuture regFuture = initAndRegister();
    // 从异步结果中获取channel
    final Channel channel = regFuture.channel();
    // 获取异步操作执行过程中发生的异常直接返回
    // 注意:只要有异常,说明此异步操作已经完成结束了
    if (regFuture.cause() != null) {
        return regFuture;
    }
// 通常走到这里的时候,说明channel已经创建成功并且初始化成功了(看initAndRegister逻辑可以推断)
// 但是注册到Selector的操作是异步的,有可能还在处理也有可能已经完成了

    // 判断当前异步操作是否完成(完成不代表成功,也有可能是异常)
    if (regFuture.isDone()) {   // 若异步操作完成
        // At this point we know that the registration was complete and successful.
        // 创建一个可修改的异步结果对象channelFuture
        ChannelPromise promise = channel.newPromise();
        // 绑定端口号,regFuture的成功与否是在这个方法里面判断的
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {  // 若异步操作未完成
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        // 为异步操作添加监听器
        regFuture.addListener(new ChannelFutureListener() {
            // 当异步操作完成(成功,异常),就会触发该方法的执行
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                // 获取异步操作执行过程中发生的异常
                Throwable cause = future.cause();
                if (cause != null) {  // 异步执行过程发生异常
                    // Registration on the EventLoop failed so fail 
                    // the ChannelPromise directly to not cause an
                    
                    // IllegalStateException once we try to access the 
                    // EventLoop of the Channel.
                    // 修改异步结果为:失败
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    // 注册,也是修改
                    promise.registered();
                    // 绑定端口号
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

一些需要注意的点:

现在我们详细跟initAndRegister方法:

该方法内部包含了我们需要关注的四个核心流程中的其中三个:

  • 创建一个Channel
  • 初始化Channel
  • 将当前channel注册给selector
final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 创建一个channel
        channel = channelFactory.newChannel();
        // 初始化channel
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(),GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    // 将当前channel注册给selector
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;
}

2.1 创建服务端 Channel

先看创建一个channel的分支,看到channel = channelFactory.newChannel();
那么channelFactory是什么?什么时候创建的?
我们回到一开始的SomeServer启动类,看到channel(Class<? extends C> channelClass)方法,该方法是用来指定创建的channel的类型,实际上就是这个方法创建channelFactory的:
在这里插入图片描述

//io.netty.bootstrap.AbstractBootstrap#channel
public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    // 创建一个channelFactory然后返回自己
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

//io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.channel.ChannelFactory<? extends C>)
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
    return channelFactory((ChannelFactory<C>) channelFactory);
}

//io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.bootstrap.ChannelFactory<? extends C>)
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    if (channelFactory == null) {
        throw new NullPointerException("channelFactory");
    }
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }
//赋值到成员变量
    this.channelFactory = channelFactory;
    return self();
}

上面代码可以看出channelFactory就是io.netty.channel.ReflectiveChannelFactory
好我们现在看一下channelFactory.newChannel()方法的具体实现:
在这里插入图片描述

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Constructor<? extends T> constructor;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
        // 我们通过io.netty.bootstrap.AbstractBootstrap#channel传进来的是
        // io.netty.channel.socket.nio.NioServerSocketChannel
            // 初始化NioServerSocketChannel的构造器
            this.constructor = clazz.getConstructor();
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                    " does not have a public non-arg constructor", e);
        }
    }
    
    @Override
    public T newChannel() {
        try {
            // 使用反射机制,调用其无参构造器,创建channel
            return constructor.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }
    ...
}

可以看出来,是通过Class的无参构造创建Channel的,而这个Class就是io.netty.channel.socket.nio.NioServerSocketChannel,我们接着看NioServerSocketChannel的无参构造在初始化的时候都做了什么?找到NioServerSocketChannel的无参构造:

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

...
    public NioServerSocketChannel() {
        // 我们Netty的channel实际上是对原生的NIO的channel的封装
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    ...
    
// 获取到一个全局性的provider,用来创建Channel或者Selector
// nio讲解的时候介绍过
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
*  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
*  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
*  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
// 通过全局性的provider,创建一个原生的NIO的channel
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}

    public NioServerSocketChannel(ServerSocketChannel channel) {
        // 封装
        super(null, channel, SelectionKey.OP_ACCEPT);
        // 创建channel的配置对象
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
}

可以看到通过全局性的provider,创建一个原生的NIO的ServerSocketChannel,再看super的构造都做了什么

//io.netty.channel.nio.AbstractNioMessageChannel#AbstractNioMessageChannel
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}

//io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    //SelectableChannel ch,此时ch就是Nio原生的ServerSocketChannel对象
    this.ch = ch;
    //存储此channel需要关注的兴趣集
    //此时就是SelectionKey.OP_ACCEPT,接收事件
    this.readInterestOp = readInterestOp;
    try {
        // 指定channel为非阻塞
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

//继续看父类构造
//io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
protected AbstractChannel(Channel parent) {
    // 此时parent是null
    this.parent = parent;
    // 为Netty的channel生成id
    id = newId();
    // 创建一个底层操作对象
    unsafe = newUnsafe();
    // 创建当前channel所绑定的channelPipeline
    pipeline = newChannelPipeline();
}

protected ChannelId newId() {
    return DefaultChannelId.newInstance();
}

 protected DefaultChannelPipeline newChannelPipeline() {
     return new DefaultChannelPipeline(this);
 }
  • 看下id是由什么组成的
    //io.netty.channel.DefaultChannelId#newInstance
    public static DefaultChannelId newInstance() {
        return new DefaultChannelId();
    }
    
    //io.netty.channel.DefaultChannelId#DefaultChannelId
    private DefaultChannelId() {
        //看到整个ID是由machine(mark地址)、process(进程)、sequence(序列)、timestamp(时间戳)、random(随机数)组成
        data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];
        int i = 0;
    
        // machineId
        System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);
        i += MACHINE_ID.length;
    
        // processId
        i = writeInt(i, PROCESS_ID);
    
        // sequence
        i = writeInt(i, nextSequence.getAndIncrement());
    
        // timestamp (kind of)
        i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());
    
        // random
        int random = PlatformDependent.threadLocalRandom().nextInt();
        i = writeInt(i, random);
        assert i == data.length;
    
        hashCode = Arrays.hashCode(data);
    }
    
  • 看下unsafe 是什么对象
    不安全操作应永不从用户代码被调用。这些方法仅用于实现实际传输,必须从I/O线程调用,以下方法除外
    在这里插入图片描述
    unsafe 提供一堆方法用来实际传输,例如 写数据write、注册register、刷新flush、connect连接、close关闭、bind绑定…
    在这里插入图片描述
    Unsafe这些方法就是最底层了,封装的都是和Nio的交互!!!!
  • 再看下创建NioServerSocketChannelConfig配置对象
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    

    其中javaChannel()返回的就是之前创建的ServerSocketChannel对象 NioServerSocketChannelConfig的继承体系:
    NioServerSocketChannelConfig -> DefaultServerSocketChannelConfig -> ServerSocketChannelConfig -> ChannelConfig
    在这里插入图片描述
    实际上是一个channel的配置属性set集合,后面会通过它对channel进行配置

  • ChannelPipeline后面会专门讲,这里只要记住是在NioServerSocketChannel构造里初始化的

总结

这里创建了一个 channel,主要完成了以下几个流程:

  • ServerBootstrap在指定Channel通道类型的时候,底层创建了一个ReflectiveChannelFactory,该工厂可以通过使用指定的Class的构造器来创建Channel,而我们指定的就是NioServerSocketChannel
  • ServerBootstrap在调用bind方法的时候,底层会通过channelFactory.newChannel()创建Channel,即ReflectiveChannelFactory
  • 在利用构造器创建Channel实例的时候,会触发对应类的类初始化和构造器方法,我们主要关注的是NioServerSocketChannel构造器初始化都做了哪些事:
    • 通过全局性的SelectorProvider,创建一个原生的NIO的channel
    • 生成了 channel 的 id,ChannelId(包括介绍了ChannelId由哪些组成)
    • 创建了真正的数据传输对象 Unsafe
    • 创建并绑定了 ChannelPipeline
    • 设置当前Channel需要关注的事件(OP_ACCEPT事件)
    • 指定channel为非阻塞
    • 创建 channel 的的配置类 NioServerSocketChannelConfig对象

2.2 初始化服务端 Channel

继续分析源码之前,说明一个事情:
一般用Netty的时候,ServerBootstrap是可以使用option()/childOption()方法进行一些TCP相关配置
在这里插入图片描述
除了option可以通过attr()/childAttr()添加属性,这些属性是绑定在channel上的,可以在处理器中使用
在这里插入图片描述
例如:
在这里插入图片描述

继续分析,看初始化channel的方法io.netty.bootstrap.AbstractBootstrap#init

入口一开始分析过(忘记的看上面):
io.netty.bootstrap.AbstractBootstrap#bind(int)
io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
io.netty.bootstrap.AbstractBootstrap#doBind
io.netty.bootstrap.AbstractBootstrap#initAndRegister
io.netty.bootstrap.AbstractBootstrap#init

该方法是一个抽象方法,具体实现在其子类,我们是Server端所以看
io.netty.bootstrap.ServerBootstrap#init,主要分三大步:

  • 处理bootstrap中的option设置属性
  • 处理bootstrap中的attr设置属性
  • 向pipeline中添加ChannelInitializer处理器
    (目的是为了添加ServerBootstrapAcceptor处理器,该处理器用于处理client的连接,通过bootstrap设置的childOption和childAttr属性都会传给这个处理器,当接受连接的时候为客户端的channel设置这些属性
//io.netty.bootstrap.ServerBootstrap#init
void init(Channel channel) throws Exception {
    // 处理bootstrap中的option设置属性
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    // 处理bootstrap中的attr设置属性
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        // 将bootstrap中设置的所有attr属性配置给channel
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    // 向pipeline中添加处理器
    ChannelPipeline p = channel.pipeline();

    // 获取bootstrap中设置的所有child开头的属性
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    }

    // ChannelInitializer是一个处理器,其存在的意义是,为pipeline添加其它处理器
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            // 获取bootstrap中配置的handler()
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            // ch.eventLoop()是获取到当前channel所绑定的evenLoop
            // 然后再使用该eventLoop所绑定的线程来执行指定的任务
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // 向pipeline中添加ServerBootstrapAcceptor处理器
                    // 该处理器用于处理client的连接
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

A、设置 options

先获取options,看options0()方法:

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
...
    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();

//io.netty.bootstrap.AbstractBootstrap#options0
final Map<ChannelOption<?>, Object> options0() {
    return options;
}
...
}

什么时候赋值的呢?之前说过在使用Netty入口类的时候,可以用option()/childOption()进行配置

//io.netty.bootstrap.AbstractBootstrap#option
public <T> B option(ChannelOption<T> option, T value) {
    if (option == null) {
        throw new NullPointerException("option");
    }
    if (value == null) {
        synchronized (options) {
            options.remove(option);
        }
    } else {
        synchronized (options) {  // 初始化options
            options.put(option, value);
        }
    }
    return self();
}

//io.netty.bootstrap.ServerBootstrap#childOption
//只有Server端才需要child,设置的属性是给接受到的客户端连接的Channel用的
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
    if (childOption == null) {
        throw new NullPointerException("childOption");
    }
    if (value == null) {
        synchronized (childOptions) {
            childOptions.remove(childOption);
        }
    } else {
        synchronized (childOptions) {
            childOptions.put(childOption, value);
        }
    }
    return this;
}

在看下setChannelOptions方法,为Channel设置Option,Option配置最终放到哪了?注意此时处理的是为parent的option,即为接受客户端连接的Channel设置option

//io.netty.bootstrap.AbstractBootstrap#setChannelOptions
static void setChannelOptions(
        Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) {
    // 遍历通过bootstrap设置的所有option
    for (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) {
        setChannelOption(channel, e.getKey(), e.getValue(), logger);
    }
}

//io.netty.bootstrap.AbstractBootstrap#setChannelOption
private static void setChannelOption(
        Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {
    try {
        // 将设置的option配置给channel的config
        if (!channel.config().setOption((ChannelOption<Object>) option, value)) {
            logger.warn("Unknown channel option '{}' for channel '{}'", option, channel);
        }
    } catch (Throwable t) {
        logger.warn(
                "Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t);
    }
}

看下channel.config()返回的是啥,因为我们的channel是NioServerSocketChannel,所以看下io.netty.channel.socket.nio.NioServerSocketChannel#config方法

public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {
//...
private final ServerSocketChannelConfig config;

public NioServerSocketChannel(ServerSocketChannel channel) {
// 封装
super(null, channel, SelectionKey.OP_ACCEPT);
// 初始化channel的配置对象
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

@Override
public ServerSocketChannelConfig config() {
return config;
}
//...
}

可以看出来返回的就是NioServerSocketChannelConfig,调用的就是io.netty.channel.socket.nio.NioServerSocketChannel.NioServerSocketChannelConfig#setOption

//io.netty.channel.socket.nio.NioServerSocketChannel.NioServerSocketChannelConfig#setOption
public <T> boolean setOption(ChannelOption<T> option, T value) {
//我们是NioChannel,肯定走第一个分支
if (PlatformDependent.javaVersion() >= 7 && option instanceof NioChannelOption) {
return NioChannelOption.setOption(jdkChannel(), (NioChannelOption<T>) option, value);
}
return super.setOption(option, value);
}

//io.netty.channel.socket.nio.NioChannelOption#setOption
static <T> boolean setOption(Channel jdkChannel, NioChannelOption<T> option, T value) {
java.nio.channels.NetworkChannel channel = (java.nio.channels.NetworkChannel) jdkChannel;
if (!channel.supportedOptions().contains(option.option)) {
return false;
}
if (channel instanceof ServerSocketChannel && option.option == java.net.StandardSocketOptions.IP_TOS) {
// Skip IP_TOS as a workaround for a JDK bug:
// See http://mail.openjdk.java.net/pipermail/nio-dev/2018-August/005365.html
return false;
}
try {
//看到底层还是用Nio的setOption方法
channel.setOption(option.option, value);
return true;
} catch (IOException e) {
throw new ChannelException(e);
}
}

B、 设置 attr

和Options同理,ServerBootstrap类是可以用attr()/childAttr()方法添加属性,这些属性最终会绑定到channel上,可以通过channel获取到,一般用在处理器,看下取值和赋值的方法:

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
...
    private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
    
//io.netty.bootstrap.AbstractBootstrap#attrs0  取值
    final Map<AttributeKey<?>, Object> attrs0() {
        return attrs;
    }
...

//io.netty.bootstrap.AbstractBootstrap#attr   设值
    public <T> B attr(AttributeKey<T> key, T value) {
        if (key == null) {
            throw new NullPointerException("key");
        }
        if (value == null) {
            synchronized (attrs) {
                attrs.remove(key);
            }
        } else {
            synchronized (attrs) {
                attrs.put(key, value);
            }
        }
        return self();
    }
}

在看下将这些值如何绑定到channel的,调用channel.attr(key).set(e.getValue())方法,注意此时是在处理parent的attr

//io.netty.bootstrap.ServerBootstrap#init
void init(Channel channel) throws Exception {
    // 处理bootstrap中的option设置属性
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    // 处理bootstrap中的attr设置属性
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        // 将bootstrap中设置的所有attr属性配置给channel
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
    ...
}

我们知道channel就是NioServerSocketChannel,NioServerSocketChannel的attr方法,实际上是继承自DefaultAttributeMap,所以NioServerSocketChannel本身就是一个属性Map:
在这里插入图片描述
就不看了,知道就好了
在这里插入图片描述

C、 添加处理器

注意上面说的设置option和attr,这些参数都是为ServerSocketChannel设置使用的,ServerSocketChannel是用来接受客户端连接请求的,而childOption和childAttr设置的参数都是为了在Server端接受到客户端连接请求后,为其客户端的channel设置使用的,所以下面可以看到会先将child的option和attr先交给专门处理client连接的处理器

@Override
void init(Channel channel) throws Exception {
...
 // 向pipeline中添加处理器
 ChannelPipeline p = channel.pipeline();

 // 获取bootstrap中设置的所有child开头的属性
 final EventLoopGroup currentChildGroup = childGroup;
 final ChannelHandler currentChildHandler = childHandler;
 final Entry<ChannelOption<?>, Object>[] currentChildOptions;
 final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
 synchronized (childOptions) {
     currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
 }
 synchronized (childAttrs) {
     currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
 }
...
}

上面的childGroup和childHandler,childOptions,childAttrs都是我们之前通过ServerBootstrap设置的:
在这里插入图片描述

@Override
void init(Channel channel) throws Exception {
...
    // 向pipeline中添加处理器
    ChannelPipeline p = channel.pipeline();

    // 获取bootstrap中设置的所有child开头的属性
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    }

    // ChannelInitializer是一个处理器,其存在的意义是,为pipeline添加其它处理器
    // 并且initChannel执行完后pipline上会移除这个处理器封装的节点(后面专门说)
    // 注意,此时initChannel方法不会立即执行,只有当当前channel注册成功之后才会执行
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            // 获取bootstrap中配置的handler()
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            // ch.eventLoop()是获取到当前channel所绑定的evenLoop
            // 然后再使用该eventLoop所绑定的线程来执行指定的任务
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // 向pipeline中添加ServerBootstrapAcceptor处理器
                    // 该处理器用于处理client的连接
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

给监听客户端连接请求的channel添加一个ChannelInitializer处理器(后面会详细介绍),为pipeline添加其它处理器,一但添加完,ChannelInitializer就没用了,就会被回收(回收的是处理器封装的节点,处理器本身不会回收,后面会讲,不是本篇重点)。

为什么不直接调用p.addLast?而是添加一个ChannelInitializer再添加ServerBootstrapAcceptor?

第一,如果是希望把添加ServerBootstrapAcceptor处理器这个逻辑异步处理的话,是一定要放在ChannelInitializer.initChannel这个方法里,因为此时channel还没有注册,也就意味着还没有和eventLoop绑定,希望通过eventLoop执行异步任务,而此时ch.eventLoop()肯定是null会报错(注册逻辑后面讲)

第二,至于为什么添加ServerBootstrapAcceptor处理器的逻辑一定要异步处理,我也不是很清楚,但是Netty几乎所有的操作都是通过异步任务处理的,如果有人明白这个异步的必要性,求赐教!!!

添加的ChannelInitializer会做哪些事?

  • 获取ServerBootstrap中配置的handler(),如果配置了会加进到ChannelPipeline (服务端一般不配handle,配置childHandler)
    在这里插入图片描述
  • 获取当前channel绑定的evenLoop的线程执行指定的任务
    向pipeline中添加ServerBootstrapAcceptor处理器,该处理器用于处理client的连接

接下来我们看下ServerBootstrapAcceptor处理器(是个内部类)逻辑:

继承了ChannelInboundHandlerAdapter ,当client发送来连接请求时,会触发channelRead()方法的执行

  • 此时通道收到的msg就是当前Server的子channel
  • 初始化这个子channel(为客户端的子channel设置handler,option,attr)
  • 将当前子channel注册到selector(此时用的是childGroup进行注册的)
//io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

    private final EventLoopGroup childGroup;
    private final ChannelHandler childHandler;
    private final Entry<ChannelOption<?>, Object>[] childOptions;
    private final Entry<AttributeKey<?>, Object>[] childAttrs;
    private final Runnable enableAutoReadTask;

    ServerBootstrapAcceptor(
            final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
            Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
        //将传进来的都保存到成员变量里
        this.childGroup = childGroup;
        this.childHandler = childHandler;
        this.childOptions = childOptions;
        this.childAttrs = childAttrs;

        // Task which is scheduled to re-enable auto-read.
        // It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
        // not be able to load the class because of the file limit it already reached.
        //
        // See https://github.com/netty/netty/issues/1328
        enableAutoReadTask = new Runnable() {
            @Override
            public void run() {
                channel.config().setAutoRead(true);
            }
        };
    }

    // 当client发送来连接请求时,会触发channelRead()方法的执行
    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 注意,这里client发送来的就是连接当前Server的子channel
        final Channel child = (Channel) msg;

        // 初始化这个子channel
        // 对用于处理client 读写请求的子channel设置handler,option,attr
        child.pipeline().addLast(childHandler);

        setChannelOptions(child, childOptions, logger);

        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }

        try {
            // 将当前子channel注册到selector
            // 注意这里用的就是childGroup了
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                    //如果注册失败,强制关闭
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }

    private static void forceClose(Channel child, Throwable t) {
        child.unsafe().closeForcibly();
        logger.warn("Failed to register an accepted channel: {}", child, t);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        final ChannelConfig config = ctx.channel().config();
        if (config.isAutoRead()) {
            // stop accept new connections for 1 second to allow the channel to recover
            // See https://github.com/netty/netty/issues/1328
            config.setAutoRead(false);
            ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);
        }
        // still let the exceptionCaught event flow through the pipeline to give the user
        // a chance to do something with it
        ctx.fireExceptionCaught(cause);
    }
}

总结

初始化服务端Channel总共有三步:

  • A、设置 options
    • Option参数怎么来的?在启动程序中通过ServerBootstrap.option可以设置值
    • Option参数最终设置到哪了?通过io.netty.channel.socket.nio.NioServerSocketChannel.NioServerSocketChannelConfig#setOption进行设置的,底层最终调用nio的ServerSocketChannel.setOption方法
    • Option参数有什么用?设置的都是TCP相关的参数
  • B、 设置 attr
    • Attr参数怎么来的?在启动程序中通过ServerBootstrap.attr可以赋值
    • Attr参数存哪里?NioServerSocketChannel继承自DefaultAttributeMap,参数绑定在自己身上
    • Attr参数有什么用?pipeline链中的处理器可以获取到这些参数
  • C、 添加处理器
    • 这个时候会给监听客户端连接的Channel添加一个ChannelInitializer(大概原因是想调用Channel对应的eventLoop的线程执行异步任务,通过异步任务添加ServerBootstrap.handler指定的处理器,和ServerBootstrapAcceptor处理器,但是这个时候Channel还没有注册到Selector,eventLoop还没绑定
    • 异步任务中会为当前监听的channel添加一个ServerBootstrapAcceptor处理器,该处理器用于处理client的连接(会将当前监听的channel,childEventLoopGroup,childOptions,childAttrs都通过构造传给这个处理器,这些东西都是在启动类的时候通过ServerBootstrap设置的)
    • ServerBootstrapAcceptor的逻辑:
      • 当client发送来连接请求时,会触发channelRead()方法的执行(channelRead的msg就是客户端对应的Channel
      • 然后初始化这个子channel,为该channel添加childHandler,设置childOptions和childAttrs
      • 将当前子channel注册到selector(这个selector是childGroup中的selector)

2.3 将 Channel 注册给 Selector

入口:
io.netty.bootstrap.AbstractBootstrap#bind(int)
io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
io.netty.bootstrap.AbstractBootstrap#doBind
io.netty.bootstrap.AbstractBootstrap#initAndRegister
回到io.netty.bootstrap.AbstractBootstrap#initAndRegister方法:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 创建一个channel
        channel = channelFactory.newChannel();
        // 初始化channel
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    // 将当前channel注册给selector
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
        //如果异常,已注册,则直接关闭
            channel.close();
        } else {
        //否则强制关闭
            channel.unsafe().closeForcibly();
        }
    }

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;
}

注册核心方法就是 ChannelFuture regFuture = config().group().register(channel)
config().group()返回的就是NioEventLoopGroup,即我们ServerBootstrap里设置的parentGroup

//group就是通过io.netty.bootstrap.ServerBootstrap#group设置的parentGroup
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    if (childGroup == null) {
        throw new NullPointerException("childGroup");
    }
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = childGroup;
    return this;
}

注意NioEventLoopGroup继承MultithreadEventLoopGroup,register方法默认实现在MultithreadEventLoopGroup中,我们看下MultithreadEventLoopGroup.register方法:

//io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
public ChannelFuture register(Channel channel) {
    // 从parentGroup中根据算法选择一个eventLoop来完成注册
    return next().register(channel);
}

//io.netty.channel.MultithreadEventLoopGroup#next
public EventLoop next() {
    return (EventLoop) super.next();
}

//io.netty.util.concurrent.MultithreadEventExecutorGroup#next
public EventExecutor next() {
    return chooser.next();
}

chooser.next()返回一个EventExecutor(就是EventLoop后面详细介绍),而chooser是一个算法选择器,有两个实现,通常执行的是GenericEventExecutorChooser里面维护了一个EventExecutor的数组(后面也会详细讲,不在本篇重点)
在这里插入图片描述

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    //EventExecutor的数组
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }
}

继续追踪EventLoop的register方法,MultithreadEventLoopGroup是从EventLoopGroup里面选,返回一个EventLoop,所以现在看SingleThreadEventLoop
在这里插入图片描述

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    //Unsafe!!!之前说过底层最终通过它来处理各种连接,之前初始化已经跟过创建逻辑了
    promise.channel().unsafe().register(this, promise);
    return promise;
}

… }


> NioSocketChannel里创建的Unsafe,是`NioSocketChannelUnsafe`
> 
> ```
> //io.netty.channel.AbstractChannel#AbstractChannel
> protected AbstractChannel(Channel parent) {
>     this.parent = parent;
>     // 为Netty的channel生成id
>     id = newId();
>     // 底层操作对象
>     unsafe = newUnsafe();
>     // 创建当前channel所绑定的channelPipeline
>     pipeline = newChannelPipeline();
> }
> 
> //io.netty.channel.AbstractChannel#newUnsafe
> protected abstract AbstractUnsafe newUnsafe();
> 
> 
> //io.netty.channel.socket.nio.NioSocketChannel#newUnsafe
> protected AbstractNioUnsafe newUnsafe() {
>     return new NioSocketChannelUnsafe();
> }
> ```
> 
> NioSocketChannelUnsafe是NioSocketChannel的内部类:  
> ![在这里插入图片描述](https://img-blog.csdnimg.cn/20200815174604918.png)
> 
> ```
> private final class NioSocketChannelUnsafe extends NioByteUnsafe {
>     @Override
>     protected Executor prepareToClose() {
>         try {
>             if (javaChannel().isOpen() && config().getSoLinger() > 0) {
>                 // We need to cancel this key of the channel so we may not end up in a eventloop spin
>                 // because we try to read or write until the actual close happens which may be later due
>                 // SO_LINGER handling.
>                 // See https://github.com/netty/netty/issues/4449
>                 doDeregister();
>                 return GlobalEventExecutor.INSTANCE;
>             }
>         } catch (Throwable ignore) {
>             // Ignore the error as the underlying channel may be closed in the meantime and so
>             // getSoLinger() may produce an exception. In this case we just return null.
>             // See https://github.com/netty/netty/issues/4449
>         }
>         return null;
>     }
> }
> ```

NioSocketChannelUnsafe 的register方法是继承io.netty.channel.AbstractChannel.AbstractUnsafe的默认实现:

protected abstract class AbstractUnsafe implements Unsafe { …

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//空报错
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    //已注册,修改ChannelFuture的结果
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    //不匹配
    if (!isCompatible(eventLoop)) {
        promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    // 这里实现了channel与eventLoop的绑定!!!
    AbstractChannel.this.eventLoop = eventLoop;

    // 判断当前正在执行的线程是否是当前eventLoop所绑定的线程
    if (eventLoop.inEventLoop()) {
        // 若当前线程是eventLoop绑定线程,则直接让这个线程来完成注册操作
        register0(promise);
    } else {
        // 当前线程不是eventLoop绑定线程,则首先会创建一个线程
        // 创建线程逻辑后面会讲,不在本篇范围内
        // 然后使用这个新创建的eventLoop线程来完成注册
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

}


几个核心关注点:

-   `Channel和EventLoop的绑定方法就在这!`  
    AbstractChannel.this.eventLoop = eventLoop;  
    一个eventLoop可以绑定多个channel
    
-   eventLoop.inEventLoop()方法:
    
    -   作用:判断当前正在执行的线程是否是当前eventLoop所绑定的线程
    -   目的:第一次走到该方法的时候,当前线程肯定是主线程,不希望让主线程做注册操作,比较耗时,让parent的EventLoopGroup里的线程处理,`即最终注册的操作线程都是EventLoopGroup里的线程`
    -   `eventLoop里面绑定的那个线程,不是一开始就初始化好了这个线程,只有在第一次注册的时候才会创建新的线程`(后面讲,不是这次的重点)
-   判断当前线程是否就是eventLoop的线程:
    
//io.netty.util.concurrent.AbstractEventExecutor#inEventLoop
public boolean inEventLoop() {
    return inEventLoop(Thread.currentThread());
}

//io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop
public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}
```

断点调试看一下:  
![在这里插入图片描述](https://img-blog.csdnimg.cn/20200815175536154.png)  
可以看到当前线程并不是EventLoop绑定的线程,EventLoop绑定的线程还是null  
eventLoop.execute第一次执行的时候是没有线程,是如何创建的?这个问题就要从NioEventLoopGroup的构造中开始看了  
简单介绍一下,不是本次重点,后面会专门说:  
io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup()  
…各种构造一直走  
io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup

```
//io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
    //核心地方,这个就是具有创建线程功能的总executor
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
//chilren维护的就是当前group中的所有子eventLoop
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
        //构建子EventLoop
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
 ...
        }
    }
//chooser就是上面提到过的算法选择器,实现类会根据指定算法从children中返回一个eventLoop
    chooser = chooserFactory.newChooser(children);
...
}
```

看下newChild方法,具体实现在其子类NioEventLoopGroup

```
//io.netty.channel.nio.NioEventLoopGroup#newChild
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
```

这里可以知道NioEventLoopGroup里的子线程类型都是`NioEventLoop`,而NioEventLoop.execute方法执行底层其实是调用构造传入的executor的execute方法,即`ThreadPerTaskExecutor(newDefaultThreadFactory())`(底层怎么调用下一篇会介绍)

看下ThreadPerTaskExecutor,其中就会有创建线程的逻辑:

```
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}
```

回到我们真正关注的重点,继续看AbstractUnsafe(具体类是NioSocketChannelUnsafe)具体的注册方法io.netty.channel.AbstractChannel.AbstractUnsafe#register0:

//io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        //状态,是不是还从没有注册过,是的话就是第一次注册
        boolean firstRegistration = neverRegistered;
        // 完成注册
        doRegister();
        // 修改状态值
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        // 触发handlerAdded()方法的执行
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        // 触发channelRegistered()方法的执行
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.

        // 若当前channel是激活状态,且是第一次注册,
        // 则触发channelActive()的执行
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                // 执行到这里到时候Channel已经注册成功了
                // 此时SelectionKey的兴趣集还是0
                // beginRead方法会将SelectionKey的兴趣集
                // 设置为之前NioServerSocketChannel构造中设置的值
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

注意点:

  • pipeline.invokeHandlerAddedIfNeeded():触发handlerAdded()方法的执行
    pipeline.fireChannelRegistered():触发channelRegistered()方法的执行
    pipeline.fireChannelActive():触发channelActive()的执行
    以上几个方法都会触发pipline链上的处理器对应的方法的执行:
    (后期会讲,先混个眼熟)
    在这里插入图片描述
    pipeline的分析,也是后期再讲,不是本篇重点

继续看doRegister()方法,其具体实现在AbstractNioChannel

//io.netty.channel.nio.AbstractNioChannel#doRegister
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            // 其实netty的channel的注册,本质上是原生的nio的channel的注册
            // 第二个参数0是形参ops的值,表示当前对任何事件都暂不关注
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

细心的人会发现,这里注册到Selector的时候,ops的值是0,表示对任何事件都不关注,但是我们此时的Channel是NioServerSocketChannel,不是应该需要关注ACCEPT事件吗?
其实默认都是先注册为0,而注册ACCEPT事件的逻辑是在注册成功之后的beginRead方法里

private void register0(ChannelPromise promise) {
    try {
...
        // 完成注册
        doRegister();
...
        if (isActive()) {
            if (firstRegistration) {
            ...
            } else if (config().isAutoRead()) {
                // 执行到这里到时候Channel已经注册成功了
                // 此时SelectionKey的兴趣集还是0
                // beginRead方法会将SelectionKey的兴趣集
                // 设置为之前NioServerSocketChannel构造中设置的值
                beginRead();
            }
        }
    } catch (Throwable t) {
...
    }
}

//io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead
public final void beginRead() {
    assertEventLoop();

    if (!isActive()) {
        return;
    }

    try {
        doBeginRead();
    } catch (final Exception e) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireExceptionCaught(e);
            }
        });
        close(voidPromise());
    }
}

//io.netty.channel.nio.AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    // 因为已经注册到Selector了,所以肯定是有SelectionKey的
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;
    
//这里interestOps应该是0
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
    //其中readInterestOp就是之前在NioServerSocketChannel构造里赋值的
    //当时赋值是SelectionKey.OP_ACCEPT
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

//回顾一下NioServerSocketChannel的构造:
public NioServerSocketChannel(ServerSocketChannel channel) {
    // 封装
    super(null, channel, SelectionKey.OP_ACCEPT);
    // 获取channel的配置对象
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
//super构造
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
...
    this.readInterestOp = readInterestOp;
...
}

总结

NioServerSocketChannel 的注册,实际上是由 Channel 所绑定的 EventLoop 中的线程来完成的,而注册的本质是将原生的 NIO 的 Channel 注册到了 Seletor。

步骤:

  • ServerBootstrap调用bind方法后,触发initAndRegister时,会调用config().group().register(channel)方法实现注册,本质是调用了NioEventLoopGroup的register方法
  • NioEventLoopGroup的register方法,会根据算法选择其中一个NioEventLoop,继续调用register方法,即NioEventLoop.register,它是继承SingleThreadEventLoop,具体实现在SingleThreadEventLoop里面
  • SingleThreadEventLoop.register方法实际上是调用了promise.channel().unsafe().register(this, promise)方法,即依靠Unsafe完成注册的,即NioSocketChannelUnsafe,register的具体实现是在io.netty.channel.AbstractChannel.AbstractUnsafe
  • AbstractUnsafe中,会用上面NioEventLoop绑定的线程(而不是主线程,并且第一次调用的时候才创建并绑定线程的,一开始NioEventLoop绑定的线程是null)来进行注册操作,底层还是利用Nio的api注册到了Selector,此时还会触发一系列pipline的事件(注册的时候兴趣集是0,之后在beginRead方法里修改了SelectionKey的兴趣集为ACCEPT)

2.4 端口绑定

现在我们看最后一个流程,端口绑定,回到doBind:
io.netty.bootstrap.AbstractBootstrap#bind(int)
io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
io.netty.bootstrap.AbstractBootstrap#doBind

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 创建、初始化channel,并将其注册到Selector
    final ChannelFuture regFuture = initAndRegister();
    // 从异步结果中获取channel
    final Channel channel = regFuture.channel();
    // 获取异步操作执行过程中发生的异常
    if (regFuture.cause() != null) {
        return regFuture;
    }

    // 判断当前异步操作是否完成:或者是成功,或者是异常
    if (regFuture.isDone()) {   // 若异步操作成功
        // At this point we know that the registration was complete and successful.
        // 创建一个可修改的异步结果对象channelFuture
        ChannelPromise promise = channel.newPromise();
        // 绑定端口号
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {  // 若异步操作未完成
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        // 为异步操作添加监听器
        regFuture.addListener(new ChannelFutureListener() {
            // 当异步操作完成(成功,异常),就会触发该方法的执行
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                // 获取异步操作执行过程中发生的异常
                Throwable cause = future.cause();
                if (cause != null) {  // 异步执行过程发生异常
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    // 修改异步结果为:失败
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();
                    // 绑定端口号
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

之前我们分析过,一但注册的操作完成,就会触发doBind0进行端口号绑定:

//io.netty.bootstrap.AbstractBootstrap#doBind0
private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
        //异步操作是成功才会触发绑定端口逻辑
        //这里异步操作就是注册selector的操作
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

可以看到,绑定端口的工作,也是让eventLoop的异步处理的,并且只有在channel注册成功的时候才会进行绑定端口的工作,看下channel.bind(localAddress, promise)方法,实际上是调用NioServerSocketChannel.bind,其具体实现是其父类AbstractChannel中
在这里插入图片描述

//io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
//调用了pipline的bind方法
    return pipeline.bind(localAddress, promise);
}


//继续走
//io.netty.channel.DefaultChannelPipeline#bind
//tail是AbstractChannelHandlerContext,代表的是pipline链上的节点(封装了处理器)中的最后一个节点,后面再说,不是这次本文重点
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}

注意:tail代表的是pipline链上的节点(封装了处理器的节点,不是处理器)中的最后一个节点,head代表的是第一个节点,head和tail是Netty定义好的,中间那些节点,是我们添加进去的处理器对应的节点,我们只要添加处理器就行了,Netty会自动封装成节点放到pipline上

//io.netty.channel.AbstractChannelHandlerContext#bind
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }
//这个方法先不讲,后面讲Channel 的 inBound 与 outBound 处理器时会专门讲
//简单理解为获取当前节点的下一个节点
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
    //执行绑定
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
            //执行绑定
                next.invokeBind(localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

直接跟invokeBind:

//io.netty.channel.AbstractChannelHandlerContext#invokeBind
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        bind(localAddress, promise);
    }
}

ChannelOutboundHandler,后面讲Channel 的 inBound 与 outBound 处理器时会专门讲,现在直接跟bind方法,只要知道这里最终会调io.netty.channel.DefaultChannelPipeline.HeadContext.bind()方法即可,即最终会调到头结点的bind方法:

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, HeadContext.class);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
        
        //io.netty.channel.DefaultChannelPipeline.HeadContext#bind
        public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
            unsafe.bind(localAddress, promise);
        }
        ...
}

可以看到又调了unsafe,底层代码了:
继续跟

//io.netty.channel.AbstractChannel.AbstractUnsafe#bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();

    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
        // Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
                "A non-root user can't receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    }

    // 若当前channel未被激活,则该方法返回false
    boolean wasActive = isActive();
    try {
        // 绑定
        // 一旦端口被绑定了,则channel就被激活了
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    //
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                // 触发pipline中的处理器中的channelActive()方法的执行
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

doBind绑定,是个抽象方法,具体实现我们需要看NioServerSocketChannel

//io.netty.channel.socket.nio.NioServerSocketChannel#doBind
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

如果是客户端NioSocketChannel也差不多:

//io.netty.channel.socket.nio.NioSocketChannel#doBind
protected void doBind(SocketAddress localAddress) throws Exception {
    doBind0(localAddress);
}

private void doBind0(SocketAddress localAddress) throws Exception {
    // 若当前JDK平台使用的版本>=7,则...
    if (PlatformDependent.javaVersion() >= 7) {
        SocketUtils.bind(javaChannel(), localAddress);
    } else {
        SocketUtils.bind(javaChannel().socket(), localAddress);
    }
}

//io.netty.util.internal.SocketUtils#bind(java.nio.channels.SocketChannel, java.net.SocketAddress)
public static void bind(final SocketChannel socketChannel, final SocketAddress address) throws IOException {
    try {
        AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
            @Override
            public Void run() throws IOException {
                socketChannel.bind(address);
                return null;
            }
        });
    } catch (PrivilegedActionException e) {
        throw (IOException) e.getCause();
    }
}

Netty(七)源码解析 之 Reactor 模型、Netty的服务端启动源码分析_reactor.netty.tcp.sslprovider$protocolsslcontextsp-CSDN博客

请登录后发表评论

    没有回复内容