Netty(三)高级应用 之 WebSocket 长连接、网络聊天-Java专区论坛-技术-SpringForAll社区

Netty(三)高级应用 之 WebSocket 长连接、网络聊天

Netty 高级应用

本章会通过代码实例的方式将 Netty 常见应用场景中的知识点进行讲解。

1. WebSocket 长连接

1.1 WebSocket 简介

WebSocket 是 HTML5 中的协议,是构建在 HTTP 协议之上的一个网络通信协议,其以长连接的方式实现了客户端与服务端的全双工通信

HTTP/1.1 版本协议中具有 keep-alive 属性,实现的是半双工通信。

Http协议只能由客户端向服务端发消息,然后获得响应,而WebSocket可以实现服务端主动向客户端发送消息。

  • 全双工通信(Full duplex Communication)是指在通信的任意时刻,线路上存在A到B和B到A的双向信号传输。 全双工通信允许数据同时在两个方向上传输,又称为双向同时通信,即通信的双方可以同时发送和接收数据。在全双工方式下,通信系统的每一端都设置了发送器和接收器,因此,能控制数据同时在两个方向上传送。全双工方式无需进行方向的切换,因此,没有切换操作所产生的时间延迟,这对那些不能有时间延误的交互式应用(例如远程监测和控制系统)十分有利。

  • 半双工通信(Half-duplex Communication)可以实现双向的通信,但不能在两个方向上同时进行,必须轮流交替地进行。在这种工作方式下,发送端可以转变为接收端;相应地,接收端也可以转变为发送端。但是在同一个时刻,信息只能在一个方向上传输。因此,也可以将半双工通信理解为一种切换方向的单工通信。

    例如:对讲机是日常生活中最为常见的一种半双工通信方式,手持对讲机的双方可以互相通信,但在同一个时刻,只能由一方在讲话。

1.2 需求分析

在页面上有两个左右并排的文本域,它们的中间有一个“发送”按钮。在左侧文本域中输入文本内容后,单击发送按钮,会显示到右侧文本域中。

1.3 定义工程 05-websocket

复制 01-primary 工程,在此基础上进行修改:05-websocket

1.4 定义客户端页面

在 src/main 下定义一个目录 webapp。在其中定义 html 页面。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>index</title>
</head>
<script type="text/javascript">

    // 当前页面一打开就会执行的代码
    var socket;

    if(window.WebSocket) {
        // 创建一个WebSocket连接
        socket = new WebSocket("ws://localhost:8888/some");

        // 当与服务端的ws连接创建成功后会触发onopen的执行
        socket.onopen = function (ev) {
            // 在右侧文本域中显示连接建立提示
            var ta = document.getElementById("responseText");
            ta.value = "连接已建立";
        }

        // 当接收到服务端发送的消息时会触发onmessage的执行
        socket.onmessage = function (ev) {
            // 将服务端发送来的消息在右侧文本域中显示,在原有内容基础上进行拼接
            var ta = document.getElementById("responseText");
            ta.value = ta.value + "\n" + ev.data;
        }

        // 当与服务端的ws连接断开时会触发onclose的执行
        socket.onclose = function (ev) {
            // 将连接关闭消息在右侧文本域中显示,在原有内容基础上进行拼接
            var ta = document.getElementById("responseText");
            ta.value = ta.value + "\n连接已关闭";
        }
    } else {
        alert("浏览器不支持WebSocket");
    }

    // 定义发送按钮的发送方法
    function send(msg) {
        // 若当前浏览器不支持WebSocket,则直接结束
        if(!window.WebSocket) return;

        // 若ws连接已打开,则向服务器发送消息
        if(socket.readyState == WebSocket.OPEN) {
            // 通过ws连接向服务器发送消息
            socket.send(msg);
        }
    }
</script>
<body>
    <form>
        <textarea id="message" style="width: 150px; height: 150px"></textarea>
        <input type="button" value="发送" onclick="send(this.form.message.value)">
        <textarea id="responseText" style="width: 150px; height: 150px"></textarea>
    </form>
</body>
</html>

1.5 定义服务端

(1) 定义服务端启动类

如果只是支持WebSocket,只需要添加HttpServerCodes(http协议处理器)和WebSocketServerProtocolHandler(webSocket服务端协议处理器)两个处理器即可,WebSocketServerProtocolHandler需要传入参数指定WebSocket请求路径(顺序HttpServerCodes必须在最前面)

在这里插入图片描述

如果希望WebSocket连接,无论是客户端发给服务端,还是服务端发给客户端,允许传输比较大的数据,需要再添加一些处理器:

  • ChunkedWriteHandler:大块数据Chunk处理器
  • HttpObjectAggregator:Chunk聚合处理器

在这里插入图片描述

注意,我们需要了解几个核心类:

  • ChunkedWriteHandler:
    这个类可以处理大数据流,而不用花费大量内存,也不会内存溢出
    在这里插入图片描述

  • HttpMessage:定义了http请求和http响应的共有的属性(可以理解为请求头)
    在这里插入图片描述

  • HttpContent(可以理解为请求体,但是代表的是将整个请求体拆分成一个个Trunk的块的每一块)

    • HTTP分块传输编码机制:允许HTTP把发送方要发送给接收方的数据分成chunk进行传输,chunk就是大数据块,即HttpContent
    • 什么时候用分块传输编码机制?如果客户端访问的是服务端的动态资源,并且较大,服务端可以一边生成资源,一边传输给客户端,不用等生成完再发送
    • 是HTTP协议的一种传输机制,数据分成chunk需要长连接,所以这种机制只能在HTTP 1.1版本下使用。

    在这里插入图片描述
    我们之前用的HttpRequestDecoder就是继承HttpObjectDecoder,是http的解码器,即接受请求的时候,如果请求的数据量大就会把请求拆成一个HttpMessage和多个HttpContent进行传输

  • HttpObjectAggregator:如果使用了Http分块传输编码机制,可以将块聚合成一个完整的请求

    /**
     * A {@link ChannelHandler} that aggregates an {@link HttpMessage}
     * and its following {@link HttpContent}s into a single {@link FullHttpRequest}
     * or {@link FullHttpResponse} (depending on if it used to handle requests or responses)
     * with no following {@link HttpContent}s.  It is useful when you don't want to take
     * care of HTTP messages whose transfer encoding is 'chunked'.  Insert this
     * handler after {@link HttpResponseDecoder} in the {@link ChannelPipeline} if being used to handle
     * responses, or after {@link HttpRequestDecoder} and {@link HttpResponseEncoder} in the
     * {@link ChannelPipeline} if being used to handle requests.
     * 
     * 一个{@link ChannelHandler},它将一个{@link HttpMessage}及其后面
     * 的{@link HttpContent}聚合为一个后面不再有{@link HttpContent}的
     * {@link FullHttpRequest}或{@link FullHttpResponse}(取决于它是否
     * 用于处理请求或响应)。
     * 
     * 当你不想处理传输编码为“分块”的HTTP消息时,它是有用的。如果用于处理响应,
     * 在pipline中将该处理器插入在HttpResponseDecoder之后
     * 如果用于处理请求,在pipline中插入在HttpRequestDecoder和HttpResponseEncoder之后
     * ...
     */
    public class HttpObjectAggregator extends MessageAggregator<HttpObject, HttpMessage, HttpContent, FullHttpMessage> {
    ...
    } 
    

    在这里插入图片描述

(2) 定义服务端处理器

因为收到的数据直接返回回去,所以处理器不能继承SimpleChannelInBoundHandler,因为继承这个类会自动把msg释放掉,而发送数据是异步的,可能会报错,之前讲过。

WebSocket协议发来的数据,经过处理器处理后收到的数据类型是TextWebSocketFrame,需要强转

public class SomeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String text = ((TextWebSocketFrame) msg).text();
        ctx.channel().writeAndFlush(new TextWebSocketFrame("Frome Client:" + text));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

1.6 演示,以及WebSocket 握手原理

启动测试:
在这里插入图片描述

responseHeaders中的sec-websocket-accept和RequestHeaders中的Sec-WebSocket-Key干什么用的?看下WebSokcet握手原理:
在这里插入图片描述

2. 网络聊天

该工程是对 socket 编程的一个应用。

2.1 需求分析

本例要实现一个网络群聊工具。参与聊天的客户端消息是通过服务端进行广播的。

2.2 创建工程 06-webchat

复制 02-socket 工程,在此基础上进行修改:06-webchat

2.3 定义服务端

(1) 定义服务端启动类

添加一个基于行的解码器即可。

public class SomeServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)
                     .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 添加一个基于行的解码器
                            pipeline.addLast(new LineBasedFrameDecoder(2048));
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new SomeServerHandler());
                        }
                    });
            ChannelFuture future = bootstrap.bind(8888).sync();
            System.out.println("服务器已启动");
            future.channel().closeFuture().sync();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}

(2) 定义服务端处理器

  • A、定义 ChannelGroup
    在这里插入图片描述

    注意:

    什么是线程驱动?
    在这里插入图片描述EventLoop继承体系:EventLoop -> OrderedEventExecutor -> EventExecutor

  • B、 重写 channelRead()
    可以直接通过group发送给所有的channel
    在这里插入图片描述
    如果希望标明自己,则需要遍历goup,找出自己的channel
    // 只要有客户端Channel给当前的服务端发送了消息,那么就会触发该方法的执行
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 获取到向服务器发送消息的channel
        Channel channel = ctx.channel();
        // 这里要实现将消息广播给所有group中的客户端Channel
        // 发送给自己的消息与发送给大家的消息是不一样的
        group.forEach(ch -> {
            if(ch != channel) {
                ch.writeAndFlush(channel.remoteAddress() + ":" + msg + "\n");
            } else {
                channel.writeAndFlush("me:" + msg + "\n");
            }
        });
    }
    
  • C、 重写 channel 激活与钝化方法
    如果激活了,添加到channelGourp,并广播消息通知上线
    // 只要有客户端Channel与服务端连接成功就会执行这个方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 获取到当前与服务器连接成功的channel
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "---上线");
        group.writeAndFlush(channel.remoteAddress() + "---上线\n");
        // 将当前channel添加到group中
        group.add(channel);
    }
    

    下线,不需要调用group.remove方法,因为在ChannelGroup里的Channel,如果连接断开了,会自动移除,这个由ChannelGroup自己完成,它会监测所有Channel的状态,一但channel断开连接,马上移除出group

    // 只要有客户端Channel断开与服务端的连接就会执行这个方法
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 获取到当前要断开连接的Channel
        Channel channel = ctx.channel();
        System.out.println(channel.remoteAddress() + "------下线");
        group.writeAndFlush(channel.remoteAddress() + "下线,当前在线人数:" + group.size() + "\n");
    
        // group中存放的都是Active状态的Channel,一旦某Channel的状态不再是Active,
        // group会自动将其从集合中踢出,所以,下面的语句不用写
        // remove()方法的应用场景是,将一个Active状态的channel移出group时使用
        // group.remove(channel);
    }
    
  • D、重写异常捕获方法
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
    

2.4 定义客户端

(1) 定义客户端启动类

添加一个行解码器,获取键盘输入写入到Channel

public class SomeClient {
    public static void main(String[] args) throws Exception {
    NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //添加行解码器
                        pipeline.addLast(new LineBasedFrameDecoder(2048));
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new SomeClientHandler());
                    }
                });

        ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
        // 获取键盘输入
        InputStreamReader is = new InputStreamReader(System.in, "UTF-8");
        BufferedReader br = new BufferedReader(is);
        // 将输入的内容写入到Channel
        future.channel().writeAndFlush(br.readLine() + "\r\n");
    }
}

(2) 定义客户端处理器

继承SimpleChannelInbountHandler即可,因为不需要将该数据发出去,用SimpleChannelInbountHandler可以帮我们自动释放掉msg所占的资源

public class SomeClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

演示

在这里插入图片描述

3. 读写空闲检测

当客户端与服务端的连接建立好后,它们之间就可以进行通信了。但是,若某客户端与服务端间长时间没有进行通信,而 Channel 却被长时间占用,就会形成资源浪费。Netty 提供了专门用于进行读写操作空闲检测的处理器可供使用。

3.1 创建工程 07-idle

复制 06-webchat 工程,在此基础上进行修改:07-idle

3.2 定义服务端

(1) 定义服务端启动类

public class SomeServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)
                     .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 添加一个基于行的解码器
                            pipeline.addLast(new LineBasedFrameDecoder(2048));
                            // 若在3秒内当前服务器没有发生读操作,则会触发读操作空闲事件
                            // 若在5秒内当前服务器没有发生写操作,则会触发写操作空闲事件
                            // pipeline.addLast(new IdleStateHandler(3, 5, 0));
                            // 若在5秒内同时即发生了读又发生了写操作才不会触发all操作空闲事件
                            // 若在5秒内读与写操作有任何一项没有发生,都会触发all操作空闲事件
                            pipeline.addLast(new IdleStateHandler(0, 0, 5));
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new SomeServerHandler());
                        }
                    });
            ChannelFuture future = bootstrap.bind(8888).sync();
            System.out.println("服务器已启动");
            future.channel().closeFuture().sync();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}

注意:

  • IdleStateHandler 构造需要三个参数:
    在这里插入图片描述
    • readerIdleTimeSeconds:在指定时间内没有读操作会触发一个READER_IDLE状态的事件
    • writerIdleTimeSeconds:在指定时间内没有写操作会触发一个WRITER_IDLE状态的事件
    • allIdleTimeSeconds:在指定时间内没有读或者没有写操作都会触发一个ALL_IDLE状态的事件(注意是指定时间内读写都发生才不会触发这个事件

(2) 定义服务端处理器

重写userEventTriggered即可:

@Override
// 所有“规定动作”之外的所有事件都可以通过以下方法触发
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
        IdleStateEvent event = (IdleStateEvent)evt;
        String eventDes = null;
        switch (event.state()) {
            case READER_IDLE:
                eventDes = "读空闲超时";
                break;
            case WRITER_IDLE:
                eventDes = "写空闲超时";
                break;
            case ALL_IDLE:
                eventDes = "读和写空闲都超时";
        }
        System.out.println(eventDes);
        //关闭节点,等于关闭Channel
        //我们不用关闭节点,演示效果
        //ctx.close();
    } else {
    //当前处理不了,走默认方法,触发下一个处理器的userEventTriggered方法
        super.userEventTriggered(ctx, evt);
    }
}

注意:

  • IDLE_EVENT事件一旦触发,就会触发Handler的userEventTriggered方法

3.3 定义客户端

直接使用“聊天程序”的客户端即可。

演示

在这里插入图片描述

4. 心跳机制

所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还“活着”, 以确保 TCP 连接的有效性。

4.1 需求

下面要实现的需求是:Client 端连接到 Server 端后,会循环执行一个定时任务:随机等待几秒,然后 ping 一下 Server 端,即发送一个心跳。当 Server 端在等待了指定时间后没有读取到 Client 端发送的心跳,Server 端会主动断开连接。

4.2 创建工程 08-heartBeat

复制 06-webchat 工程,在此基础上进行修改:08-heartBeat

4.3 定义服务端

(1) 定义服务端启动类

public class SomeServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)
                     .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            //空闲检测读,5秒内没有数据触发读空闲事件
                            pipeline.addLast(new IdleStateHandler(5, 0, 0));
                            pipeline.addLast(new SomeServerHandler());
                        }
                    });
            ChannelFuture future = bootstrap.bind(8888).sync();
            System.out.println("服务器已启动");
            future.channel().closeFuture().sync();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}

(2) 定义读操作处理器

public class SomeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("接收到Client发送的消息:" + msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    // 用于捕获当前Server中的各种事件
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.READER_IDLE) {
                System.out.println("将要断开连接");
                ctx.close();
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }
}

4.4 定义客户端

(1) 定义客户端启动类

注意,该类中不能关闭 Channel,不能关闭 eventLoopGroup。

public class SomeClient {
    public static void main(String[] args) throws Exception {
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new SomeClientHandler());
                    }
                });

        ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
    }
}

(2) 定义随机发送心跳处理器

一但管道激活,就开始随机发送心跳

public class SomeClientHandler extends ChannelInboundHandlerAdapter {
    private ScheduledFuture schedule;
    private GenericFutureListener listener;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 发送心跳
        sendHeartbeat(ctx.channel());
    }

    private void sendHeartbeat(Channel channel) {
        // 生成一个[1,8)的随机数作为心跳发送的时间间隔
        int interval = new Random().nextInt(7) + 1;
        System.out.println(interval + "秒后会向Server发送心跳");

//channel.eventLoop().schedule方法可以生成一个定时任务
//该方法有三个参数:需要执行的任务,多长时间以后执行,时间单位
        schedule = channel.eventLoop().schedule(() -> {
            if (channel.isActive()) {
                System.out.println("向Server发送心跳");
                channel.writeAndFlush("~PING~");
            } else {
                System.out.println("与Server间的连接已经关闭");
            }
        }, interval, TimeUnit.SECONDS);
        //该异步定时任务只会执行一次,我们可以添加监听器监听异步任务操作,
        //一但异步任务完成马上触发监听器方法,通过监听器可以拿到异步任务的操作结果
        //我们可以让监听器一但监听到任务执行完毕,马上重新执行该任务,达到循环的效果
        
//定义一个监听器,一旦操作完成会触发监听器执行
        listener = (future) -> {
            // 再次发送心跳
            // sendHeartBeat方法执行完了出栈了,才会
            // 触发监听的sendHeartBeat,不会栈溢出
            sendHeartbeat(channel);
        };

        // 为定时任务添加监听器,一旦该异步任务结束就会触发监听器逻辑
        schedule.addListener(listener);
    }
//为了安全起见,重写一个channelInactive方法,当通道被关闭了,
// 就把定时任务的监听器取消掉,就不会再递归调用了,以免通道关闭
// 了,心跳还再一直发
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 一旦连接被关闭,则将监听器移除,这样就不会再发生心跳方法的递归调用了,以防止栈溢出
        schedule.removeListener(listener);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
/**
 * Listens to the result of a {@link Future}.  The result of the asynchronous operation is notified once this listener
 * is added by calling {@link Future#addListener(GenericFutureListener)}.
 * 
 * 侦听{@link Future,Future代表异步操作}的结果。
 * 一旦通过调用{@link Future#addListener(GenericFutureListener)}
 * 添加了该侦听器,就会被通知异步操作的结果。
 */
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
    void operationComplete(F future) throws Exception;
}

测试:
在这里插入图片描述

不用channelInactive方法,加这也可以一个意思
在这里插入图片描述

4.5 客户端重连服务端

(1) 修改处理器

在客户端修改心跳处理器。只需要添加如下内容:

要想重连需要bootstrap,所以我们把bootstrap传到发送心跳的处理器中

public class SomeClientHandler extends ChannelInboundHandlerAdapter {
    private ScheduledFuture schedule;
    private GenericFutureListener listener;
    private Bootstrap bootstrap;

//处理器中传入Bootstrap
    public SomeClientHandler(Bootstrap bootstrap) {
        this.bootstrap = bootstrap;
    }

...

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 一旦连接被关闭,则将监听器移除,这样就不会再发生心跳方法的递归调用了,以防止栈溢出
        schedule.removeListener(listener);
        System.out.println("重新连接Server...");
        //一旦连接关闭,尝试重新连接
        bootstrap.connect("localhost", 8888).sync();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

(2) 修改客户端启动类

public class SomeClient {
    public static void main(String[] args) throws Exception {
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        //处理器构造中传入Bootstrap即可
                        pipeline.addLast(new SomeClientHandler2(bootstrap));
                    }
                });

        ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
    }
}

测试:
在这里插入图片描述

Netty(三)高级应用 之 WebSocket 长连接、网络聊天、读写空闲检测、心跳机制_spring websocket 空闲检测怎么做-CSDN博客

请登录后发表评论

    没有回复内容