Netty(六)源码解析 之简述NIO、用NIO实现Socket通信 和 群聊-Java专区论坛-技术-SpringForAll社区

Netty(六)源码解析 之简述NIO、用NIO实现Socket通信 和 群聊

由于 Netty 的底层是 NIO,所以在读 Netty 源码之前,首先了解一下 NIO 网络编程相关知识。
直接看官方的注释,因为注释是最权威的。

1. NIO 简介

NIO,New IO(官方),Non-blocking IO(非官方),是 JDK1.4 中引入的一种新的 IO 标准,是一种同步非阻塞 IO。NIO 是以为单位进行数据处理的,当然块的大小是程序员自己指定的。其相对于 BIO 的以字节/字符为单位所进行的阻塞式处理方式,大大提高了读写效率与并发度。

  • BIO:Blocking IO,同步阻塞 IO
  • NIO:Non-blocking IO,同步非阻塞 IO JDK1.4
  • AIO:异步非阻塞 IO,也称为 NIO2.0 JDK1.7
  • BIO:一个客户端对应一个Channel,一个Channel由一个专门线程负责处理,Channel和线程是1:1关系
  • NIO:一个客户端对应一个Channel,一个线程可以处理多个Channel,即Channel和线程关系是n:1关系
    怎么实现?由Selector(多路复用器)实现,多个Channel注册到Selector上,一个Selector和线程是一对一关系,线程处理哪个Channel的用户请求,由Selector决定,即哪个Channel准备就绪了,就让线程处理哪个Channel的请求

SelectorProvider抽象类简介

先看下类上面的注释:

/**
 * Service-provider class for selectors and selectable channels.
 * 用于选择器和可选择通道的服务提供程序类。
 * 
 * 解释:就是说SelectorProvider这个类专门用来获取selectors和selectable channels的
 * 
 * <p> A selector provider is a concrete subclass of this class that has a
 * zero-argument constructor and implements the abstract methods specified
 * below.  A given invocation of the Java virtual machine maintains a single
 * system-wide default provider instance, which is returned by the {@link
 * #provider() provider} method.  The first invocation of that method will locate
 * the default provider as specified below.
 * 
 * 选择器提供程序是这个类的一个具体子类,它有一个零参数的构造函数,并实现下面指定的抽象方法。
 * Java虚拟机的给定调用维护一个单例的系统范围的默认Provider实例,该实例
 * 由{@link #provider() provider}方法返回。该方法的第一次调用定位到下面指定的缺省provider。
 * 简单来说就是provider方法返回的就是该类的一个实例,且是系统全局范围的,单例的,
 * 下面一段话介绍了该实例会在哪些地方使用:
 * 
 * 解释:就是说SelectorProvider.provider方法可以返回一个默认实例,这个实例
 * 在虚拟机内是全局、单例的
 * 
 * <p> The system-wide default provider is used by the static <tt>open</tt>
 * methods of the {@link java.nio.channels.DatagramChannel#open
 * DatagramChannel}, {@link java.nio.channels.Pipe#open Pipe}, {@link
 * java.nio.channels.Selector#open Selector}, {@link
 * java.nio.channels.ServerSocketChannel#open ServerSocketChannel}, and {@link
 * java.nio.channels.SocketChannel#open SocketChannel} classes.  It is also
 * used by the {@link java.lang.System#inheritedChannel System.inheritedChannel()}
 * method. A program may make use of a provider other than the default provider
 * by instantiating that provider and then directly invoking the <tt>open</tt>
 * methods defined in this class.
 * 
 * 整个系统默认的provider实例会被{@link java.nio.channels.DatagramChannel#open
 * DatagramChannel}, {@link java.nio.channels.Pipe#open Pipe}, {@link
 * java.nio.channels.Selector#open Selector}, {@link
 * java.nio.channels.ServerSocketChannel#open ServerSocketChannel}, 和{@link
 * java.nio.channels.SocketChannel#open SocketChannel}这些类的静态open方法使用。
 * 它也被{@link java.lang.System#inheritedChannel System.inheritedChannel()}方法使用
 * 程序除了使用默认的provider外,还可以使用通过实例化provider,然后直接调用该类中定义的open方法
 * 
 * 解释:上面的provider方法获取到的默认SelectorProvider实例会被上面描述的众多类的方法中被使用
 * 也可以直接通过SelectorProvider的方法获取到Selector和selectable channels
 * 
 * <p> All of the methods in this class are safe for use by multiple concurrent
 * threads.  </p>
 * 这个类中的所有方法对于多个并发线程都是安全的。
 *
 * @author Mark Reinhold
 * @author JSR-51 Expert Group
 * @since 1.4
 */

public abstract class SelectorProvider {
...
}

我们可以看下上面描述的各种使用全局默认SelectorProvider实例的场景:

  • DatagramChannel
    public abstract class DatagramChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, MulticastChannel{
    ...
        public static DatagramChannel open() throws IOException {
            return SelectorProvider.provider().openDatagramChannel();
        }
    }
    

    看到该方法返回的DatagramChannel继承AbstractSelectableChannel ,就是SelectableChannel

    public abstract class DatagramChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, MulticastChannel{
    ...
    }
    
    public abstract class AbstractSelectableChannel extends SelectableChannel{
    ...
    }
    
  • Pipe的open静态方法也是一样的:
    public abstract class Pipe {
    ...
        public static Pipe open() throws IOException {
            return SelectorProvider.provider().openPipe();
        }
        //虽然返回Pipe,自己,但是Pipe中包含了一个内部类SourceChannel,也是SelectableChannel
        public static abstract class SourceChannel
            extends AbstractSelectableChannel
            implements ReadableByteChannel, ScatteringByteChannel{
            ...
    }
    }
    
  • Selector的open同理:
    public abstract class Selector implements Closeable {
    ...
        public static Selector open() throws IOException {
            return SelectorProvider.provider().openSelector();
        }
    }
    
  • ServerSocketChannel同理:
    //ServerSocketChannel就是SelectableChannel
    public abstract class ServerSocketChannel
        extends AbstractSelectableChannel
        implements NetworkChannel{
        ...
        public static ServerSocketChannel open() throws IOException {
            return SelectorProvider.provider().openServerSocketChannel();
        }
    }
    
  • SocketChannel同理:
    public abstract class SocketChannel
        extends AbstractSelectableChannel
        implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel{
        ...
        public static SocketChannel open() throws IOException {
            return SelectorProvider.provider().openSocketChannel();
        }
    }
    
  • System.inheritedChannel:
    public final class System {
    ...
        public static Channel inheritedChannel() throws IOException {
            return SelectorProvider.provider().inheritedChannel();
        }
    }
    

Selector抽象类简介

先看下类上面注释,这个类的注释很长,我都把中文和解释写在里面了,这个类个人认为就是Nio的核心,也是多路复用的核心,注释中把整个Nio执行流程讲解的非常详细,看完基本可以把Nio的执行流程弄的非常明白,建议耐心看完:
注释中分了三大块(注释中用华丽的分割线分隔了~):

  • Selector中的三个键集
  • Selector的选择流程
  • Selector的并发性
/**
 * A multiplexor of {@link SelectableChannel} objects.
 * {@link SelectableChannel}对象的多路复用器。
 * 
 * <p> A selector may be created by invoking the {@link #open open} method of
 * this class, which will use the system's default {@link
 * java.nio.channels.spi.SelectorProvider selector provider} to
 * create a new selector.  A selector may also be created by invoking the
 * {@link java.nio.channels.spi.SelectorProvider#openSelector openSelector}
 * method of a custom selector provider.  A selector remains open until it is
 * closed via its {@link #close close} method.
 * 可以通过调用本类的{@link #open open}方法来创建selector,该方法将使用系统的
 * 默认{@link java.nio.channels.spi.SelectorProvider selector provider}来
 * 创建新的selector。
 * 
 * 还可以通过调用自定义的selector provider的
 * {@link java.nio.channels.spi.SelectorProvider#openSelector openSelector}方法来
 * 创建selector。
 * 
 * 选择器一直保持打开状态,直到通过其{@link #close close}方法关闭。
 * 
 * <a name="ks"></a>
 *
 * <p> A selectable channel's registration with a selector is represented by a
 * {@link SelectionKey} object.  A selector maintains three sets of selection
 * keys:
 * 一个可选择的通道在selector上的注册是由一个{@link SelectionKey}对象表示的。
 * 一个selector维护三组SelectionKey的集合:
 * 
 * 解释:不管一个通道在selector上注册监听几个事件就只会有一个SelectionKey代表这个通道注册。
 * 而SelectionKey里面有一个兴趣集,表名当前channel注册关注了哪些事件
 * 
 * <ul>
 *
 *   <li><p> The <i>key set</i> contains the keys representing the current
 *   channel registrations of this selector.  This set is returned by the
 *   {@link #keys() keys} method. </p></li>
 * 
 *  key set:键集,包含的SelectionKey表示此选择器的当前通道注册的键。
 *   这个集合可以由{@link #keys() keys}方法返回。
 * 
 *   解释:简单来说就是只要一个Channel在Selector上注册了,在Selector里就对应一个SelectionKey
 *   存放在key set里,Selector注册了几个通道,这个集合就有几个SelectionKey(跟channel注册时
 *   关注的事件无关)
 * 
 *   <li><p> The <i>selected-key set</i> is the set of keys such that each
 *   key's channel was detected to be ready for at least one of the operations
 *   identified in the key's interest set during a prior selection operation.
 *   This set is returned by the {@link #selectedKeys() selectedKeys} method.
 *   The selected-key set is always a subset of the key set. </p></li>
 * 
 *   selected-key set:选择键集,是这样的集合,在之前一次选择操作中,里面每个key的channel
 *   都被检测到在键的兴趣集中标识的操作里至少有一个操作准备好。
 *   这个集合由{@link #selectedKeys() selectedKeys}方法返回。
 *   选择键集始终是键集的子集。
 * 
 *   解释:channel在selector上注册需要指定需要关注的事件,可以关注多个事件也可以
 *   一个事件都不关注,在一次selector.select的选择期间,一个通道,只要其注册的通道感兴趣的事件中有
 *   任何一个事件已经就绪,这个通道在selector上注册代表的SelectionKey就会被放进selected-key
 * 
 *   <li><p> The <i>cancelled-key</i> set is the set of keys that have been
 *   cancelled but whose channels have not yet been deregistered.  This set is
 *   not directly accessible.  The cancelled-key set is always a subset of the
 *   key set. </p></li>
 *   cancelleed -key集合放的key是已取消但其通道尚未取消注册的集合。
 *   这个集合不能直接访问。取消键集始终是键集的子集。
 * 
 *   解释:在调用SelectionKey.cancle方法时候,就会将该Key放入cancelleed -key集合,
 *   此时虽然取消了,但是还没有真正开始执行取消注册的操作,并且该key在Key set集合里依然存在,
 *   等下一次进行选择的时候,会把cancelleed -key集合里所有取消掉的key删除,并且Key set集合
 *   里对应的key也会删除,此时才是真正的在Selector里被删除
 * 
 * </ul>
 *
 * <p> All three sets are empty in a newly-created selector.
 * 在新创建的选择器中,这三个集合都是空的。
 * 
 * <p> A key is added to a selector's key set as a side effect of registering a
 * channel via the channel's {@link SelectableChannel#register(Selector,int)
 * register} method.  Cancelled keys are removed from the key set during
 * selection operations.  The key set itself is not directly modifiable.
 * 通过通道的{@link SelectableChannel#register(selector,int) register}方法注册通道时,
 * 其中的一个副作用就是将一个SelectionKey键添加到selector的key set。
 * 在选择操作期间(调用selector.select方法时),将取消的键从取消键集中删除。
 * 这个取消键集本身不能直接修改。
 * 
 * <p> A key is added to its selector's cancelled-key set when it is cancelled,
 * whether by closing its channel or by invoking its {@link SelectionKey#cancel
 * cancel} method.  Cancelling a key will cause its channel to be deregistered
 * during the next selection operation, at which time the key will removed from
 * all of the selector's key sets.
 * 当一个键被取消时,它会被添加到selector的cancelled-key键集中,无论是通过关闭它的通道
 * 还是通过调用它的{@link SelectionKey#cancel cancel}方法。取消一个键将导致它的通道
 * 在下一次选择操作期间取消注册,此时该键将从选择器的所有键集中删除。
 *
 * <a name="sks"></a><p> Keys are added to the selected-key set by selection
 * operations.  A key may be removed directly from the selected-key set by
 * invoking the set's {@link java.util.Set#remove(java.lang.Object) remove}
 * method or by invoking the {@link java.util.Iterator#remove() remove} method
 * of an {@link java.util.Iterator iterator} obtained from the
 * set.  Keys are never removed from the selected-key set in any other way;
 * they are not, in particular, removed as a side effect of selection
 * operations.  Keys may not be added directly to the selected-key set. </p>
 * SelectionKey添加到选择键集是通过选择操作(即执行selector.select方法期间)的。
 * 可以通过调用集合的remove方法或调用集合迭代器的remove方法从选择键集中直接删除一个键。
 * 键不会以任何其他方式从选择键集合中移除;特别注意的是,它们并没有作为选择操作的副作用被移除。
 * SelectionKey不能直接添加到选择键集。
 * 
 * 解释:这段话就说明了两点
 * 第一,选择键集即selected-key set中的元素,只能在selector.selcet方法执行的时候,即
 * 选择操作期间添加,在选择操作期间该key对应的channel关注的事件中有任何一个事件就绪都会将
 * 其添加到selected-key set
 * 第二,选择键集中的元素不会通过任何方式被移除,只能通过调用selected-key set的remove方法
 * 或者通过集合迭代器的remove方法移除key,也就是为什么一般在处理完SelectionKey对应的事件
 * 后,必须手动主动将key移除选择键集的原因
 * 
 * 
 * ---------------------    华丽的分割线    ---------------------
 * 
 * <a name="selop"></a>
 * <h2>Selection</h2>     选择
 *
 * <p> During each selection operation, keys may be added to and removed from a
 * selector's selected-key set and may be removed from its key and
 * cancelled-key sets.  Selection is performed by the {@link #select()}, {@link
 * #select(long)}, and {@link #selectNow()} methods, and involves three steps:
 * 每次选择操作期间,键可能会被添加到选择器的选择键集 或 从选择器的选择键集移除,也可能
 * 通过它的键和取消的键集删除。选择由{@link #select()}、{@link #select(long)}和
 * {@link #selectNow()}方法执行,包括三个步骤:
 * 
 * </p>
 * 
 * 下面介绍的三步就是select方法的逻辑:
 * <ol>
 *
 *   <li><p> Each key in the cancelled-key set is removed from each key set of
 *   which it is a member, and its channel is deregistered.  This step leaves
 *   the cancelled-key set empty. </p></li>
 *   1:取消键集中的每个key都将从它所属的每个键集中删除,其通道也将取消注册。
 *   此步骤将取消键集变为为空集合。
 * 
 *   <li><p> The underlying operating system is queried for an update as to the
 *   readiness of each remaining channel to perform any of the operations
 *   identified by its key's interest set as of the moment that the selection
 *   operation began.  For a channel that is ready for at least one such
 *   operation, one of the following two actions is performed: </p>
 *   2:在选择操作开始时,会查询底层操作系统的更新,以了解每个剩余通道是否准备好
 *   执行由其键的兴趣集标识的任何操作。对于为至少一个这样的操作准备好的通道,
 *   执行以下两个操作中的一个:
 *   <ol>
 *
 *     <li><p> If the channel's key is not already in the selected-key set then
 *     it is added to that set and its ready-operation set is modified to
 *     identify exactly those operations for which the channel is now reported
 *     to be ready.  Any readiness information previously recorded in the ready
 *     set is discarded.  </p></li>
 *     2.1:如果通道的键不在选择键集中,则将其添加到该集中,并修改其ready-operation集,
 *     以准确地识别channel现在报告已准备就绪的操作。丢弃之前记录在准备集中的任何准备信息。
 *    
 *     解释:从这个步骤可以看出,如果SelectionKey是新加入选择键集的话,其SelectionKey
 *     中的准备集中的信息是不会保留上一次的。
 * 
 *     <li><p> Otherwise the channel's key is already in the selected-key set,
 *     so its ready-operation set is modified to identify any new operations
 *     for which the channel is reported to be ready.  Any readiness
 *     information previously recorded in the ready set is preserved; in other
 *     words, the ready set returned by the underlying system is
 *     bitwise-disjoined into the key's current ready set. </p></li>
 *     2.2:否则,通道的键已经在选择键集中,因此它的准备集将被修改,以标识通道被报告为已就绪的
 *     任何新操作。保留之前记录在准备集中的任何准备信息;换句话说,底层系统返回的准备集被
 *     按位分解为键的当前准备集。
 *     
 *     解释:这个步骤描述的情况是channel的键已经在选举键的情况,什么时候会发生这种情况?
 *     首先要了解,selector.select选择的时候,对于我们调用者来说,感觉只是执行了一个同步方法,
 *     调用了一次,但底层其实是多次轮询监听所有的channel,轮询的过程中调用线程会被阻塞,所以
 *     我们感觉是同步的,底层在到达某一个条件会返回结果,然后唤醒调用线程。
 *     所以同个channel可能在多次轮询中都会发现其某个感兴趣的事件已经就绪,而只有第一次发现
 *     的时候其SelectionKey是不在选择键集中的,这个时候会添加进去,之后轮询到同一个channel
 *     会发现选择键集中已经存在,这个时候上一次轮询标记的就绪事件肯定是不能丢弃的,因为还没处理。
 *     
 *     还有一种情况,就是我们已经处理了SelectionKey的业务逻辑,但是没有将其从选择键集中移除,
 *     所以下一次选择操作,这个SelectionKey还是会存在在选择键集中,并被我们重复处理,同时这个
 *     SelectionKey的准备集中的准备信息,上一次记录的信息还是被保留的。
 *     
 *     由此可以推断出,底层系统返回的通道准备信息,应该是按照二进制位,1代表就绪,0代表未就绪,
 *     每一位代表一个事件,返回这样的二进制数字(虽然readyOps方法返回的是int,进制之间可以随意转换),
 *     而NIO处理的时候,如果SelectionKey不在选择键集中,则直接替换原有的准备集的数字,如果已经存在,
 *     则取原来的准备集数字和现在系统返回的数字进行与预算
 *   </ol>
 *
 *   If all of the keys in the key set at the start of this step have empty
 *   interest sets then neither the selected-key set nor any of the keys'
 *   ready-operation sets will be updated.
 *   2:如果此步骤开始时的键集中的所有键的兴趣集都是空的(0也是空,代表不关注任何事件),
 *   那么选择键集 和 键的任何准备集 都不会被更新。
 *
 *   <li><p> If any keys were added to the cancelled-key set while step (2) was
 *   in progress then they are processed as in step (1). </p></li>
 *   3:如果在步骤(2)进行时向取消的键集添加了任何键,则按照步骤(1)处理它们。
 * </ol>
 *
 * <p> Whether or not a selection operation blocks to wait for one or more
 * channels to become ready, and if so for how long, is the only essential
 * difference between the three selection methods. </p>
 * 选择操作是否阻塞以等待一个或多个通道就绪,如果是,等待多长时间,是三种选择方法之间唯一的本质区别。
 * 
 * 解释:Selector中有三个选择操作的方法,select()/select(long timeout)/selectNow()
 * 这一段描述其实就是这三个方法区别
 * 
 * 
 * ---------------------    华丽的分割线    ---------------------
 * 
 * 
 * <h2>Concurrency</h2>    并发性
 *
 * <p> Selectors are themselves safe for use by multiple concurrent threads;
 * their key sets, however, are not.
 * 选择器本身对于多个并发线程来说是安全的;然而,它们的键集却不是。
 * 
 * 解释:简单来首就是我们直接调用Selector各个api是线程安全的,但是如果我们直接取Selector里
 * 的键集自己修改是线程不安全的
 * 
 * <p> The selection operations synchronize on the selector itself, on the key
 * set, and on the selected-key set, in that order.  They also synchronize on
 * the cancelled-key set during steps (1) and (3) above.
 * 选择操作按顺序同步选择器本身、键集和选择键集。它们还在上面的步骤(1)和(3)中同步已取消的键集。
 * 
 * <p> Changes made to the interest sets of a selector's keys while a
 * selection operation is in progress have no effect upon that operation; they
 * will be seen by the next selection operation.
 * 在进行选择操作时,对选择器键的兴趣集所做的更改不会影响该操作;它们将被下一个选择操作看到。
 * 
 * <p> Keys may be cancelled and channels may be closed at any time.  Hence the
 * presence of a key in one or more of a selector's key sets does not imply
 * that the key is valid or that its channel is open.  Application code should
 * be careful to synchronize and check these conditions as necessary if there
 * is any possibility that another thread will cancel a key or close a channel.
 * SelectionKey可能在任何时候被取消,通道也可能在任何时候被关闭。因此,在一个或多个选择器
 * 的键集中出现一个键并不意味着该键是有效的,或者它的通道是打开的。应用程序代码应该小心同步,
 * 并在必要时检查这些条件,看是否有其他线程取消键或关闭通道的可能性。
 * 
 * <p> A thread blocked in one of the {@link #select()} or {@link
 * #select(long)} methods may be interrupted by some other thread in one of
 * three ways:
 * 在Selector.select()和Selector.select(long)方法中阻塞的线程可能会被其他线程以
 * 以下三种方式中断:
 *
 * <ul>
 *
 *   <li><p> By invoking the selector's {@link #wakeup wakeup} method,
 *   </p></li>
 *   通过调用selector的wakeup方法
 *
 *   <li><p> By invoking the selector's {@link #close close} method, or
 *   </p></li>
 *   通过调用selector的{close方法,或者
 * 
 *   <li><p> By invoking the blocked thread's {@link
 *   java.lang.Thread#interrupt() interrupt} method, in which case its
 *   interrupt status will be set and the selector's {@link #wakeup wakeup}
 *   method will be invoked. </p></li>
 *   通过调用被阻塞线程的link java.lang.Thread#interrupt()方法,该方法会将此线程设置为
 *   中断状态(设置中断状态并不会中断线程),线程执行过程发现自己是中断状态会执行
 *   selector的wakeup方法
 * </ul>
 *
 * <p> The {@link #close close} method synchronizes on the selector and all
 * three key sets in the same order as in a selection operation.
 * close方法以与选择操作相同的顺序同步选择器和所有三个键集。
 * 
 * <a name="ksc"></a>
 *
 * <p> A selector's key and selected-key sets are not, in general, safe for use
 * by multiple concurrent threads.  If such a thread might modify one of these
 * sets directly then access should be controlled by synchronizing on the set
 * itself.  The iterators returned by these sets' {@link
 * java.util.Set#iterator() iterator} methods are <i>fail-fast:</i> If the set
 * is modified after the iterator is created, in any way except by invoking the
 * iterator's own {@link java.util.Iterator#remove() remove} method, then a
 * {@link java.util.ConcurrentModificationException} will be thrown. </p>
 * 通常,对于多个并发线程来说,选择器的键集和选择键集并不安全。
 * 如果有一个这样的线程可能直接修改这些集合中的一个,那么应该通过同步该集合本身来控制访问。
 * 这些集合的{@link java.util.Set#iterator()}方法返回的迭代器是快速失败的:
 * 如果在创建迭代器之后修改了集合,除了调用迭代器自己的remove方法外,使用任何方式进行修改,
 * 都会抛出并发修改异常(第一个例子中会演示这个情况!)
 * 
 * @author Mark Reinhold
 * @author JSR-51 Expert Group
 * @since 1.4
 *
 * @see SelectableChannel
 * @see SelectionKey
 */

public abstract class Selector implements Closeable {
...
}

我们稍微关注一下Selector 的三个选择操作方法:

  • public abstract int select() throws IOException;
    /**
     * Selects a set of keys whose corresponding channels are ready for I/O
     * operations.
     * 选择一组键,它们对应的通道已经为I/O操作准备好了。
     * 
     * <p> This method performs a blocking <a href="#selop">selection
     * operation</a>.  It returns only after at least one channel is selected,
     * this selector's {@link #wakeup wakeup} method is invoked, or the current
     * thread is interrupted, whichever comes first.  </p>
     * 这个方法执行一个阻塞选择操作。!!!
     * 它只在至少一个通道被选中、此选择器的{@link #wakeup wakeup}方法被调用或当前线
     * 程被中断(以最先出现的方式)后返回。
     * 
     * @return  The number of keys, possibly zero,
     *          whose ready-operation sets were updated
     *
     * @throws  IOException
     *          If an I/O error occurs
     *
     * @throws  ClosedSelectorException
     *          If this selector is closed
     */
    public abstract int select() throws IOException;
    
  • public abstract int select(long timeout) throws IOException;
    /**
     * Selects a set of keys whose corresponding channels are ready for I/O
     * operations.
     * 选择一组键,它们对应的通道已经为I/O操作准备好了。
     * 
     * <p> This method performs a blocking <a href="#selop">selection
     * operation</a>.  It returns only after at least one channel is selected,
     * this selector's {@link #wakeup wakeup} method is invoked, the current
     * thread is interrupted, or the given timeout period expires, whichever
     * comes first.
     * 这个方法执行一个阻塞选择操作。它只在至少一个通道被选中、此选择器的
     * {@link #wakeup wakeup}方法被调用、当前线程被中断或给定的超时时
     * 间过期(以最先出现的方式)之后返回。
     * 
     * <p> This method does not offer real-time guarantees: It schedules the
     * timeout as if by invoking the {@link Object#wait(long)} method. </p>
     * 这个方法不提供实时保证:它通过调用{@link Object#wait(long)}方法来调度超时。
     * 
     * @param  timeout  If positive, block for up to <tt>timeout</tt>
     *                  milliseconds, more or less, while waiting for a
     *                  channel to become ready; if zero, block indefinitely;
     *                  must not be negative
     *
     * @return  The number of keys, possibly zero,
     *          whose ready-operation sets were updated
     *
     * @throws  IOException
     *          If an I/O error occurs
     *
     * @throws  ClosedSelectorException
     *          If this selector is closed
     *
     * @throws  IllegalArgumentException
     *          If the value of the timeout argument is negative
     */
    public abstract int select(long timeout) throws IOException;
    
  • public abstract int selectNow() throws IOException;
    /**
     * Selects a set of keys whose corresponding channels are ready for I/O
     * operations.
     * 选择一组键,它们对应的通道已经为I/O操作准备好了。
     * 
     * <p> This method performs a non-blocking <a href="#selop">selection
     * operation</a>.  If no channels have become selectable since the previous
     * selection operation then this method immediately returns zero.
     * 该方法执行一个非阻塞选择操作。如果自上一个选择操作以来没有通道成为可选择的,
     * 那么此方法将立即返回零。
     * 
     * <p> Invoking this method clears the effect of any previous invocations
     * of the {@link #wakeup wakeup} method.  </p>
     * 调用此方法将清除以前调用{@link #wakeup wakeup}方法的效果。
     * 
     * @return  The number of keys, possibly zero, whose ready-operation sets
     *          were updated by the selection operation
     *
     * @throws  IOException
     *          If an I/O error occurs
     *
     * @throws  ClosedSelectorException
     *          If this selector is closed
     */
    public abstract int selectNow() throws IOException;
    

总结:三个方法区别就是阻塞还是不阻塞,阻塞是一直阻塞,还是阻塞指定的最长时间

IO多路复用的机制区别,简单说明,后期会专门开一篇介绍:

  • select/poll:在用户态维护的连接链表,每次select需要把所有连接复制到内核态,并轮询所有的连接,在将结果返回到用户态,并再次遍历所有连接,找到就绪的连接
    • select用的是数组维护连接句柄,poll用的是链表结构维护连接句柄,所以select有大小限制
  • epoll(linux支持):连接链表维护在内核态,只收集发生事件的连接,将发生事件的连接返回到用户态去处理

Netty用的是poll方式

SelectionKey抽象类简介

看下类介绍,有些概念已经在Selector中介绍过了:

/**
 * A token representing the registration of a {@link SelectableChannel} with a
 * {@link Selector}.
 * 一个表示{@link SelectableChannel}注册到{@link Selector}的标记(令牌)。
 * 
 * <p> A selection key is created each time a channel is registered with a
 * selector.  A key remains valid until it is <i>cancelled</i> by invoking its
 * {@link #cancel cancel} method, by closing its channel, or by closing its
 * selector.  Cancelling a key does not immediately remove it from its
 * selector; it is instead added to the selector's <a
 * href="Selector.html#ks"><i>cancelled-key set</i></a> for removal during the
 * next selection operation.  The validity of a key may be tested by invoking
 * its {@link #isValid isValid} method.
 * 每次向selector注册通道时,都会创建选择键。一个键保持有效,直到它被取消了,取消可以通过调用它的
 * {@link #cancel cancel}方法,或者关闭它的通道,或者关闭它的选择器。
 * 取消一个键不会立即将其从选择器中移除;取代的是它被添加到选择器的 cancelleled -key set
 * 用于在下一次选择操作中删除。
 * 一个key的有效性可以通过调用它的{@link #isValid isValid}方法来测试。
 * 
 * <a name="opsets"></a>
 *
 * <p> A selection key contains two <i>operation sets</i> represented as
 * integer values.  Each bit of an operation set denotes a category of
 * selectable operations that are supported by the key's channel.
 * 一个选择键包含两个用整数值表示的操作集。操作集的每个二进制位表示键的通道支持的可选择操作的类别。
 * 
 * <ul>
 *
 *   <li><p> The <i>interest set</i> determines which operation categories will
 *   be tested for readiness the next time one of the selector's selection
 *   methods is invoked.  The interest set is initialized with the value given
 *   when the key is created; it may later be changed via the {@link
 *   #interestOps(int)} method. </p></li>
 *   兴趣集决定了在下一次调用选择器的选择方法时将测试哪些操作类别是否准备就绪。
 *   在创建选择键的时候,用给定的值初始化兴趣集;之后可以通过{@link #interestOps(int)}方法对其
 *   进行更改。
 * 
 *   解释:channel注册到Selector的时候会用channel.register(Selector sel, int ops)方法,
 *   ops参数就是兴趣集,是一个整数,转换成二进制后,每个二进制位代表一种操作类型,1代表关注
 *   注册完以后,之后可以通过调用该channel对应的SelectionKey的interestOps进行查询或修改
 *   
 *   <li><p> The <i>ready set</i> identifies the operation categories for which
 *   the key's channel has been detected to be ready by the key's selector.
 *   The ready set is initialized to zero when the key is created; it may later
 *   be updated by the selector during a selection operation, but it cannot be
 *   updated directly. </p></li>
 *   准备集标识了该键的通道已被键的selector检测为就绪的所有操作类别。当选择键被创建时,
 *   准备集被初始化为零;它可能会在之后的选择操作期间由选择器更新,但不能直接更新。
 * </ul>
 *
 * <p> That a selection key's ready set indicates that its channel is ready for
 * some operation category is a hint, but not a guarantee, that an operation in
 * such a category may be performed by a thread without causing the thread to
 * block.  A ready set is most likely to be accurate immediately after the
 * completion of a selection operation.  It is likely to be made inaccurate by
 * external events and by I/O operations that are invoked upon the
 * corresponding channel.
 * 选择键的准备集表明它的通道为某个操作类别准备好了,这是一种提示,但不是保证,
 * 说明此类类别中的操作可以由线程执行,而不会导致线程阻塞。准备集最有可能在选择
 * 操作立即完成后是准确的。外部事件和在相应通道上调用的I/O操作可能会使其不准确。
 * 
 * <p> This class defines all known operation-set bits, but precisely which
 * bits are supported by a given channel depends upon the type of the channel.
 * Each subclass of {@link SelectableChannel} defines an {@link
 * SelectableChannel#validOps() validOps()} method which returns a set
 * identifying just those operations that are supported by the channel.  An
 * attempt to set or test an operation-set bit that is not supported by a key's
 * channel will result in an appropriate run-time exception.
 * 这个类定义了所有已知的操作集位,但是给定通道所支持的位的精确程度取决于通道的类型。
 * {@link SelectableChannel}的每个子类都定义了一个
 * {@link SelectableChannel#validOps() validOps()}方法,该方法返回一个集合,
 * 标识通道支持的那些操作。尝试设置或测试SelectionKey的通道不支持的操作集位将导致适当的运行时异常。
 * 
 * <p> It is often necessary to associate some application-specific data with a
 * selection key, for example an object that represents the state of a
 * higher-level protocol and handles readiness notifications in order to
 * implement that protocol.  Selection keys therefore support the
 * <i>attachment</i> of a single arbitrary object to a key.  An object can be
 * attached via the {@link #attach attach} method and then later retrieved via
 * the {@link #attachment() attachment} method.
 * 通常需要将一些特定于应用程序的数据与选择键相关联,例如表示高级协议状态并处理准备就绪通知以便
 * 实现该协议的对象。因此选择键支持一个任意对象对一个键的附件。对象可以通过
 * {@link #attach attach}方法附加,然后通过{@link #attachment() attachment}方法检索。
 * 
 * <p> Selection keys are safe for use by multiple concurrent threads.  The
 * operations of reading and writing the interest set will, in general, be
 * synchronized with certain operations of the selector.  Exactly how this
 * synchronization is performed is implementation-dependent: In a naive
 * implementation, reading or writing the interest set may block indefinitely
 * if a selection operation is already in progress; in a high-performance
 * implementation, reading or writing the interest set may block briefly, if at
 * all.  In any case, a selection operation will always use the interest-set
 * value that was current at the moment that the operation began.  </p>
 * 选择键对于多个并发线程来说是安全的。对兴趣集的读写操作通常会与选择器的某些操作同步。
 * 确切地说,同步是如何执行的取决于实现:在一个幼稚的实现中,如果选择操作已经在进行,
 * 那么对兴趣集的读写操作可能会无限期地阻塞;在高性能实现中,对兴趣集的读写操作可能会短暂阻塞。
 * 在任何情况下,选择操作都将始终使用操作开始时的当前兴趣集值。
 *
 * @author Mark Reinhold
 * @author JSR-51 Expert Group
 * @since 1.4
 *
 * @see SelectableChannel
 * @see Selector
 */

public abstract class SelectionKey {
...
}

类描述中提到一个SelectionKey 包含两个用整数值表示的操作集,一个是兴趣集(表示需要关注的事件),一个是准备集(表示已经准备就绪的事件),我们看一下兴趣集中都有哪些类型的操作,SelectionKey中定义了四种类别:
在这里插入图片描述

1 << 0 :1 读操作
1<< 2 :4 写操作
1<< 3 :8 连接操作(指客户端主动连接)
1<< 4 :16 接受操作(指客户端被动接受客户端的连接)>

相加可以组合多个操作
1+4 = 5,即5代表读和写操作
1+4+8

channel注册到Selector的时候需要指定该通道感兴趣的事件,通常调用的是这个API:
java.nio.channels.SelectableChannel#register(java.nio.channels.Selector, int)

在简单看一下SelectionKey的API:
在这里插入图片描述

2. NIO 通信 14-niosocket

这里使用 NIO 实现一个简单的 C/S 通信:Client 向 Server 发送一个数据,显示在 Server端控制台。

(1) 创建工程

创建一个普通的 Maven 的 java 工程即可:14-niosocket

(2) 修改 pom

无需添加任何依赖。但需要指定编译器版本。

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

(3) 定义客户端

public class NioClient {
    public static void main(String[] args) throws Exception {
        // 创建客户端channel
        SocketChannel clientChannel = SocketChannel.open();
        // 指定channel使用非阻塞模式
        clientChannel.configureBlocking(false);
        // 指定要连接的Server地址
        InetSocketAddress serverAddr = new InetSocketAddress("localhost", 8888);
        // 连接Server
        if (!clientChannel.connect(serverAddr)) {   // 首次连接
            while (!clientChannel.finishConnect()) {   // 完成重连
                System.out.println("连接不上server,正在尝试连接中。。。");
                continue;
            }
        }
        // 将消息写入到channel
        clientChannel.write(ByteBuffer.wrap("hello".getBytes()));
        System.out.println("Client消息已发送");

        System.in.read();
    }
}

注意:SocketChannel的connect()和finishConnect()方法区别

  • connect()
        /**
         * Connects this channel's socket.
         * 连接此通道的套接字。
         * 
         * <p> If this channel is in non-blocking mode then an invocation of this
         * method initiates a non-blocking connection operation.  If the connection
         * is established immediately, as can happen with a local connection, then
         * this method returns <tt>true</tt>.  Otherwise this method returns
         * <tt>false</tt> and the connection operation must later be completed by
         * invoking the {@link #finishConnect finishConnect} method.
         * 如果此通道处于非阻塞模式,则调用此方法将启动一个非阻塞连接操作。
         * 如果立即建立了连接,就像本地连接一样,那么该方法返回true。
         * 否则,此方法将返回false,随后必须通过调用
         * {@link #finishConnect finishConnect}方法来完成连接操作。
         * 
         * <p> If this channel is in blocking mode then an invocation of this
         * method will block until the connection is established or an I/O error
         * occurs.
         * 如果此通道处于阻塞模式,则此方法的调用将阻塞,直到连接建立或发生I/O错误为止。
         *
         * <p> This method performs exactly the same security checks as the {@link
         * java.net.Socket} class.  That is, if a security manager has been
         * installed then this method verifies that its {@link
         * java.lang.SecurityManager#checkConnect checkConnect} method permits
         * connecting to the address and port number of the given remote endpoint.
         * 这个方法执行和{@link java.net.Socket}类完全相同的安全检查。也就是说,如果安装了
         * 安全管理器,则此方法将验证其
         * {@link java.lang.SecurityManager#checkConnect checkConnect}方法是
         * 否允许连接到给定远程端点的地址和端口号。
         * 
         * <p> This method may be invoked at any time.  If a read or write
         * operation upon this channel is invoked while an invocation of this
         * method is in progress then that operation will first block until this
         * invocation is complete.  If a connection attempt is initiated but fails,
         * that is, if an invocation of this method throws a checked exception,
         * then the channel will be closed.  </p>
         * 此方法可以在任何时候调用。如果在调用此方法时调用了该通道上的读或写操作,那么该操作将首先
         * 阻塞,直到此方法调用完成。如果一个连接请求发起但是失败了,也就是说,如果调用此方法抛出
         * 一个检查异常,则通道将被关闭。
         * ...
         */
        public abstract boolean connect(SocketAddress remote) throws IOException;
    
  • finishConnect()
    /**
     * Finishes the process of connecting a socket channel.
     * 完成连接套接字通道的过程。
     * 
     * <p> A non-blocking connection operation is initiated by placing a socket
     * channel in non-blocking mode and then invoking its {@link #connect
     * connect} method.  Once the connection is established, or the attempt has
     * failed, the socket channel will become connectable and this method may
     * be invoked to complete the connection sequence.  If the connection
     * operation failed then invoking this method will cause an appropriate
     * {@link java.io.IOException} to be thrown.
     * 非阻塞连接操作是通过将套接字通道置于非阻塞模式,然后调用其{@link #connect connect}方
     * 法来启动的。一旦建立了连接,或者尝试失败,都将会让套接字通道将变得可连接,并且可以
     * 调用此方法继续完成连接。如果连接操作失败,那么调用此方法将导致一个合适的
     * {@link java.io.IOException}抛出。
     * 
     * <p> If this channel is already connected then this method will not block
     * and will immediately return <tt>true</tt>.  If this channel is in
     * non-blocking mode then this method will return <tt>false</tt> if the
     * connection process is not yet complete.  If this channel is in blocking
     * mode then this method will block until the connection either completes
     * or fails, and will always either return <tt>true</tt> or throw a checked
     * exception describing the failure.
     * 如果这个通道已经连接成功,那么这个方法就不会阻塞,并且会立即返回true。如果此通道处
     * 于非阻塞模式,则如果连接过程尚未完成,此方法将返回false。如果此通道处于阻塞模式,
     * 则此方法将阻塞,直到连接完成或失败为止,并且总是返回true或抛出一个检查异常描述失败。
     * 
     * <p> This method may be invoked at any time.  If a read or write
     * operation upon this channel is invoked while an invocation of this
     * method is in progress then that operation will first block until this
     * invocation is complete.  If a connection attempt fails, that is, if an
     * invocation of this method throws a checked exception, then the channel
     * will be closed.  </p>
     * 此方法可以在任何时候调用。如果在调用此方法的过程中调用了该通道上的读或写操作,那么读和写
     * 这些该操作将首先阻塞,直到本方法调用完成。如果连接尝试失败,也就是说,如果调用此方法
     * 引发检查异常,则通道将被关闭。
     * ...
     */
    public abstract boolean finishConnect() throws IOException;
    

(4) 定义服务端

public class NioServer {
    public static void main(String[] args) throws Exception {
        // 创建一个服务端Channel
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 指定channel采用的为非阻塞模式
        serverChannel.configureBlocking(false);
        // 指定要监听的端口
        serverChannel.bind(new InetSocketAddress(8888));
        // 创建一个多路复用器selector
        Selector selector = Selector.open();
        // 将channel注册到selector,并告诉selector让其监听“接收Client连接事件”
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // select()是一个阻塞方法,若阻塞1秒的时间到了,或在阻塞期间有channel就绪,都会打破阻塞
            if (selector.select(1000) == 0) {
                System.out.println("当前没有找到就绪的channel");
                continue;
            }

            // 代码能走到这里,说明已经有channel就绪
            // 获取所有就绪的channel的SelectionKey
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            // 遍历所有就绪的key
            for (SelectionKey key : selectionKeys) {
                // 若当前key为OP_ACCEPT,则说明当前channel是可以接收客户端连接的。
                // 那么,这里的代码就是用于接收客户端连接的
                if (key.isAcceptable()) {
                    System.out.println("接收到Client的连接");
                    // 获取连接到Server的客户端channel,其是客户端channel在server端的代表(驻京办)
                    SocketChannel clientChannel = serverChannel.accept();
                    clientChannel.configureBlocking(false);
                    // 将客户端channel注册到selector,并告诉selector让其监听这个channel中是否发生了读事件
                    clientChannel.register(selector, SelectionKey.OP_READ);
                }
                // 若当前key为OP_READ,则说明当前channel中有客户端发送来的数据。
                // 那么,这里的代码就是用于读取channel中的数据的
                if (key.isReadable()) {
                    try {
                        // 创建buffer
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        // 根据key获取其对应的channel
                        SocketChannel clientChannel = (SocketChannel) key.channel();
                        // 把channel中的数据读取到buffer
                        clientChannel.read(buffer);
                    } catch (Exception e) {
                        // 若在读取过程中发生异常,则直接取消该key,即放弃该channel
                        key.cancel();
                    }
                }

                // 删除当前处理过的key,以免重复处理
                selectionKeys.remove(key);
            } // end-for
        }

    }
}

注意这段实际上执行的时候会有问题,selectionKeys在有多个的情况下,使用for循环,直接用集合的remove(key)方法会导致并发修改异常:
在这里插入图片描述
改成迭代器的遍历方式即可:

public class NioServer2 {
    public static void main(String[] args) throws Exception {
        // 创建一个服务端Channel
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 指定channel采用的为非阻塞模式
        serverChannel.configureBlocking(false);
        // 指定要监听的端口
        serverChannel.bind(new InetSocketAddress(8888));
        // 创建一个多路复用器selector
        Selector selector = Selector.open();
        // 将channel注册到selector,并告诉selector让其监听“接收Client连接事件”
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // select()是一个阻塞方法,若阻塞1秒的时间到了,或在阻塞期间有channel就绪,都会打破阻塞
            if (selector.select(1000) == 0) {
                System.out.println("当前没有找到就绪的channel");
                continue;
            }

            // 代码能走到这里,说明已经有channel就绪
            // 获取所有就绪的channel的key
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            // 遍历所有就绪的key
            while (it.hasNext()) {
                SelectionKey key = it.next();
                // 若当前key为OP_ACCEPT,则说明当前channel是可以接收客户端连接的。
                // 那么,这里的代码就是用于接收客户端连接的
                if (key.isAcceptable()) {
                    System.out.println("接收到Client的连接");
                    // 获取连接到Server的客户端channel,其是客户端channel在server端的代表(驻京办)
                    SocketChannel clientChannel = serverChannel.accept();
                    clientChannel.configureBlocking(false);
                    // 将客户端channel注册到selector,并告诉selector让其监听这个channel中是否发生了读事件
                    clientChannel.register(selector, SelectionKey.OP_READ);
                }
                // 若当前key为OP_READ,则说明当前channel中有客户端发送来的数据。
                // 那么,这里的代码就是用于读取channel中的数据的
                if (key.isReadable()) {
                    try {
                        // 创建buffer
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        // 根据key获取其对应的channel
                        SocketChannel clientChannel = (SocketChannel) key.channel();
                        // 把channel中的数据读取到buffer
                        clientChannel.read(buffer);
                    } catch (Exception e) {
                        // 若在读取过程中发生异常,则直接取消该key,即放弃该channel
                        key.cancel();
                    }
                }

                // 删除当前正在迭代的的key,以免重复处理
                it.remove();
            }
        }

    }
}

如果只开启客户端的效果:
在这里插入图片描述

3. NIO 群聊 15-niochat

该工程实现的功能是:只要有 Client 启动、发送消息,及下线,都会广播给所有其它Client 通知。

(1) 创建工程

创建一个普通的 Maven 的 java 工程即可:15-niochat

(2) 修改 pom

无需添加任何依赖。但需要指定编译器版本。

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

(3) 定义 NioChatServerStarter

public class NioChatServerStarter {
    public static void main(String[] args) throws Exception {
        // 创建一个服务端Channel
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 指定channel采用的为非阻塞模式
        serverChannel.configureBlocking(false);
        // 指定要监听的端口
        serverChannel.bind(new InetSocketAddress(8888));
        // 创建一个多路复用器selector
        Selector selector = Selector.open();
        // 将channel注册到selector
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        // 创建支持群聊的nio server
        NioChatServer chatServer = new NioChatServer();
        chatServer.enableChat(serverChannel, selector);
    }
}

(4) 定义 NioChatServer

public class NioChatServer {
    // 开启Server的群聊支持功能
    public void enableChat(ServerSocketChannel serverChannel, Selector selector) throws Exception {
        System.out.println("chatServer启动。。。");
        while (true) {
            if (selector.select(1000) == 0) {
                continue;
            }

            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            for (SelectionKey key : selectionKeys) {
                // 处理客户端上线
                // 处理连接情况,只要有Client连接到服务器,就广播给所有Client通知:
                if (key.isAcceptable()) {
                    SocketChannel clientChannel = serverChannel.accept();
                    clientChannel.configureBlocking(false);
                    clientChannel.register(selector, SelectionKey.OP_READ);
                    // 获取到client地址
                    String msg = clientChannel.getRemoteAddress() + "-上线了";
                    // 将上线通知广播给所有在线的其它client
                    sendMSGToOtherClientOnline(selector, clientChannel, msg);
                }

                // 处理客户端发送消息情况:
                if (key.isReadable()) {
                    SocketChannel clientChannel = (SocketChannel)key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    clientChannel.read(buffer);
                    // 获取来自于client的消息,trim()将buffer中没有数据的内容转为的空格去掉
                    String msgFromClient = new String(buffer.array()).trim();
                    if (msgFromClient.length() > 0) {
                        // 获取到client地址
                        SocketAddress clientAddr = clientChannel.getRemoteAddress();
                        // 构成要发送给其它client的消息
                        String msgToSend = clientAddr + " say:" + msgFromClient;
                        // 处理客户端下线的情况:
                        // 下线,如果客户端直接关闭的话对于服务端来说,对应的通道
                        // 会变成可读的,但是尝试读取这个消息内容是会报错的
                        // 所以我们主动让客户端发送一个”88”代表要下线了
                        // 若client发送的是字符串"88",则表示其要下线
                        if ("88".equals(msgFromClient)) {
                            msgToSend = clientAddr + "下线";
                            // 取消当前key,即放弃其所对应的channel,
                            // 将其对应的channel从selector中去掉
                            key.cancel();
                        }
                        // 将client消息广播给所有在线的其它client
                        sendMSGToOtherClientOnline(selector, clientChannel, msgToSend);
                    }
                }  // end-if
                selectionKeys.remove(key);
            } // end-for
        }
    }
//处理发送消息逻辑:
//将client消息广播给所有在线的其它client
    private void sendMSGToOtherClientOnline(Selector selector, SocketChannel self, String msg) throws IOException {
        // 遍历所有注册到selector的channel,即所有在线的client
        for (SelectionKey key : selector.keys()) {
            SelectableChannel channel = key.channel();
            // 将消息发送给所有其它client
            // 需要判断channel类型,因为我们服务端的channel和客户端的channel注册
            // 的都是一个selector
            if (channel instanceof SocketChannel && channel != self) {
                ((SocketChannel) channel).write(ByteBuffer.wrap(msg.trim().getBytes()));
            }
        }
    }
}

Selector注册需要关注的事件后,Selector就会监控这些事件是否发生,如果事件来了表示已经就绪了,就可以选择发生该事件的通道让线程去处理,而服务端在处理SelectionKey的时候通常不需要判断key.isWritable()和key.isConnectable()这两种就绪事件:

  • 连接:如果有客户端连接我,我这就接受该连接,一接受连接就发送了接受事件,通道就会变为就绪,Selector就能选上
  • 可读:只要有客户端给我写数据,我就开始读数据,只要我一读,就发送了读事件,通道就会变为就绪,Selector就能选上

上面两个情况都是被动的,所以需要Selector帮我们监听,而write和connect是自己干的事,主动去做的,不需要监听,自己想写就写,想连就连

上面代码在处理关闭逻辑的时候,我是判断client发送的是字符串”88″,则表示其要下线,实际上直接关闭客户端也是可以的,但是要注意:
在这里插入图片描述

(5) 定义 NioChatClientStarter

public class NioChatClientStarter {
    public static void main(String[] args) throws Exception {
        // 创建客户端channel
        SocketChannel clientChannel = SocketChannel.open();
        // 指定channel使用非阻塞模式
        clientChannel.configureBlocking(false);
        // 指定要连接的Server地址
        InetSocketAddress serverAddr = new InetSocketAddress("localhost", 8888);
        // 连接Server
        if (!clientChannel.connect(serverAddr)) {   // 首次连接
            while (!clientChannel.finishConnect()) {   // 完成重连
                System.out.println("连接不上server,正在尝试连接中。。。");
                continue;
            }
        }
        // 创建群聊客户端,启动聊天功能
        NioChatClient chatClient = new NioChatClient();
        chatClient.enableChat(clientChannel);
    }
}

(6) 定义 NioChatClient

启动聊天功能,需要接受来自Server的消息,并可以像Server发送消息

我们需要启动一个线程监听来自Server的消息

public class NioChatClient {
    // 启动聊天功能
    public void enableChat(SocketChannel clientChannel) throws Exception {
        // 获取client自己的地址
        SocketAddress selfAddr = clientChannel.getLocalAddress();
        System.out.println(selfAddr + ",你已经成功上线了");

        // 创建一个线程用于不间断地接收来自于Server的消息
        new Thread() {
            @Override
            public void run() {
                // 实现不间断
                while (true) {
                    // 接收来自于server的消息
                    try {
                        // 若当前client已经关闭,则结束循环,
                        // 否则正常接收来自Server的消息
                        if (!clientChannel.isConnected()) {
                            return;
                        }
                        receiveMsgFromServer(clientChannel);
                        TimeUnit.SECONDS.sleep(1);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();

        // 注意,该方法不能写到前面的创建线程之前,这样会导致无法接收到来自于Server的消息,
        // 因为该方法中的Scanner是阻塞的
        // 向server发送消息
        sendMsgToServer(clientChannel);
    }

    // 向server发送消息
    private void sendMsgToServer(SocketChannel clientChannel) throws Exception {
        // 接收来自于键盘的输入
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            String msg = scanner.nextLine();
            // 将消息写入到channel,其中有可能是下线请求消息88
            clientChannel.write(ByteBuffer.wrap(msg.trim().getBytes()));
            // 若消息为88,则表示当前client要下线,则将该channel关闭
            if ("88".equals(msg.trim())) {
                // 关闭客户端
                clientChannel.close();
                return;
            }
        }
    }
//从服务端接受消息:
    private void receiveMsgFromServer(SocketChannel clientChannel) throws Exception {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        clientChannel.read(buffer);
        String msg = new String(buffer.array()).trim();
        if (msg.length() > 0) {
            System.out.println(msg);
        }
    }
}

演示

在这里插入图片描述

 

 

Netty(六)源码解析 之简述NIO、用NIO实现Socket通信 和 群聊_netty scoket nio-CSDN博客

请登录后发表评论

    没有回复内容