Twitter 的雪花算法大部分人应该都知道,用于生成有序的数值类型的 id。算法的原理也很简单,通过划分了 64 位比特:1 位符号位固定为 0 表示正数,41 位时间戳,5 位数据中心 id,5 位机器 id,12 位有序序号。其中 5 位数据中心 id 和 5 位机器 id,接下来我将合并成 10 位 workid 来做讨论,含义是一样的,总共可以分配 1024 台机器用于生成相同业务的主键。
为啥要区分 workid?
同一种业务他的主键一定是唯一的。如商品服务,则商品表内的主键肯定是不能重复的。但如果发布多个商品服务实例,此时 id 如果是有商品服务自己生成,则不区分 workid,那么在相同毫秒下如果出现并发请求且请求到了不同的实例,理论上是会出现 id 重复的。虽然概率很小,但是在一些集中时间高并发的业务是很有可能出现的,我们还是应该要避免这种问题。
怎么解决
-
手动配置
单机情况下,我们一般在实现该算法的时候,可能就直接指定了 workid 为一个固定的整数(0~1023)。或者集群环境下,有一台或多台服务专门提供生成主键的服务,此时也可以通过手动分配(硬编码或者配置文件)的方式来指定 workid。但是随着容器化自动部署,共用配置中心,相同的服务用的是同一套配置中心,很难通过手动的配置方式来指定 workid。即使可以做到,对于后期的运维也是很麻烦的事情。
-
美团 leaf
市面上,当前有美团的开源项目 leaf,将业务服务的 ip+port 添加到 zookeeper 上,通过 zookeeper 的分布式一致性以及它的持久性节点,来避免抢占相同的 workid,同时 leaf 还通过定时任务上传业务服务的时间信息,来避免时钟回拨的问题,可以仔细阅读下代码,这块代码还是很好理解的:https://github.com/Meituan-Dianping/Leaf但是我司在 k8s 上部署服务的时候,并没有指定 ip,每次发布 ip 都是动态分配的,也就意味着相同业务实例它的 workid 会随着发布次数的变多,就会超过 1024 个,有人也提了 issue:https://github.com/Meituan-Dianping/Leaf/issues/84,有热心市民也给出了解决方案:https://github.com/NotFound9/Leaf。
自己实现
基于 zookeeper 的临时节点的特性,即客户端断开连接,就会删除节点。可以实现一个自动生成不重复的 workId 的功能。
- 参考美团的 snowflake 算法,我把 workId 给暴露出去,同时提供了 init 和 destroy 方法,用于容器内zk 客户端的初始化和销毁
- 创建了一个 ZKSnowflake 继承 Snowflake
- 在 init 方法中,通过分布式锁先锁住对应业务类型的节点,用于该业务类型多实例发布的时候避免抢占。
- 遍历该业务节点下的所有节点,然后再 0~1023 这个范围内遍历,选出还没有被使用的节点,将这个下标作为 workid,并创建到 zk 中作为一个临时节点。
- zk 客户端增加监听,对重连的事件,进行重新获取 workid,期间内不允许生成 id。(该类情况可以根据业务的实际场景来判断,其实如果都是指定时间发布的,后续不做修改也可以,只要在按批发布的时候区分好 workid 即可)
public class ZkSnowflake extends Snowflake {
public static final String ROOT = "/snowflake/";
public static final String LOCK = "/lock/";
private String zkAddress;
private String lockPath;
private String groupPath;
private CuratorFramework client;
private volatile boolean prepare = false;
public synchronized long nextId() {
if (!prepare) {
throw new RuntimeException("workId未装载完毕");
}
return super.nextId();
}
public ZkSnowflake(String zkAddress, String groupName) {
this.zkAddress = zkAddress;
this.lockPath = LOCK + groupName;
this.groupPath = ROOT + groupName;
}
public void init() {
client = CuratorFrameworkFactory.builder().connectString(zkAddress).sessionTimeoutMs(5000).connectionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
genWorkId(client);
client.getConnectionStateListenable().addListener((cli, connectionState) -> {
if (connectionState == ConnectionState.RECONNECTED) {
log.error("zk重连");
genWorkId(cli);
} else if (connectionState == ConnectionState.LOST) {
log.error("zk失连");
}
});
log.info("snowflake启动成功,workId:{}", workerId);
}
public void destroy() {
client.close();
log.info("zk客户端关闭");
}
private void genWorkId(CuratorFramework client) {
prepare = false;
InterProcessMutex lock = null;
try {
lock = new InterProcessMutex(client, lockPath);
lock.acquire();
Stat stat = client.checkExists().forPath(groupPath);
if (stat == null) {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(groupPath);
}
List<String> children = client.getChildren().forPath(groupPath);
for (long workerId = minWorkerId; workerId <= maxWorkerId; workerId++) {
String workerIdStr = String.valueOf(workerId);
if (!children.contains(workerIdStr)) {
String nodePath = groupPath + "/" + workerIdStr;
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(nodePath);
this.workerId = workerId;
prepare = true;
break;
}
}
if (!prepare) {
throw new RuntimeException("workId已超范围无法分配");
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
try {
if (lock != null) {
lock.release();
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
}
总结
相比于美团的,自己实现的方法比较简单,但是没法解决时钟回拨的问题(因为无法知道该服务实例上一次生成 id 的时间戳),也可能会遇到 zk 丢失节点(临时节点)等一些问题。
网上也有基于 redis 的方案,等有空了再整合下。
后续再学习下有没有更好的方案~~
没有回复内容