redisson功能完整指南

发布于:2025-09-04 ⋅ 阅读:(25) ⋅ 点赞:(0)

Redisson功能完整指南

目录

  1. 分布式锁和同步工具
  2. 分布式数据结构
  3. 原子操作和计数器
  4. 分布式对象和缓存
  5. 消息和发布订阅
  6. 高级数据结构
  7. 分布式服务
  8. 事务和批处理
  9. 地理空间数据
  10. 时间序列数据

一、分布式锁和同步工具

1.1 RLock(可重入分布式锁)

@Service
public class InventoryService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    public boolean reduceInventory(String productId, int quantity) {
        RLock lock = redissonClient.getLock("inventory:lock:" + productId);
        try {
            // 尝试获取锁,最多等待10秒,锁30秒后自动释放
            if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {
                // 执行库存扣减逻辑
                int currentInventory = getCurrentInventory(productId);
                if (currentInventory >= quantity) {
                    updateInventory(productId, currentInventory - quantity);
                    return true;
                }
                return false;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
        return false;
    }
}

1.2 RReadWriteLock(分布式读写锁)

@Service
public class DocumentService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    // 读操作 - 多个线程可以同时读
    public String readDocument(String documentId) {
        RReadWriteLock rwLock = redissonClient.getReadWriteLock("document:lock:" + documentId);
        RLock readLock = rwLock.readLock();
        
        try {
            if (readLock.tryLock(10, TimeUnit.SECONDS)) {
                // 多个读操作可以并发执行
                return performDocumentRead(documentId);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            if (readLock.isHeldByCurrentThread()) {
                readLock.unlock();
            }
        }
        return null;
    }
    
    // 写操作 - 独占锁,写时不允许读
    public boolean updateDocument(String documentId, String content) {
        RReadWriteLock rwLock = redissonClient.getReadWriteLock("document:lock:" + documentId);
        RLock writeLock = rwLock.writeLock();
        
        try {
            if (writeLock.tryLock(10, TimeUnit.SECONDS)) {
                // 写操作是独占的,执行时会阻塞所有读写操作
                performDocumentWrite(documentId, content);
                return true;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            if (writeLock.isHeldByCurrentThread()) {
                writeLock.unlock();
            }
        }
        return false;
    }
}

1.3 RFairLock(公平分布式锁)

@Service
public class TicketBookingService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    // 公平锁确保按照请求顺序获取锁,避免线程饥饿
    public boolean bookTicket(String eventId, String userId) {
        RFairLock fairLock = redissonClient.getFairLock("ticket:booking:" + eventId);
        
        try {
            // 公平锁会按照请求的时间顺序分配锁
            if (fairLock.tryLock(30, 60, TimeUnit.SECONDS)) {
                // 检查票务余量
                int availableTickets = getAvailableTickets(eventId);
                if (availableTickets > 0) {
                    // 预订票务
                    reserveTicket(eventId, userId);
                    updateAvailableTickets(eventId, availableTickets - 1);
                    return true;
                }
                return false;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            if (fairLock.isHeldByCurrentThread()) {
                fairLock.unlock();
            }
        }
        return false;
    }
}

1.4 RSemaphore(分布式信号量)

@Service
public class DownloadService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    public boolean downloadFile(String fileId) {
        RSemaphore semaphore = redissonClient.getSemaphore("download:limit");
        semaphore.trySetPermits(5); // 最多允许5个并发下载
        
        try {
            if (semaphore.tryAcquire(2, TimeUnit.SECONDS)) {
                try {
                    // 执行文件下载逻辑
                    performDownload(fileId);
                    return true;
                } finally {
                    semaphore.release();
                }
            }
            return false;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }
}

二、分布式数据结构

2.1 RMap(分布式Map)

@Service
public class UserSessionService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    public void saveUserSession(String userId, UserSession session) {
        RMap<String, UserSession> sessionMap = redissonClient.getMap("user:sessions");
        sessionMap.put(userId, session);
        sessionMap.expire(Duration.ofHours(2)); // 2小时过期
    }
    
    public UserSession getUserSession(String userId) {
        RMap<String, UserSession> sessionMap = redissonClient.getMap("user:sessions");
        return sessionMap.get(userId);
    }
    
    public void removeUserSession(String userId) {
        RMap<String, UserSession> sessionMap = redissonClient.getMap("user:sessions");
        sessionMap.remove(userId);
    }
}

2.2 RQueue(分布式队列)

@Service
public class OrderProcessingService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    // 生产者:添加订单到队列
    public void submitOrder(Order order) {
        RQueue<Order> orderQueue = redissonClient.getQueue("orders:processing");
        orderQueue.offer(order);
    }
    
    // 消费者:处理订单队列
    @Scheduled(fixedDelay = 1000)
    public void processOrders() {
        RQueue<Order> orderQueue = redissonClient.getQueue("orders:processing");
        Order order = orderQueue.poll();
        if (order != null) {
            processOrder(order);
        }
    }
}

2.3 RDelayedQueue(延时队列)

@Service
public class DelayedNotificationService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    public void scheduleNotification(String message, int delayMinutes) {
        RQueue<String> destinationQueue = redissonClient.getQueue("notifications:destination");
        RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(destinationQueue);
        
        // 延时指定分钟后发送通知
        delayedQueue.offer(message, delayMinutes, TimeUnit.MINUTES);
    }
    
    @Scheduled(fixedDelay = 5000)
    public void processNotifications() {
        RQueue<String> destinationQueue = redissonClient.getQueue("notifications:destination");
        String message = destinationQueue.poll();
        if (message != null) {
            sendNotification(message);
        }
    }
}

三、原子操作和计数器

3.1 RAtomicLong(原子长整型)

@Service
public class StatisticsService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    public long incrementPageViews(String pageId) {
        RAtomicLong pageViews = redissonClient.getAtomicLong("page:views:" + pageId);
        return pageViews.incrementAndGet();
    }
    
    public long getPageViews(String pageId) {
        RAtomicLong pageViews = redissonClient.getAtomicLong("page:views:" + pageId);
        return pageViews.get();
    }
    
    public void resetPageViews(String pageId) {
        RAtomicLong pageViews = redissonClient.getAtomicLong("page:views:" + pageId);
        pageViews.set(0);
    }
}

3.2 RLongAdder(长整型累加器)

@Service
public class HighConcurrencyCounterService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    public void incrementCounter(String eventType) {
        RLongAdder counter = redissonClient.getLongAdder("events:" + eventType);
        counter.increment();
    }
    
    public long getEventCount(String eventType) {
        RLongAdder counter = redissonClient.getLongAdder("events:" + eventType);
        return counter.sum();
    }
}

四、分布式对象和缓存

4.1 RBucket(分布式对象桶)

@Service
public class ConfigurationService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    public void saveConfig(String key, Object config) {
        RBucket<Object> bucket = redissonClient.getBucket("config:" + key);
        bucket.set(config, Duration.ofDays(1)); // 1天过期
    }
    
    @SuppressWarnings("unchecked")
    public <T> T getConfig(String key, Class<T> type) {
        RBucket<T> bucket = redissonClient.getBucket("config:" + key);
        return bucket.get();
    }
    
    public boolean configExists(String key) {
        RBucket<Object> bucket = redissonClient.getBucket("config:" + key);
        return bucket.isExists();
    }
}

4.2 RMapCache(带过期时间的Map)

@Service
public class TokenService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    public void storeToken(String userId, String token, Duration expiry) {
        RMapCache<String, String> tokenCache = redissonClient.getMapCache("user:tokens");
        tokenCache.put(userId, token, expiry.toMillis(), TimeUnit.MILLISECONDS);
    }
    
    public String getToken(String userId) {
        RMapCache<String, String> tokenCache = redissonClient.getMapCache("user:tokens");
        return tokenCache.get(userId);
    }
    
    public void invalidateToken(String userId) {
        RMapCache<String, String> tokenCache = redissonClient.getMapCache("user:tokens");
        tokenCache.remove(userId);
    }
}

五、消息和发布订阅

5.1 RTopic(主题发布订阅)

@Service
public class EventPublisher {
    
    @Autowired
    private RedissonClient redissonClient;
    
    public void publishEvent(String eventType, Object eventData) {
        RTopic topic = redissonClient.getTopic("events:" + eventType);
        topic.publish(eventData);
    }
}

@Component
public class EventSubscriber {
    
    @Autowired
    private RedissonClient redissonClient;
    
    @PostConstruct
    public void subscribeToEvents() {
        RTopic topic = redissonClient.getTopic("events:user");
        topic.addListener(String.class, (channel, message) -> {
            System.out.println("收到用户事件: " + message);
            handleUserEvent(message);
        });
    }
}

六、高级数据结构

6.1 RBloomFilter(布隆过滤器)

@Service
public class DuplicateCheckService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    @PostConstruct
    public void initBloomFilter() {
        RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter("duplicate:check");
        // 初始化布隆过滤器:预期插入100万个元素,错误率0.03
        bloomFilter.tryInit(1000000L, 0.03);
    }
    
    public boolean mightContain(String item) {
        RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter("duplicate:check");
        return bloomFilter.contains(item);
    }
    
    public void addItem(String item) {
        RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter("duplicate:check");
        bloomFilter.add(item);
    }
}

6.2 RRateLimiter(分布式限流器)

@Service
public class ApiRateLimiter {
    
    @Autowired
    private RedissonClient redissonClient;
    
    public boolean isAllowed(String apiKey) {
        RRateLimiter rateLimiter = redissonClient.getRateLimiter("api:limit:" + apiKey);
        // 设置每秒最多10个请求
        rateLimiter.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS);
        
        // 尝试获取1个许可
        return rateLimiter.tryAcquire(1);
    }
}

Redisson功能总结表

分布式锁类型对比

锁类型 特点 适用场景
RLock 可重入、支持超时 一般业务操作
RReadWriteLock 读写分离 读多写少场景
RFairLock 公平获取锁 避免线程饥饿
RMultiLock 同时锁定多个资源 防止死锁的资源操作
RRedLock 高安全性 关键业务操作
RSpinLock 减少线程切换 短时间持有的操作
RFencedLock 版本控制 需要检测锁有效性

数据结构特点

  • RMap: 分布式HashMap,支持过期时间
  • RQueue: 分布式队列,支持阻塞操作
  • RDelayedQueue: 延时队列,适用于定时任务
  • RAtomicLong: 原子计数器,支持高并发
  • RBloomFilter: 布隆过滤器,快速去重检测
  • RRateLimiter: 限流器,API限流控制
  • RGeo: 地理位置数据,支持距离计算
  • RTimeSeries: 时间序列数据,适用于监控指标

最佳实践建议

  1. 选择合适的锁类型:根据业务场景选择最适合的锁
  2. 设置合理的超时时间:避免死锁和长时间等待
  3. 正确释放资源:使用try-finally确保锁被释放
  4. 监控性能指标:关注锁的竞争情况和持有时间
  5. 合理使用缓存:根据数据访问模式选择缓存策略

七、分布式服务

7.1 RExecutorService(分布式执行器)

@Service
public class DistributedTaskService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    public void executeDistributedTask() {
        RExecutorService executor = redissonClient.getExecutorService("distributed:tasks");
        
        // 提交任务到分布式执行器
        executor.submit(() -> {
            System.out.println("执行分布式任务: " + Thread.currentThread().getName());
            // 执行实际的业务逻辑
            performBusinessLogic();
            return "任务完成";
        });
    }
}

八、事务和批处理

8.1 RBatch(批处理操作)

@Service
public class BatchOperationService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    public void batchUpdateUsers(List<User> users) {
        RBatch batch = redissonClient.createBatch();
        
        for (User user : users) {
            RMapAsync<String, User> userMapAsync = batch.getMap("users");
            userMapAsync.putAsync(user.getId(), user);
            
            RAtomicLongAsync counterAsync = batch.getAtomicLong("user:count");
            counterAsync.incrementAndGetAsync();
        }
        
        // 批量执行所有操作
        BatchResult<?> result = batch.execute();
        System.out.println("批处理完成,操作数: " + result.getResponses().size());
    }
}

九、地理空间数据

9.1 RGeo(地理空间数据结构)

@Service
public class LocationService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    public void addLocation(String locationId, double longitude, double latitude) {
        RGeo<String> geo = redissonClient.getGeo("locations");
        geo.add(longitude, latitude, locationId);
    }
    
    public List<String> findNearbyLocations(double longitude, double latitude, double radius) {
        RGeo<String> geo = redissonClient.getGeo("locations");
        return geo.radius(longitude, latitude, radius, GeoUnit.KILOMETERS);
    }
    
    public Double getDistance(String location1, String location2) {
        RGeo<String> geo = redissonClient.getGeo("locations");
        return geo.dist(location1, location2, GeoUnit.KILOMETERS);
    }
}

十、时间序列数据

10.1 RTimeSeries(时间序列)

@Service
public class MetricsService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    public void recordMetric(String metricName, double value) {
        RTimeSeries<String, Double> timeSeries = redissonClient.getTimeSeries(metricName);
        timeSeries.add(System.currentTimeMillis(), value);
    }
    
    public Map<Long, Double> getMetricRange(String metricName, long fromTime, long toTime) {
        RTimeSeries<String, Double> timeSeries = redissonClient.getTimeSeries(metricName);
        return timeSeries.range(fromTime, toTime);
    }
}

高级锁特性示例

锁的高级特性

@Service
public class AdvancedLockService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    // 展示锁的高级特性:可重入、超时、异步等
    public void demonstrateAdvancedFeatures(String resourceId) {
        RLock lock = redissonClient.getLock("advanced:lock:" + resourceId);
        
        try {
            // 1. 检查锁状态
            boolean isLocked = lock.isLocked();
            boolean isHeldByCurrentThread = lock.isHeldByCurrentThread();
            
            // 2. 获取锁信息
            long remainTimeToLive = lock.remainTimeToLive();
            
            // 3. 尝试获取锁并设置自动续期
            if (lock.tryLock(10, 300, TimeUnit.SECONDS)) {
                
                // 4. 锁的可重入特性
                recursiveOperation(resourceId, 3);
                
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            // 6. 安全释放锁
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
    
    // 递归操作展示锁的可重入性
    private void recursiveOperation(String resourceId, int depth) {
        if (depth <= 0) return;
        
        RLock lock = redissonClient.getLock("advanced:lock:" + resourceId);
        
        try {
            // 同一线程可以多次获取同一个锁(可重入)
            if (lock.tryLock(5, TimeUnit.SECONDS)) {
                System.out.println("递归深度: " + depth + ", 持有计数: " + lock.getHoldCount());
                
                // 执行一些操作
                Thread.sleep(100);
                
                // 递归调用
                recursiveOperation(resourceId, depth - 1);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
}

CMS系统综合应用示例

页面管理综合服务

@Service
public class EnhancedPageManagementService {
    
    @Autowired
    private RedissonClient redissonClient;
    
    // 综合页面管理:使用多种Redisson功能
    public void managePageLifecycle(Long pageId) {
        // 1. 使用分布式锁确保操作原子性
        RLock lock = redissonClient.getLock("page:manage:" + pageId);
        
        try {
            if (lock.tryLock(10, TimeUnit.SECONDS)) {
                // 2. 增加页面处理计数
                RAtomicLong processCount = redissonClient.getAtomicLong("page:process:count");
                processCount.incrementAndGet();
                
                // 3. 更新页面状态到活跃集合
                RSet<Long> activePages = redissonClient.getSet("pages:active");
                activePages.add(pageId);
                
                // 4. 缓存页面配置
                RMapCache<String, Object> configCache = redissonClient.getMapCache("page:config");
                configCache.put("page:" + pageId, getPageConfig(pageId), Duration.ofHours(1));
                
                // 5. 发布页面变更事件
                RTopic statusTopic = redissonClient.getTopic("page:lifecycle:events");
                statusTopic.publish(Map.of("pageId", pageId, "action", "managed"));
                
                // 6. 添加到处理队列
                RQueue<Long> processQueue = redissonClient.getQueue("pages:process:queue");
                processQueue.offer(pageId);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到