Netty(五) 高级应用 之 手写 RPC 框架-Java专区论坛-技术-SpringForAll社区

Netty(五) 高级应用 之 手写 RPC 框架

1. 手写 RPC 框架

1.1 RPC 简介

RPC,Remote Procedure Call,远程过程调用,是一种通过网络从远程计算机上请求服务,而不需要了解底层网络技术的协议。在 OSI 网络通信模型中,RPC 跨越了传输层(第四层,传输协议 TCP/UDP,即通过 ip+port 进行通信)和应用层(第七层,传输协议有 HTTP、HTTPS、FTP 等)。RPC 使得开发分布式系统应用变得更加容易。

Socket,就是提供一套api,按照api开发,就是满足TCP协议

RPC 采用 C/S 模式。请求程序就是 Client,而服务提供程序就是 Server。首先,Client 发送一个带有请求参数的调用请求到 Server,然后等待响应。在 Server 端,进程一直处于睡眠状态直到接收到 Client 的调用请求。当一个调用请求到达,Server 会根据请求参数进行计算,并将计算结果发送给 Client,然后等待下一个调用请求。Client 接收到响应信息,即获取到调用结果,然后根据情况继续发出下一次调用。

1.2 RPC 框架具体需求

我们这里要定义一个 RPC 框架,这个框架提供给用户后,用户只需要按照使用步骤就可以完成 RPC 远程调用。我们现在给出用户对于该 RPC 框架的使用步骤:

  • 用户需要将业务接口通知到 Server 与 Client,因为业务接口是服务名称。
  • 用户只需将业务接口的实现类写入到 Server 端的指定包下,那么这个包下的实现类就会被 Server 发布。
  • Client 端只需根据业务接口名就可获取到 Server 端发布的服务提供者,然后就可以调用到远程 Server 端的实现类方法的执行。

1.3 定义 api 工程 10-rpc-api

该 api 工程中用于存放业务接口、常量类、工具类等将来服务端与客户端均会使用到的一个接口与类。

(1) 创建工程

创建一个普通的 Maven 的 Java 工程:10-rpc-api

(2) 导入依赖

仅导入 lombok 依赖即可。

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
<scope>provided</scope>
</dependency>
</dependencies>

(3) 定义业务接口

// 业务接口
public interface SomeService {
    String hello(String name);
}

(4) 定义常量类

@Data
public class Invocation implements Serializable {
    /**
     * 接口名,即微服务名称
     */
    private String className;
    /**
     * 要远程调用的方法名
     */
    private String methodName;
    /**
     * 参数类型列表
     */
    private Class<?>[] paramTypes;
    /**
     * 参数值列表
     */
    private Object[] paramValues;

}

1.4 定义服务端工程 10-rpc-server

(1) 创建工程

创建一个普通的 Maven 的 Java 工程:10-rpc-server

(2) 导入依赖

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
<!-- netty-all 依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.36.Final</version>
</dependency>
<!--API 工程依赖-->
<dependency>
<groupId>com.abc</groupId>
<artifactId>10-rpc-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!--lombok 依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
<scope>provided</scope>
</dependency>
</dependencies>

(3) 定义业务接口实现类

public class SomeServiceImpl implements SomeService {
    @Override
    public String hello(String name) {
        return name + "欢迎你";
    }
}

(4) 定义服务器类

定义一个注册表,map结构,key为业务接口名,即服务名称,value为实现类
定义一个集合,用来存储指定包中的业务接口的实现类名

public class RpcServer {
 // 注册表
 private Map<String, Object> registerMap = new HashMap<>();
 // 用于存放指定包中的业务接口的实现类名
 private List<String> classCache = new ArrayList<>();
}
  • A、定义 publish():发布服务,将指定包中的业务接口实现类实例写入到注册表
    public class RpcServer {
        // 注册表
        private Map<String, Object> registerMap = new HashMap<>();
        // 用于存放指定包中的业务接口的实现类名
        private List<String> classCache = new ArrayList<>();
    
        // 发布服务:将指定包中的业务接口实现类实例写入到注册表
        public void publish(String basePackage) throws Exception {
            // 将指定包中的业务接口实现类名写入到classCache中
            cacheClassCache(basePackage);
            // 将指定包中的业务接口实现类实例写入到注册表
            doRegister();
        }
    
  • B、 定义 cacheClassCache():将指定包中的业务接口实现类类名写入到classCache中
    public class RpcServer {
    ...
        // 用于存放指定包中的业务接口的实现类名
        private List<String> classCache = new ArrayList<>();
    ...
    
        // 将指定包中的业务接口实现类名写入到classCache中
        private void cacheClassCache(String basePackage) {
            // 获取指定包目录中的资源
            URL resource = this.getClass().getClassLoader()
                    // com.abc.service  =>  com/abc/service
                    .getResource(basePackage.replaceAll("\\.", "/"));
    
            // 增强程序健壮性,若指定的目录中没有资源,则直接返回
            if (resource == null) {
                return;
            }
    
            File dir = new File(resource.getFile());
            // 遍历指定目录中的所有文件
            for (File file : dir.listFiles()) {
                if (file.isDirectory()) {
                    // 若当前file为目录,则递归
                    cacheClassCache(basePackage + "." + file.getName());
                } else if (file.getName().endsWith(".class")) {
                    // 去掉文件名后的.class后辍
                    String fileName = file.getName().replace(".class", "").trim();
                    // 将类的全限定性类名写入到classCache
                    classCache.add(basePackage + "." + fileName);
                }
            }
            // System.out.println(classCache);
        }   
    }
    
  • C、 定义 doRegister():将指定包中的业务接口实现类实例写入到注册表
    public class RpcServer {
        // 注册表
        private Map<String, Object> registerMap = new HashMap<>();
        // 用于存放指定包中的业务接口的实现类名
        private List<String> classCache = new ArrayList<>();
    ...
        // 将指定包中的业务接口实现类实例写入到注册表
        // 注册表是一个map
        // key为业务接口名,即微服务名称
        // value为该业务接口对应的实现类实例
        private void doRegister() throws Exception {
            if (classCache.size() == 0) {
                return;
            }
    
            for (String className : classCache) {
                // 将当前遍历的类加载到内存
                Class<?> clazz = Class.forName(className);
                registerMap.put(clazz.getInterfaces()[0].getName(), clazz.newInstance());
            }
        }
    }
    

    注意,一个实现类只能实现一个接口(包括Dubbo也是这么玩的)

  • D、定义 starter():启动服务器
    public class RpcServer {
    ...
        // 启动服务器
        public void start() throws InterruptedException {
            EventLoopGroup parentGroup = new NioEventLoopGroup();
            EventLoopGroup childGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(parentGroup, childGroup)
                        // 用于指定当Server的连接请求处理线程全被占用时,
                        // 临时存放已经完成了三次握手的请求的队列的长度。
                        // 默认是50
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        // 指定使用心跳机制来保证TCP长连接的存活性
                        // 这个心跳机制是操作系统级别实现的
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                //编码器
                                pipeline.addLast(new ObjectEncoder());
                                //解码器
                                pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
                                        ClassResolvers.cacheDisabled(null)));
                                pipeline.addLast(new RpcServerHandler(registerMap));
                            }
                        });
                //监听端口,启动Server
                ChannelFuture future = bootstrap.bind(8888).sync();
                System.out.println("服务端已启动,监听的端口为:8888");
                future.channel().closeFuture().sync();
            } finally {
                parentGroup.shutdownGracefully();
                childGroup.shutdownGracefully();
            }
        }
    }
    
    • TCP,SO_KEEPALIVE参数:连接一旦建立好了,通过心跳,默认能保持2个小时
    • ObjectDecoder
      在这里插入图片描述
      类解析器(底层即时类加载器)作用:将传过来的二进制请求参数数据,转换成对应的类的实例,需要类加载器加载类并实例化
    • ClassResolvers:类解析器,如果传null,会使用默认的类加载器
      在这里插入图片描述

(5) 定义服务端处理器

处理器需要做的事:解析Client发送来的msg,然后从registerMap注册表中查看是否有对应的接口

因为需要注册表,我么可以通过构造传进来

// ChannelInboundHandlerAdapter:不会自动释放msg
// SimpleChannelInboundHandler:会自动释放msg
public class RpcServerHandler extends SimpleChannelInboundHandler<Invocation> {

    private Map<String, Object> registerMap;
//通过构造传入注册表
    public RpcServerHandler(Map<String, Object> registerMap) {
        this.registerMap = registerMap;
    }

    // 解析Client发送来的msg,然后从registerMap注册表中查看是否有对应的接口
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Invocation msg) throws Exception {
        Object result = "没有该提供者,或没有该方法";
        if (registerMap.containsKey(msg.getClassName())) {
            // 从注册表中获取接口对应的实现类实例
            Object invoker = registerMap.get(msg.getClassName());
            result = invoker.getClass().getMethod(msg.getMethodName(), msg.getParamTypes())
                    .invoke(invoker, msg.getParamValues());
        }
        // 将运算结果返回给client
        ctx.writeAndFlush(result);
        ctx.close();
    }
//异常处理
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

(6) 定义服务器启动类

public class RpcStarter {
    public static void main(String[] args) throws Exception {
        RpcServer server = new RpcServer();
        server.publish("com.abc.service");
        server.start();
    }
}

1.5 定义客户端工程 10-rpc-client

(1) 创建工程

创建一个普通的 Maven 的 Java 工程:10-rpc-client

(2) 导入依赖

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- netty-all 依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.36.Final</version>
</dependency>
<!--API 工程依赖-->
<dependency>
<groupId>com.abc</groupId>
<artifactId>10-rpc-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!--lombok 依赖-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
<scope>provided</scope>
</dependency>
</dependencies>

(3) 定义动态代理类

除了cglib和jdk动态代理,还有javassist:可以动态编译,对现有class文件直接操作,也可以以实现动态代理

在客户端创建一个动态代理类,用于动态代理服务端的提供者对象。

public class RpcProxy {

    public static <T> T create(Class<?> clazz) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(),
                new Class[]{clazz},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    //注意!!需要排除Object类的方法
                        // 若调用的是Object的方法,则直接进行本地调用
                        if (Object.class.equals(method.getDeclaringClass())) {
                            return method.invoke(this, args);
                        }
                        // 远程调用在这里发生
                        return rpcInvoke(clazz, method, args);
                    }
                });
    }

//rpcInvoke远程调用方法:
    private static Object rpcInvoke(Class<?> clazz, Method method, Object[] args) throws InterruptedException {
    //在rpcInvoke里前面,new一个客户端处理器
        RpcClientHandler handler = new RpcClientHandler();
        NioEventLoopGroup loopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(loopGroup)
                    .channel(NioSocketChannel.class)
                    // Nagle算法开关
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new ObjectEncoder());
                            pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
                                    ClassResolvers.cacheDisabled(null)));
                            //将处理器放到pipline
                            pipeline.addLast(handler);
                        }
                    });
            ChannelFuture future = bootstrap.connect("localhost", 8888).sync();

            // 形成远程调用的参数实例
            Invocation invocation = new Invocation();
            invocation.setClassName(clazz.getName());
            invocation.setMethodName(method.getName());
            invocation.setParamTypes(method.getParameterTypes());
            invocation.setParamValues(args);

            // 将参数实例发送给Server
            future.channel().writeAndFlush(invocation).sync();

            future.channel().closeFuture().sync();
        } finally {
            loopGroup.shutdownGracefully();
        }
        //最后通过handler获取结果
        return handler.getResult();
    }
}

注意:

  • ChannelOption.TCP_NODELAY:TCP标准参数,在传输的时候,默认让传输的帧(数据块)尽可能的大,可以提高传输效率,如何做到?默认用一种Nagle算法保证尽可能大的数据块进行传输。NODELAY默认是false,即延迟,等到数据块足够大才发送,true则不延迟,一有数据马上发
    在这里插入图片描述

(4) 定义客户端处理器

public class RpcClientHandler extends SimpleChannelInboundHandler<Object> {
    private Object result;
    public Object getResult() {
        return result;
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        this.result = msg;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

(5) 定义消费者类

public class RpcConsumer {
    public static void main(String[] args) {
        SomeService service = RpcProxy.create(SomeService.class);
        System.out.println(service.hello("kkb"));
        System.out.println(service.hashCode());
    }
}

演示

在这里插入图片描述

2. 手写 Dubbo 框架

2.1 原理

Dubbo 框架本身就是一个 RPC 框架,不同的是,消费者要连接的服务端 IP 与端口号不是硬编码在服务端的,而是从 zk 中读取到的。那么 zk 中的这些信息是从哪里来的呢?

一个 Dubbo 应用中会存在很多服务提供者与消费者。每个提供者都是一个 Netty Server,其会对外暴露自己所在主机的 IP 与 Port。每个消费者都是一个 Netty Client,其会通过连接相应主机的 IP 与 Port 来获取相应的服务。

服务提供者的 IP 与 Port 是如何对外暴露的呢?其会为自己所提供的服务起一个服务名称,一般为业务接口名。然后将该服务名称与对应提供者主机的 IP 与 Port 相绑定,注册到zk 中。

具体的注册步骤:

Step1:在 zk 中创建一个 Dubbo 的持久根节点,例如/mydubbo
Step2:以服务名称为节点名称在/mydubbo 节点下创建一个持久节点,
  例如: /mydubbo/com.abc.service.SomeService
Step3:在服务名称节点下创建临时节点,节点名称为提供者主机的 ip:port,(真实Dubbo不只这些元数据信息)
   例如: /mydubbo/com.abc.service.SomeService/127.0.0.1:8888

服务消费者会从服务注册中心 zk 中查找自己所需要的服务名称,一般为业务接口名,然后获取到该服务名称对应的所有提供者主机信息,并通过负载均衡方式选取一个主机进行连接,获取相应服务。

具体消费过程:

Step1:从 zk 的/mydubbo 节点下找到要消费的服务名称对应的节点
Step2:为该服务名称节点添加子节点列表变更的 watcher 监听
Step3:获取该服务名称节点下的所有子节点,即该服务对应的所有提供者主机的 ip:port
Step4:通过负载均衡选择一个 server
Step5:消费者将其调用信息发送给该 server
step6:接收提供者主机返回的运算结果

2.2 新增需求

当客户端通过负载均衡策略选择了某一提供者主机后,我们这里新增了一个需求:提供者主机中提供同一服务名称(接口名)的实现类有多个。这样,消费者可以指定其要调用的实现类。若消费者没有指定要调用的实现类,其会调用到注册到中第一个注册的实现类。

一个接口多个实现类的实现机制和真正Dubbo实现的是不一样的

为了实现提供者端业务接口可以有多个实现类供客户端选择,这里要求实现类名必须是一个前辍 prefix 后是业务接口名。这样,消费者在进行消费时,可以通过前辍来指定要调用的是哪个实现类。

2.3 定义 aip 工程 11-dubbo-api

(1) 创建工程

复制 10-rpc-api,在其基础上进行修改:11-dubbo-api

(2) 修改 Invocation 类

新增prefix,前缀属性。

/**
 * RPC远程调用信息
 */
@Data
public class Invocation implements Serializable {
    /**
     * 接口名
     */
    private String className;
    /**
     * 要远程调用的方法名
     */
    private String methodName;
    /**
     * 方法参数类型列表
     */
    private Class<?>[] paramTypes;
    /**
     * 方法参数值
     */
    private Object[] paramValues;
    /**
     * 要调用的业务接口实现类的功能性前辍
     */
    private String prefix;

}
(3) 增加常量类

保持原有的业务接口类及 Invocation 类,再增加一个 zk 相关的常量类。

/**
 * zk相关常量
 */
public class ZKConstant {
    /**
     * zk集群地址
     */
    public static final String ZK_CLUSTER = "zkOS:2181";
    /**
     * dubbo在zk中的根节点路径
     */
    public static final String ZK_DUBBO_ROOT_PATH = "/mydubbo";
}

2.4 定义服务端工程 11-dubbo-server

(1) 创建工程

复制 10-rpc-server 工程,在其基础上进行修改:11-dubbo-server

(2) 添加依赖

由于当前工程要作为 zk 的客户端对 zk 进行操作,所以这里导入了 Curator 依赖。

<!--curator 依赖-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
(3) 接口实现类

将原来的实现类删除,添加两个新的实现类。

public class AlipaySomeService implements SomeService {
    @Override
    public String hello(String name) {
        return name + "欢迎你  AlipaySomeService";
    }
}

public class WechatSomeService implements SomeService {
    @Override
    public String hello(String name) {
        return name + "欢迎你  WechatSomeService";
    }
}
(4) 定义注册中心注册规范
/**
 * 注册规范
 */
public interface RegistryCenter {
    /**
     *  注册到注册中心
     * @param serviceName  服务名称,一般是接口名
     * @param serviceAddress  提供者的ip:port
     */
    void register(String serviceName, String serviceAddress) throws Exception;
}
(5) 定义 zk 注册中心实现

注意:

  • ExponentialBackoffRetry(1000,10):每重试一次,sleep1秒,最多重试10次
// 注册到zk
public class ZKRegistryCenter implements RegistryCenter {
    private CuratorFramework client;

    public ZKRegistryCenter() {
        // 创建并初始化zk客户端
        client = CuratorFrameworkFactory.builder()
                // 指定要连接的zk集群地址
                // ZK地址统一用spi里面配置的常量,因为客户端也要用
                .connectString(ZKConstant.ZK_CLUSTER)
                // 指定连接超时
                .connectionTimeoutMs(10000)
                // 指定会话超时
                .sessionTimeoutMs(4000)
                // 指定重试策略:每重试一次,sleep 1秒,最多重试10次
                .retryPolicy(new ExponentialBackoffRetry(1000, 10))
                .build();
        // 启动zk客户端
        client.start();
    }
    @Override
//实现registry方法:
    public void register(String serviceName, String serviceAddress) throws Exception {
        // 要创建的服务名称对应的节点路径
        // 先直接创建好持久根节点/mybubbo 和/mybubbo/xxxService,都是持久节点,可以同时创建
        //(需要判空,如果已经存在会报错)
        String servicePath = ZKConstant.ZK_DUBBO_ROOT_PATH + "/" + serviceName;
        if (client.checkExists().forPath(servicePath) == null) {
            client.create()
                    // 若父节点不存在,则会自动创建
                    .creatingParentsIfNeeded()
                    // 指定要创建的是持久节点
                    .withMode(CreateMode.PERSISTENT)
                    // 指定要创建的节点名称
                    .forPath(servicePath);
        }
        // 要创建的主机对应的节点路径
        String hostPath = servicePath + "/" + serviceAddress;
        if (client.checkExists().forPath(hostPath) == null) {
            client.create()
                    .withMode(CreateMode.EPHEMERAL)  // 临时节点
                    .forPath(hostPath);
        }
    }
}
(6) 修改服务器类

A、修改 publish()方法

public class RpcServer {
    // 注册表
    private Map<String, Object> registerMap = new HashMap<>();
    // 用于存放指定包中的业务接口的实现类名
    private List<String> classCache = new ArrayList<>();
    // 主机地址
    private String serviceAddress;
    // 业务接口实现类所在包
    private String basePackage;
    
//新增主机地址,注册到ZK时用,也可以放到publish方法实现赋值,这里直接用构造了
    public RpcServer(String serviceAddress) {
        this.serviceAddress = serviceAddress;
    }

    // 发布服务:将指定包中的业务接口实现类实例写入到注册表
    public void publish(String basePackage) throws Exception {
    // publish时,将发布的包名维护到成员变量,后面服务端处理器需要知道包名,找到实现类
        this.basePackage = basePackage;
        // 将指定包中的业务接口实现类名写入到classCache中
        cacheClassCache(basePackage);
        // 将指定包中的业务接口实现类实例写入到注册表
        doRegister();
    }

    // 将指定包中的业务接口实现类名写入到classCache中
    private void cacheClassCache(String basePackage) {
    //没有变化
        ...
    }

// registerMap:之前注册表维护的是接口名,现在改成实现类类名
    // 增加注册到注册中心的方法,注册的是接口名
    private void doRegister() throws Exception {
        if (classCache.size() == 0) {
            return;
        }

        for (String className : classCache) {
            // 将当前遍历的类加载到内存
            Class<?> clazz = Class.forName(className);
            // 将“实现类名-提供者实例”写入到注册表
            registerMap.put(className, clazz.newInstance());

            // 将“业务接口-主机地址”注册到zk
            new ZKRegistryCenter().register(clazz.getInterfaces()[0].getName(), serviceAddress);
        }
    }

    // 启动服务器
    public void start() throws InterruptedException {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(parentGroup, childGroup)
                    // 用于指定当Server的连接请求处理线程全被占用时,
                    // 临时存放已经完成了三次握手的请求的队列的长度。
                    // 默认是50
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    // 指定使用心跳机制来保证TCP长连接的存活性
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new ObjectEncoder());
                            pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
                                    ClassResolvers.cacheDisabled(null)));
                            //注册表,和发布的包名传入处理器
                            pipeline.addLast(new RpcServerHandler(registerMap, basePackage));
                        }
                    });
                    
//地址不是写死了,通过构造传入本机地址
            String ip = serviceAddress.split(":")[0];
            String port = serviceAddress.split(":")[1];

            ChannelFuture future = bootstrap.bind(ip, Integer.valueOf(port)).sync();
            System.out.println("服务端已启动,监听的端口为:" + port);
            future.channel().closeFuture().sync();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}
(7) 修改服务器启动类
public class RpcStarter {
    public static void main(String[] args) throws Exception {
        RpcServer server = new RpcServer("127.0.0.1:8888");
        server.publish("com.abc.service");
        server.start();
    }
}
(8) 修改服务端处理器

注意

  • 通过构造将前面传来的发布包名保存,因为实现类有可能和接口的包不一样
  • 现在一个接口有多个实现类,所以需要我们按照规则拼接需要调用的实现类名
    basePackage + Prefix + InterFaceName
  • 如果没有传Prefix,则用查询到的接口的第一个实现类
// ChannelInboundHandlerAdapter:不会自动释放msg
// SimpleChannelInboundHandler:会自动释放msg
public class RpcServerHandler extends SimpleChannelInboundHandler<Invocation> {

    private Map<String, Object> registerMap;
    private String basePackage;
    
//通过构造将前面传来的发布包名保存,因为实现类有可能和接口的包不一样
    public RpcServerHandler(Map<String, Object> registerMap, String basePackage) {
        this.registerMap = registerMap;
        this.basePackage = basePackage;
    }

    // 解析Client发送来的msg,然后从registerMap注册表中查看是否有对应的接口
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Invocation msg) throws Exception {
        Object result = "没有该提供者,或没有该方法";

        // prefix:Alipay
        // 接口名:com.abc.service.SomeServicve
        // 获取接口名
        String interfaceName = msg.getClassName();
        // 获取接口的简单类名
        String simpleInterfaceName = interfaceName
                .substring(interfaceName.lastIndexOf(".") + 1);
        // 拼接指定实现类的类名,即registerMap的key
        String key = basePackage + "." + msg.getPrefix() + simpleInterfaceName;

        // 若消费者没有指定要调用的前辍,则使用第一个该接口的实现类
        if (StringUtil.isNullOrEmpty(msg.getPrefix())) {
            // 遍历所有实现类名
            for (String rkey : registerMap.keySet()) {
                // 若当前遍历的实现类名以简单接口名结尾,则选定这个类名
                if (rkey.endsWith(simpleInterfaceName)) {
                    key = rkey;
                    break;
                }
            }
        }

        if (registerMap.containsKey(key)) {
            // 从注册表中获取接口对应的实现类实例
            Object invoker = registerMap.get(key);
            result = invoker.getClass().getMethod(msg.getMethodName(), msg.getParamTypes())
                    .invoke(invoker, msg.getParamValues());
        }
        // 将运算结果返回给client
        ctx.writeAndFlush(result);
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

2.5 定义客户端工程 11-dubbo-client

(1) 创建工程

复制 10-rpc-client 工程,在其基础上进行修改:11-dubbo-client

(2) 添加依赖

由于当前工程要作为 zk 的客户端对 zk 进行操作,所以这里新增了 Curator 依赖。

<!--curator 依赖-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
(3) 未发生变化的类

复制来的工程所包含的类中,客户端处理器类 RpcClientHandler 未发生变化,可以直接使用。

(4) 定义负载均衡接口
/**
 * 负载均衡
 */
public interface LoadBalance {
    String choose(List<String> servers);
}
(5) 定义随机负载均衡器
// 随机负载均衡
public class RandomLoadBalance implements LoadBalance {

    @Override
    public String choose(List<String> servers) {
        return servers.get(new Random().nextInt(servers.size()));
    }
}

我们实现的负载均衡很简单,实际上Bubbo实现的非常负载,且功能强大

(6) 定义服务发现规范
/**
 * 服务发现
 */
public interface ServiceDiscovery {
    /**
     *
     * @param serviceName  服务名称,即接口名
     * @return  返回经过负载均衡后的server
     */
    String discovery(String serviceName) throws Exception;
}
(7) 定义 zk 服务发现实现类

注意

  • 服务发现方法中需要监听子节点列表变化,用
    在这里插入图片描述
// 从zk中进行服务发现
public class ZKServiceDiscovery implements ServiceDiscovery {
    private CuratorFramework client;
    
    //存储指定服务的提供者列表
    private List<String> servers;
    
//构造中初始化ZK客户端:
    public ZKServiceDiscovery() {
        // 创建并初始化zk客户端
        client = CuratorFrameworkFactory.builder()
                // 指定要连接的zk集群地址
                .connectString(ZKConstant.ZK_CLUSTER)
                // 指定连接超时
                .connectionTimeoutMs(10000)
                // 指定会话超时
                .sessionTimeoutMs(4000)
                // 指定重试策略:每重试一次,sleep 1秒,最多重试10次
                .retryPolicy(new ExponentialBackoffRetry(1000, 10))
                .build();
        // 启动zk客户端
        client.start();
    }

    @Override
    //服务发现:
    public String discovery(String serviceName) throws Exception {
        // 要获取的服务在zk中的路径
        String servicePath = ZKConstant.ZK_DUBBO_ROOT_PATH + "/" + serviceName;
        // 获取到指定节点的所有子节点列表
        servers = client.getChildren()
                .usingWatcher((CuratorWatcher) event -> {
                    servers = client.getChildren().forPath(servicePath);
                })
                .forPath(servicePath);
        if (servers.size() == 0) {
            return null;
        }
        // 负载均衡选择一个主机
        return new RandomLoadBalance().choose(servers);
    }
}

我们这边写的Watcher其实没用,因为我们客户端Demo执行完进程直接结束了

(8) 修改消费者类 RpcConsumer
public class RpcConsumer {
    public static void main(String[] args) {
        SomeService service = RpcProxy.create(SomeService.class, "Wechat");
        System.out.println(service.hello("kkb"));
        System.out.println(service.hashCode());
    }
}
(9) 修改 RpcProxy

变化:

  • create方法多加了一个前缀参数:
  • rpcInvoke新增了服务发现功能
public class RpcProxy {

//新增一个prefix前缀参数
    public static <T> T create(Class<?> clazz, String prefix) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(),
                new Class[]{clazz},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        // 若调用的是Object的方法,则直接进行本地调用
                        if (Object.class.equals(method.getDeclaringClass())) {
                            return method.invoke(this, args);
                        }
                        // 远程调用在这里发生
                        // 同样新增一个前缀参数
                        return rpcInvoke(clazz, method, args, prefix);
                    }
                });
    }

//远程调用新增了服务发现的功能
    private static Object rpcInvoke(Class<?> clazz, Method method, Object[] args, String prefix) throws Exception {
        RpcClientHandler handler = new RpcClientHandler();
        NioEventLoopGroup loopGroup = new NioEventLoopGroup();
        
//进行服务发现,查询指定服务的提供者地址
        ServiceDiscovery discovery = new ZKServiceDiscovery();
        String serverAddress = discovery.discovery(clazz.getName());
        if (serverAddress == null) {
            return null;
        }

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(loopGroup)
                    .channel(NioSocketChannel.class)
                    // Nagle算法开关
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new ObjectEncoder());
                            pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE,
                                    ClassResolvers.cacheDisabled(null)));
                            pipeline.addLast(handler);
                        }
                    });

            String ip = serverAddress.split(":")[0];
            String port = serverAddress.split(":")[1];
            ChannelFuture future = bootstrap.connect(ip, Integer.valueOf(port)).sync();

            // 形成远程调用的参数实例
            Invocation invocation = new Invocation();
            invocation.setClassName(clazz.getName());
            invocation.setMethodName(method.getName());
            invocation.setParamTypes(method.getParameterTypes());
            invocation.setParamValues(args);
            // 将前缀传过去
            invocation.setPrefix(prefix);

            // 将参数实例发送给Server
            future.channel().writeAndFlush(invocation).sync();

            future.channel().closeFuture().sync();
        } finally {
            loopGroup.shutdownGracefully();
        }
        return handler.getResult() + " : " + serverAddress;
    }
}

3. Server 监听多端口

3.1 创建工程 13-multiports

复制 02-socket 工程,在此基础上进行修改。仅需修改服务端即可,客户端代码不用动:13-multiports

3.2 修改 SomeServer 类

// 定义服务端启动类
public class SomeServer {
    private List<ChannelFuture> futures = new ArrayList<>();
    private EventLoopGroup parentGroup = new NioEventLoopGroup();
    private EventLoopGroup childGroup = new NioEventLoopGroup();

    public void start(List<Integer> ports) throws Exception {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(parentGroup, childGroup)
                 .channel(NioServerSocketChannel.class)
                 .childHandler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new SomeServerHandler());
                    }
                });

        for (Integer port : ports) {
            // 生成一个future
            // bing时又调用了sync,实际上监听没意义了。
            // 肯定是按顺序一个一个绑定的
            ChannelFuture future = bootstrap.bind(port).sync();
            System.out.println("服务器正在启动中。。。");
            future.addListener(f -> {
                if (f.isSuccess()) {
                    System.out.println("服务器已启动,监听的端口为:" + port);
                }
            });
            // 将所有生成的future添加到集合中
            futures.add(future);
        }
    }

    // 关闭所有Channel
    public void closeAllChannel() {
        System.out.println("所有Channel已经全部关闭");
        for (ChannelFuture future : futures) {
            future.channel().close();
        }
        parentGroup.shutdownGracefully();
        childGroup.shutdownGracefully();
    }
}

3.3 修改 SomeServerHandler 类

public class SomeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 显示客户端的ip与port
        System.out.print(ctx.channel().remoteAddress() + ",");
        // 显示当前Server的ip与port,就是当前监听的端口号
        System.out.println(ctx.channel().localAddress() + "," + msg);
        // 向客户端发送数据
        ctx.channel().writeAndFlush("from server:" + UUID.randomUUID());
        TimeUnit.MILLISECONDS.sleep(500);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

}

3.4 定义 ServerStarter 类

public class ServerStarter {
    public static void main(String[] args) throws Exception {
        List<Integer> ports = new ArrayList<>();
        ports.add(7777);
        ports.add(8888);
        ports.add(9999);

        SomeServer server = new SomeServer();
        // 启动服务器,按照指定端口号进行监听
        server.start(ports);

        // 30秒后关闭所有channel
        TimeUnit.SECONDS.sleep(60);
        server.closeAllChannel();
    }
}

演示

在这里插入图片描述
在这里插入图片描述

演示这个目的是为了后面介绍源码的时候会涉及到,所以要知道同一个ServerBootstrap,Server端是可以监听多个端口的。

 

Netty(五) 高级应用 之 手写 RPC 框架 、 手写 Dubbo 框架 、Server端监听多个端口_rpc 请求监听-CSDN博客

请登录后发表评论

    没有回复内容