雪花算法自动生成workId

Twitter 的雪花算法大部分人应该都知道,用于生成有序的数值类型的 id。算法的原理也很简单,通过划分了 64 位比特:1 位符号位固定为 0 表示正数,41 位时间戳,5 位数据中心 id,5 位机器 id,12 位有序序号。其中 5 位数据中心 id 和 5 位机器 id,接下来我将合并成 10 位 workid 来做讨论,含义是一样的,总共可以分配 1024 台机器用于生成相同业务的主键。

为啥要区分 workid?

同一种业务他的主键一定是唯一的。如商品服务,则商品表内的主键肯定是不能重复的。但如果发布多个商品服务实例,此时 id 如果是有商品服务自己生成,则不区分 workid,那么在相同毫秒下如果出现并发请求且请求到了不同的实例,理论上是会出现 id 重复的。虽然概率很小,但是在一些集中时间高并发的业务是很有可能出现的,我们还是应该要避免这种问题。

怎么解决

  1. 手动配置
    单机情况下,我们一般在实现该算法的时候,可能就直接指定了 workid 为一个固定的整数(0~1023)。或者集群环境下,有一台或多台服务专门提供生成主键的服务,此时也可以通过手动分配(硬编码或者配置文件)的方式来指定 workid。

    但是随着容器化自动部署,共用配置中心,相同的服务用的是同一套配置中心,很难通过手动的配置方式来指定 workid。即使可以做到,对于后期的运维也是很麻烦的事情。

  2. 美团 leaf
    市面上,当前有美团的开源项目 leaf,将业务服务的 ip+port 添加到 zookeeper 上,通过 zookeeper 的分布式一致性以及它的持久性节点,来避免抢占相同的 workid,同时 leaf 还通过定时任务上传业务服务的时间信息,来避免时钟回拨的问题,可以仔细阅读下代码,这块代码还是很好理解的:https://github.com/Meituan-Dianping/Leaf

    但是我司在 k8s 上部署服务的时候,并没有指定 ip,每次发布 ip 都是动态分配的,也就意味着相同业务实例它的 workid 会随着发布次数的变多,就会超过 1024 个,有人也提了 issue:https://github.com/Meituan-Dianping/Leaf/issues/84,有热心市民也给出了解决方案:https://github.com/NotFound9/Leaf

自己实现

基于 zookeeper 的临时节点的特性,即客户端断开连接,就会删除节点。可以实现一个自动生成不重复的 workId 的功能。

  1. 参考美团的 snowflake 算法,我把 workId 给暴露出去,同时提供了 init 和 destroy 方法,用于容器内zk 客户端的初始化和销毁
  2. 创建了一个 ZKSnowflake 继承 Snowflake
  3. 在 init 方法中,通过分布式锁先锁住对应业务类型的节点,用于该业务类型多实例发布的时候避免抢占。
  4. 遍历该业务节点下的所有节点,然后再 0~1023 这个范围内遍历,选出还没有被使用的节点,将这个下标作为 workid,并创建到 zk 中作为一个临时节点。
  5. zk 客户端增加监听,对重连的事件,进行重新获取 workid,期间内不允许生成 id。(该类情况可以根据业务的实际场景来判断,其实如果都是指定时间发布的,后续不做修改也可以,只要在按批发布的时候区分好 workid 即可)
public class ZkSnowflake extends Snowflake {
    public static final String ROOT = "/snowflake/";

    public static final String LOCK = "/lock/";

    private String zkAddress;

    private String lockPath;

    private String groupPath;

    private CuratorFramework client;

    private volatile boolean prepare = false;

    
    public synchronized long nextId() {
        if (!prepare) {
            throw new RuntimeException("workId未装载完毕");
        }
        return super.nextId();
    }

    public ZkSnowflake(String zkAddress, String groupName) {
        this.zkAddress = zkAddress;
        this.lockPath = LOCK + groupName;
        this.groupPath = ROOT + groupName;
    }

    
    public void init() {
        client = CuratorFrameworkFactory.builder().connectString(zkAddress).sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        client.start();

        genWorkId(client);

        client.getConnectionStateListenable().addListener((cli, connectionState) -> {
            if (connectionState == ConnectionState.RECONNECTED) {
                log.error("zk重连");
                genWorkId(cli);
            } else if (connectionState == ConnectionState.LOST) {
                log.error("zk失连");
            }
        });

        log.info("snowflake启动成功,workId:{}", workerId);
    }

    
    public void destroy() {
        client.close();
        log.info("zk客户端关闭");
    }


    private void genWorkId(CuratorFramework client) {
        prepare = false;
        InterProcessMutex lock = null;
        try {
            lock = new InterProcessMutex(client, lockPath);
            lock.acquire();

            Stat stat = client.checkExists().forPath(groupPath);
            if (stat == null) {
                client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(groupPath);
            }
            List<String> children = client.getChildren().forPath(groupPath);
            for (long workerId = minWorkerId; workerId <= maxWorkerId; workerId++) {
                String workerIdStr = String.valueOf(workerId);
                if (!children.contains(workerIdStr)) {
                    String nodePath = groupPath + "/" + workerIdStr;
                    client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(nodePath);
                    this.workerId = workerId;
                    prepare = true;
                    break;
                }
            }

            if (!prepare) {
                throw new RuntimeException("workId已超范围无法分配");
            }

        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            try {
                if (lock != null) {
                    lock.release();
                }
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
        }
    }
}

总结

相比于美团的,自己实现的方法比较简单,但是没法解决时钟回拨的问题(因为无法知道该服务实例上一次生成 id 的时间戳),也可能会遇到 zk 丢失节点(临时节点)等一些问题。

网上也有基于 redis 的方案,等有空了再整合下。

后续再学习下有没有更好的方案~~

请登录后发表评论

    没有回复内容