大纲
1.库存分桶上线
2.库存分桶下线
3.库存分桶扩容
4.库存预警
1.库存分桶上线
(1)使用入口
(2)具体实现
(1)使用入口
当库存充⾜后,可针对下线的分桶进⾏再次上线并分配库存。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
@Resource
private TairLock tairLock;
...
//分桶上线接口
@Override
public void bucketOnline(InventorOnlineRequest request) {
//1.验证入参必填
checkInventorOnlineParams(request);
//2.注意这里需要锁定中心桶库存
String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + request.getSellerId() + request.getSkuId();
String value = SnowflakeIdWorker.getCode();
boolean lock = tairLock.tryLock(key, value);
if (lock) {
try {
//3.获取中心桶的剩余库存,并校验是否可上线分桶
Integer residueNum = checkBucketOnlineNum(key);
//4.构建新的分桶元数据信息,并写入
writeBucketCache(request, residueNum);
} catch (Exception e) {
log.error(e.getStackTrace().toString());
} finally {
tairLock.unlock(key, value);
}
} else {
throw new BaseBizException("请求繁忙,稍后再重试!");
}
}
...
}
(2)具体实现
步骤一:校验⼊参必填,即指定的商品SKU以及需要上线的分桶编号
步骤二:校验中⼼桶的剩余库存,否则上线⼀个空库存分桶毫⽆意义
步骤三:接着判断本地缓存列表⾥是否还存在下线的分桶可供上线
步骤四:当存在可上线的分桶,需要构建新的元数据信息
步骤五:写入数据到远程缓存中并更新本地缓存的分桶元数据信息
步骤一:校验⼊参必填,即指定的商品SKU以及需要上线的分桶编号。可上线的分桶编号由远程缓存保存⼀份,具体上线⼏个由调⽤⽅决定。
步骤二:校验中⼼桶的剩余库存,否则上线⼀个空库存分桶毫⽆意义。因为空库存分桶⼜会⻢上下线,这⾥需要先获取对应中⼼桶的剩余库存,验证是否可以上线分桶。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
@Resource
private TairCache tairCache;
...
//校验中心桶是否可上线分桶,并返回对应的可分配库存
private Integer checkBucketOnlineNum(String key) {
//验证缓存是否存在
String residueNum = tairCache.get(key);
if (StringUtils.isEmpty(residueNum)) {
return 0;
}
//返回具体的库存
return Integer.valueOf(residueNum);
}
...
}
步骤三:接着判断本地缓存列表⾥是否还存在下线的分桶可供上线。如果本地缓存列表⾥不存在下线的分桶,则不再处理。如果本地缓存列表⾥存在下线的分桶,则计算出中⼼桶可⽤库存可以分发⼏个分桶上线,以及分发的库存信息。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
@Resource
private InventoryBucketCache inventoryBucketCache;
...
//构建新的分桶元数据信息
//@param request 分桶上线对象
//@param residueNum 中心桶剩余库存
private void writeBucketCache(InventorOnlineRequest request, Integer residueNum) {
String key = request.getSellerId() + request.getSkuId();
//5.获取本地存储的分桶元数据信息
BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);
if (!Objects.isNull(bucketLocalCache)) {
//6.获取当前可上线的分桶列表信息以及具体上线库存
List<BucketCacheBO> bucketCacheBOList = buildBucketList(
request.getBucketNoList(),
bucketLocalCache.getAvailableList(),
bucketLocalCache.getUndistributedList(),
bucketLocalCache.getInventoryBucketConfig(),
residueNum
);
//当前可上线的分桶为空,直接返回
if (CollectionUtils.isEmpty(bucketCacheBOList)) {
return;
}
//7.构建返回新的分桶元数据模型返回
buildBucketLocalCache(bucketLocalCache, bucketCacheBOList, residueNum);
//8.写入数据到远程缓存中并更新本地缓存的分桶元数据信息
writeBucketLocalCache(bucketLocalCache, bucketCacheBOList);
}
}
...
}
步骤四:当存在可上线的分桶,需要构建新的元数据信息。这⾥因为是上线,先操作缓存的库存增加,再增加上线分桶可路由配置,暂时不考虑延迟处理。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//构建新的分桶元数据模型
//@param bucketLocalCache 本地分桶元数据信息
//@param bucketCacheBOList 上线的分桶数据列表
//@param residueNum 中心桶剩余库存
private void buildBucketLocalCache(BucketLocalCache bucketLocalCache, List<BucketCacheBO> bucketCacheBOList, Integer residueNum) {
//获取本次上线的库存信息
Integer inventoryNum = 0;
for (BucketCacheBO bucketCacheBO : bucketCacheBOList) {
inventoryNum = inventoryNum + bucketCacheBO.getBucketNum();
}
//填充中心桶剩余库存
residueNum = residueNum - inventoryNum;
bucketLocalCache.setResidueNum(residueNum);
//添加新上线的分桶列表
bucketLocalCache.getAvailableList().addAll(bucketCacheBOList);
Map<String, BucketCacheBO> bucketCacheMap = bucketCacheBOList.stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity()));
List<BucketCacheBO> undistributedList = bucketLocalCache.getUndistributedList().stream().filter(bucketCacheBO ->
//在上线的分桶列表,需要移除掉
!bucketCacheMap.containsKey(bucketCacheBO.getBucketNo())).collect(Collectors.toList()
);
//从不可用的分桶列表重移除
bucketLocalCache.setUndistributedList(undistributedList);
}
...
}
步骤五:写入数据到远程缓存中并更新本地缓存的分桶元数据信息
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//写入数据到远程缓存中并更新本地缓存的分桶元数据信息
//@param bucketLocalCache 分桶元数据信息
//@param bucketCacheBOList 上线的分桶信息
private void writeBucketLocalCache(BucketLocalCache bucketLocalCache, List<BucketCacheBO> bucketCacheBOList) {
String key = bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId();
//中心桶被扣减掉的库存(上线的分桶库存总和)
Integer centerInventoryNum = bucketCacheBOList.stream().mapToInt(BucketCacheBO::getBucketNum).sum();
//中心桶的库存扣减信息
tairCache.decr(TairInventoryConstant.SELLER_INVENTORY_PREFIX + key, centerInventoryNum);
//1.先更新分桶的上线缓存处理操作
for (BucketCacheBO bucketCacheBO : bucketCacheBOList) {
tairCache.incr(bucketCacheBO.getBucketNo(), bucketCacheBO.getBucketNum());
}
//2.处理分桶列表的更新,待中心桶库存以及上线分桶库存更新完成,更新远程和本地的分桶列表
inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache);
tairCache.set(TairInventoryConstant.SELLER_BUCKET_PREFIX + key, JSONObject.toJSONString(bucketLocalCache), 0);
}
...
}
2.库存分桶下线
(1)使用入口
(2)具体实现
(1)使用入口
当后台选择下线部分分桶或者分桶库存不⾜而触发下线阈值时会调⽤。即在进行异步分桶扩容时,发现中心桶剩余库存为0时,会检查是否下线。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
@Resource
private TairLock tairLock;
...
//分桶下线接口
@Override
public void bucketOffline(InventorOfflineRequest request) {
long start = System.currentTimeMillis();
//1.验证入参必填
checkInventorOfflineParams(request);
//2.过滤只有一个分桶的无效请求
Boolean isOffline = checkBucketOffline(request);
if (isOffline) {
//注意这里需要锁定中心桶库存
String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + request.getSellerId() + request.getSkuId();
String value = SnowflakeIdWorker.getCode();
boolean lock = tairLock.tryLock(key, value);
if (lock) {
try {
//3.先将准备下线的分桶库存从本地和远程列表中移除至不可用列表,避免新的请求进来
updateBucket(request);
} catch (Exception e) {
e.printStackTrace();
} finally {
tairLock.unlock(key, value);
}
} else {
throw new BaseBizException("请求繁忙,稍后重试!");
}
log.info("分桶下线处理时间,request:{}, lock:{}, time:{}", JSON.toJSONString(request), lock, System.currentTimeMillis() - start);
}
}
...
}
(2)具体实现
步骤一:校验⼊参必填,指定的商品SKU以及需要下线的分桶编号。
步骤二:在竞争锁之前,先判断⼀下是否⽆需处理。过滤只有一个分桶的无效请求,避免⼤量⽆效请求竞争锁带来的开销。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
@Resource
private InventoryBucketCache inventoryBucketCache;
...
//判断待下线分桶是否可以下线
private Boolean checkBucketOffline(InventorOfflineRequest request) {
BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());
//可用分桶缓存列表大于1
if (!Objects.isNull(bucketLocalCache) && bucketLocalCache.getAvailableList().size() > 1) {
//并且待下线的分桶在可用分桶列表中
return bucketLocalCache.getAvailableList().stream().anyMatch(bucketCache -> request.getBucketNoList().contains(bucketCache.getBucketNo()));
}
return false;
}
...
}
步骤三:先把下线的分桶从可发分桶列表中移除,包括本地缓存的列表,从而可以避免后续请求再次路由到该分桶。
待改进:应该先扣减该分桶的远程缓存,再移除所在机器的本地缓存。因为如果先移除本机器的本地缓存后,但其他机器的本地缓存没有移除。这时会造成各机器本地缓存不一致,有些请求还是会到该分桶进行扣减。
先更新分桶库存缓存,再更新本地分桶元数据缓存及远程元数据缓存,可以避免不同机器的本地分桶元数据缓存不一致。比如更新了本地缓存的机器不会路由到该分桶,而没更新本地缓存的机器依然路由到了该分桶。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
@Resource
private InventoryBucketCache inventoryBucketCache;
...
//将准备下线的分桶列表,先从本地缓存以及远程缓存列表里面移除
private void updateBucket(InventorOfflineRequest request) {
//获取本地和远程的分桶列表
BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());
if (!Objects.isNull(bucketLocalCache)) {
//过滤返回下线的分桶确实存在于存活的分桶列表上
Map<String, BucketCacheBO> bucketCacheMap = bucketLocalCache.getAvailableList().stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity()));
//过滤已不存在远程缓存的列表
List<String> bucketCacheList = request.getBucketNoList().stream().filter(bucketCacheMap::containsKey).collect(Collectors.toList());
//过滤后,有可下线的分桶缓存
if (!CollectionUtils.isEmpty(bucketCacheList)) {
//分桶最少也需要保留一个
if (bucketLocalCache.getAvailableList().size() > 1) {
//先移除缓存的分桶列表,避免新的请求访问影响真实库存
updateBucketCache(bucketCacheList, bucketLocalCache);
}
}
}
}
...
}
步骤四:触发库存回收,将当前下线的分桶还存在的库存都回退到中⼼桶。这⾥需要注意的是更新下线后的元数据,此时分桶还可能在被扣减库存。需要扣减请求路由不到该分桶才可以对该分桶内的库存进⾏回源中⼼桶。这⾥默认采取的是延迟1秒,具体最优时间可由压测得出。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
@Resource
private InventoryBucketCache inventoryBucketCache;
...
//移除本地分桶的对应分桶列表以及远程的分桶列表
//@param bucketCacheList 下线的分桶列表
//@param bucketCache 远程缓存元数据信息
private void updateBucketCache(List<String> bucketCacheList, BucketLocalCache bucketCache) {
String key = bucketCache.getSellerId() + bucketCache.getSkuId();
//1.获取到本地的缓存列表
BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);
//2.填充下线的分桶到不可用列表中
for (String bucketNo : bucketCacheList) {
bucketLocalCache.getUndistributedList().add(new BucketCacheBO(bucketNo));
}
//过滤返还上线的分桶列表
List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList().stream().filter(bucketCacheBO -> !bucketCacheList.contains(bucketCacheBO.getBucketNo())).collect(Collectors.toList());
bucketLocalCache.setAvailableList(availableList);
//3.从本地缓存里面更新
inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache);
//4.覆盖远程的分桶元数据信息
tairCache.set(TairInventoryConstant.SELLER_BUCKET_PREFIX + key, JSONObject.toJSONString(bucketLocalCache), 0);
log.info("下线分桶,分桶元数据信息:{}", JSONObject.toJSONString(bucketLocalCache));
//发送清空下线分桶库存的消息
bucketClearProducer.sendBucketClear(new BucketClearRequest(bucketCache.getSkuId(), bucketCache.getSellerId(), bucketCacheList, 0));
}
...
}
//清空分桶库存的消息队列
@Component
public class BucketClearProducer {
@Autowired
private DefaultProducer defaultProducer;
//清空分桶库存的消息,MQ生产
public void sendBucketClear(BucketClearRequest bucketClearRequest) {
//发送清空分桶库存消息,延迟1秒,留给更多的时间给正在扣减该分桶的线程处理
defaultProducer.sendMessage(RocketMqConstant.BUCKET_CLEAR_TOPIC, JSONObject.toJSONString(bucketClearRequest), 1, "清空分桶");
}
}
步骤五:接收延迟消息,开始处理分桶下线。同时处理完成分桶下线时,还需检测⼀下库存是否触发预警机制。如触发了预警通知,则需要发出消息进行异步处理。
//处理清空分桶库存的消息
@Component
public class BucketClearListener implements MessageListenerConcurrently {
@Autowired
private InventoryBucketService inventoryBucketService;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
for (MessageExt messageExt : list) {
String msg = new String(messageExt.getBody());
log.info("执行分桶下线清空库存,消息内容:{}", msg);
BucketClearRequest bucketClearRequest = JsonUtil.json2Object(msg, BucketClearRequest.class);
inventoryBucketService.bucketClear(bucketClearRequest);
}
} catch (Exception e) {
log.error("consume error, 清空分桶库存失败", e);
//本次消费失败,下次重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//清空分桶库存,分桶库存放回中央库存
@Override
public void bucketClear(BucketClearRequest request) {
long start = System.currentTimeMillis();
String key = TairInventoryConstant.SELLER_BUCKET_PREFIX + request.getSellerId() + request.getSkuId();
String bucketCache = tairCache.get(key);
if (!StringUtils.isEmpty(bucketCache)) {
BucketLocalCache bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class);
updateBucketInventory(request.getBucketNoList(), bucketLocalCache);
}
log.info("清空下线分桶库存,request:{},时间:{}", JSON.toJSONString(request), System.currentTimeMillis() - start);
//商品库存值预警
warningProductInventory(bucketCache);
}
...
}
步骤六:清理下线分桶库存逻辑处理
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//将分桶的缓存库存返回给中心桶库存上
private void updateBucketInventory(List<String> bucketCacheList, BucketLocalCache bucketLocalCache) {
//中心桶的库存key
String key = TairInventoryConstant.SELLER_INVENTORY_PREFIX + bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId();
Integer inventoryNum = 0;
//下线的分桶列表
List<String> undistributedList = bucketLocalCache.getUndistributedList().stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList());
//只处理已经下线的分桶
bucketCacheList = bucketCacheList.stream().filter(undistributedList::contains).collect(Collectors.toList());
if (CollectionUtils.isEmpty(bucketCacheList)) {
return;
}
for (String bucketNo : bucketCacheList) {
//先获取下线的分桶实际剩余库存
String bucketNum = tairCache.get(bucketNo);
//当分桶的库存大于0的时候才处理
if (!StringUtils.isEmpty(bucketNum) && Integer.valueOf(bucketNum) > 0) {
//清理下线的分桶库存,设置为0
Integer result = tairCache.decr(bucketNo, Integer.parseInt(bucketNum));
if (result >= 0) {
log.error("下线分桶,bucketNo:{},desc:{}", bucketNo, bucketNum);
inventoryNum = inventoryNum + Integer.parseInt(bucketNum);
} else {
log.error("分桶已下线,bucketNo:{}", bucketNo);
}
}
}
if (inventoryNum > 0) {
//将下线的剩余库存加至 中心桶库存上
Integer incr = tairCache.incr(key, inventoryNum);
log.error("回源中心桶,inventoryNum:{}, after value :{}", inventoryNum, incr);
}
}
...
}
3.库存分桶扩容
(1)使用入口
(2)具体实现
(1)使用入口
每次对库存分桶缓存扣减库存时,都会检查是否需要进行分桶扩容。当分桶的剩余库存小于回源比例时,就会触发发送消息进行异步分桶扩容。
@Component
public class BucketCapacityListener implements MessageListenerConcurrently {
@Autowired
private InventoryBucketService inventoryBucketService;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
for (MessageExt messageExt : list) {
String msg = new String(messageExt.getBody());
log.info("执行分桶库存扩容,消息内容:{}", msg);
BucketCapacity bucketCapacity = JsonUtil.json2Object(msg, BucketCapacity.class);
inventoryBucketService.bucketCapacity(bucketCapacity);
}
} catch (Exception e) {
log.error("consume error, 分桶库存扩容失败", e);
//本次消费失败,下次重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
@Resource
private TairLock tairLock;
...
//分桶扩容接口
@Override
public void bucketCapacity(BucketCapacity bucketCapacity) {
//先锁住中心桶库存,避免此时库存发生变化
String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + bucketCapacity.getSellerId() + bucketCapacity.getSkuId();
String value = SnowflakeIdWorker.getCode();
//1.校验是否已经无需扩容了,如果是则快速结束
BucketCapacityContext bucketCapacityContext = checkBucketCapacity(bucketCapacity);
if (!bucketCapacityContext.getIsCapacity()) {
return;
}
//获取分布式锁来进行扩容处理
boolean lock = tairLock.tryLock(key, value);
if (lock) {
try {
//再次校验是否需要扩容,此处不允许并发
bucketCapacityContext = checkBucketCapacity(bucketCapacity);
if (bucketCapacityContext.getIsCapacity()) {
//2.获取中心桶库存的库存
Integer residueNum = getCenterStock(bucketCapacity);
//3.可以扩容,计算出可回源的库存进行处理
if (residueNum > 0) {
backSourceInventory(residueNum, bucketCapacityContext);
} else {
//4.中心桶无库存,检查是否触发下线
checkBucketOffline(bucketCapacity);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
tairLock.unlock(key, value);
}
} else {
throw new BaseBizException("请求繁忙,稍后重试!");
}
}
...
}
(2)具体实现
步骤一:验证当前扩容分桶是否已经被处理
步骤二:获取中⼼桶缓存的剩余库存,校验是否存在库存
步骤三:对分桶进行扩容
步骤四:获取扩容后的预估库存深度
步骤五:刷新分桶元数据缓存
步骤六:将计算好的分桶库存和中⼼桶库存进⾏增加和扣减操作
步骤一:验证当前扩容分桶是否已经被处理。如果库存⽆需处理,则快速结束,避免因为争抢锁影响性能。只有需要扩容的才上锁进⾏后续校验,并且上锁后需要再次校验。Double Check可以避免部分⽆效请求。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//校验本次请求是否还需要执行扩容处理
private BucketCapacityContext checkBucketCapacity(BucketCapacity bucketCapacity) {
String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId();
//1.获取远程的分桶缓存
Integer residueNum = getBucketInventoryNum(bucketCapacity.getBucketNo());
//2.获取缓存元数据信息
BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);
//3.校验是否还需要执行扩容
List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();
InventoryBucketConfigDO inventoryBucketConfig = bucketLocalCache.getInventoryBucketConfig();
for (BucketCacheBO bucketCacheBO : availableList) {
//具体使用的是哪个分桶进行扣减库存
if (bucketCacheBO.getBucketNo().equals(bucketCapacity.getBucketNo())) {
//触发回源比例的百分比
Integer backSourceProportion = inventoryBucketConfig.getBackSourceProportion();
//当前分桶的分配总库存
Integer bucketNum = bucketCacheBO.getBucketNum();
int backSourceNum = bucketNum * backSourceProportion / 100;
//回源比例的库存 大于剩余的库存,触发异步扩容
return new BucketCapacityContext(residueNum, backSourceNum > residueNum, bucketCapacity);
}
}
//如果不在可用列表里面,则意味已下线,快速结束掉
return new BucketCapacityContext(residueNum, false, bucketCapacity);
}
...
}
步骤二:获取中⼼桶缓存的剩余库存,校验是否存在库存。有库存才扩容,⽆库存则验证是否触发下线阈值。如果触发下线阈值,则发送消息进行通知,异步处理分桶下线。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//返回中心桶库存,如返回中心库存大于0则允许处理
private Integer getCenterStock(BucketCapacity bucketCapacity) {
String key = TairInventoryConstant.SELLER_INVENTORY_PREFIX + bucketCapacity.getSellerId() + bucketCapacity.getSkuId();
String centreCacheNum = tairCache.get(key);
if (!StringUtils.isEmpty(centreCacheNum)) {
return Integer.valueOf(centreCacheNum);
}
return 0;
}
...
}
步骤三:对分桶进行扩容。分桶需要扩容多少库存,需要注意尽量保证每个分桶的库存尽可能均匀。如果中心桶库存超过最大深度库存,则直接以配置的回源步长增长库存,否则汇总当前分桶的实际库存深度。也就是根据当前的可⽤分桶列表、中⼼桶库存、总的可⽤库存深度,计算出平均的⼀个可分配库存数量,从而避免每个分桶扩容的库存不均匀(最⼩值必须超过最⼩库存深度)。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//回源库存到分桶上
//@param residueNum 中心桶库存
//@param bucketCapacityContext 扩容上下文对象
private void backSourceInventory(Integer residueNum, BucketCapacityContext bucketCapacityContext) {
//首先需要当前分桶的库存,其次还需要获取目前分桶的可发库存深度(第一次初始化的时候分配的库存)
//根据当初分配的库存深度以及最大库存深度以及中心桶库存,得出均匀到目前支持可用的分桶均匀分配库存大概数量
//同时根据本次同步的库存数量刷新分桶的实际库存深度
BucketCapacity bucketCapacity = bucketCapacityContext.getBucketCapacity();
//先获取本地的分桶元数据信息,获取当前分桶的总发放上限
String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId();
BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);
InventoryBucketConfigDO inventoryBucketConfig = bucketLocalCache.getInventoryBucketConfig();
List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();
Integer inventoryNum = 0;
//获取实际配置的最大可用库存深度
Integer maxBucketNum = availableList.stream().mapToInt(BucketCacheBO::getBucketNum).sum();
BucketCacheBO bucketCache = null;
for (BucketCacheBO bucketCacheBO : availableList) {
if (bucketCacheBO.getBucketNo().equals(bucketCapacity.getBucketNo())) {
bucketCache = bucketCacheBO;
break;
}
}
//这里没有匹配到分桶,则该分桶已被下线,不处理后续流程
if (Objects.isNull(bucketCache)) {
return;
}
//3.中心桶库存超过最大深度库存(全部分桶总计),直接以配置的回源步长增长库存
if (residueNum > maxBucketNum) {
inventoryNum = inventoryBucketConfig.getBackSourceStep();
} else {
inventoryNum = calcEvenInventoryNum(maxBucketNum, inventoryBucketConfig, residueNum, bucketCache);
}
//4.获取扩容后的预估库存深度
Integer maxDepthNum = getMaxDepthNum(inventoryNum, inventoryBucketConfig, bucketCache, bucketCapacityContext);
//5.更新分桶元数据相关信息,注意需要判断当前分桶的库存深度是否真实发生变化,如无变化则不需要更新
refreshBucketCache(maxDepthNum, bucketLocalCache, bucketCapacity.getBucketNo(), inventoryNum);
log.info("本次分桶:{},回源库存:{}", bucketCapacity.getBucketNo(), inventoryNum);
//6.回源分桶的库存
Integer incr = tairCache.incr(bucketCapacity.getBucketNo(), inventoryNum);
//7.扣减中心桶库存
Integer decr = tairCache.decr(TairInventoryConstant.SELLER_INVENTORY_PREFIX + key, inventoryNum);
log.info("本次分桶:{},回源库存:{}, 回源后分桶库存:{}, 中心桶剩余库存:{}", bucketCapacity.getBucketNo(), inventoryNum, incr, decr);
}
...
//计算出均匀后的每个分桶实际分配的库存值
//@param maxBucketNum 最大的库存深度
//@param inventoryBucketConfig 分桶配置
//@param residueNum 中心桶剩余库存
//@param bucketCache 扩容分桶
private Integer calcEvenInventoryNum(Integer maxBucketNum, InventoryBucketConfigDO inventoryBucketConfig, Integer residueNum, BucketCacheBO bucketCache) {
//获取当前扩容的分桶深度
Integer bucketDepthNum = bucketCache.getBucketNum();
//得到扩容的分桶深度 和当前全部可用分桶的库存深度,计算占比
//根据占比计算出回源的步长,注意最小深度,如果计算后的步长小于最小库存深度,则默认取最小库存深度
BigDecimal proportion = new BigDecimal(bucketDepthNum).divide(new BigDecimal(maxBucketNum), 6, BigDecimal.ROUND_DOWN);
//根据比例计算出可分配的库存
BigDecimal allotNum = new BigDecimal(residueNum).multiply(proportion).setScale(0, BigDecimal.ROUND_DOWN);
if (allotNum.compareTo(new BigDecimal(inventoryBucketConfig.getMinDepthNum())) < 0) {
allotNum = new BigDecimal(inventoryBucketConfig.getMinDepthNum());
}
//当最小深度都已无法满足剩余库存,则以实际剩余库存扩容
if (new BigDecimal(residueNum).compareTo(allotNum) < 0) {
return residueNum;
}
//得到扩容的库存值
return allotNum.intValue();
}
...
}
步骤四:获取扩容后的预估库存深度,此时分桶的库存深度发⽣变化,如果扩容的库存深度超过当时分配的库存深度,且未超过最⼤库存深度,则以当前分配的实际库存更新当前分桶库存深度。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//返回目前扩容后的库存深度,库存深度只允许增长不允许减少
//@param inventoryNum 步长扩容库存
//@param inventoryBucketConfig 分桶配置信息
//@param bucketCache 分桶信息
private Integer getMaxDepthNum(Integer inventoryNum, InventoryBucketConfigDO inventoryBucketConfig, BucketCacheBO bucketCache, BucketCapacityContext bucketCapacityContext) {
//获取当前分桶的实际库存,实际库存和真实库存会有差异,但是这里只是计算一个大概库存深度,无需精确
Integer residueNum = bucketCapacityContext.getResidueNum();
//预估出实际库存深度,当前分桶库存 + 步长增长库存
Integer maxBucketNum = residueNum + inventoryNum;
if (bucketCache.getBucketNum() > maxBucketNum) {
return bucketCache.getBucketNum();
}
log.info("前分桶的实际库存:{},预估的实际库存深度:{}", residueNum, maxBucketNum);
//实际库存深度,不能超过配置的最大库存深度,同理,最小深度也不能小于最小的库存深度
if (inventoryBucketConfig.getMaxDepthNum() < maxBucketNum) {
return inventoryBucketConfig.getMaxDepthNum();
}
return maxBucketNum;
}
...
}
步骤五:刷新分桶元数据缓存
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//刷新分桶元数据缓存
//@param maxDepthNum 分桶最大库存深度
//@param bucketLocalCache 分桶元数据信息
//@param bucketNo 分桶编号
private void refreshBucketCache(Integer maxDepthNum, BucketLocalCache bucketLocalCache, String bucketNo, Integer inventoryNum) {
List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();
for (BucketCacheBO bucketCacheBO : availableList) {
if (bucketCacheBO.getBucketNo().equals(bucketNo)) {
//每次库存具体深度变化都要更细,否则很容易触发 回源的比例
bucketCacheBO.setBucketNum(maxDepthNum);
bucketCacheBO.setAllotNum(inventoryNum + (Objects.isNull(bucketCacheBO.getAllotNum()) ? 0 : bucketCacheBO.getAllotNum()));
break;
}
}
String key = bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId();
//1.刷新本地缓存
inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache);
//2.刷新远程缓存
tairCache.set(TairInventoryConstant.SELLER_BUCKET_PREFIX + key, JSONObject.toJSONString(bucketLocalCache), 0);
}
...
}
步骤六:将计算好的分桶库存和中⼼桶库存进⾏增加和扣减操作。
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//回源库存到分桶上
//@param residueNum 中心桶库存
//@param bucketCapacityContext 扩容上下文对象
private void backSourceInventory(Integer residueNum, BucketCapacityContext bucketCapacityContext) {
BucketCapacity bucketCapacity = bucketCapacityContext.getBucketCapacity();
//先获取本地的分桶元数据信息,获取当前分桶的总发放上限
String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId();
BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);
InventoryBucketConfigDO inventoryBucketConfig = bucketLocalCache.getInventoryBucketConfig();
List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();
Integer inventoryNum = 0;
//获取实际配置的最大可用库存深度
Integer maxBucketNum = availableList.stream().mapToInt(BucketCacheBO::getBucketNum).sum();
BucketCacheBO bucketCache = null;
for (BucketCacheBO bucketCacheBO : availableList) {
if (bucketCacheBO.getBucketNo().equals(bucketCapacity.getBucketNo())) {
bucketCache = bucketCacheBO;
break;
}
}
//这里没有匹配到分桶,则该分桶已被下线,不处理后续流程
if (Objects.isNull(bucketCache)) {
return;
}
//3.中心桶库存超过最大深度库存(全部分桶总计),直接以配置的回源步长增长库存
if (residueNum > maxBucketNum) {
inventoryNum = inventoryBucketConfig.getBackSourceStep();
} else {
inventoryNum = calcEvenInventoryNum(maxBucketNum, inventoryBucketConfig, residueNum, bucketCache);
}
//4.获取扩容后的预估库存深度
Integer maxDepthNum = getMaxDepthNum(inventoryNum, inventoryBucketConfig, bucketCache, bucketCapacityContext);
//5.更新分桶元数据相关信息,注意需要判断当前分桶的库存深度是否真实发生变化,如无变化则不需要更新
refreshBucketCache(maxDepthNum, bucketLocalCache, bucketCapacity.getBucketNo(), inventoryNum);
log.info("本次分桶:{},回源库存:{}", bucketCapacity.getBucketNo(), inventoryNum);
//6.回源分桶的库存
Integer incr = tairCache.incr(bucketCapacity.getBucketNo(), inventoryNum);
//7.扣减中心桶库存
Integer decr = tairCache.decr(TairInventoryConstant.SELLER_INVENTORY_PREFIX + key, inventoryNum);
log.info("本次分桶:{},回源库存:{}, 回源后分桶库存:{}, 中心桶剩余库存:{}", bucketCapacity.getBucketNo(), inventoryNum, incr, decr);
}
...
}
4.库存预警
(1)库存系统发送库存预警消息
(2)商品可采可补可售系统消费预警消息
(1)库存系统发送库存预警消息
进行分桶扩容时,如果发现中心桶没有库存,就会触发检查是否要下线。在对分桶库存进行下线时,会清空分桶库存,并且进行库存预警。
//处理清空分桶库存的消息
@Component
public class BucketClearListener implements MessageListenerConcurrently {
@Autowired
private InventoryBucketService inventoryBucketService;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
for (MessageExt messageExt : list) {
String msg = new String(messageExt.getBody());
log.info("执行分桶下线清空库存,消息内容:{}", msg);
BucketClearRequest bucketClearRequest = JsonUtil.json2Object(msg, BucketClearRequest.class);
inventoryBucketService.bucketClear(bucketClearRequest);
}
} catch (Exception e) {
log.error("consume error, 清空分桶库存失败", e);
//本次消费失败,下次重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
@Service
public class InventoryBucketServiceImpl implements InventoryBucketService {
...
//清空分桶库存,分桶库存放回中央库存
@Override
public void bucketClear(BucketClearRequest request) {
long start = System.currentTimeMillis();
String key = TairInventoryConstant.SELLER_BUCKET_PREFIX + request.getSellerId() + request.getSkuId();
String bucketCache = tairCache.get(key);
if (!StringUtils.isEmpty(bucketCache)) {
BucketLocalCache bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class);
updateBucketInventory(request.getBucketNoList(), bucketLocalCache);
}
log.info("清空下线分桶库存,request:{},时间:{}", JSON.toJSONString(request), System.currentTimeMillis() - start);
//商品库存值预警
warningProductInventory(bucketCache);
}
//对商品的库存发生变化进行预警处理
private void warningProductInventory(String bucketCache) {
//1.批量获取一下可用的缓存分桶列表编号
BucketLocalCache bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class);
List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();
//2.批量获取汇总商品剩余库存(分桶下线代表中心桶库存已经没有了,不校验中心桶库存)
List<String> cacheKeyList = availableList.stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList());
List<String> productInventoryList = tairCache.mget(cacheKeyList);
//3.检测卖家单个商品的总库存 是否触发最小值预警或者百分比预警,是则异步消息通知供需服务
Integer sumInventoryNum = 0;
for (int i = 0; i < productInventoryList.size(); i++) {
String inventoryNum = productInventoryList.get(i);
if (StringUtils.isNotEmpty(inventoryNum)) {
sumInventoryNum = sumInventoryNum + Integer.valueOf(inventoryNum);
}
}
Boolean isWarning = false;
//如果实际库存值,小于预警值,或者总库存触发比例阈值,异步消息通知
if (sumInventoryNum < warningInventoryNum) {
isWarning = true;
}
//未触发最小库存预警,检测是否触发最小比例预警
if (!isWarning) {
//总的库存深度,不仅仅要看可用分桶的库存深度,还要看下线的库存深度,从而计算出一个当时实际分配的库存深度,计算出一个预警值
int sumBucketNum = availableList.stream().mapToInt(BucketCacheBO::getBucketNum).sum();
List<BucketCacheBO> undistributedList = bucketLocalCache.getUndistributedList();
if (!CollectionUtils.isEmpty(undistributedList)) {
sumBucketNum = sumBucketNum + undistributedList.stream().mapToInt(cacheBO -> Objects.isNull(cacheBO.getBucketNum()) ? 0 : cacheBO.getBucketNum()).sum();
}
log.info("总的库存深度:{}", sumBucketNum);
//预警比例
BigDecimal warningProportion = new BigDecimal(proportion).divide(new BigDecimal(100), 3, BigDecimal.ROUND_DOWN);
//库存占比
BigDecimal inventoryProportion = new BigDecimal(sumInventoryNum).divide(new BigDecimal(sumBucketNum), 6, BigDecimal.ROUND_HALF_UP);
//配置的预警比例,大于分配的实际库存深度和已剩的库存占比
if (warningProportion.compareTo(inventoryProportion) > 0) {
isWarning = true;
}
}
//异步消息通知预警
if (isWarning) {
WarningInventoryDTO warningInventoryDTO = inventoryConverter.converterDTO(bucketLocalCache);
warningInventoryProducer.sendWarningInventory(warningInventoryDTO);
}
}
...
}
@Component
public class WarningInventoryProducer {
@Autowired
private DefaultProducer defaultProducer;
//库存预警的消息 MQ生产
public void sendWarningInventory(WarningInventoryDTO warningInventoryDTO) {
//发送分库存预警消息
defaultProducer.sendMessage(RocketMqConstant.WARNING_INVENTORY_TOPIC, JSONObject.toJSONString(warningInventoryDTO), "库存预警");
}
}
(2)商品可采可补可售系统消费预警消息
@Component
public class WarningInventoryListener implements MessageListenerConcurrently {
@Resource
private WarningInventoryService warningInventoryService;
@Resource
private InventoryRemote inventoryRemote;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
for (MessageExt messageExt : list) {
String msg = new String(messageExt.getBody());
log.info("执行库存预警,消息内容:{}", msg);
WarningInventoryDTO warningInventoryDTO = JsonUtil.json2Object(msg, WarningInventoryDTO.class);
//1.入预警缓存
JsonResult jsonResult = warningInventoryService.warningInventory(warningInventoryDTO);
//2.通知库存消息预警,只有有效的才通知
if (jsonResult.getSuccess()) {
inventoryRemote.warningInventoryMessage(warningInventoryDTO);
}
}
} catch (Exception e) {
log.error("consume error, 库存预警失败", e);
//本次消费失败,下次重新消费
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
@Service
public class WarningInventoryServiceImpl implements WarningInventoryService {
@Autowired
private RedisReadWriteManager redisReadWriteManager;
@Autowired
private RedisManagerRepository redisManagerRepository;
@Override
public JsonResult warningInventory(WarningInventoryDTO warningInventoryDTO) {
List<Long> sellerIdList = Arrays.asList(warningInventoryDTO.getSellerId());
//1.查询缓存,key为卖家ID,value为对应的sku列表
Map<Long, List<String>> redisSetMap = redisReadWriteManager.getRedisSortedSet(sellerIdList, AbstractRedisKeyConstants::getWarningInventoryZsetKey);
//2.获取这个卖家下的库存预警商品列表
List<String> skuList = redisSetMap.get(warningInventoryDTO.getSellerId());
//3.返回缓存操作的模型对象
Map<String, RedisSortedSetCache> redisSortedSetCacheMap = redisManagerRepository.diffWarningInventory(skuList, warningInventoryDTO, AbstractRedisKeyConstants::getWarningInventoryZsetKey);
if (redisSortedSetCacheMap.size() > 0) {
//4.执行数据缓存更新
redisReadWriteManager.flushIncrSortedSetMap(redisSortedSetCacheMap);
return JsonResult.buildSuccess();
}
return JsonResult.buildError("重复库存预警通知");
}
}