Netty(十 )源码解析之 客户端启动、Pipline源码分析-Java专区论坛-技术-SpringForAll社区

Netty(十 )源码解析之 客户端启动、Pipline源码分析

1. 客户端启动

服务端启动已经说完了,接下来看下客户端启动,有了之前的经验,现在看客户端会容易的很多:

public class SomeClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    //客户端可以指定自己的 主机名 和 端口号
                    //主机名不能随便写,要和自己机器上DNS一样
                    .localAddress("myNettyClient", 8000)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new SomeClientHandler());
                        }
                    });

            ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

NioEventLoopGroup就不用分析了,和服务端一样。只要看连接connect方法:

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
...
//io.netty.bootstrap.Bootstrap#connect
    public ChannelFuture connect(String inetHost, int inetPort) {
        return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
    }
    public ChannelFuture connect(SocketAddress remoteAddress) {
        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        }

        validate();
        //localAddress之前Demo演示过可以指定本地主机端口和主机名
        return doResolveAndConnect(remoteAddress, config.localAddress());
    }
    
    public Bootstrap validate() {
        super.validate();
        //验证,这个handler就是bootstrap.handle指定的ChannelInitializer
        if (config.handler() == null) {
            throw new IllegalStateException("handler not set");
        }
        return this;
    }
    //io.netty.bootstrap.AbstractBootstrap#validate
    public B validate() {
    //EventLoopGroup不能为空
        if (group == null) {
            throw new IllegalStateException("group not set");
        }
        //ChannelFactory不能为空
        if (channelFactory == null) {
            throw new IllegalStateException("channel or channelFactory not set");
        }
        return self();
    }
}

看下核心方法io.netty.bootstrap.Bootstrap#doResolveAndConnect

//io.netty.bootstrap.Bootstrap#doResolveAndConnect
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    // 创建并初始化了channel,然后将channel注册到了selector
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();

    // 若异步操作完成
    if (regFuture.isDone()) {
    // 如果异步操作失败直接返回
        if (!regFuture.isSuccess()) {
            return regFuture;
        }
        // 解析地址,并连接Server
        return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        //promise之前有说过,是一个可修改的异步结果
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                // Directly obtain the cause and do a null check so we only need one volatile read in case of a
                // failure.
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();
                    // 解析地址,并连接Server
                    doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

这里分两步:

  • 初始化并注册channel
  • 解析地址,并连接Server

1.1 初始化并注册channel

final ChannelFuture regFuture = initAndRegister();这个方法调用的是抽象类
io.netty.bootstrap.AbstractBootstrap#initAndRegister方法,这个和Server端是完全一样的逻辑

  • 创建一个channel
  • 初始化channel
  • 将当前channel注册给selector
//io.netty.bootstrap.AbstractBootstrap#initAndRegister
final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 创建一个channel
        channel = channelFactory.newChannel();
        // 初始化channel
        init(channel);
    } catch (Throwable t) {
...
    }

    // 将当前channel注册给selector
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    return regFuture;
}

和Server端的区别有两个:

  • 第一是channelFactory.newChannel()创建的通道是io.netty.channel.socket.nio.NioSocketChannel
  • 第二就是init初始化方法的实现,客户端用的是io.netty.bootstrap.Bootstrap,所以要看io.netty.bootstrap.Bootstrap#init方法
    //io.netty.bootstrap.Bootstrap#init
    void init(Channel channel) throws Exception {
        // 将handler放入pipeline
        ChannelPipeline p = channel.pipeline();
        p.addLast(config.handler());
        // 使用option初始化
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }
        // 使用attr初始化
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
        }
    }
    

    最重要的区别就是初始化channel的方法,客户端只会设置option,attr,handler。 而服务端还会在channel的ChannelPipeline中添加一个特殊ChannelInitializer,其中会再次向pipeline中添加ServerBootstrapAcceptor处理器,该处理器用于处理client的连接,一旦客户端来连接,该处理器的channelRead方法就会被触发,可以直接接受到对应客户端的Channel通道对象,然后通过child的option,attr,handler进行初始化,并通过child的NioEventLoopGroup.register进行注册

1.2 解析地址,并连接Server

channel注册成功后会执行 解析地址,并连接Server 的方法:

//io.netty.bootstrap.Bootstrap#doResolveAndConnect0
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,final SocketAddress localAddress, final ChannelPromise promise) {
    try {
        // 获取到当前channel所绑定的eventLoop
        final EventLoop eventLoop = channel.eventLoop();
        // 创建一个地址解析器(其中封装着一个格式匹配器)
        final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

        // 若地址解析器不支持指定的Server的地址,或该地址已经被解析过了
        if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
            // Resolver has no idea about what to do with the specified remote address or it's resolved already.
            // 解析器不知道如何处理指定的远程地址,或者它已经被解析。
            doConnect(remoteAddress, localAddress, promise);
            return promise;
        }

        // 解析Server地址:将主机名映射为ip
        final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);

        // 若异步操作直接结束:成功,或异常
        if (resolveFuture.isDone()) {
            final Throwable resolveFailureCause = resolveFuture.cause();

            // 若异常,则关闭channel,修改异步结果
            if (resolveFailureCause != null) {
                // Failed to resolve immediately
                channel.close();
                promise.setFailure(resolveFailureCause);
            } else {
                // Succeeded to resolve immediately; cached? (or did a blocking lookup)
                // 若成功,则连接Server
                // resolveFuture.getNow()  直接获取异步结果
                doConnect(resolveFuture.getNow(), localAddress, promise);
            }
            return promise;
        }

        // Wait until the name resolution is finished.
        // 为异步操作添加监听器:异步操作结束:成功,或异常
        resolveFuture.addListener(new FutureListener<SocketAddress>() {
            @Override
            public void operationComplete(Future<SocketAddress> future) throws Exception {
                if (future.cause() != null) {
                    channel.close();
                    promise.setFailure(future.cause());
                } else {
                    doConnect(future.getNow(), localAddress, promise);
                }
            }
        });
    } catch (Throwable cause) {
        promise.tryFailure(cause);
    }
    return promise;
}

上面异步操作的处理就不说了,一共三个核心流程:

  • final AddressResolver resolver = this.resolver.getResolver(eventLoop); — 创建地址解析器
  • final Future resolveFuture = resolver.resolve(remoteAddress); —解析地址
  • doConnect(resolveFuture.getNow(), localAddress, promise); —连接Server

1.2.1 创建地址解析器

final AddressResolver resolver = this.resolver.getResolver(eventLoop);

public abstract class AddressResolverGroup<T extends SocketAddress> implements Closeable {
...
/**
     * Note that we do not use a {@link ConcurrentMap} here because it is usually expensive to instantiate a resolver.
     * 注意,我们在这里没有使用{@link ConcurrentMap},因为实例化一个解析器通常开销很大。
     */
    private final Map<EventExecutor, AddressResolver<T>> resolvers =
            new IdentityHashMap<EventExecutor, AddressResolver<T>>();
            
    public AddressResolver<T> getResolver(final EventExecutor executor) {
        if (executor == null) {
            throw new NullPointerException("executor");
        }

        if (executor.isShuttingDown()) {
            throw new IllegalStateException("executor not accepting a task");
        }

        AddressResolver<T> r;
        synchronized (resolvers) {
            r = resolvers.get(executor);
            if (r == null) {
                final AddressResolver<T> newResolver;
                try {
                //创建解析器,一个抽象方法具体实现在子类
                    newResolver = newResolver(executor);
                } catch (Exception e) {
                    throw new IllegalStateException("failed to create a new resolver", e);
                }
//可以看出来一个EventLoop对应一个专门的解析器
                resolvers.put(executor, newResolver);
                //添加一个监听器
                executor.terminationFuture().addListener(new FutureListener<Object>() {
                    @Override
                    //当executor被终止时这个方法会被触发,即删除executor对应的解析器
                    public void operationComplete(Future<Object> future) throws Exception {
                        synchronized (resolvers) {
                            resolvers.remove(executor);
                        }
                        newResolver.close();
                    }
                });

                r = newResolver;
            }
        }

        return r;
    }
    
    protected abstract AddressResolver<T> newResolver(EventExecutor executor) throws Exception;
    ...
}

我们看下resolver是谁就能知道newResolver = newResolver(executor)创建的解析器是什么:

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
...
private static final AddressResolverGroup<?> DEFAULT_RESOLVER = DefaultAddressResolverGroup.INSTANCE;

private volatile AddressResolverGroup<SocketAddress> resolver = (AddressResolverGroup<SocketAddress>) DEFAULT_RESOLVER;
...
}

这个resolver就是DefaultAddressResolverGroup.INSTANCE

public final class DefaultAddressResolverGroup extends AddressResolverGroup<InetSocketAddress> {

    public static final DefaultAddressResolverGroup INSTANCE = new DefaultAddressResolverGroup();

    private DefaultAddressResolverGroup() { }

    @Override
    protected AddressResolver<InetSocketAddress> newResolver(EventExecutor executor) throws Exception {
        return new DefaultNameResolver(executor).asAddressResolver();
    }
}

看下asAddressResolver返回的是什么

public abstract class InetNameResolver extends SimpleNameResolver<InetAddress> {
    private volatile AddressResolver<InetSocketAddress> addressResolver;
...
    public AddressResolver<InetSocketAddress> asAddressResolver() {
        AddressResolver<InetSocketAddress> result = addressResolver;
        if (result == null) {
            synchronized (this) {
                result = addressResolver;
                if (result == null) {
                    addressResolver = result = new InetSocketAddressResolver(executor(), this);
                }
            }
        }
        return result;
    }
}

可以看出来解析器用的就是io.netty.resolver.InetSocketAddressResolver

1.2.2 解析地址

final Future resolveFuture = resolver.resolve(remoteAddress);

//io.netty.resolver.AbstractAddressResolver#resolve(java.net.SocketAddress)
public final Future<T> resolve(SocketAddress address) {
//先判断支不支持,不支持直接返回不支持处理
    if (!isSupported(checkNotNull(address, "address"))) {
        // Address type not supported by the resolver
        return executor().newFailedFuture(new UnsupportedAddressTypeException());
    }
//如果已经解析过了直接赋值返回
    if (isResolved(address)) {
        // Resolved already; no need to perform a lookup
        @SuppressWarnings("unchecked")
        final T cast = (T) address;
        return executor.newSucceededFuture(cast);
    }

    try {
        @SuppressWarnings("unchecked")
        final T cast = (T) address;
        final Promise<T> promise = executor().newPromise();
        // 解析Server地址
        doResolve(cast, promise);
        return promise;
    } catch (Exception e) {
        return executor().newFailedFuture(e);
    }
}

//io.netty.resolver.AbstractAddressResolver#doResolve
protected abstract void doResolve(T unresolvedAddress, Promise<T> promise) throws Exception;

doResolve是一个抽象方法,具体实现在子类,之前我们已经分析过resolver是io.netty.resolver.InetSocketAddressResolver:

public class InetSocketAddressResolver extends AbstractAddressResolver<InetSocketAddress> {
...
    @Override
    protected void doResolve(final InetSocketAddress unresolvedAddress, final Promise<InetSocketAddress> promise)
            throws Exception {
        // Note that InetSocketAddress.getHostName() will never incur a reverse lookup here,
        // because an unresolved address always has a host name.
        // 注意,InetSocketAddress.getHostName()在这里不会导致反向查找,因为未解析的地址总是有主机名。
        // 上面注释意思因为InetSocketAddress.getHostName()方法有这样的机制:
        // 如果InetSocketAddress给的是主机名则该方法直接返回主机名,
        // 如果给的是ip地址,则会进行ip地址反向查找返回主机名,即该方法总会返回主机名
        
        //这个nameResolver之前分析过程也能看到,就是DefaultNameResolver
        nameResolver.resolve(unresolvedAddress.getHostName())
                .addListener(new FutureListener<InetAddress>() {
                //解析操作是异步的,异步操作成功了会修改promise
                    @Override
                    public void operationComplete(Future<InetAddress> future) throws Exception {
                        if (future.isSuccess()) {
                            promise.setSuccess(new InetSocketAddress(future.getNow(), unresolvedAddress.getPort()));
                        } else {
                            promise.setFailure(future.cause());
                        }
                    }
                });
    }
    ...
}

nameResolver是DefaultNameResolver,resolve方法实现在其父类SimpleNameResolver

public abstract class SimpleNameResolver<T> implements NameResolver<T> {
...
    @Override
    public final Future<T> resolve(String inetHost) {
        final Promise<T> promise = executor().newPromise();
        return resolve(inetHost, promise);
    }
    ...
    public Future<T> resolve(String inetHost, Promise<T> promise) {
        checkNotNull(promise, "promise");

        try {
            doResolve(inetHost, promise);
            return promise;
        } catch (Exception e) {
            return promise.setFailure(e);
        }
    }
    
    protected abstract void doResolve(String inetHost, Promise<T> promise) throws Exception;
    ...
}

doResolve方法实现在子类DefaultNameResolver:

public class DefaultNameResolver extends InetNameResolver {
...
    @Override
    protected void doResolve(String inetHost, Promise<InetAddress> promise) throws Exception {
        try {
        //inetHost此时应是一个主机名
            promise.setSuccess(SocketUtils.addressByName(inetHost));
        } catch (UnknownHostException e) {
            promise.setFailure(e);
        }
    }
...
}

跟SocketUtils.addressByName(inetHost)方法:

//io.netty.util.internal.SocketUtils#addressByName
public static InetAddress addressByName(final String hostname) throws UnknownHostException {
    try {
//访问控制器,执行特权
//特权行为就是PrivilegedExceptionAction
        return AccessController.doPrivileged(new PrivilegedExceptionAction<InetAddress>() {
            @Override
            public InetAddress run() throws UnknownHostException {
                // 根据主机名获取到ip,这个是JDK提供的方法
                // 给定主机的名称,确定主机的IP地址。
                return InetAddress.getByName(hostname);
            }
        });
    } catch (PrivilegedActionException e) {
        throw (UnknownHostException) e.getCause();
    }
}

我们看下AccessController

/**
 * Performs the specified {@code PrivilegedExceptionAction} with
 * privileges enabled.  The action is performed with <i>all</i> of the
 * permissions possessed by the caller's protection domain.
 * 使用启用的特权执行指定的{@code PrivilegedExceptionAction}。
 * 该操作是通过调用者的保护域所拥有的所有 权限来执行的。
 * 
 * 解释:只有具有all权限才能执行action
 */
@CallerSensitive
public static native <T> T doPrivileged(PrivilegedExceptionAction<T> action) throws PrivilegedActionException;

//另一个重载的方法,参数中就可以指定权限,只有拥有指定的这个权限才能运行action
//不指定的情况下是all权限
@CallerSensitive
public static <T> T doPrivileged(PrivilegedAction<T> action,AccessControlContext context, Permission... perms) {

    AccessControlContext parent = getContext();
    if (perms == null) {
        throw new NullPointerException("null permissions parameter");
    }
    Class <?> caller = Reflection.getCallerClass();
    DomainCombiner dc = (context == null) ? null : context.getCombiner();
    return AccessController.doPrivileged(action, createWrapper(dc,
        caller, parent, context, perms));
}

在这里插入图片描述

看下PrivilegedExceptionAction和PrivilegedAction的区别:

/**
 * A computation to be performed with privileges enabled.  The computation is
 * performed by invoking {@code AccessController.doPrivileged} on the
 * {@code PrivilegedAction} object.  This interface is used only for
 * computations that do not throw checked exceptions; computations that
 * throw checked exceptions must use {@code PrivilegedExceptionAction}
 * instead.
 * 在启用特权的情况下执行的计算。计算是通过在{@code PrivilegedAction}对象上调
 * 用{@code AccessController.doPrivileged}来执行的。此接口仅用于不会抛出已
 * 检查异常的计算;抛出受检查异常的计算必须使用{@code}代替。
 */

public interface PrivilegedAction<T> {
    T run();
}



/**
 * A computation to be performed with privileges enabled, that throws one or
 * more checked exceptions.  The computation is performed by invoking
 * {@code AccessController.doPrivileged} on the
 * {@code PrivilegedExceptionAction} object.  This interface is
 * used only for computations that throw checked exceptions;
 * computations that do not throw
 * checked exceptions should use {@code PrivilegedAction} instead.
 * 在启用特权的情况下执行的一种计算,它抛出一个或多个已检查异常。
 * 计算是通过在{@code PrivilegedExceptionAction}对象上调
 * 用{@code AccessController.doPrivileged}来执行的。
 * 此接口仅用于抛出已检查异常的计算;不抛出检查异常的计算应该使用{@code PrivilegedAction}代替。
 */

public interface PrivilegedExceptionAction<T> {
T run() throws Exception;
}

1.2.3 连接Server

doConnect(resolveFuture.getNow(), localAddress, promise);

  • resolveFuture.getNow():返回的就是解析后的IP地址+端口号,是要连接的Server端的地址
  • localAddress是本地的地址
//io.netty.bootstrap.Bootstrap#doConnect
private static void doConnect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    final Channel channel = connectPromise.channel();
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (localAddress == null) {
                channel.connect(remoteAddress, connectPromise);
            } else {
                channel.connect(remoteAddress, localAddress, connectPromise);
            }
            connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }
    });
}

直接看connect方法,此时channel应该是NioSocketChannel,connect方法实现在其父类AbstractChannel:

//io.netty.channel.AbstractChannel#connect
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, promise);
}
//io.netty.channel.DefaultChannelPipeline#connect
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}
//io.netty.channel.AbstractChannelHandlerContext#connect
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return connect(remoteAddress, null, promise);
}
//io.netty.channel.AbstractChannelHandlerContext#connect
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

    if (remoteAddress == null) {
        throw new NullPointerException("remoteAddress");
    }
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }
//pipline中找到下一个实现connect方法的节点
//节点的概念在后面pipline讲解的时候说
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
    //节点中也有执行器,重点下次说
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeConnect(remoteAddress, localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeConnect(remoteAddress, localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}
//io.netty.channel.AbstractChannelHandlerContext#invokeConnect
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        connect(remoteAddress, localAddress, promise);
    }
}

继续跟((ChannelOutboundHandler) handler()).connect,这个ChannelOutboundHandler我们要跟的是头节点HeadContext(至于为什么后面讲pipline和Inbound、OutBound处理器的时候就明白了,先记住就好了):

//io.netty.channel.DefaultChannelPipeline.HeadContext#connect
public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) {
    unsafe.connect(remoteAddress, localAddress, promise);
}
//io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    try {
        if (connectPromise != null) {
            // Already a connect in process.
            throw new ConnectionPendingException();
        }

        boolean wasActive = isActive();
        //doConnect连接方法
        if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
        } else {
...
        }
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}


//io.netty.channel.socket.nio.NioSocketChannel#doConnect
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        // 将本地地址绑定到channel
        doBind0(localAddress);
    }

    boolean success = false;
    try {
        // 客户端的连接就绪指的是,一旦建立了连接,或者尝试连接失败,套接字通道都将变得可连接
        // 这个时候如果Selector.select注册了连接事件,就会发现其SelectionKey是可连接的
        // 连接Server
        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
        if (!connected) {
        // 如果连接没有直接成功,则
            // 初始化当前channel的key感兴趣的操作为连接操作
            // 之后Selector监听到连接就绪事件后会继续尝试调用连接方法
            // java.nio.channels.SocketChannel#finishConnect然后完成连接
            // 在分析NioEventLoop任务执行的时候讲过
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}

将本地地址绑定到channel,即客户端也可以指定本地绑定哪个端口和Server端进行连接:

//io.netty.channel.socket.nio.NioSocketChannel#doBind0
private void doBind0(SocketAddress localAddress) throws Exception {
    // 若当前JDK平台使用的版本>=7,则...
    if (PlatformDependent.javaVersion() >= 7) {
        SocketUtils.bind(javaChannel(), localAddress);
    } else {
        SocketUtils.bind(javaChannel().socket(), localAddress);
    }
}
//io.netty.util.internal.SocketUtils#bind
public static void bind(final SocketChannel socketChannel, final SocketAddress address) throws IOException {
    try {
        AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {
            @Override
            public Void run() throws IOException {
                socketChannel.bind(address);
                return null;
            }
        });
    } catch (PrivilegedActionException e) {
        throw (IOException) e.getCause();
    }
}

2. ChannelPipeline

channelPipeline本身是一个双向链表,默认有头head和尾tail节点
添加处理器,实际上先把处理器封装成节点context ,再加入到pipline中
添加也能删除

2.1 ChannelPipeline的创建

创建是在Channel初始化执行构造的时候,之前有跟过,这里以NioServerSocketChannel为例:
io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel(java.nio.channels.ServerSocketChannel)
io.netty.channel.nio.AbstractNioMessageChannel#AbstractNioMessageChannel
io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)

//io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    // 为Netty的channel生成id
    id = newId();
    // 底层操作对象
    unsafe = newUnsafe();
    // 创建当前channel所绑定的channelPipeline
    pipeline = newChannelPipeline();
}
//io.netty.channel.AbstractChannel#newChannelPipeline
protected DefaultChannelPipeline newChannelPipeline() {
//this就是当前Channel
    return new DefaultChannelPipeline(this);
}

我们看下Pipline的构造做了什么:

//io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
//两个头尾节点
    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

可以看到初始化pipline过程中会直接给pipline链的头和尾添加特殊的节点。

2.1.1 TailContext和HeadContext

无论头节点还是尾节点都会继承AbstractChannelHandlerContext:

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
//当前节点的前后节点
//由此可以看出是节点之间组成的是一个双向链表结构
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;
    ...
}

即一个节点由三部分构成:前节点,后节点,当前节点数据

先看下尾节点:
注意TailContext也是一个ChannelInboundHandler,也是一个处理器,可以处理通道入栈相关事件

//看到继承自AbstractChannelHandlerContext
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

    TailContext(DefaultChannelPipeline pipeline) {
        // 参数1:当前节点所要加入的pipeline
        // 参数2:当前节点封装的执行器executor,节点里的处理器就是通过executor执行的
        // 参数3:节点名称,简单类名+"#0",这里是tailContext#0
        // 参数4:节点类型
        super(pipeline, null, TAIL_NAME, TailContext.class);
        // 修改当前节点所封装处理器的状态
        setAddComplete();
    }
    
    //该方法实现在其父类AbstractChannelHandlerContext
    final boolean setAddComplete() {
        for (;;) {
        //先获取当前节点状态
            int oldState = handlerState;
            //如果当前状态是删除完成
            if (oldState == REMOVE_COMPLETE) {
                return false;
            }
            //状态不是删除完成则:
            // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
            // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
            // exposing ordering guarantees.
            // CAS:Compare and Swap,即比较再交换
            //修改处理器状态为添加成功
            if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
                return true;
            }
        }
    }
    //看下几个比较重要的节点的方法的实现
@Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    //io.netty.channel.DefaultChannelPipeline#onUnhandledInboundException
        onUnhandledInboundException(cause);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    //io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage
    //从通道读数据,一旦走到最后一个节点
        onUnhandledInboundMessage(msg);
    }
...

}

//io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage
protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. " +
                        "Please check your pipeline configuration.", msg);
    } finally {
    //一旦走到最后一个节点
        // 释放msg
        ReferenceCountUtil.release(msg);
    }
}
//io.netty.channel.DefaultChannelPipeline#onUnhandledInboundException
protected void onUnhandledInboundException(Throwable cause) {
    try {
        logger.warn(
                "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                        "It usually means the last handler in the pipeline did not handle the exception.",
                cause);
    } finally {
    //一旦走到最后一个节点,异常也会释放掉
        ReferenceCountUtil.release(cause);
    }
}

可以看出来,如果Channel的事件走到最后一个节点TailContext,它会负责处理很多的资源释放逻辑。

看下super构造AbstractChannelHandlerContext

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,String name, Class<? extends ChannelHandler> handlerClass) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        // 初始化执行标识
        // 即标记当前处理器的哪些方法需要执行
        this.executionMask = mask(handlerClass);
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }
    ...
}

重点关注一下标记逻辑,这个标记逻辑可以标记当前节点的处理器哪些方法可以执行,哪些方法永远不会执行,当调用在pipline中的节点的各个fire方法的时候会用到,下一章会讲,这里只是先标记了,但这个设计思路方案可以借鉴:

final class ChannelHandlerMask {
...

    /**
     * Return the {@code executionMask}.
     */
    static int mask(Class<? extends ChannelHandler> clazz) {
        // Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast
        // lookup in the future.
        Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
        //先从缓存取,以类为单位
        Integer mask = cache.get(clazz);
        if (mask == null) {
            mask = mask0(clazz);
            cache.put(clazz, mask);
        }
        return mask;
    }

    /**
     * Calculate the {@code executionMask}.
     */
    private static int mask0(Class<? extends ChannelHandler> handlerType) {
        // 初始化一个整数,代表含义是标记允许执行处理器的exeptionCaught方法
        // 即:使用了二进制数上的二进制位代表了每一个方法是否允许执行
        int mask = MASK_EXCEPTION_CAUGHT;

        try {
            // 解决当前处理器是InboundHandler的情况
            if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
            //先做了一个与操作,ChannelInboundHandler对应的所有方法都标记为允许执行
            //然后依次判断ChannelInboundHandler中每个方法是否要跳过
            //如果跳过,则该处理器对应的方法就会标记为不允许执行
            //判断逻辑在isSkippable
                mask |= MASK_ALL_INBOUND;

                if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_REGISTERED;
                }
                if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
                    mask &= ~MASK_CHANNEL_UNREGISTERED;
                }
...
            }

            // 解决当前处理器是OutboundboundHandler的情况
            // 同理
            if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
                mask |= MASK_ALL_OUTBOUND;

                if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
                        SocketAddress.class, ChannelPromise.class)) {
                    mask &= ~MASK_BIND;
                }
                if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
                        SocketAddress.class, ChannelPromise.class)) {
                    mask &= ~MASK_CONNECT;
                }
...
            }

            if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
                mask &= ~MASK_EXCEPTION_CAUGHT;
            }
        } catch (Exception e) {
            // Should never reach here.
            PlatformDependent.throwException(e);
        }

        return mask;
    }

    @SuppressWarnings("rawtypes")
    private static boolean isSkippable(final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
        return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
            @Override
            public Boolean run() throws Exception {
                // 若当前方法上出现了@Skip注解,则返回true,代表跳过
                return handlerType.getMethod(methodName, paramTypes).isAnnotationPresent(Skip.class);
            }
        });
    }
...
}

上面可以看出来,只要对于的处理器的方法上加了@Skip注解,则不会执行该方法
而一般我们继承的ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter的所有方法都被@Skip标记了,所以只有我们重写的方法不会被@Skip标记,才会允许执行。
在这里插入图片描述

看下Head节点:
TailContext是一个ChannelInboundHandler
而HeadContext既是ChannelOutboundHandler又是ChannelInboundHandler

注意:ChannelOutboundHandler和ChannelInboundHandler是用来控制入栈和出栈相关的事件处理能力的,不代表当前节点在链表中的位置

final class HeadContext extends AbstractChannelHandlerContext
        implements ChannelOutboundHandler, ChannelInboundHandler {

    private final Unsafe unsafe;

    HeadContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, HEAD_NAME, HeadContext.class);
        //比尾节点多了一个Unsafe
        //因为它是一个ChannelOutboundHandler,ChannelOutboundHandler
        //里面有一堆方法bind,connect,disConnect,close等
        //这些方法都需要调用Unsafe执行
        unsafe = pipeline.channel().unsafe();
        setAddComplete();
    }
...
}

关于TailContext和HeadContext下一章还会详细说明。

2.2 ChannelInitializer源码分析

在介绍pipline添加处理器之前,先介绍一下处理器。
处理器分两类,一个是平常我们实现的继承ChannelOutboundHandlerAdapter和ChannelInboundHandlerAdapter的处理器,还有一个就是ChannelInitializer,它也继承 ChannelInboundHandlerAdapter,但它是一个特殊的处理器,叫做通道初始化器,他的生命周期和历史使命非常短暂,历史使命就是执行initChannel方法完成通道的初始化,执行完毕就会被销毁。

先了解一下ChannelInitializer
在这里插入图片描述

本身也是ChannelInboundHandlerAdapter,所以一旦添加成功和被删除,会触发下面指定的方法:
在这里插入图片描述

ChannelInitializer类上面有个注解:@Sharable

/**
 * Indicates that the same instance of the annotated {@link ChannelHandler}
 * can be added to one or more {@link ChannelPipeline}s multiple times
 * without a race condition.
 * 表明被该注解标记的{@link ChannelHandler}的相同实例可以在没有竞争条件
 * 的情况下多次添加到一个或多个{@link ChannelPipeline}中。
 * <p>
 * If this annotation is not specified, you have to create a new handler
 * instance every time you add it to a pipeline because it has unshared
 * state such as member variables.
 * 如果不加这个注解,每次添加到pipline都需要创建一个新的handle实例,因为状态不共享
 * <p>
 */
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface Sharable {
    // no value
}

在这里插入图片描述

也可以将这个处理器实例添加到多个pipline上,ChannelInitializer就是这样用的,看下面分析:
那ChannelInitializer为什么要加这个注解?因为ChannelInitializer是被服务端接受的所有客户端连接的Channel所共享的

ChannelInitializer里面有一个很关键的成员变量initMap,也是被共享的,存放的是各个pipline中封装了当前ChannelInitializer处理器的节点:

@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
...
    // We use a Set as a ChannelInitializer is usually shared between all Channels in a Bootstrap /
    // ServerBootstrap. This way we can reduce the memory usage compared to use Attributes.
    //我们使用一个Set集合作为通道初始化器,它通常在引导程序中的所有通道之间共享。
    //这样,与使用属性相比,我们可以减少内存的使用。
    //这个Set集合存放的是各个pipline中封装了当前ChannelInitializer处理器的节点!
    //即节点是多例的,节点内的handler是单例的,共享的
    private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(
            new ConcurrentHashMap<ChannelHandlerContext, Boolean>());

    protected abstract void initChannel(C ch) throws Exception;
....
}

重要类,实例的数量关系:

NioEventLoopGroup在不指定的情况下默认有当前主机逻辑处理器数量的2倍的NioEventLoop,我的机器是32

ServerBootstrap.bind绑定端口的时候,即服务端每监听一个端口,会创建一个Channel,并在初始化的时候添加一个连接处理器,用来处理客户端的连接请求

即一个端口号,对应一个ServerChannel,一个ServerChannel会绑定一个连接处理器

每个端口上的连接处理器在接受到客户端的连接请求时,每一个连接请求都会创建一个NioSocketChannel,而在初始化这个NioSocketChannel的时候,所有端口上的连接处理器用的是同一个通道初始化器,回顾一下服务端Channel添加连接处理器的代码,如下:

//io.netty.bootstrap.ServerBootstrap#init
void init(Channel channel) throws Exception {
    // 处理bootstrap中的option设置属性
...

    // 处理bootstrap中的attr设置属性
...

    // 向pipeline中添加处理器
    ChannelPipeline p = channel.pipeline();

    // 获取bootstrap中设置的所有child开头的属性
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
...
    // ChannelInitializer是一个处理器,其存在的意义是,为pipeline添加其它处理器
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            // 获取bootstrap中配置的handler()
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            // ch.eventLoop()是获取到当前channel所绑定的evenLoop
            // 然后再使用该eventLoop所绑定的线程来执行指定的任务
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // 向pipeline中添加ServerBootstrapAcceptor处理器
                    // 该处理器用于处理client的连接
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

//当client发送来连接请求时,ServerBootstrapAcceptor处理器会触发channelRead()方法的执行
//io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 注意,这里client发送来的就是连接当前Server的子channel
    final Channel child = (Channel) msg;

    // 初始化这个子channel
    // 对用于处理client 读写请求的子channel设置handler,以及添加到对应的selector中
    
    // 这个childHandler就是ServerBootstrap.childHandler设置的ChannelInitializer
    // 是被所有childChannel初始化时共享的!
    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        // 将当前子channel注册到selector
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

关键几行代码就是:

  • final ChannelHandler currentChildHandler = childHandler;
  • pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
  • child.pipeline().addLast(childHandler);

即ServerBootstrap.childHandler设置的ChannelInitializer,是被所有childChannel初始化时共享的!

2.2.1 ChannelInitializer的生命周期流程分析

再回到ChannelInitializer,它是一个ChannelInboundHandlerAdapter ,就有handlerAdded方法,当前处理器被成功添加到pipline的时候会触发,或者说当前处理器成功添加到Channel里面,这个方法就会被触发,而当当前处理器被移除的时候就会触发handlerRemoved方法:

PS:添加时机一种情况就是上面child.pipeline().addLast(childHandler);里面还有将处理器封装成节点的逻辑,后面跟

@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
...
//initMap可以理解存放了 当前处理器封装的节点 的集合
    private final Set<ChannelHandlerContext> initMap = Collections.newSetFromMap(
            new ConcurrentHashMap<ChannelHandlerContext, Boolean>());

    protected abstract void initChannel(C ch) throws Exception;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    //已经添加成功了才会触发,所以处理器已经封装成节点了
        if (ctx.channel().isRegistered()) {
            // This should always be true with our current DefaultChannelPipeline implementation.
            // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
            // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
            // will be added in the expected order.
            if (initChannel(ctx)) {

                // We are done with init the Channel, removing the initializer now.
                removeState(ctx);
            }
        }
    }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        initMap.remove(ctx);
    }
    //先看initChannel方法:
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    //initMap添加的就是当前ChannelInitializer处理器所封装的节点
        if (initMap.add(ctx)) {
            try {
                // 调用自定义的ChannelInitializer实例的initChannel()方法
                // 该方法就是我们实现的
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
            //执行完以后从pipline中查找自己,有的话就删除
            //注意删除的是节点,不是当前处理器
                ChannelPipeline pipeline = ctx.pipeline();
                if (pipeline.context(this) != null) {
                    pipeline.remove(this);
                }
            }
            return true;
        }
        return false;
    }
    ...
}

可以看到,调用完ChannelInitializer实例的initChannel()方法后,就会执行删除的逻辑将自己封装的节点从pipline上删除,看下pipline是如何查找节点和删除节点的:

2.3 处理器的查找 和 删除

public class DefaultChannelPipeline implements ChannelPipeline {
    // 从pipeline中查找指定处理器对应节点
    public final ChannelHandlerContext context(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }
//从head节点的下一个节点开始
        AbstractChannelHandlerContext ctx = head.next;
        for (;;) {

            if (ctx == null) {
                return null;
            }
//如果节点封装的处理器和我指定的处理器一样就返回
            if (ctx.handler() == handler) {
                return ctx;
            }

            ctx = ctx.next;
        }
    }
    
    public final ChannelPipeline remove(ChannelHandler handler) {
        // getContextOrDie(handler)  从pipeline中查找对应节点,如果查询不到会报错
        remove(getContextOrDie(handler));
        return this;
    }
    
    private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
    //查找
        AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
        if (ctx == null) {
        //为空,抛异常,就死了..
            throw new NoSuchElementException(handler.getClass().getName());
        } else {
            return ctx;
        }
    }
    //删除节点
    private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
    //断言,不满足条件,就抛出异常
        assert ctx != head && ctx != tail;

        synchronized (this) {
            // 从链表中删除节点
            remove0(ctx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we remove the context from the pipeline and add a task that will call
            // ChannelHandler.handlerRemoved(...) once the channel is registered.
            if (!registered) {
            //如果当前channel没有注册,处理一些异常情况
                callHandlerCallbackLater(ctx, false);
                return ctx;
            }
            //删除以后让执行器执行"修改节点状态"的任务
//ctx是一个节点,一个节点包含一个执行器,让执行器线程处理处理器的操作
//这个执行器和EventLoop默认是同一个
            EventExecutor executor = ctx.executor();
            //判断当前线程和执行器线程是不是同一个线程
            if (!executor.inEventLoop()) {
            //不是则用执行器线程执行
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                    //修改节点状态
                        callHandlerRemoved0(ctx);
                    }
                });
                return ctx;
            }
        }
        // 是则直接用执行器线程执行
        // 修改节点状态
        callHandlerRemoved0(ctx);
        return ctx;
    }
    
    private static void remove0(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;
        AbstractChannelHandlerContext next = ctx.next;
        prev.next = next;
        next.prev = prev;
    }
    
    private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
        // Notify the complete removal.
        try {
            ctx.callHandlerRemoved();
        } catch (Throwable t) {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
        }
    }
    ...
}

pipline删除节点后,可以看到会触发节点的callHandlerRemoved方法,该方法会修改节点的状态,并触发对应的处理器的handlerRemoved方法:

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    final void callHandlerRemoved() throws Exception {
        try {
            // Only call handlerRemoved(...) if we called handlerAdded(...) before.
            if (handlerState == ADD_COMPLETE) {
            // 修改状态前会触发对应处理器的handlerRemoved方法
                // 触发当前节点处理器中重写的handlerRemoved()
                // 即又回到io.netty.channel.ChannelInitializer#handlerRemoved方法
                handler().handlerRemoved(this);
            }
        } finally {
            // Mark the handler as removed in any case.
            // 修改处理器状态为已删除
            setRemoved();
        }
    }
    //替换状态
final void setRemoved() {
        handlerState = REMOVE_COMPLETE;
    }
    ...
}

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
...
//即又回到io.netty.channel.ChannelInitializer#handlerRemoved方法
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    //从initMap删除节点
    //ctx节点是多例的!ChannelInitializer是共享的,是单例的!
    //所以节省内存
        initMap.remove(ctx);
    }
}

io.netty.channel.ChannelInitializer#initChannel(io.netty.channel.ChannelHandlerContext)跟完了,再看下io.netty.channel.ChannelInitializer#removeState方法:
这个方法触发意义在于为了保险,如果ChannelInitializer处理器handlerRemoved方法被重写,没有删除的情况,也能保证删除

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
...
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            // This should always be true with our current DefaultChannelPipeline implementation.
            // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
            // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
            // will be added in the expected order.
            if (initChannel(ctx)) {

                // We are done with init the Channel, removing the initializer now.
                removeState(ctx);
            }
        }
    }
    private void removeState(final ChannelHandlerContext ctx) {
        // The removal may happen in an async fashion if the EventExecutor we use does something funky.
    //判断节点状态
        if (ctx.isRemoved()) {
            initMap.remove(ctx);
        } else {
            // The context is not removed yet which is most likely the case because a custom EventExecutor is used.
            // Let's schedule it on the EventExecutor to give it some more time to be completed in case it is offloaded.
            ctx.executor().execute(new Runnable() {
                @Override
                public void run() {
                    initMap.remove(ctx);
                }
            });
        }
    }
    ...
}

//io.netty.channel.AbstractChannelHandlerContext#isRemoved
public boolean isRemoved() {
//判断状态是否移除成功
    return handlerState == REMOVE_COMPLETE;
}

2.4 处理器的添加

现在看一下pipline是如何添加处理器的,pipline中有很多add方法可以添加处理器:
在这里插入图片描述
我们以常用的io.netty.channel.DefaultChannelPipeline#addLast(java.lang.String, io.netty.channel.ChannelHandler)开始分析:

public class DefaultChannelPipeline implements ChannelPipeline {
...
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }
    
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }

//name可以为处理器指定名称
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 检测处理器是否被重复添加
            checkMultiplicity(handler);

            // 将处理器封装为节点
            newCtx = newContext(group, filterName(name, handler), handler);

            // 将新建节点添加到链表尾部
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            // 如果registered为false,则意味着该通道尚未在eventLoop上注册。
            // 在本例中,我们将上下文添加到管道中,并添加一个在注册了通道后
            // 调用ChannelHandler.handlerAdded(…)的任务。
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        // 修改状态,触发handlerAdded()方法的执行
        callHandlerAdded0(newCtx);
        return this;
    }
    ...
}

大致分为四个核心流程:

  • (1) 检查处理器是否被多次添加
  • (2) 创建新节点
  • (3) 插入新节点
  • (4) 执行 handlerAdded()回调

(1) 检查处理器是否被多次添加

checkMultiplicity(handler);

public class DefaultChannelPipeline implements ChannelPipeline {
...
    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            // 若不能共享且已经被添加过了
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }
    ...
}

(2) 创建新节点

newCtx = newContext(group, filterName(name, handler), handler);

先看filterName,生成处理器名称(作为节点名称):

public class DefaultChannelPipeline implements ChannelPipeline {
...
    private String filterName(String name, ChannelHandler handler) {
        // 若没有为处理器指定名称,则自动生成一个名称
        if (name == null) {
            return generateName(handler);
        }
        // 若指定了名称,则需要检测名称是否重复
        checkDuplicateName(name);
        return name;
    }
    private String generateName(ChannelHandler handler) {
        Map<Class<?>, String> cache = nameCaches.get();
        Class<?> handlerType = handler.getClass();
        // 从缓存中获取指定处理器的名称
        String name = cache.get(handlerType);
        // 若这个名称为null,则生成一个
        if (name == null) {
            name = generateName0(handlerType);
            cache.put(handlerType, name);
        }
        // 判断并处理生成的名称重复问题(只和pipline链上的节点判断)
        if (context0(name) != null) {
            // 去掉最后一个数字
            String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.

            for (int i = 1;; i ++) {
                String newName = baseName + i;
                if (context0(newName) == null) {
                    name = newName;
                    break;
                }
            }
        }
        return name;
    }
    private static String generateName0(Class<?> handlerType) {
        // 命名规则为:处理器简单类名后跟#0
        return StringUtil.simpleClassName(handlerType) + "#0";
    }
    //查找pipline中是否有重名的处理器节点
    private AbstractChannelHandlerContext context0(String name) {
        // 从head节点的下一个节点开始遍历
        AbstractChannelHandlerContext context = head.next;
        while (context != tail) {
            if (context.name().equals(name)) {
                // 返回节点实例,说明名称已冲突
                return context;
            }
            context = context.next;
        }
        // 返回null,说明名称不冲突
        return null;
    }

//如果指定了名称,发现名称重复会报错
    private void checkDuplicateName(String name) {
        if (context0(name) != null) {
            throw new IllegalArgumentException("Duplicate handler name: " + name);
        }
    }
    ...
}

即如果不指定,默认处理器名为:处理器简单类名后跟#0,如果已经存在,0递增#1,#2…
如果指定了名称,发现名称重复会报错

在看newContext创建节点:

public class DefaultChannelPipeline implements ChannelPipeline {
...
    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    //构造需要一个pipline,一个执行器,节点名,处理器
    //创建节点是new,关键要看childExecutor,节点的执行器初始化时机就在这
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

    private EventExecutor childExecutor(EventExecutorGroup group) {
    //通常情况是null,如果是null,则默认用的是当前channel的eventloop
    //如果我们指定了group,我们分析一下
        if (group == null) {
            return null;
        }
        // 该值用于决定是否为每个group固定一个executor
        // pin:固定的意思
        Boolean pinEventExecutor = channel.config()
                .getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
        // 若pinEventExecutor的值为false,则不为group绑定executor,就会为每一个
        // 节点从group中轮询一个executor
        // 如果不是固定,就走group.next()
        if (pinEventExecutor != null && !pinEventExecutor) {
            return group.next();
        }

        // 若pinEventExecutor的值为true,则为每一个group固定一个不变的executor
        // map的key为group,value为其指定的用于固定的executor
        Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
        if (childExecutors == null) {
            // Use size of 4 as most people only use one extra EventExecutor.
            childExecutors = this.childExecutors =
                    new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
        }
        // Pin one of the child executors once and remember it so that the same child executor
        // is used to fire events for the same channel.
        EventExecutor childExecutor = childExecutors.get(group);
        if (childExecutor == null) {
            childExecutor = group.next();
            childExecutors.put(group, childExecutor);
        }
        return childExecutor;
    }
    ...
}

//看下DefaultChannelHandlerContext的构造:
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        //通常没有特殊设置,executor是null
        super(pipeline, executor, name, handler.getClass());
        this.handler = handler;
    }
    
//super的构造
    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
                                  String name, Class<? extends ChannelHandler> handlerClass) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        // 初始化执行标识
        this.executionMask = mask(handlerClass);
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }
...
}

PS:netty的pipeline上的节点所绑定的执行器是是什么时候初始化的?addLast的时候传的是null啊
一般不用初始化,如果addLast的时候没有指定,那么节点在使用执行器的时候,直接用节点绑定的Channel的EventLoop:

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
...
    public EventExecutor executor() {
        if (executor == null) {
            return channel().eventLoop();
        } else {
            return executor;
        }
    }
    ...
}

(3) 插入新节点

addLast0(newCtx);

public class DefaultChannelPipeline implements ChannelPipeline {
...
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
    ...
}

(4) 执行 handlerAdded()回调

callHandlerAddedInEventLoop(newCtx, executor);

public class DefaultChannelPipeline implements ChannelPipeline {
...
    private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
        newCtx.setAddPending();
        executor.execute(new Runnable() {
            @Override
            public void run() {
                callHandlerAdded0(newCtx);
            }
        });
    }
    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
        //触发节点处理器的handlerAdded方法
            ctx.callHandlerAdded();
        } catch (Throwable t) {
            boolean removed = false;
            try {
                remove0(ctx);
                //如果发生异常会删除节点,触发处理器的HandlerRemoved方法
                ctx.callHandlerRemoved();
                removed = true;
            } catch (Throwable t2) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to remove a handler: " + ctx.name(), t2);
                }
            }

            if (removed) {
            //处理节点处理器ExceptionCaught方法
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; removed.", t));
            } else {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; also failed to remove.", t));
            }
        }
    }
    ...
}

看下上面调用节点的各个方法具体实现:

//上面调用节点对应各种回调方法:
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
...
//修改节点状态为添加中
    final void setAddPending() {
        boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
        assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
    }
    //触发处理器添加成功的方法
    final void callHandlerAdded() throws Exception {
        // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
        // any pipeline events ctx.handler() will miss them because the state will not allow it.
        if (setAddComplete()) {
            // 触发处理器的handlerAdded()方法的执行
            handler().handlerAdded(this);
        }
    }
    //修改节点状态为添加成功
    final boolean setAddComplete() {
        for (;;) {
            int oldState = handlerState;
            if (oldState == REMOVE_COMPLETE) {
                return false;
            }
            // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
            // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
            // exposing ordering guarantees.
            if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
                return true;
            }
        }
    }
    ...
}

 

原文地址:https://blog.csdn.net/weixin_41947378/category_10273110.html

请登录后发表评论

    没有回复内容