前言
本地部署参考 这里
MQ部分
代码举例
阿里云
依赖
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.8.4.Final</version>
</dependency>
阿里云配置类(自定义的)
public class RmqTopicVO {
/**
* 主题类型
**/
private String type;
/**
* 主题
**/
private String topic;
private String tag;
/**
* 是否延时发送
**/
private Boolean delayed;
/**
* 延时小时数
**/
private List<Long> hours;
/**
* 延时分钟数
**/
private List<Long> minutes;
/**
* 延时秒数
**/
private List<Long> seconds;
/**
* 延时毫秒数
**/
private List<Long> millis;
/**
* 重试次数
**/
private Integer retryTimes;
}
生产者代码
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ProducerBean buildProducer() {
ProducerBean producer = new ProducerBean();
producer.setProperties(getMqPropertie());
return producer;
}
消费者代码
public class RmqServerConfig {
private String accessKey;
private String secretKey;
private String nameSrvAddr;
private String groupId;
private int threadNums = 20;
private List<RmqTopicVO> topics;
@Bean(initMethod = "start", destroyMethod = "shutdown")
public ConsumerBean buildConsumer() {
ConsumerBean consumerBean = new ConsumerBean();
//配置文件
Properties properties = getMqPropertie();
properties.setProperty(PropertyKeyConst.GROUP_ID, groupId);
//将消费者线程数固定为20个 20为默认值
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, threadNums+"");
consumerBean.setProperties(properties);
//订阅关系
if (topics == null || topics.isEmpty()){
log.error("没有配置RMQ订阅主题");
return null;
}
Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>(topics.size());
//订阅多个设置topic
topics.forEach(topic ->{
Subscription subscription = new Subscription();
subscription.setTopic(topic.getTopic());
subscription.setExpression(topic.getTag());
//这里将注入到spring的消息监听处理类获取到,并放到topic订阅列表中
subscriptionTable.put(subscription, SpringContextUtil.getBean(topic.getType()));
log.info("注册RMQ:{}", topic.getType());
});
consumerBean.setSubscriptionTable(subscriptionTable);
return consumerBean;
}
public Properties getMqPropertie() {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
return properties;
}
}
消息监听类(举例)
@Slf4j
@Component
@RequiredArgsConstructor
public class MyListener implements MessageListener {
private final MyService myService;
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
try {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("收到通知, body:{}", body);
myService.test(body);
return Action.CommitMessage;
} catch (Exception e) {
log.error("mq发生异常", e);
return Action.ReconsumeLater;
}
}
}
本地部署版
依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-acl -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.9.8</version>
</dependency>
生产者代码
@Value("${rocketmq.name-server}")
private String myNameSrvAddr;
@Value("${rocketmq.producer.access-key}")
private String myAccessKey;
@Value("${rocketmq.producer.secret-key}")
private String mySecretKey;
@Value("${spring.application.name}")
private String projectName;
@Bean( initMethod = "start", destroyMethod = "shutdown")
public DefaultMQProducer mqProducer() {
// 1. 创建带认证信息的 RPCHook
AclClientRPCHook aclHook = new AclClientRPCHook(new SessionCredentials(myAccessKey, mySecretKey));
DefaultMQProducer producer = new DefaultMQProducer(null, aclHook, true, null);
// 本地部署客户端要求每个producer有唯一的producerGroup
String suffix = UUID.randomUUID().toString().replace("-", "").substring(0, 5);
producer.setProducerGroup(projectName + "_" + suffix);
producer.setNamesrvAddr(myNameSrvAddr);
producer.setVipChannelEnabled(false);
return producer;
}
消费者代码
@Value("${rocketmq.name-server}")
private String myNameSrvAddr;
@Value("${rocketmq.consumer.access-key}")
private String myAccessKey;
@Value("${rocketmq.consumer.secret-key}")
private String mySecretKey;
private List<DefaultMQPushConsumer> consumers;
@Value("${spring.application.name}")
private String projectName;
@Bean
public List<DefaultMQPushConsumer> buildConsumer() {
List<DefaultMQPushConsumer> consumers = new ArrayList<>(topics.size());
for (RmqTopicVO topicVO : topics) {
String topic = topicVO.getTopic();
if (!StringUtils.hasText(topic)){
continue;
}
MessageListenerConcurrently topicListenerBean = SpringContextUtil.getBean(topicVO.getType());
if (Objects.isNull(topicListenerBean)){
log.error("topic:[ {} ] no listener object", topic);
continue;
}
String tag = StringUtils.hasText(topicVO.getTag()) ? topicVO.getTag() : "*";
try {
AclClientRPCHook aclHook = new AclClientRPCHook(new SessionCredentials(myAccessKey, mySecretKey));
String groupName = String.join("_", topic, projectName, "group");
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName, aclHook, new AllocateMessageQueueAveragely(), true, null);
consumer.setNamesrvAddr(myNameSrvAddr);
consumer.subscribe(topic, tag);
consumer.setConsumeMessageBatchMaxSize(1);
consumer.setConsumeThreadMin(threadNums);
consumer.setConsumeThreadMax(threadNums);
//最大重试次数
consumer.setMaxReconsumeTimes(3);
consumer.registerMessageListener(topicListenerBean);
consumer.start();
consumers.add(consumer);
}catch (Exception e){
log.error("创建topic:[ {} ]消费者异常",topic, e);
}
}
// 保存引用
this.consumers = consumers;
return consumers;
}
@PreDestroy
public void destroy() {
log.info("进入销毁消费者组流程");
if (consumers != null) {
for (DefaultMQPushConsumer consumer : consumers) {
try {
consumer.shutdown();
} catch (Exception e) {
// 日志记录
log.info("关闭消费者组异常", e);
}
}
}
}
消息监听类(举例)
@Slf4j
@Component
@RequiredArgsConstructor
public class MyListener implements MessageListenerConcurrently {
private final MyService myService;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
boolean success = onMessage(msg);
if (!success) {
// 只要有一条消息处理失败,整个批次返回RECONSUME_LATER
// 这样这一批消息会被整体重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 所有消息都成功处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
private boolean onMessage(MessageExt message) {
try {
String body = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("收到通知,body:{}", body);
myService.test(body);
} catch (Exception e) {
log.error("mq发生异常, topic: {}, msgKey: {}", message.getTopic(), message.getKeys(), e);
// 抛出异常,RocketMQ会自动重试
return false;
}
return true;
}
}
问题点
- 生产者和消费者组一定不能共用,要不然会出现各种问题,如果你没出现问题,那可能是你测试的不够多
- 关于服务消费同一消息时,要设置不同组,MQ默认是集群消费,不是广播消费。
- 消费时,为了避免批量消费中有一个错误导致所有回滚,可以设置每批次拉取一条消息
- 消息延迟问题,rocektmq 4系列,只有延迟级别,不能设置具体的时间(5系列支持)
RocketMQ-MQTT部分
由于系统需要知道设备上下线,处理业务,所以基于1.0.1版本修改了代码
修改部分
1.心跳检测日志输出到指定文件(这个自己看着改就好,应该很简单)
2.给某个topic发送设备上下线消息
3.记录设备连接日志,方便运维
给某个topic发送设备上下线消息
增加参数
在service.conf
配置文件中添加设备上下线通知topic
connectionNotifyTopic=
,默认iot (具体看下面相关代码)
消息类:
public class ClientStatusEventVO implements Serializable {
public static final String CONNECT_TAG = "connect";
public static final String DISCONNECT_TAG = "disconnect";
public static final String TCP_CLEAN_TAG = "tcpclean";
/**
* 每个TCP连接的唯一标识
*/
private String channelId;
/**
* 具体设备
*/
private String clientId;
/**
* 事件类型:connect/disconnect/tcpclean
*/
private String eventType;
/**
* 时间
*/
private Long time;
/**
* 客户端使用的公网出口IP地址
*/
private String clientIp;
public void setChannelId(String channelId) {
this.channelId = channelId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public void setEventType(String eventType) {
this.eventType = eventType;
}
public void setTime(Long time) {
this.time = time;
}
public void setClientIp(String clientIp) {
this.clientIp = clientIp;
}
public String getChannelId() {
return channelId;
}
public String getClientId() {
return clientId;
}
public String getEventType() {
return eventType;
}
public Long getTime() {
return time;
}
public String getClientIp() {
return clientIp;
}
@Override
public String toString() {
return "ClientStatusEventVO{" +
"channelId='" + channelId + '\'' +
", clientId='" + clientId + '\'' +
", eventType='" + eventType + '\'' +
", time=" + time +
", clientIp='" + clientIp + '\'' +
'}';
}
}
发送消息类
在mqtt-ds中创建
@Component
public class NotifyConnectionManager {
private static final Logger logger = LoggerFactory.getLogger(NotifyConnectionManager.class);
private static final String PRODUCER_GROUP_NAME = "iot_producer_group";
@Resource
private ServiceConf serviceConf;
private DefaultMQProducer defaultMQProducer;
@PostConstruct
public void init() throws MQClientException {
String suffix = UUID.randomUUID().toString().replace("-", "").substring(0, 8);
String groupName = PRODUCER_GROUP_NAME + "_" + suffix;
defaultMQProducer = MqFactory.buildDefaultMQProducer(groupName, serviceConf.getProperties());
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(0);
defaultMQProducer.start();
logger.info("NotifyConnectionManager initialize successfully. groupName: {}", groupName);
}
public void putMessageToMq(String tag, String msgStr){
String notifyTopic = serviceConf.getProperties().getProperty("connectionNotifyTopic", "iot");
String msgKey = UUID.randomUUID().toString().replace("-", "");
Message rmqMsg = new Message(
notifyTopic,
tag,
msgKey,
// 消息体
msgStr.getBytes(StandardCharsets.UTF_8)
);
try {
defaultMQProducer.send(rmqMsg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
logger.info("Send notify msg successfully, topic:{}, tag:{}, msgKey:{}, msg:{}, result:{}", notifyTopic, tag, msgKey, msgStr, sendResult);
}
@Override
public void onException(Throwable throwable) {
logger.error("Send notify msg failed, topic:{}, msgKey:{}, msg:{}", notifyTopic, msgKey, msgStr, throwable);
}
});
}catch (Exception e){
logger.error("Send notify msg failed, topic:{}, msgKey:{}, msg:{}", notifyTopic, msgKey, msgStr, e);
}
}
}
上面三个类中,分别表示连接异常,已连接,连接断开。
ConnectHandler:
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// create link successfully , send message to mq
String clientId = ChannelInfo.getClientId(ctx.channel());
String channelId = ChannelInfo.getId(ctx.channel());
String remoteIp = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
long timeStamp = System.currentTimeMillis();
ClientStatusEventVO clientStatusEventVO = new ClientStatusEventVO();
clientStatusEventVO.setChannelId(channelId);
clientStatusEventVO.setClientId(clientId);
clientStatusEventVO.setClientIp(remoteIp);
clientStatusEventVO.setEventType(ClientStatusEventVO.TCP_CLEAN_TAG);
clientStatusEventVO.setTime(timeStamp);
String jsonString = JSON.toJSONString(clientStatusEventVO);
// send connection details to topic, default is iot
notifyConnectionManager.putMessageToMq(ClientStatusEventVO.TCP_CLEAN_TAG, jsonString);
logger.info(" [ConnectHandler] [tcpclean]: channelId:{}, clientId: {}, remoteIp:{}", channelId, clientId, remoteIp);
if (cause.getMessage() == null || !simpleExceptions.contains(cause.getMessage())) {
logger.error("exceptionCaught {}", ctx.channel(), cause);
}
channelManager.closeConnect(ctx.channel(), ChannelCloseFrom.SERVER, cause.getMessage());
}
MqttConnectHandler:
public void doHandler(ChannelHandlerContext ctx, MqttConnectMessage connectMessage, HookResult upstreamHookResult) {
final MqttConnectVariableHeader variableHeader = connectMessage.variableHeader();
final MqttConnectPayload payload = connectMessage.payload();
Channel channel = ctx.channel();
ChannelInfo.setKeepLive(channel, variableHeader.keepAliveTimeSeconds());
ChannelInfo.setClientId(channel, connectMessage.payload().clientIdentifier());
ChannelInfo.setCleanSessionFlag(channel, variableHeader.isCleanSession());
String remark = upstreamHookResult.getRemark();
if (!upstreamHookResult.isSuccess()) {
byte connAckCode = (byte) upstreamHookResult.getSubCode();
MqttConnectReturnCode mqttConnectReturnCode = MqttConnectReturnCode.valueOf(connAckCode);
channel.writeAndFlush(MqttMessageFactory.buildConnAckMessage(mqttConnectReturnCode));
channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, remark);
return;
}
CompletableFuture<Void> future = new CompletableFuture<>();
ChannelInfo.setFuture(channel, ChannelInfo.FUTURE_CONNECT, future);
// use 'scheduler' to separate two i/o: 'ack to client' and 'session-load from rocketmq'
scheduler.schedule(() -> {
if (!future.isDone()) {
future.complete(null);
}
}, 1, TimeUnit.SECONDS);
try {
MqttConnAckMessage mqttConnAckMessage = MqttMessageFactory.buildConnAckMessage(MqttConnectReturnCode.CONNECTION_ACCEPTED);
future.thenAccept(aVoid -> {
if (!channel.isActive()) {
return;
}
ChannelInfo.removeFuture(channel, ChannelInfo.FUTURE_CONNECT);
channel.writeAndFlush(mqttConnAckMessage);
});
sessionLoop.loadSession(ChannelInfo.getClientId(channel), channel);
// save will message
WillMessage willMessage = null;
if (variableHeader.isWillFlag()) {
if (payload.willTopic() == null || payload.willMessageInBytes() == null) {
logger.error("Will message and will topic can not be empty");
channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, "Will message and will topic can not be empty");
return;
}
willMessage = new WillMessage(payload.willTopic(), payload.willMessageInBytes(), variableHeader.isWillRetain(), variableHeader.willQos());
sessionLoop.addWillMessage(channel, willMessage);
}
// create link successfully , send message to mq
String clientId = ChannelInfo.getClientId(ctx.channel());
String channelId = ChannelInfo.getId(ctx.channel());
String remoteIp = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
long timeStamp = System.currentTimeMillis();
ClientStatusEventVO clientStatusEventVO = new ClientStatusEventVO();
clientStatusEventVO.setChannelId(channelId);
clientStatusEventVO.setClientId(clientId);
clientStatusEventVO.setClientIp(remoteIp);
clientStatusEventVO.setEventType(ClientStatusEventVO.CONNECT_TAG);
clientStatusEventVO.setTime(timeStamp);
String jsonString = JSON.toJSONString(clientStatusEventVO);
// send connection details to topic, default is iot
notifyConnectionManager.putMessageToMq(ClientStatusEventVO.CONNECT_TAG, jsonString);
logger.info(" [MqttConnectHandler] [Connect]: channelId:{}, clientId: {}, remoteIp:{}", channelId, clientId, remoteIp);
} catch (Exception e) {
logger.error("Connect:{}", payload.clientIdentifier(), e);
channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, "ConnectException");
}
}
MqttDisconnectHandler:
public void doHandler(ChannelHandlerContext ctx, MqttMessage mqttMessage, HookResult upstreamHookResult) {
// disconnect manually, send message to mq
String clientId = ChannelInfo.getClientId(ctx.channel());
String channelId = ChannelInfo.getId(ctx.channel());
String remoteIp = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress();
long timeStamp = System.currentTimeMillis();
ClientStatusEventVO clientStatusEventVO = new ClientStatusEventVO();
clientStatusEventVO.setChannelId(channelId);
clientStatusEventVO.setClientId(clientId);
clientStatusEventVO.setClientIp(remoteIp);
clientStatusEventVO.setEventType(ClientStatusEventVO.DISCONNECT_TAG);
clientStatusEventVO.setTime(timeStamp);
String jsonString = JSON.toJSONString(clientStatusEventVO);
// send connection details to topic, default is iot
notifyConnectionManager.putMessageToMq(ClientStatusEventVO.DISCONNECT_TAG, jsonString);
logger.info(" [MqttDisconnectHandler] [Disconnect]: channelId:{}, clientId: {}, remoteIp:{}", channelId, clientId, remoteIp);
channelManager.closeConnect(ctx.channel(), ChannelCloseFrom.CLIENT, "disconnect");
}
这样,就实现了 阿里云的MQTT的disconnect,connect,tcpclean。
效果展示: