延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue

发布于:2024-05-03 ⋅ 阅读:(37) ⋅ 点赞:(0)

一、接着上文

RDelayedQueue作为redisson封装的一个分布式延迟队列,直接拿来使用还是比较简单的。

本文主要包括以下几部分:

  • 保存至延迟队列(生产者)
  • 读取延迟队列(消费者)
  • 从延迟队列移除任务

在这里插入图片描述

二、redission配置


import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Redisson配置类
 *
 * @author xxx
 */
@Configuration
public class RedissonConfig {
    @Value("${spring.application.name}")
    private String serverName;

    @Bean
    public RedissonClient redissonClient(RedisProperties redisProperties) {
        Config config = new Config();
        SingleServerConfig singleServerConfig = config.useSingleServer();
        singleServerConfig.setAddress("redis://" + redisProperties.getHost() + ":" + redisProperties.getPort());
        singleServerConfig.setPassword(redisProperties.getPassword());
        singleServerConfig.setKeepAlive(true);
        singleServerConfig.setDatabase(redisProperties.getDatabase());
        singleServerConfig.setConnectionMinimumIdleSize(2);
        singleServerConfig.setConnectionPoolSize(4);
        singleServerConfig.setClientName(serverName);
        return Redisson.create(config);
    }
}
spring:
  application:
    name: delay-task-service
  redis:
    host: 192.168.8.18
    port: 6379
    database: 0
    timeout: 3000

三、保存至延迟队列(生产者)

作为延迟任务的生产者,你需要根据预期的回调时间,计算出delay延迟时间。

伪代码见下:

public static final  String REDISSON_QUEUE_NAME = "DelayTaskQueue";

private final RedissonClient redissonClient;

RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(REDISSON_QUEUE_NAME);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);

long delay = DateUtil.between(event.getNotifyDate(), new DateTime(), DateUnit.SECOND);

delayedQueue.offer(event.getTransNo(), delay < 0 ? 1 : delay, TimeUnit.SECONDS);

四、读取延迟队列(消费者)

    public static final  String REDISSON_QUEUE_NAME = "DelayTaskQueue";

    private final RedissonClient redissonClient;
    
    @PostConstruct
    public void init() {
        new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>())
                .execute(() -> {
                    while (true) {
                        try {
                            RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDISSON_QUEUE_NAME);
                            RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);

                            String transNo = blockingDeque.take();

                            if (null == transNo) {
                                return;
                            }

                            if (log.isInfoEnabled()) {
                                log.info("开始执行延迟队列中的任务,transNo={}", transNo);
                            }
                            // 异步执行你的操作
                            notifyTaskService.handleTask(transNo, null);
                        } catch (Exception e) {
                            log.error("延时队列的任务执行出现异常", e);
                        }
                    }
                });
    }

五、从延迟队列移除任务

public static final  String REDISSON_QUEUE_NAME = "DelayTaskQueue";

private final RedissonClient redissonClient;

RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(REDISSON_QUEUE_NAME);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);

delayedQueue.remove(transNo);

六、总结

本文主要是摘要一些源码,仅供参考。

附:相关系列文章链接

延时任务通知服务的设计及实现(一)-- 设计方案

延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue

延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue

延时任务通知服务的设计及实现(四)-- webhook执行任务