Redisson功能完整指南
目录
- 分布式锁和同步工具
- 分布式数据结构
- 原子操作和计数器
- 分布式对象和缓存
- 消息和发布订阅
- 高级数据结构
- 分布式服务
- 事务和批处理
- 地理空间数据
- 时间序列数据
一、分布式锁和同步工具
1.1 RLock(可重入分布式锁)
@Service
public class InventoryService {
@Autowired
private RedissonClient redissonClient;
public boolean reduceInventory(String productId, int quantity) {
RLock lock = redissonClient.getLock("inventory:lock:" + productId);
try {
if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {
int currentInventory = getCurrentInventory(productId);
if (currentInventory >= quantity) {
updateInventory(productId, currentInventory - quantity);
return true;
}
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
return false;
}
}
1.2 RReadWriteLock(分布式读写锁)
@Service
public class DocumentService {
@Autowired
private RedissonClient redissonClient;
public String readDocument(String documentId) {
RReadWriteLock rwLock = redissonClient.getReadWriteLock("document:lock:" + documentId);
RLock readLock = rwLock.readLock();
try {
if (readLock.tryLock(10, TimeUnit.SECONDS)) {
return performDocumentRead(documentId);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (readLock.isHeldByCurrentThread()) {
readLock.unlock();
}
}
return null;
}
public boolean updateDocument(String documentId, String content) {
RReadWriteLock rwLock = redissonClient.getReadWriteLock("document:lock:" + documentId);
RLock writeLock = rwLock.writeLock();
try {
if (writeLock.tryLock(10, TimeUnit.SECONDS)) {
performDocumentWrite(documentId, content);
return true;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (writeLock.isHeldByCurrentThread()) {
writeLock.unlock();
}
}
return false;
}
}
1.3 RFairLock(公平分布式锁)
@Service
public class TicketBookingService {
@Autowired
private RedissonClient redissonClient;
public boolean bookTicket(String eventId, String userId) {
RFairLock fairLock = redissonClient.getFairLock("ticket:booking:" + eventId);
try {
if (fairLock.tryLock(30, 60, TimeUnit.SECONDS)) {
int availableTickets = getAvailableTickets(eventId);
if (availableTickets > 0) {
reserveTicket(eventId, userId);
updateAvailableTickets(eventId, availableTickets - 1);
return true;
}
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (fairLock.isHeldByCurrentThread()) {
fairLock.unlock();
}
}
return false;
}
}
1.4 RSemaphore(分布式信号量)
@Service
public class DownloadService {
@Autowired
private RedissonClient redissonClient;
public boolean downloadFile(String fileId) {
RSemaphore semaphore = redissonClient.getSemaphore("download:limit");
semaphore.trySetPermits(5);
try {
if (semaphore.tryAcquire(2, TimeUnit.SECONDS)) {
try {
performDownload(fileId);
return true;
} finally {
semaphore.release();
}
}
return false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
}
二、分布式数据结构
2.1 RMap(分布式Map)
@Service
public class UserSessionService {
@Autowired
private RedissonClient redissonClient;
public void saveUserSession(String userId, UserSession session) {
RMap<String, UserSession> sessionMap = redissonClient.getMap("user:sessions");
sessionMap.put(userId, session);
sessionMap.expire(Duration.ofHours(2));
}
public UserSession getUserSession(String userId) {
RMap<String, UserSession> sessionMap = redissonClient.getMap("user:sessions");
return sessionMap.get(userId);
}
public void removeUserSession(String userId) {
RMap<String, UserSession> sessionMap = redissonClient.getMap("user:sessions");
sessionMap.remove(userId);
}
}
2.2 RQueue(分布式队列)
@Service
public class OrderProcessingService {
@Autowired
private RedissonClient redissonClient;
public void submitOrder(Order order) {
RQueue<Order> orderQueue = redissonClient.getQueue("orders:processing");
orderQueue.offer(order);
}
@Scheduled(fixedDelay = 1000)
public void processOrders() {
RQueue<Order> orderQueue = redissonClient.getQueue("orders:processing");
Order order = orderQueue.poll();
if (order != null) {
processOrder(order);
}
}
}
2.3 RDelayedQueue(延时队列)
@Service
public class DelayedNotificationService {
@Autowired
private RedissonClient redissonClient;
public void scheduleNotification(String message, int delayMinutes) {
RQueue<String> destinationQueue = redissonClient.getQueue("notifications:destination");
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(destinationQueue);
delayedQueue.offer(message, delayMinutes, TimeUnit.MINUTES);
}
@Scheduled(fixedDelay = 5000)
public void processNotifications() {
RQueue<String> destinationQueue = redissonClient.getQueue("notifications:destination");
String message = destinationQueue.poll();
if (message != null) {
sendNotification(message);
}
}
}
三、原子操作和计数器
3.1 RAtomicLong(原子长整型)
@Service
public class StatisticsService {
@Autowired
private RedissonClient redissonClient;
public long incrementPageViews(String pageId) {
RAtomicLong pageViews = redissonClient.getAtomicLong("page:views:" + pageId);
return pageViews.incrementAndGet();
}
public long getPageViews(String pageId) {
RAtomicLong pageViews = redissonClient.getAtomicLong("page:views:" + pageId);
return pageViews.get();
}
public void resetPageViews(String pageId) {
RAtomicLong pageViews = redissonClient.getAtomicLong("page:views:" + pageId);
pageViews.set(0);
}
}
3.2 RLongAdder(长整型累加器)
@Service
public class HighConcurrencyCounterService {
@Autowired
private RedissonClient redissonClient;
public void incrementCounter(String eventType) {
RLongAdder counter = redissonClient.getLongAdder("events:" + eventType);
counter.increment();
}
public long getEventCount(String eventType) {
RLongAdder counter = redissonClient.getLongAdder("events:" + eventType);
return counter.sum();
}
}
四、分布式对象和缓存
4.1 RBucket(分布式对象桶)
@Service
public class ConfigurationService {
@Autowired
private RedissonClient redissonClient;
public void saveConfig(String key, Object config) {
RBucket<Object> bucket = redissonClient.getBucket("config:" + key);
bucket.set(config, Duration.ofDays(1));
}
@SuppressWarnings("unchecked")
public <T> T getConfig(String key, Class<T> type) {
RBucket<T> bucket = redissonClient.getBucket("config:" + key);
return bucket.get();
}
public boolean configExists(String key) {
RBucket<Object> bucket = redissonClient.getBucket("config:" + key);
return bucket.isExists();
}
}
4.2 RMapCache(带过期时间的Map)
@Service
public class TokenService {
@Autowired
private RedissonClient redissonClient;
public void storeToken(String userId, String token, Duration expiry) {
RMapCache<String, String> tokenCache = redissonClient.getMapCache("user:tokens");
tokenCache.put(userId, token, expiry.toMillis(), TimeUnit.MILLISECONDS);
}
public String getToken(String userId) {
RMapCache<String, String> tokenCache = redissonClient.getMapCache("user:tokens");
return tokenCache.get(userId);
}
public void invalidateToken(String userId) {
RMapCache<String, String> tokenCache = redissonClient.getMapCache("user:tokens");
tokenCache.remove(userId);
}
}
五、消息和发布订阅
5.1 RTopic(主题发布订阅)
@Service
public class EventPublisher {
@Autowired
private RedissonClient redissonClient;
public void publishEvent(String eventType, Object eventData) {
RTopic topic = redissonClient.getTopic("events:" + eventType);
topic.publish(eventData);
}
}
@Component
public class EventSubscriber {
@Autowired
private RedissonClient redissonClient;
@PostConstruct
public void subscribeToEvents() {
RTopic topic = redissonClient.getTopic("events:user");
topic.addListener(String.class, (channel, message) -> {
System.out.println("收到用户事件: " + message);
handleUserEvent(message);
});
}
}
六、高级数据结构
6.1 RBloomFilter(布隆过滤器)
@Service
public class DuplicateCheckService {
@Autowired
private RedissonClient redissonClient;
@PostConstruct
public void initBloomFilter() {
RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter("duplicate:check");
bloomFilter.tryInit(1000000L, 0.03);
}
public boolean mightContain(String item) {
RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter("duplicate:check");
return bloomFilter.contains(item);
}
public void addItem(String item) {
RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter("duplicate:check");
bloomFilter.add(item);
}
}
6.2 RRateLimiter(分布式限流器)
@Service
public class ApiRateLimiter {
@Autowired
private RedissonClient redissonClient;
public boolean isAllowed(String apiKey) {
RRateLimiter rateLimiter = redissonClient.getRateLimiter("api:limit:" + apiKey);
rateLimiter.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS);
return rateLimiter.tryAcquire(1);
}
}
Redisson功能总结表
分布式锁类型对比
锁类型 |
特点 |
适用场景 |
RLock |
可重入、支持超时 |
一般业务操作 |
RReadWriteLock |
读写分离 |
读多写少场景 |
RFairLock |
公平获取锁 |
避免线程饥饿 |
RMultiLock |
同时锁定多个资源 |
防止死锁的资源操作 |
RRedLock |
高安全性 |
关键业务操作 |
RSpinLock |
减少线程切换 |
短时间持有的操作 |
RFencedLock |
版本控制 |
需要检测锁有效性 |
数据结构特点
- RMap: 分布式HashMap,支持过期时间
- RQueue: 分布式队列,支持阻塞操作
- RDelayedQueue: 延时队列,适用于定时任务
- RAtomicLong: 原子计数器,支持高并发
- RBloomFilter: 布隆过滤器,快速去重检测
- RRateLimiter: 限流器,API限流控制
- RGeo: 地理位置数据,支持距离计算
- RTimeSeries: 时间序列数据,适用于监控指标
最佳实践建议
- 选择合适的锁类型:根据业务场景选择最适合的锁
- 设置合理的超时时间:避免死锁和长时间等待
- 正确释放资源:使用try-finally确保锁被释放
- 监控性能指标:关注锁的竞争情况和持有时间
- 合理使用缓存:根据数据访问模式选择缓存策略
七、分布式服务
7.1 RExecutorService(分布式执行器)
@Service
public class DistributedTaskService {
@Autowired
private RedissonClient redissonClient;
public void executeDistributedTask() {
RExecutorService executor = redissonClient.getExecutorService("distributed:tasks");
executor.submit(() -> {
System.out.println("执行分布式任务: " + Thread.currentThread().getName());
performBusinessLogic();
return "任务完成";
});
}
}
八、事务和批处理
8.1 RBatch(批处理操作)
@Service
public class BatchOperationService {
@Autowired
private RedissonClient redissonClient;
public void batchUpdateUsers(List<User> users) {
RBatch batch = redissonClient.createBatch();
for (User user : users) {
RMapAsync<String, User> userMapAsync = batch.getMap("users");
userMapAsync.putAsync(user.getId(), user);
RAtomicLongAsync counterAsync = batch.getAtomicLong("user:count");
counterAsync.incrementAndGetAsync();
}
BatchResult<?> result = batch.execute();
System.out.println("批处理完成,操作数: " + result.getResponses().size());
}
}
九、地理空间数据
9.1 RGeo(地理空间数据结构)
@Service
public class LocationService {
@Autowired
private RedissonClient redissonClient;
public void addLocation(String locationId, double longitude, double latitude) {
RGeo<String> geo = redissonClient.getGeo("locations");
geo.add(longitude, latitude, locationId);
}
public List<String> findNearbyLocations(double longitude, double latitude, double radius) {
RGeo<String> geo = redissonClient.getGeo("locations");
return geo.radius(longitude, latitude, radius, GeoUnit.KILOMETERS);
}
public Double getDistance(String location1, String location2) {
RGeo<String> geo = redissonClient.getGeo("locations");
return geo.dist(location1, location2, GeoUnit.KILOMETERS);
}
}
十、时间序列数据
10.1 RTimeSeries(时间序列)
@Service
public class MetricsService {
@Autowired
private RedissonClient redissonClient;
public void recordMetric(String metricName, double value) {
RTimeSeries<String, Double> timeSeries = redissonClient.getTimeSeries(metricName);
timeSeries.add(System.currentTimeMillis(), value);
}
public Map<Long, Double> getMetricRange(String metricName, long fromTime, long toTime) {
RTimeSeries<String, Double> timeSeries = redissonClient.getTimeSeries(metricName);
return timeSeries.range(fromTime, toTime);
}
}
高级锁特性示例
锁的高级特性
@Service
public class AdvancedLockService {
@Autowired
private RedissonClient redissonClient;
public void demonstrateAdvancedFeatures(String resourceId) {
RLock lock = redissonClient.getLock("advanced:lock:" + resourceId);
try {
boolean isLocked = lock.isLocked();
boolean isHeldByCurrentThread = lock.isHeldByCurrentThread();
long remainTimeToLive = lock.remainTimeToLive();
if (lock.tryLock(10, 300, TimeUnit.SECONDS)) {
recursiveOperation(resourceId, 3);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
private void recursiveOperation(String resourceId, int depth) {
if (depth <= 0) return;
RLock lock = redissonClient.getLock("advanced:lock:" + resourceId);
try {
if (lock.tryLock(5, TimeUnit.SECONDS)) {
System.out.println("递归深度: " + depth + ", 持有计数: " + lock.getHoldCount());
Thread.sleep(100);
recursiveOperation(resourceId, depth - 1);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}
CMS系统综合应用示例
页面管理综合服务
@Service
public class EnhancedPageManagementService {
@Autowired
private RedissonClient redissonClient;
public void managePageLifecycle(Long pageId) {
RLock lock = redissonClient.getLock("page:manage:" + pageId);
try {
if (lock.tryLock(10, TimeUnit.SECONDS)) {
RAtomicLong processCount = redissonClient.getAtomicLong("page:process:count");
processCount.incrementAndGet();
RSet<Long> activePages = redissonClient.getSet("pages:active");
activePages.add(pageId);
RMapCache<String, Object> configCache = redissonClient.getMapCache("page:config");
configCache.put("page:" + pageId, getPageConfig(pageId), Duration.ofHours(1));
RTopic statusTopic = redissonClient.getTopic("page:lifecycle:events");
statusTopic.publish(Map.of("pageId", pageId, "action", "managed"));
RQueue<Long> processQueue = redissonClient.getQueue("pages:process:queue");
processQueue.offer(pageId);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}