Netty入门 (一)概述(核心概念、执行流程)-Java专区论坛-技术-SpringForAll社区

Netty入门 (一)概述(核心概念、执行流程)

Netty 入门

1. Netty 概述

1.1 Netty 简介

Netty 官网上可以看到最权威的介绍:
在这里插入图片描述

  • Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能服务器和客户端。
  • Netty 是一个 NIO 客户机-服务器框架,它支持快速、简单地开发网络应用程序,如服务器和客户机。它大大简化了网络编程,如 TCP 和 UDP 套接字服务器。
  • “快速和简单”并不意味着生成的应用程序将受到可维护性或性能问题的影响。Netty经过精心设计,并积累了许多协议(如 ftp、smtp、http)的实施经验,以及各种二进制和基于文本的遗留协议。因此,Netty 成功地找到了一种方法,在不妥协的情况下实现了易于开发、性能、稳定性和灵活性。

1.2 谁在使用 Netty

Dubbo、zk、RocketMQ、ElasticSearch、Spring5(对 HTTP 协议的实现)、GRpc、Spark 等大型开源项目都在使用 Netty 作为底层通讯框架。

1.3 Netty 中的核心概念

(1) Channel

  • 管道,其是对 Socket 的封装,其包含了一组 API,大大简化了直接与 Socket 进行操作的复杂性。

(2) EventLoopGroup

  • EventLoopGroup 是一个 EventLoop 池,包含很多的 EventLoop。
  • Netty 为每个 Channel 分配了一个 EventLoop,用于处理用户连接请求、对用户请求的处理等所有事件。EventLoop 本身只是一个线程驱动,在其生命周期内只会绑定一个线程,让该线程处理一个 Channel 的所有 IO 事件。
  • 一个 Channel 一旦与一个 EventLoop 相绑定,那么在 Channel 的整个生命周期内是不能改变的。一个 EventLoop 可以与多个 Channel 绑定。即 Channel 与 EventLoop 的关系是 n:1,而 EventLoop 与线程的关系是 1:1。(一个线程可以处理多个Channel)

(3) ServerBootStrap

  • 启动类,用于配置整个 Netty 代码,将各个组件关联起来。服务端使用的是 ServerBootStrap,而客户端使用的是则 BootStrap。

(4) ChannelHandler 与 ChannelPipeline

  • ChannelHandler 是对 Channel 中数据的处理器,这些处理器可以是系统本身定义好的编解码器,也可以是用户自定义的。这些处理器会被统一添加到一个 ChannelPipeline 的对象中,然后按照添加的顺序对 Channel 中的数据进行依次处理。

(5) ChannelFuture

  • Netty 中所有的 I/O 操作都是异步的,即操作不会立即得到返回结果,所以 Netty 中定义了一个 ChannelFuture 对象作为这个异步操作的“代言人”,表示异步操作本身。如果想获取到该异步操作的返回值,可以通过该异步操作对象的 addListener()方法为该异步操作添加监听器,为其注册回调:当结果出来后马上调用执行。
  • Netty 的异步编程模型都是建立在 Future 与回调概念之上的。

1.4 Netty 执行流程

在这里插入图片描述

2. Demo1:使用Netty响应Http请求

通过该程序达到的目的是,对 Netty 编程的基本结构及流程有所了解。
该程序是通过 Netty 实现 HTTP 请求的处理,即接收 HTTP 请求,返回 HTTP 响应。

2.1 创建工程 01-primary

创建一个普通的 Maven 的 Java 工程:01-primary

2.2 导入依赖

仅导入一个 netty-all 依赖即可。

<dependencies>
<!-- netty-all 依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.36.Final</version>
</dependency>
<!--lombok 依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
<scope>provided</scope>
</dependency>
</dependencies>

2.3 定义服务器启动类

该服务器就是用于创建并初始化服务器启动对象 ServerBootStrap。

// 服务器启动类
public class SomeServer {
    public static void main(String[] args) throws InterruptedException {

        // 用于处理客户端连接请求,将请求发送给childGroup中的eventLoop
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        // 用于处理客户端请求
        EventLoopGroup childGroup = new NioEventLoopGroup();

        try {
            // 用户启动ServerChannel
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)  // 指定eventLoopGroup
                    .channel(NioServerSocketChannel.class)  // 指定使用NIO进行通信
                    .childHandler(new SomeChannelInitializer());   // 指定childGroup中的eventLoop所绑定的线程所要处理的处理器

            // 指定当前服务器所监听的端口号
            // bind()方法的执行是异步的
            // sync()方法会使bind()操作与后续的代码的执行由异步变为了同步
            ChannelFuture future = bootstrap.bind(8888).sync();
            System.out.println("服务器启动成功。监听的端口号为:8888");
            // 关闭Channel
            // closeFuture()的执行是异步的。
            // 当Channel调用了close()方法并关闭成功后才会触发closeFuture()方法的执行
            future.channel().closeFuture().sync();
        } finally {
            // 优雅关闭
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}

注意点:

  • ServerBootstrap.handler指定的处理器是让parentGroup里面的线程绑定用的,childHandler指定的处理器是让childGroup里面的线程绑定用的(先简单理解,以后源码分析会探究)
  • 并不是只有NIO,还可以用Oio(Oio已经过时了),但是EventLoopGroup和Channel通道类型要换成io.netty.channel.oio.OioEventLoopGroup和io.netty.channel.socket.oio.OioServerSocketChannel,一般EventLoopGroup和Channel通道类型要配对使用
  • NioEventLoopGroup的实现使用了Nio的Selector(事件分发器)和Channel(以后源码分析会探究)
    在这里插入图片描述
  • bootstrap.bind方法():该方法会创建一个Channel,并绑定一个端口,且该方法是异步的,会先返回一个异步操作结果对象
  • future.channel().clostFuture():该方法并不是一个关闭方法!而是返回了一个通道关闭操作的异步结果对象,当通道关闭的时候这个对象会被通知,并且这个方法返回的总是同一个实例
    在这里插入图片描述即当其他线程调用这个channel的close()方法后,我们可以通过这个异步操作结果对象获得关闭操作的结果等信息
    在这里插入图片描述
  • ChannelFuture:只要一个方法的返回值是它,说明这个方法的执行就是异步的
    /**
     * The result of an asynchronous {@link Channel} I/O operation.
     * 异步{@link Channel} I/O操作的结果。
     * <p>
     * All I/O operations in Netty are asynchronous.  It means any I/O calls will
     * return immediately with no guarantee that the requested I/O operation has
     * been completed at the end of the call.  Instead, you will be returned with
     * a {@link ChannelFuture} instance which gives you the information about the
     * result or status of the I/O operation.
     * 所有的I/O操作在Netty是异步的。这意味着任何I/O调用将立即返回,
     * 而不能保证所请求的I/O操作已经在调用结束时完成。相反,你将返回
     * 一个{@link ChannelFuture}实例,它给你关于I/O操作的结果或状态的信息。
     * ...
     */
     public interface ChannelFuture extends Future<Void> {...}
    
  • .sync():ChannelFuture对象的sync方法可以让异步变为同步,即让当前线程阻塞,直到这个操作执行结束才被唤醒

2.4 定义管道初始化器

定义的管道初始化器是通过bootstrap.childHandler使用的 :

ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)  // 指定eventLoopGroup
                    .channel(NioServerSocketChannel.class)  // 指定使用NIO进行通信
                    .childHandler(new SomeChannelInitializer()); 

定义我们的管道初始化器:

// 管道初始化器
// 当前类的实例在pipeline初始化完毕后就会被GC
public class SomeChannelInitializer extends ChannelInitializer<SocketChannel> {

    // 当Channel初始创建完毕后就会触发该方法的执行,用于初始化Channel
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        // 从Channel中获取pipeline
        ChannelPipeline pipeline = ch.pipeline();
        // 将HttpServerCodec处理器放入到pipeline的最后
        // HttpServerCodec是什么?是HttpRequestDecoder与HttpResponseEncoder的复合体
        // HttpRequestDecoder:http请求解码器,将Channel中的ByteBuf数据解码为HttpRequest对象
        // HttpResponseEncoder:http响应编码器,将HttpResponse对象编码为将要在Channel中发送的ByteBuf数据
        pipeline.addLast(new HttpServerCodec());
        // 将自再定义的处理器放入到Pipeline的最后
        pipeline.addLast(new SomeServerHandler());
    }
}

注意:

  • ChannelInitializer继承体系
    ChannelInitializer -> ChannelInboundHandlerAdapter -> ChannelInboundHandlerAdapter -> ChannelHandlerAdapter -> ChannelHandler
  • 添加处理器常用方法addLast,可以为处理器命名,方便后续对处理器获取操作
    io.netty.channel.ChannelPipeline#addLast(java.lang.String, io.netty.channel.ChannelHandler)
    (如果不指定处理器名会自动生成一个名字,后期源码分析会看到)
  • HttpServerCodec是HttpRequestDecoder和HttpResponseEncoder的复合体,可以轻易的实现服务端的HTTP
    在这里插入图片描述

2.5 定义服务端处理器

在管道初始化器中,通过pipeline添加我们自定义的处理器:

pipeline.addLast(new SomeServerHandler());

自定义的处理器继承ChannelInboundHandlerAdapter,重写channelRead方法,通道一但收到消息就会触发该方法:
在这里插入图片描述
重写exceptionCaught方法,通道一但发送异常就会触发该方法:
一但发生异常我们就关闭通道
在这里插入图片描述
我们启动先简单测试一下:
在这里插入图片描述
在这里插入图片描述

可以看到一次请求收到消息有两种,我们需要处理的是DefaultHttpRequest

注意:

  • 这个DefaultHttpRequest的继承体系:DefaultHttpRequest -> HttpRequest
    这个HttpRequest是Netty定义的,和Servlet的HttpRequest不是一个东西,没有任何关系

先简单打印一些信息:
在这里插入图片描述
在这里插入图片描述

接着我们希望给客户端响应

注意:

创建响应对象:

// 构造response的响应体
ByteBuf body = Unpooled.copiedBuffer("hello netty world", CharsetUtil.UTF_8);
// 生成响应对象
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, body);
// 获取到response的头部后进行初始化
HttpHeaders headers = response.headers();
headers.set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
headers.set(HttpHeaderNames.CONTENT_LENGTH, body.readableBytes());

将响应对象放入Channel:

注意:

  • ctx.write只是写入到缓存,ctx.flush才会将缓存内容放入Channel
    所以一般我们直接调用ctx.writeAndFlush
// 将响应对象写入到Channel
// ctx.write(response);
// ctx.flush();
ctx.writeAndFlush(response);

我们可以添加一个监听器,一但响应客户端结束,马上关闭Channel

// 将响应对象写入到Channel
// ctx.write(response);
// ctx.flush();
// ctx.writeAndFlush(response);
// ctx.channel().close();
ctx.writeAndFlush(response)
        // 添加channel关闭监听器
        .addListener(ChannelFutureListener.CLOSE);
public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> {

    /**
     * A {@link ChannelFutureListener} that closes the {@link Channel} which is
     * associated with the specified {@link ChannelFuture}.
     */
    ChannelFutureListener CLOSE = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            future.channel().close();
        }
    };
    ...
}

测试:
在这里插入图片描述

发现服务端多处理了一个请求/favicon.ico
在这里插入图片描述
在这里插入图片描述

注意:

  • SpringMVC就不会出现这个问题,已经帮我们处理过了

我们可以过滤不处理/facicon.ico请求

if(msg instanceof HttpRequest) {
    HttpRequest request = (HttpRequest) msg;
    System.out.println("请求方式:" + request.method().name());
    System.out.println("请求URI:" + request.uri());

    if("/favicon.ico".equals(request.uri())) {
        System.out.println("不处理/favicon.ico请求");
        return;
    }

在这里插入图片描述

总结

  • 可以使用匿名内部类代替ChannelInitializer,因为实例本身意义不大,就只用一次
    // 用户启动ServerChannel
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(parentGroup, childGroup)  // 指定eventLoopGroup
            .channel(NioServerSocketChannel.class)  // 指定使用NIO进行通信
            .childHandler(new ChannelInitializer<SocketChannel>() {
    
                // 当Channel初始创建完毕后就会触发该方法的执行,用于初始化Channel
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // 从Channel中获取pipeline
                    ChannelPipeline pipeline = ch.pipeline();
                    // 将HttpServerCodec处理器放入到pipeline的最后
                    // HttpServerCodec是什么?是HttpRequestDecoder与HttpResponseEncoder的复合体
                    // HttpRequestDecoder:http请求解码器,将Channel中的ByteBuf数据解码为HttpRequest对象
                    // HttpResponseEncoder:http响应编码器,将HttpResponse对象编码为将要在Channel中发送的ByteBuf数据
                    pipeline.addLast(new HttpServerCodec());
                    // 将自再定义的处理器放入到Pipeline的最后
                    pipeline.addLast(new SomeServerHandler());
                }
            });   // 指定childGroup中的eventLoop所绑定的线程所要处理的处理器
    
    // 指定当前服务器所监听的端口号
    // bind()方法的执行是异步的
    // sync()方法会使bind()操作与后续的代码的执行由异步变为了同步
    ChannelFuture future = bootstrap.bind(8888).sync();
    
  • 自定义处理器整体代码
    // 自定义服务端处理器
    // 需求:用户提交一个请求后,在浏览器上就会看到hello netty world
    public class SomeServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         *  当Channel中有来自于客户端的数据时就会触发该方法的执行
         * @param ctx  上下文对象
         * @param msg   就是来自于客户端的数据
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            System.out.println("-------------- " + ctx.channel());
    
    
            System.out.println("msg = " + msg.getClass());
            System.out.println("客户端地址 = " + ctx.channel().remoteAddress());
    
            if(msg instanceof HttpRequest) {
                HttpRequest request = (HttpRequest) msg;
                System.out.println("请求方式:" + request.method().name());
                System.out.println("请求URI:" + request.uri());
    
                if("/favicon.ico".equals(request.uri())) {
                    System.out.println("不处理/favicon.ico请求");
                    return;
                }
    
                // 构造response的响应体
                ByteBuf body = Unpooled.copiedBuffer("hello netty world", CharsetUtil.UTF_8);
                // 生成响应对象
                DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, body);
                // 获取到response的头部后进行初始化
                HttpHeaders headers = response.headers();
                headers.set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
                headers.set(HttpHeaderNames.CONTENT_LENGTH, body.readableBytes());
    
                // 将响应对象写入到Channel
                // ctx.write(response);
                // ctx.flush();
                // ctx.writeAndFlush(response);
                // ctx.channel().close();
                ctx.writeAndFlush(response)
                        // 添加channel关闭监听器
                        .addListener(ChannelFutureListener.CLOSE);
            }
        }
    
        /**
         *  当Channel中的数据在处理过程中出现异常时会触发该方法的执行
         * @param ctx  上下文
         * @param cause  发生的异常对象
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            // 关闭Channel
            ctx.close();
        }
    }
    

3. Demo2:Socket编程,C/S通信

前面的工程是一个仅存在服务端的 HTTP 请求的服务器,而 Netty 中最为最见的是 C/S构架的 Socket 代码。所以下面我们就来看一个 Netty 的 Socket 通信代码。

3.1 创建工程 02-socket

创建一个普通的 Maven 的 Java 工程:02-socket

本例要实现的功能是:客户端连接上服务端后,其马上会向服务端发送一个数据。服务端在接收到数据后,会马上向客户端也回复一个数据。客户端每收到服务端的一个数据后,便会再向服务端发送一个数据。而服务端每收到客户端的一个数据后,便会再向客户端发送一个数据。如此反复,无穷匮也。

3.2 定义服务端

3.2.1 定义服务端启动类

之前我们写的是Http请求响应的服务器,现在我们写到服务器和客户端之间是通过字符串通信的,所以我们添加StringDecoder和StringEncoder

// 定义服务端启动类
public class SomeServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)
                    // 指定要创建Channel类型
                     .channel(NioServerSocketChannel.class)
                     .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 获取channel中的Pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            // StringDecoder:字符串解码器,将Channel中的ByteBuf数据解码为String
                            pipeline.addLast(new StringDecoder());
                            // StringEncoder:字符串编码器,将String编码为将要发送到Channel中的ByteBuf
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new SomeServerHandler());
                        }
                    });
            ChannelFuture future = bootstrap.bind(8888).sync();
            System.out.println("服务器已启动");
            future.channel().closeFuture().sync();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}

3.2.2 定义服务端处理器

ChannelInitializer中添加自定义处理器

pipeline.addLast(new SomeServerHandler());
public class SomeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("channel = " + ctx.channel());

        // 将来自于客户端的数据显示在服务端控制台
        System.out.println(ctx.channel().remoteAddress() + "," + msg);
        // 向客户端发送数据
        ctx.channel().writeAndFlush("from server:" + UUID.randomUUID());
        TimeUnit.MILLISECONDS.sleep(500);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

}

3.3 定义客户端

3.3.1 定义客户端启动类

注意:

  • 客户端只需要一个NioEventLoopGroup
    服务端要两个是因为一个专门用来处理客户端的连接请求,一个处理连接以后具体干活的请求
    服务端会被很多客户端连接,需要处理很多连接请求,而客户端自己只需要处理一个连接请求
  • 客户端使用的是BootStrap,服务端用的是ServerBootStrap
  • 客户端使用的通道类型是NioSocketChannel
public class SomeClient {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new SomeClientHandler());
                        }
                    });

            ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

3.3.2 定义客户端处理器

public class SomeClientHandler extends SimpleChannelInboundHandler<String> {

    // msg的消息类型与类中的泛型类型是一致的
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + "," + msg);
        ctx.channel().writeAndFlush("from client:" + LocalDateTime.now());
        TimeUnit.MILLISECONDS.sleep(500);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客户端和服务端都需要等待对方发送消息后才会回复,所以需要有一个人先发送消息,我们让客户端在连接成功时先发送消息

  • channelActive()方法:当Channel被激活后会触发该方法的执行
public class SomeClientHandler extends SimpleChannelInboundHandler<String> {
...
    // 当Channel被激活后会触发该方法的执行
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().writeAndFlush("from client:begin talking");
    }
}

测试:
先启动服务端
在这里插入图片描述

启动客户端后
在这里插入图片描述

在这里插入图片描述

3.4 SimpleChannelInboundHandler和ChannelInboundHandlerAdapter的区别

首先SimpleChannelInboundHandler是继承ChannelInboundHandlerAdapter的。
当管道有数据时,会触发ChannelInboundHandlerAdapter.channelRead()方法

public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {
...
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
            //区别就在这!!!
                ReferenceCountUtil.release(msg);
            }
        }
    }
...
}

SimpleChannelInboundHandler中的channelRead()方法会自动释放接收到的来自于对方的msg所占有的所有资源。

ChannelInboundHandlerAdapter 中的 channelRead()方法不会自动释放接收到的来自于对方的msg

  • 若对方没有向自己发送数据,则自定义处理器建议继承自ChannelInboundHandlerAdapter。因为若继承自 SimpleChannelInboundHandler 则必须重写channelRead0()方法。而重写该方法的目的是对来自于对方的数据进行处理。因为对方根本就没有发送数据,所以也就没有必要重写 channelRead0()方法。
  • 若对方向自己发送了数据,而自己又需要将该数据再发送给对方,则自定义处理器建议继承自ChannelInboundHandlerAdapter。因为 write()方法的执行是异步的,且SimpleChannelInboundHandler 中的 channelRead()方法会自动释放掉来自于对方的 msg。若 write()方法中正在处理 msg,而此时 SimpleChannelInboundHandler 中的 channelRead()方法执行完毕了,将 msg 给释放了。此时就会报错。
    在这里插入图片描述

PS:关于这个释放正常启动一般也没什么问题,我们看一下释放的代码:

//io.netty.util.ReferenceCountUtil#release(java.lang.Object)
public static boolean release(Object msg) {
//只有消息被封装成了ReferenceCounted,才需要释放
//而据我了解,只有通过Netty配置,开启了一种“Debug”模式,才会将消息封装成这个对象
//所以一般不会用,后期源码分析会看到
    if (msg instanceof ReferenceCounted) {
        return ((ReferenceCounted) msg).release();
    }
    return false;
}

Netty入门 (一)概述(核心概念、执行流程)、两个Demo(使用Netty,响应Http请求、Socket编程 C/S通信)_netty 实现http和socket-CSDN博客

请登录后发表评论