【Redis】Redis 生成唯一 id

发布于:2024-12-18 ⋅ 阅读:(69) ⋅ 点赞:(0)

每个订单业务都需要有一个唯一的id,如果使用数据库自增id就会暴露规律,同时id会有一个最大的阈值,万一订单超过这个阈值,那就会出现问题。因此我们可以封装一个全局ID生成器,可以适用于分布式系统生成唯一ID,一般需要满足唯一性、高可用性、递增性、安全性、高性能。有以下几种策略能够生成唯一ID。

1. UUID

这个直接利用JDK自带的工具类UUID工具类就能生成了。这种生成策略生成的其实是16进制的一长串的数值,因为这一长串是十六进制,因此它返回的结果其实是字符串结构,并且也不是单调递增的一种特性。因此虽然可以做唯一ID,但是并不够友好,没有满足之前我们所说的哪些特性,因此这种用的比较少。

2. Redis 自增

Redis满足上述的几个特性,而且是整体单调递增的,数值的长度不超过long类型,因为它是个数值类型,存储大小也不会太大。

3. snowflake 算法

雪花算法是世界上知名的全局唯一id生成策略,采用long类型的64位数字,原理和今天的Redis自增原理差别不大,只是雪花算法的自增是当前机器的自增,需要维护机器ID。同时,对时钟依赖度高,如果时间不准确,可能会出现异常。

4. 数据库自增

数据库自增不是将表id设置为自增,而是单独用一张表记录做自增。需要自增时就要从这张单独的表获取id进行自增。

这里将演示redis自增生成唯一id。为了增加ID的安全性,不直接使用Redis自增的数值,而是拼接一些其他信息。

如图所示,ID分为三部分。

符号位:最高位表示符号位,1bit,永远为0。

时间戳:31bit,以秒为单位,可以使用 69 年。

序列号:32bit,秒内计数器,支持每秒产生2^32个不同的ID。

实现

/**
 * Redis id 自增
 * */
@Component
public class RedisIdWork {

    /*开始时间戳,这个是使用main方法测试出来的。基于2020年1月1日至写下此代码的时间戳
    public static void main(String[] args) {
        LocalDateTime now = LocalDateTime.of(2022, 1, 1, 0, 0, 0);
        long second = now.toEpochSecond(ZoneOffset.UTC);
        System.out.println(second);       // 1640995200
    }*/

    private static final long BEGIN_TIMESTAMP = 1640995200; //  基于2020年1月1日至写下此代码的时间戳
    private static final long COUNT_BITS = 32;    // 位数
    private StringRedisTemplate stringRedisTemplate;

    public RedisIdWork(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    // 方法的返回值是Long,因为按照之前的策略,最终的id是一个64位的数字就是对应了Java的Long
    // 参数keyPrefix:生成策略是基于redis的自增长,redis的自增肯定是需要有一个key,然后值不断自增。
    // 不同的业务肯定有不同的key,大家不能都去用同一个自增长,因此这里需要有前缀去区分不同的业务,例如订单业务可以传order过来
    public long nextId(String keyPrefix) {
        // 1. 生成时间戳。此刻时间减去开始时间戳
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond - BEGIN_TIMESTAMP;
        // 2. 生成序列号
        // 2.1.获取当前日期,精确到天yyyyMMdd。这里用冒号分开,因为在redis中如果你的key用冒号分隔,这样在redis中就是分层级的
        // 当我要统计某天、某月、某年的订单量的时候,就可以很方便的利用前缀统计,例如统计一个月就可以使用yyyy:MM作为前缀
        String data = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        // 2.2 自增长
         /*
         icr表示自增长,然后拼上业务(keyPrefix),但是到这key还不能结束。
         因为自增的值也会越来越大,而redis单个key的自增长对应的数值是有一个上限的,2的64次方。
         我们的key的策略里面,真正用来记录序列号的只有32个比特位,而redis是64比特位,超过64位很难,但是超过32位还是有可能的
         所以尽管是同一个业务,也不能使用同一个key,否则就很有可能会超过上限。
         办法:在业务前缀的后面,拼上一个时间戳,这个时间戳精确到天。这样的好处是将来如果想统计这一天一共下了多少单,那么直接看key的日期对应的值就行了,因此它还有一个统计效果
         注意:这里可能会报黄,说你这里拆箱可能会有空指针。但实际上这个方法并不会导致空指针,因为如果这个key不存在,它就会自动取给你创建一个key,并且从0开始,第一次自增长之后就是1了,所以你根本不用管这个警告
         */
        Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + data);
        // 3. 拼接id返回

        return timestamp << COUNT_BITS | count;
    }

}

测试

@SpringBootTest
class HmDianPingApplicationTests {

    @Resource
    private RedisIdWork redisIdWorker;

    // 线程池,创建500个线程
    private ExecutorService es = Executors.newFixedThreadPool(500);


    @Test
    void testIdWorker() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(300);

        // 线程任务
        Runnable task = () -> {
            for (int i = 0; i < 100; i++) {
                long id = redisIdWorker.nextId("order");
                System.out.println("id = " + id);
            }
        };
        long begin = System.currentTimeMillis();
        for (int i = 0; i < 300; i++) {
            es.submit(task); // 提交300次,每个任务生成100个自增长id,因此一共30000个
        }
        long end = System.currentTimeMillis(); // 由于线程池是异步的,因此这里计时其实是没有意义的,因此这里会借助于CountDownLatch
        System.out.println("time = " + (end - begin));
    }

}

因为使用线程池是异步的,因此异步线程还没执行完毕,主线程已经执行完毕了,所以测量运行时间是没有意义的。这里要使用CountDownLatch,它有countDown 和 await 两个方法可以帮我们测量运行时间。

await 是阻塞方法,由于异异步线程没执行完,主线程就开始执行了,所以要阻塞主线程方法,等待异步线程执行完毕后再执行主线程方法。那主线程需要阻塞到什么时候呢?当CountDownLatch内部维护的变量为0时,开始让主线程方法执行。我们让异步线程和变量绑定,每调用一次countDown,内部变量就减1,当内部变量为0时,说明异步线程都执行完毕了,此时await就不在阻塞主线程,这样就能够计算出系统运行时间。

@SpringBootTest
class HmDianPingApplicationTests {

    @Resource
    private RedisIdWork redisIdWorker;

    // 线程池,创建500个线程
    private ExecutorService es = Executors.newFixedThreadPool(500);


    @Test
    void testIdWorker() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(300);

        // 线程任务
        Runnable task = () -> {
            for (int i = 0; i < 100; i++) {
                long id = redisIdWorker.nextId("order");
                System.out.println("id = " + id);
            }
            latch.countDown();
        };
        long begin = System.currentTimeMillis();
        for (int i = 0; i < 300; i++) {
            es.submit(task); // 提交300次,每个任务生成100个自增长id,因此一共30000个
        }
        latch.await();
        long end = System.currentTimeMillis(); // 由于线程池是异步的,因此这里计时其实是没有意义的,因此这里会借助于CountDownLatch
        System.out.println("time = " + (end - begin));
    }

}

结果


网站公告

今日签到

点亮在社区的每一天
去签到