雪花算法生成SnowflakeID

发布于:2024-08-11 ⋅ 阅读:(142) ⋅ 点赞:(0)

雪花算法 Snowflake ID

雪花算法(Snowflake Algorithm)是一种分布式ID生成算法,最初由Twitter开发并使用,用于在分布式系统中生成全局唯一且有序的64位整数ID。这种算法生成的ID通常称为雪花ID(Snowflake ID)。

雪花ID具有以下特点:

  1. 全局唯一性:由于算法设计考虑了机器标识、时间戳和序列号,确保了在分布式环境下生成的ID是唯一的。
  2. 有序性:在相同节点上生成的ID,在大多数情况下,时间戳部分是递增的,这有助于排序和数据分片。
  3. 可预测性低:虽然ID是有序的,但通过时间戳、机器标识和序列号的组合,使得单个ID难以被预测。
  4. 性能高效:算法实现简单,生成ID的过程不需要网络通信或数据库查询,因此性能高。

雪花算法的ID结构如下:

  • 1位:符号位,始终为0,表示正数。
  • 41位:时间戳(毫秒精度),从某个固定的时间点开始计算,如2010年1月1日0时0分0秒。
  • 10位:工作机器ID,可以标识不同的服务器节点。
  • 12位:序列号,用于同一毫秒内生成的不同ID。
    SnowflakeID

雪花算法的工作流程如下:

  1. 时间戳:获取当前时间戳,减去起始时间戳,得到一个相对时间戳。这个相对时间戳占用41位。
  2. 工作机器ID:每个服务器有一个唯一的ID,这个ID需要事先配置好,并且保证在一个集群中是唯一的。这个ID占用10位,最多支持1024个不同的服务器。
  3. 序列号:对于同一个毫秒内的多个请求,使用序列号来区分,序列号每增加一次就加1,直到达到最大值12位全满(即4096),然后重置为0,等待下一个毫秒到来。
  4. 拼接ID:将上述三部分按位拼接起来,形成最终的64位整数ID。

需要注意的是,如果在某一毫秒内生成的ID数量超过了序列号的最大值(4096),则需要等待下一个毫秒才能继续生成新的ID,这是雪花算法的一个潜在的限制。

雪花算法在实际应用中可能需要考虑时间回拨、服务器重启、服务器ID冲突等问题,因此在实现时需要加入适当的容错机制和异常处理逻辑。

时钟回拨抛异常版本
public class SnowflakeIdGenerator {
	/** 最大工作机器ID所占的位数 */
	private final long workerIdBits = 5L;
	/** 数据中心ID所占的位数 */
	private final long datacenterIdBits = 5L;
	/** 支持的最大工作机器ID,结果是31*/
	private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
	/** 支持的最大数据中心ID,结果是31 */
	private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
	/** 序列所占的位数 */
	private final long sequenceBits = 12L;
	/** 工作机器ID向左移12位 */
	private final long workerIdShift = sequenceBits;
	/** 数据中心ID向左移17位(12+5) */
	private final long datacenterIdShift = sequenceBits + workerIdBits;
	/** 时间截向左移22位(5+5+12) */
	private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
	/** 支持的最大序列,这里为4095*/
	private final long sequenceMask = -1L ^ (-1L << sequenceBits);
	/** 开始时间截 (2021-10-01) */
	private long twepoch = 1633017600000L;
	/** 工作机器ID(0~31) */
	private long workerId;
	/** 数据中心ID(0~31) */
	private long datacenterId;
	/** 毫秒内序列(0~4095) */
	private long sequence = 0L;
	/** 上次生成ID的时间截 */
	private long lastTimestamp = -1L;

	/**
	 * 构造函数
	 * 
	 * @param workerId     工作机器ID (0~31)
	 * @param datacenterId 数据中心ID (0~31)
	 */
	public SnowflakeIdGenerator(long workerId, long datacenterId) {
		if (workerId > maxWorkerId || workerId < 0) {
			throw new IllegalArgumentException(
					String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
		}
		if (datacenterId > maxDatacenterId || datacenterId < 0) {
			throw new IllegalArgumentException(
					String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
		}
		this.workerId = workerId;
		this.datacenterId = datacenterId;
	}

	public SnowflakeIdGenerator(long workerId, long datacenterId, long epochTime) {
		if (workerId > maxWorkerId || workerId < 0) {
			throw new IllegalArgumentException(
					String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
		}
		if (datacenterId > maxDatacenterId || datacenterId < 0) {
			throw new IllegalArgumentException(
					String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
		}
		this.workerId = workerId;
		this.datacenterId = datacenterId;
		this.twepoch = epochTime;
	}

	/**
	 * 获得下一个ID (该方法是线程安全的)
	 * 
	 * @return SnowflakeId
	 */
	public synchronized long nextId() {
		long timestamp = timeGen();

		// 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
		if (timestamp < lastTimestamp) {
			throw new RuntimeException(String.format(
					"Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
		}
		// 如果是同一时间生成的,则进行毫秒内序列
		if (lastTimestamp == timestamp) {
			sequence = (sequence + 1) & sequenceMask;
			// 毫秒内序列溢出
			if (sequence == 0) {
				// 阻塞到下一个毫秒,获得新的时间戳
				timestamp = tilNextMillis(lastTimestamp);
			}
		}
		// 时间戳改变,毫秒内序列重置
		else {
			sequence = 0L;
		}
		// 上次生成ID的时间截
		lastTimestamp = timestamp;
		// 移位并通过或运算拼到一起组成64位的ID
		return ((timestamp - twepoch) << timestampLeftShift) | (datacenterId << datacenterIdShift)
				| (workerId << workerIdShift) | sequence;
	}

	/**
	 * 阻塞到下一个毫秒,直到获得新的时间戳
	 * 
	 * @param lastTimestamp 上次生成ID的时间截
	 * @return 当前时间戳
	 */
	protected long tilNextMillis(long lastTimestamp) {
		long timestamp = timeGen();
		while (timestamp <= lastTimestamp) {
			timestamp = timeGen();
		}
		return timestamp;
	}

	/**
	 * 返回以毫秒为单位的当前时间
	 * 
	 * @return 当前时间(毫秒)
	 */
	protected long timeGen() {
		return System.currentTimeMillis();
	}

	/** 测试 */
	public static void main(String[] args) {
		SnowflakeIdGenerator idWorker = new SnowflakeIdGenerator(0, 0);
		for (int i = 0; i < 1000; i++) {
			long id = idWorker.nextId();
			System.out.println(Long.toBinaryString(id));
			System.out.println(id);
		}
	}
}
时钟回拨等待模式

这里提供另外的一种代码风格的编写,基本逻辑和上面代码是一样的,WORKER_ID_BITS占用10位就是上面代码的【工作机器ID】5位加【数据中心ID】5位

package com.sanjunweb.mp;

import java.util.Calendar;
import java.util.Properties;

public final class SnowflakeShardingKeyGenerator {
	/** 开始时间截 */
	public static final long EPOCH;
	/** 序列号在SnowflakeID中占的位数 */
	private static final long SEQUENCE_BITS = 12L;
	/** 工作机器ID所占的位数 */
	private static final long WORKER_ID_BITS = 10L;
	/** 序列号最大值,这里为4095 */
	private static final long SEQUENCE_MASK = (1 << SEQUENCE_BITS) - 1;
	/** 工作机器ID向左移12位 */
	private static final long WORKER_ID_LEFT_SHIFT_BITS = SEQUENCE_BITS;
	/** 时间截向左移22位(12+10) */
	private static final long TIMESTAMP_LEFT_SHIFT_BITS = WORKER_ID_LEFT_SHIFT_BITS + WORKER_ID_BITS;
	/** 工作机器ID最大值,这里为1024 */
	private static final long WORKER_ID_MAX_VALUE = 1L << WORKER_ID_BITS;
	/** 工作机器ID */
	private static final long WORKER_ID = 0;
	/** 默认的序列号步长 */
	private static final int DEFAULT_VIBRATION_VALUE = 1;
	/** 允许最大时钟回拨 */
	private static final int MAX_TOLERATE_TIME_DIFFERENCE_MILLISECONDS = 10;
	/** 新的一毫秒开始时,序列号起始偏移量 */
	private int sequenceOffset = -1;
	/** 序列号偏移量 */
	private long sequence;
	/** 上一次生成SnowflakeID的毫秒时间戳 */
	private long lastMilliseconds;
	/** 获取当前系统时间 */
	private static TimeService timeService = new TimeService();
	/** 属性配置 */
	private Properties properties = new Properties();

	static {
		// 初始化一个开始时间戳
		Calendar calendar = Calendar.getInstance();
		calendar.set(2016, Calendar.NOVEMBER, 1);
		calendar.set(Calendar.HOUR_OF_DAY, 0);
		calendar.set(Calendar.MINUTE, 0);
		calendar.set(Calendar.SECOND, 0);
		calendar.set(Calendar.MILLISECOND, 0);
		EPOCH = calendar.getTimeInMillis();
	}

	/**
	 * 获取SnowflakeID
	 * 
	 * @return
	 * @throws InterruptedException
	 */
	public synchronized Comparable<?> generateKey() throws InterruptedException {
		long currentMilliseconds = timeService.getCurrentMillis();
		if (waitTolerateTimeDifferenceIfNeed(currentMilliseconds)) {
			currentMilliseconds = timeService.getCurrentMillis();
		}
		if (lastMilliseconds == currentMilliseconds) {
			if (0L == (sequence = (sequence + 1) & SEQUENCE_MASK)) {
				currentMilliseconds = waitUntilNextTime(currentMilliseconds);
			}
		} else {
			vibrateSequenceOffset();
			sequence = sequenceOffset;
		}
		lastMilliseconds = currentMilliseconds;
		return ((currentMilliseconds - EPOCH) << TIMESTAMP_LEFT_SHIFT_BITS)
				| (getWorkerId() << WORKER_ID_LEFT_SHIFT_BITS) | sequence;
	}

	/**
	 * 如果存在时钟回拨,检测回拨时间,如果范围合适则等待,不合适抛出异常
	 * 
	 * @param currentMilliseconds
	 * @return
	 * @throws InterruptedException
	 */
	private boolean waitTolerateTimeDifferenceIfNeed(final long currentMilliseconds) throws InterruptedException {
		if (lastMilliseconds <= currentMilliseconds) {
			return false;
		}
		long timeDifferenceMilliseconds = lastMilliseconds - currentMilliseconds;
		checkState(timeDifferenceMilliseconds < getMaxTolerateTimeDifferenceMilliseconds(),
				"Clock is moving backwards, last time is %d milliseconds, current time is %d milliseconds",
				lastMilliseconds, currentMilliseconds);
		Thread.sleep(timeDifferenceMilliseconds);
		return true;
	}

	/**
	 * 判断状态
	 * 
	 * @param expression
	 * @param errorMessageTemplate
	 * @param errorMessageArgs
	 */
	public static void checkState(boolean expression, String errorMessageTemplate, Object... errorMessageArgs) {
		if (!expression) {
			throw new IllegalStateException(String.format(errorMessageTemplate, errorMessageArgs));
		}
	}

	/**
	 * 获取工作机器ID
	 * 
	 * @return
	 */
	private long getWorkerId() {
		long result = Long.valueOf(properties.getProperty("worker.id", String.valueOf(WORKER_ID)));
		checkArgument(result >= 0L && result < WORKER_ID_MAX_VALUE,
				String.format("worker Id can't be greater than %d or less than 0", WORKER_ID_MAX_VALUE));
		return result;
	}

	/**
	 * 获取最大时钟回拨毫秒
	 * 
	 * @return
	 */
	private int getMaxTolerateTimeDifferenceMilliseconds() {
		return Integer.valueOf(properties.getProperty("max.tolerate.time.difference.milliseconds",
				String.valueOf(MAX_TOLERATE_TIME_DIFFERENCE_MILLISECONDS)));
	}

	/**
	 * 等待下一个时间戳
	 * 
	 * @param lastTime
	 * @return
	 */
	private long waitUntilNextTime(final long lastTime) {
		long result = timeService.getCurrentMillis();
		while (result <= lastTime) {
			result = timeService.getCurrentMillis();
		}
		return result;
	}

	/**
	 * 在新的一毫秒开始时,序列号不是简单地从0开始,而是从一个起始偏移量开始
	 */
	private void vibrateSequenceOffset() {
		sequenceOffset = sequenceOffset >= getMaxVibrationOffset() ? 0 : sequenceOffset + 1;
	}

	/**
	 * 获取序列号的最大步长
	 * <p>
	 * 要使 sequenceOffset 能够取到更大的范围,你需要确保 max.vibration.offset 属性被正确设置
	 * 
	 * @return
	 */
	private int getMaxVibrationOffset() {
		int result = Integer
				.parseInt(properties.getProperty("max.vibration.offset", String.valueOf(DEFAULT_VIBRATION_VALUE)));
		checkArgument(result >= 0 && result <= SEQUENCE_MASK, "Illegal max vibration offset");
		return result;
	}

	/**
	 * 校验参数
	 * 
	 * @param expression
	 * @param errorMessage
	 */
	public static void checkArgument(boolean expression, Object errorMessage) {
		if (!expression) {
			throw new IllegalArgumentException(String.valueOf(errorMessage));
		}
	}

	/**
	 * 获取当前系统时间
	 * 
	 * @author dyw
	 * @date 2024年8月11日
	 */
	public static class TimeService {
		public long getCurrentMillis() {
			return System.currentTimeMillis();
		}
	}
}

循环使用序列号,不在新的时间周期重置,而是在当序列号增加到最大值时重置

private void vibrateSequenceOffset() {
	sequenceOffset = sequenceOffset >= getMaxVibrationOffset() ? 0 : sequenceOffset + 1;
}
  1. 避免碰撞:在高并发环境下,如果所有操作都在每个新周期开始时重置为0,那么理论上所有在同一时间戳内启动的操作都可能尝试使用相同的序列号,从而导致ID碰撞。而通过延续上一周期的序列号,可以减少这种同时期的碰撞概率。
  2. 提高分布均匀性:连续使用序列号可以更均匀地分布ID的生成。如果总是从0开始,可能会造成某些特定范围内的ID被过度使用,而其他范围则较少被触及。连续递增有助于确保整个序列号空间的均匀利用。
  3. 简化实现和维护:连续递增的逻辑较为简单,易于理解和实现,也减少了代码中的边界条件处理。例如,如果在达到最大值后需要特殊处理来重置序列号,那么这将引入额外的复杂性和潜在的错误点。
  4. 优化性能:在一些场景下,连续递增序列号可以减少不必要的计算和存储开销。例如,如果序列号需要存储在一个共享资源中(如数据库),连续递增可以减少对共享资源的访问频率,从而提高系统的整体性能。
  5. 适应性:连续递增的序列号能够更好地适应动态变化的工作负载。如果工作负载突然增加,连续递增可以更快地分配更多的序列号,而不需要等待下一个时间周期开始。
关于时钟回拨问题

由于各种潜在因素,机器的本地时钟可能会变得不准确。为了校准这些时间偏差,网络提供了NTP(网络时间协议)服务。然而,在进行时间校准时,时钟的跳跃或回拨问题可能会随之出现。雪花算法由于其强烈依赖于机器时钟,因此难以避免受到时钟回拨的影响,这可能导致生成重复的ID。在原标准实现中,当时钟回拨发生时,系统会直接抛出异常并短暂停止对外服务。然而,这种处理方式在实际生产环境中是不可接受的。因此,需要探索有效的解决方案来最小化时钟回拨带来的影响。以下是两种可能的解决思路:

1、摆脱对机器时钟的依赖:可以定义一个初始时间戳,并在需要时自增该时间戳,而不是跟随机器时钟的增加。那么,何时自增这个时间戳呢?一个合理的策略是,当序列号增加到最大值时,将时间戳加1。这种方法能够充分利用序列号,也可以充分利用未来时间,预先生成一些ID并存储在缓存中,特别适用于流量较大的场景。然而,在流量较小的情况下,可能会出现时间断层或滞后的现象。如果对从ID中解析出来的时间戳有特定的利用需求(例如用于记录事件发生的时间),那么这个缺点可能会成为问题。但如果时间戳的利用意义不大,那么这个缺点也就可以忽略不计了。

2、优化对机器时钟的依赖:如果仍然选择依赖机器时钟,那么对于小范围的时钟回拨(例如几十毫秒),可以选择等待时钟恢复正常。此外,如果流量不大,可以考虑缓存前几百毫秒或几秒的序列号。当时钟回拨发生时,可以从缓存中获取并自增序列号,从而有效地应对时钟回拨问题。这种方法在保持对机器时钟依赖的同时,降低了时钟回拨对系统的影响。


网站公告

今日签到

点亮在社区的每一天
去签到