基于MQTT的实时消息推送系统设计与实现(Java后端+Vue前端)
本文档将详细介绍如何使用MQTT协议实现一个具备实时推送、消息优先级、消息撤回、重试机制和连接稳定性保障的完整消息系统。我们将以社交应用的实时通知场景为例,提供完整的Java后端和Vue前端实现方案。
1. 系统设计概述
1.1 业务场景
我们实现一个社交应用的实时通知系统,支持:
- 实时接收好友请求、评论、点赞等通知
- 按优先级处理不同类型消息(系统通知 > 私信 > 点赞)
- 消息发送者可在5分钟内撤回消息
- 网络异常时自动重试发送
- 客户端断线后自动重连并接收离线消息
1.2 系统架构
Vue前端应用
|
| MQTT over WebSocket
v
MQTT Broker (EMQX)
|
| 发布/订阅模式
v
Spring Boot后端服务
|
v
MySQL数据库 (存储消息记录)
1.3 技术选型
- MQTT Broker: EMQX 5.0(支持MQTT 5.0,高并发)
- 后端: Spring Boot 2.7.x + Eclipse Paho MQTT客户端
- 前端: Vue 3 + MQTT.js + Element Plus
- 数据库: MySQL 8.0
- 认证: JWT令牌认证
2. 环境搭建
2.1 安装EMQX Broker
# 使用Docker安装EMQX
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 18083:18083 emqx/emqx:5.0.24
访问EMQX控制台:http://localhost:18083
,默认账号密码:admin/public
2.2 创建主题结构
设计以下MQTT主题:
user/${userId}/notifications
- 用户接收通知的主题user/${userId}/recall
- 用户接收消息撤回指令的主题system/broadcast
- 系统广播通知notifications/dlq
- 死信队列
3. 后端实现(Java/Spring Boot)
3.1 项目初始化
创建Spring Boot项目,添加依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.14</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>mqtt-notification-service</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>mqtt-notification-service</name>
<description>MQTT Notification Service with Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Data JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- MySQL Driver -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- MQTT Client -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<!-- JWT Authentication -->
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-api</artifactId>
<version>0.11.5</version>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-impl</artifactId>
<version>0.11.5</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-jackson</artifactId>
<version>0.11.5</version>
<scope>runtime</scope>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Validation -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
3.2 配置文件
# 服务器端口
server.port=8080
# 数据库配置
spring.datasource.url=jdbc:mysql://localhost:3306/mqtt_notification?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true
spring.datasource.username=root
spring.datasource.password=password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# JPA配置
spring.jpa.hibernate.ddl-auto=update
spring.jpa.show-sql=true
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL8Dialect
# MQTT配置
mqtt.broker-url=tcp://localhost:1883
mqtt.client-id=notification-server-${random.value}
mqtt.username=admin
mqtt.password=public
mqtt.keep-alive-interval=30
mqtt.connection-timeout=3000
mqtt.clean-session=false
mqtt.retry-interval=1000
mqtt.max-retry-attempts=3
# JWT配置
jwt.secret=your-secret-key-should-be-very-long-and-secure-for-production-use
jwt.expiration=86400000
# 业务配置
app.max-recall-minutes=5
3.3 MQTT客户端配置
package com.example.mqtt.config;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqttConfig {
@Value("${mqtt.broker-url}")
private String brokerUrl;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.keep-alive-interval}")
private int keepAliveInterval;
@Value("${mqtt.connection-timeout}")
private int connectionTimeout;
@Value("${mqtt.clean-session}")
private boolean cleanSession;
@Bean
public MqttClient mqttClient() throws MqttException {
// 创建连接选项
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setKeepAliveInterval(keepAliveInterval);
options.setConnectionTimeout(connectionTimeout);
options.setCleanSession(cleanSession);
options.setAutomaticReconnect(true); // 自动重连
// 创建MQTT客户端
MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
client.connect(options);
return client;
}
}
3.4 数据模型设计
消息实体类
package com.example.mqtt.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.*;
import java.time.LocalDateTime;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "notifications")
public class Notification {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(unique = true, nullable = false)
private String messageId;
@Column(nullable = false)
private String userId;
@Column(nullable = false)
private String senderId;
@Column(nullable = false)
private String title;
@Column(nullable = false, columnDefinition = "TEXT")
private String content;
@Column(nullable = false)
@Enumerated(EnumType.STRING)
private NotificationType type;
@Column(nullable = false)
private int priority; // 0-9,9为最高优先级
@Column(nullable = false)
@Enumerated(EnumType.STRING)
private NotificationStatus status = NotificationStatus.ACTIVE;
@Column(nullable = false)
private LocalDateTime createdAt;
private LocalDateTime recallAt;
private int retryCount = 0;
private boolean isRead = false;
public enum NotificationType {
FRIEND_REQUEST, COMMENT, LIKE, SYSTEM, MENTION, MESSAGE
}
public enum NotificationStatus {
ACTIVE, RECALLED, EXPIRED, FAILED
}
}
消息数据访问接口
package com.example.mqtt.repository;
import com.example.mqtt.model.Notification;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import java.util.Optional;
@Repository
public interface NotificationRepository extends JpaRepository<Notification, Long> {
Optional<Notification> findByMessageId(String messageId);
Page<Notification> findByUserIdOrderByCreatedAtDesc(String userId, Pageable pageable);
long countByUserIdAndIsReadFalse(String userId);
}
3.5 MQTT消息服务实现
package com.example.mqtt.service;
import com.example.mqtt.model.Notification;
import com.example.mqtt.repository.NotificationRepository;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Service
public class MqttService {
private final MqttClient mqttClient;
private final NotificationRepository notificationRepository;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
@Value("${mqtt.retry-interval}")
private int retryInterval;
@Value("${mqtt.max-retry-attempts}")
private int maxRetryAttempts;
@Value("${app.max-recall-minutes}")
private int maxRecallMinutes;
@Autowired
public MqttService(MqttClient mqttClient, NotificationRepository notificationRepository) {
this.mqttClient = mqttClient;
this.notificationRepository = notificationRepository;
}
@PostConstruct
public void init() {
try {
// 订阅撤回主题,用于记录撤回操作
mqttClient.subscribe("user/+/recall", 2, (topic, message) -> {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
System.out.println("Received recall command: " + payload + " on topic: " + topic);
// 这里可以添加撤回记录逻辑
});
} catch (MqttException e) {
System.err.println("Failed to subscribe to recall topics: " + e.getMessage());
}
}
/**
* 发送通知消息
*/
public String sendNotification(Notification notification) {
// 生成唯一消息ID
String messageId = "notif-" + UUID.randomUUID().toString();
notification.setMessageId(messageId);
notification.setCreatedAt(LocalDateTime.now());
notification.setStatus(Notification.NotificationStatus.ACTIVE);
// 保存消息到数据库
notificationRepository.save(notification);
// 构建MQTT消息
String topic = "user/" + notification.getUserId() + "/notifications";
String payload = buildNotificationPayload(notification, messageId);
MqttMessage mqttMessage = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
mqttMessage.setQos(2); // 确保消息恰好一次送达
// 设置MQTT 5.0属性 - 优先级和过期时间
if (mqttClient.getVersion() == MqttClient.PROTOCOL_VERSION_5) {
MqttProperties properties = new MqttProperties();
properties.setMessagePriority(notification.getPriority());
properties.setMessageExpiryInterval(86400); // 24小时后过期
mqttMessage.setProperties(properties);
}
try {
// 发送消息
mqttClient.publish(topic, mqttMessage);
System.out.println("Message sent successfully: " + messageId);
return messageId;
} catch (MqttException e) {
System.err.println("Failed to send message: " + e.getMessage());
// 启动重试机制
scheduleRetry(notification, 1);
return messageId;
}
}
/**
* 构建通知消息 payload
*/
private String buildNotificationPayload(Notification notification, String messageId) {
return String.format(
"{\"messageId\":\"%s\",\"userId\":\"%s\",\"senderId\":\"%s\",\"title\":\"%s\",\"content\":\"%s\"," +
"\"type\":\"%s\",\"priority\":%d,\"createdAt\":\"%s\"}",
messageId,
notification.getUserId(),
notification.getSenderId(),
escapeJson(notification.getTitle()),
escapeJson(notification.getContent()),
notification.getType(),
notification.getPriority(),
notification.getCreatedAt()
);
}
/**
* JSON转义
*/
private String escapeJson(String value) {
if (value == null) return "";
return value.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\b", "\\b")
.replace("\f", "\\f")
.replace("\n", "\\n")
.replace("\r", "\\r")
.replace("\t", "\\t");
}
/**
* 安排消息重试
*/
private void scheduleRetry(Notification notification, int attempt) {
if (attempt > maxRetryAttempts) {
// 达到最大重试次数,标记为失败
notification.setStatus(Notification.NotificationStatus.FAILED);
notification.setRetryCount(attempt);
notificationRepository.save(notification);
// 发送到死信队列
String dlqTopic = "notifications/dlq";
String payload = String.format(
"{\"messageId\":\"%s\",\"error\":\"Max retry attempts reached\",\"retryCount\":%d}",
notification.getMessageId(), attempt
);
try {
mqttClient.publish(dlqTopic, new MqttMessage(payload.getBytes()));
} catch (MqttException e) {
System.err.println("Failed to publish to DLQ: " + e.getMessage());
}
return;
}
// 指数退避策略计算延迟
long delay = (long) (Math.pow(2, attempt) * retryInterval);
scheduler.schedule(() -> {
try {
String topic = "user/" + notification.getUserId() + "/notifications";
String payload = buildNotificationPayload(notification, notification.getMessageId());
MqttMessage mqttMessage = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
mqttMessage.setQos(2);
mqttClient.publish(topic, mqttMessage);
System.out.println("Message retried successfully (attempt " + attempt + "): " + notification.getMessageId());
// 更新重试计数
notification.setRetryCount(attempt);
notificationRepository.save(notification);
} catch (MqttException e) {
System.err.println("Retry attempt " + attempt + " failed: " + e.getMessage());
// 继续重试
scheduleRetry(notification, attempt + 1);
}
}, delay, TimeUnit.MILLISECONDS);
}
/**
* 撤回消息
*/
public boolean recallMessage(String messageId, String operatorId) {
// 查找消息
Notification notification = notificationRepository.findByMessageId(messageId)
.orElseThrow(() -> new IllegalArgumentException("Message not found"));
// 验证权限
if (!notification.getSenderId().equals(operatorId) && !"admin".equals(operatorId)) {
throw new SecurityException("No permission to recall this message");
}
// 检查是否在可撤回时间内
LocalDateTime now = LocalDateTime.now();
long minutesSinceCreation = java.time.Duration.between(notification.getCreatedAt(), now).toMinutes();
if (minutesSinceCreation > maxRecallMinutes) {
throw new IllegalStateException("Cannot recall message after " + maxRecallMinutes + " minutes");
}
// 更新消息状态
notification.setStatus(Notification.NotificationStatus.RECALLED);
notification.setRecallAt(now);
notificationRepository.save(notification);
// 发送撤回指令
String topic = "user/" + notification.getUserId() + "/recall";
String payload = String.format(
"{\"messageId\":\"%s\",\"operatorId\":\"%s\",\"recalledAt\":\"%s\",\"isRecall\":true}",
messageId, operatorId, now
);
try {
MqttMessage mqttMessage = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
mqttMessage.setQos(2);
mqttClient.publish(topic, mqttMessage);
return true;
} catch (MqttException e) {
System.err.println("Failed to send recall command: " + e.getMessage());
return false;
}
}
}
3.6 业务服务和控制器
通知业务服务
package com.example.mqtt.service;
import com.example.mqtt.model.Notification;
import com.example.mqtt.repository.NotificationRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
@Service
public class NotificationService {
private final NotificationRepository notificationRepository;
private final MqttService mqttService;
@Autowired
public NotificationService(NotificationRepository notificationRepository, MqttService mqttService) {
this.notificationRepository = notificationRepository;
this.mqttService = mqttService;
}
/**
* 创建并发送通知
*/
public String createAndSendNotification(String userId, String senderId, String title,
String content, Notification.NotificationType type,
int priority) {
Notification notification = Notification.builder()
.userId(userId)
.senderId(senderId)
.title(title)
.content(content)
.type(type)
.priority(priority)
.build();
return mqttService.sendNotification(notification);
}
/**
* 撤回消息
*/
public boolean recallNotification(String messageId, String operatorId) {
return mqttService.recallMessage(messageId, operatorId);
}
/**
* 获取用户通知列表
*/
public Page<Notification> getUserNotifications(String userId, Pageable pageable) {
return notificationRepository.findByUserIdOrderByCreatedAtDesc(userId, pageable);
}
/**
* 标记消息为已读
*/
public void markAsRead(String messageId, String userId) {
Notification notification = notificationRepository.findByMessageId(messageId)
.orElseThrow(() -> new IllegalArgumentException("Message not found"));
if (!notification.getUserId().equals(userId)) {
throw new SecurityException("No permission to mark this message as read");
}
notification.setRead(true);
notificationRepository.save(notification);
}
/**
* 获取未读消息数量
*/
public long getUnreadCount(String userId) {
return notificationRepository.countByUserIdAndIsReadFalse(userId);
}
}
通知API控制器
package com.example.mqtt.controller;
import com.example.mqtt.model.Notification;
import com.example.mqtt.service.NotificationService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.http.ResponseEntity;
import org.springframework.security.core.Authentication;
import org.springframework.web.bind.annotation.*;
import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
@RestController
@RequestMapping("/api/notifications")
public class NotificationController {
private final NotificationService notificationService;
@Autowired
public NotificationController(NotificationService notificationService) {
this.notificationService = notificationService;
}
/**
* 发送通知
*/
@PostMapping
public ResponseEntity<String> sendNotification(
Authentication authentication,
@Valid @RequestBody SendNotificationRequest request) {
String senderId = authentication.getName();
String messageId = notificationService.createAndSendNotification(
request.getUserId(),
senderId,
request.getTitle(),
request.getContent(),
request.getType(),
request.getPriority()
);
return ResponseEntity.ok(messageId);
}
/**
* 撤回通知
*/
@PostMapping("/{messageId}/recall")
public ResponseEntity<Boolean> recallNotification(
Authentication authentication,
@PathVariable String messageId) {
String operatorId = authentication.getName();
boolean result = notificationService.recallNotification(messageId, operatorId);
return ResponseEntity.ok(result);
}
/**
* 获取当前用户的通知列表
*/
@GetMapping
public ResponseEntity<Page<Notification>> getUserNotifications(
Authentication authentication,
Pageable pageable) {
String userId = authentication.getName();
Page<Notification> notifications = notificationService.getUserNotifications(userId, pageable);
return ResponseEntity.ok(notifications);
}
/**
* 标记消息为已读
*/
@PutMapping("/{messageId}/read")
public ResponseEntity<Void> markAsRead(
Authentication authentication,
@PathVariable String messageId) {
String userId = authentication.getName();
notificationService.markAsRead(messageId, userId);
return ResponseEntity.noContent().build();
}
/**
* 获取未读消息数量
*/
@GetMapping("/unread/count")
public ResponseEntity<Long> getUnreadCount(Authentication authentication) {
String userId = authentication.getName();
long count = notificationService.getUnreadCount(userId);
return ResponseEntity.ok(count);
}
// 请求参数模型
public static class SendNotificationRequest {
@NotBlank(message = "User ID is required")
private String userId;
@NotBlank(message = "Title is required")
private String title;
@NotBlank(message = "Content is required")
private String content;
@NotNull(message = "Type is required")
private Notification.NotificationType type;
@Min(value = 0, message = "Priority must be between 0 and 9")
private int priority = 5;
// Getters and setters
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getTitle() { return title; }
public void setTitle(String title) { this.title = title; }
public String getContent() { return content; }
public void setContent(String content) { this.content = content; }
public Notification.NotificationType getType() { return type; }
public void setType(Notification.NotificationType type) { this.type = type; }
public int getPriority() { return priority; }
public void setPriority(int priority) { this.priority = priority; }
}
}
3.7 JWT认证配置
package com.example.mqtt.config;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import io.jsonwebtoken.io.Decoders;
import io.jsonwebtoken.security.Keys;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Component;
import java.security.Key;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
@Component
public class JwtConfig {
@Value("${jwt.secret}")
private String secretKey;
@Value("${jwt.expiration}")
private long jwtExpiration;
public String extractUsername(String token) {
return extractClaim(token, Claims::getSubject);
}
public <T> T extractClaim(String token, Function<Claims, T> claimsResolver) {
final Claims claims = extractAllClaims(token);
return claimsResolver.apply(claims);
}
public String generateToken(UserDetails userDetails) {
return generateToken(new HashMap<>(), userDetails);
}
public String generateToken(Map<String, Object> extraClaims, UserDetails userDetails) {
return buildToken(extraClaims, userDetails, jwtExpiration);
}
private String buildToken(
Map<String, Object> extraClaims,
UserDetails userDetails,
long expiration
) {
return Jwts
.builder()
.setClaims(extraClaims)
.setSubject(userDetails.getUsername())
.setIssuedAt(new Date(System.currentTimeMillis()))
.setExpiration(new Date(System.currentTimeMillis() + expiration))
.signWith(getSignInKey(), SignatureAlgorithm.HS256)
.compact();
}
public boolean isTokenValid(String token, UserDetails userDetails) {
final String username = extractUsername(token);
return (username.equals(userDetails.getUsername())) && !isTokenExpired(token);
}
private boolean isTokenExpired(String token) {
return extractExpiration(token).before(new Date());
}
private Date extractExpiration(String token) {
return extractClaim(token, Claims::getExpiration);
}
private Claims extractAllClaims(String token) {
return Jwts
.parserBuilder()
.setSigningKey(getSignInKey())
.build()
.parseClaimsJws(token)
.getBody();
}
private Key getSignInKey() {
byte[] keyBytes = Decoders.BASE64.decode(secretKey);
return Keys.hmacShaKeyFor(keyBytes);
}
}
4. 前端实现(Vue 3)
4.1 项目初始化
# 创建Vue项目
vue create mqtt-notification-client
cd mqtt-notification-client
# 安装依赖
npm install mqtt element-plus axios jwt-decode
4.2 MQTT服务封装
import mqtt from 'mqtt/dist/mqtt.min'
import { ElNotification, ElMessageBox } from 'element-plus'
class MqttService {
constructor() {
this.client = null
this.userId = null
this.connected = false
this.reconnectTimeout = 1000 // 初始重连延迟(ms)
this.maxReconnectTimeout = 30000 // 最大重连延迟
this.notificationCallback = null // 接收通知的回调
this.recallCallback = null // 接收撤回的回调
}
/**
* 初始化MQTT连接
* @param {string} userId 用户ID
* @param {function} notificationCallback 通知回调函数
* @param {function} recallCallback 撤回回调函数
*/
init(userId, notificationCallback, recallCallback) {
this.userId = userId
this.notificationCallback = notificationCallback
this.recallCallback = recallCallback
// 如果已连接则先断开
if (this.client) {
this.disconnect()
}
this.connect()
}
/**
* 连接到MQTT Broker
*/
connect() {
if (!this.userId) {
console.error('用户ID不能为空')
return
}
// 生成客户端ID
const clientId = `web-client-${this.userId}-${Math.random().toString(16).substr(2, 8)}`
// MQTT over WebSocket连接地址
const brokerUrl = 'ws://localhost:8083/mqtt'
// 连接选项
const options = {
clientId,
clean: false, // 保持会话,重连后可接收离线消息
connectTimeout: 4000,
username: 'admin', // 实际环境中应使用JWT令牌
password: 'public',
keepalive: 30 // 30秒心跳
}
// 连接到broker
this.client = mqtt.connect(brokerUrl, options)
// 连接成功
this.client.on('connect', (connack) => {
this.connected = true
this.reconnectTimeout = 1000 // 重置重连延迟
console.log('MQTT连接成功', connack)
ElNotification({
title: '连接成功',
message: '已成功连接到消息服务器',
type: 'success',
duration: 3000
})
// 订阅用户专属主题
const topics = [
`user/${this.userId}/notifications`,
`user/${this.userId}/recall`
]
this.client.subscribe(topics, { qos: 2 }, (err) => {
if (!err) {
console.log(`已订阅主题: ${topics.join(', ')}`)
} else {
console.error('订阅主题失败:', err)
ElNotification({
title: '订阅失败',
message: '无法订阅通知主题',
type: 'error'
})
}
})
})
// 接收消息
this.client.on('message', (topic, message) => {
this.handleMessage(topic, message.toString())
})
// 连接断开
this.client.on('close', () => {
this.connected = false
console.log('MQTT连接已关闭')
this.scheduleReconnect()
})
// 连接错误
this.client.on('error', (err) => {
this.connected = false
console.error('MQTT错误:', err)
ElNotification({
title: '连接错误',
message: '消息服务连接发生错误',
type: 'error'
})
this.scheduleReconnect()
})
// 重连中
this.client.on('reconnect', () => {
console.log('正在重连到MQTT服务器...')
})
}
/**
* 处理接收到的消息
*/
handleMessage(topic, payload) {
try {
const message = JSON.parse(payload)
if (topic.endsWith('/recall') && message.isRecall) {
// 处理撤回消息
console.log('收到撤回指令:', message)
if (this.recallCallback) {
this.recallCallback(message)
}
ElNotification({
title: '消息已撤回',
message: '一条消息已被发送者撤回',
type: 'info'
})
} else if (topic.endsWith('/notifications')) {
// 处理通知消息
console.log('收到新通知:', message)
if (this.notificationCallback) {
this.notificationCallback(message)
}
// 显示通知
ElNotification({
title: message.title,
message: message.content,
type: this.getNotificationTypeIcon(message.type),
duration: 5000
})
}
} catch (e) {
console.error('解析消息失败:', e)
}
}
/**
* 根据通知类型获取对应的图标
*/
getNotificationTypeIcon(type) {
switch (type) {
case 'SYSTEM':
return 'warning'
case 'FRIEND_REQUEST':
return 'success'
case 'MESSAGE':
return 'info'
default:
return 'info'
}
}
/**
* 安排重连
*/
scheduleReconnect() {
if (!this.client.connected) {
ElNotification({
title: '连接断开',
message: `将在 ${this.reconnectTimeout / 1000} 秒后尝试重连`,
type: 'warning'
})
// 指数退避重连
setTimeout(() => {
if (!this.connected) {
this.connect()
this.reconnectTimeout = Math.min(this.reconnectTimeout * 2, this.maxReconnectTimeout)
}
}, this.reconnectTimeout)
}
}
/**
* 断开连接
*/
disconnect() {
if (this.client) {
this.client.end()
this.client = null
this.connected = false
console.log('MQTT连接已手动断开')
}
}
/**
* 检查连接状态
*/
isConnected() {
return this.connected
}
}
// 导出单例实例
export default new MqttService()
4.3 API服务封装
import axios from 'axios'
import { ElMessage } from 'element-plus'
// 创建axios实例
const api = axios.create({
baseURL: 'http://localhost:8080/api',
timeout: 10000,
headers: {
'Content-Type': 'application/json'
}
})
// 请求拦截器 - 添加认证令牌
api.interceptors.request.use(
(config) => {
const token = localStorage.getItem('token')
if (token) {
config.headers.Authorization = `Bearer ${token}`
}
return config
},
(error) => {
return Promise.reject(error)
}
)
// 响应拦截器 - 处理错误
api.interceptors.response.use(
(response) => {
return response.data
},
(error) => {
const message = error.response?.data?.error || error.message || '请求失败'
ElMessage.error(message)
return Promise.reject(error)
}
)
// 通知相关API
export const notificationApi = {
/**
* 发送通知
*/
sendNotification: (data) => {
return api.post('/notifications', data)
},
/**
* 撤回通知
*/
recallNotification: (messageId) => {
return api.post(`/notifications/${messageId}/recall`)
},
/**
* 获取当前用户的通知列表
*/
getUserNotifications: (page = 0, size = 20) => {
return api.get(`/notifications?page=${page}&size=${size}`)
},
/**
* 标记消息为已读
*/
markAsRead: (messageId) => {
return api.put(`/notifications/${messageId}/read`)
},
/**
* 获取未读消息数量
*/
getUnreadCount: () => {
return api.get('/notifications/unread/count')
}
}
// 用户认证相关API
export const authApi = {
/**
* 用户登录
*/
login: (credentials) => {
return api.post('/auth/login', credentials)
},
/**
* 用户注册
*/
register: (userData) => {
return api.post('/auth/register', userData)
},
/**
* 获取当前用户信息
*/
getCurrentUser: () => {
return api.get('/auth/me')
}
}
export default api
4.4 通知列表组件
<template>
<div class="notification-list">
<el-card>
<div slot="header" class="card-header">
<h2>通知中心</h2>
<el-badge :value="unreadCount" class="notification-badge">
<el-button size="small" @click="markAllAsRead">
全部标为已读
</el-button>
</el-badge>
</div>
<el-empty
v-if="notifications.length === 0 && !loading"
description="暂无通知"
></el-empty>
<el-skeleton
v-if="loading"
:rows="5"
animated
></el-skeleton>
<el-timeline v-else>
<el-timeline-item
v-for="notification in notifications"
:key="notification.messageId"
:timestamp="formatTime(notification.createdAt)"
:icon="getIcon(notification.type)"
:color="getColor(notification.type)"
placement="top"
>
<el-card
:class="{ 'unread-notification': !notification.read, 'recalled-notification': notification.status === 'RECALLED' }"
>
<div class="notification-header">
<h3>{{ notification.title }}</h3>
<span class="notification-type">{{ getTypeName(notification.type) }}</span>
</div>
<div v-if="notification.status === 'RECALLED'" class="recall-message">
此消息已被撤回
</div>
<div v-else class="notification-content">
{{ notification.content }}
</div>
<div class="notification-actions">
<el-button
size="mini"
text
@click="markAsRead(notification.messageId)"
v-if="!notification.read && notification.status !== 'RECALLED'"
>
标为已读
</el-button>
<el-button
size="mini"
text
type="danger"
@click="handleRecall(notification.messageId)"
v-if="canRecall(notification)"
>
撤回
</el-button>
</div>
</el-card>
</el-timeline-item>
</el-timeline>
<el-pagination
v-if="notifications.length > 0"
class="pagination"
:current-page="currentPage"
:page-size="pageSize"
:total="totalNotifications"
@current-change="handlePageChange"
layout="prev, pager, next"
></el-pagination>
</el-card>
</div>
</template>
<script setup>
import { ref, onMounted, computed } from 'vue'
import { notificationApi } from '../services/apiService'
import { ElMessageBox, ElMessage } from 'element-plus'
import mqttService from '../services/mqttService'
import { formatDistanceToNow } from 'date-fns'
import { zhCN } from 'date-fns/locale'
// 状态变量
const notifications = ref([])
const loading = ref(true)
const currentPage = ref(0)
const pageSize = ref(20)
const totalNotifications = ref(0)
const unreadCount = ref(0)
const userId = ref(localStorage.getItem('userId'))
// 初始化
onMounted(() => {
fetchNotifications()
fetchUnreadCount()
setupMqttListeners()
})
// 设置MQTT监听器
const setupMqttListeners = () => {
if (userId.value) {
mqttService.init(
userId.value,
handleNewNotification, // 处理新通知
handleMessageRecall // 处理消息撤回
)
}
}
// 获取通知列表
const fetchNotifications = async () => {
try {
loading.value = true
const response = await notificationApi.getUserNotifications(currentPage.value, pageSize.value)
notifications.value = response.content
totalNotifications.value = response.totalElements
} catch (error) {
console.error('获取通知失败:', error)
} finally {
loading.value = false
}
}
// 获取未读消息数量
const fetchUnreadCount = async () => {
try {
const count = await notificationApi.getUnreadCount()
unreadCount.value = count
} catch (error) {
console.error('获取未读数量失败:', error)
}
}
// 处理新通知
const handleNewNotification = (newNotification) => {
// 添加到列表头部
notifications.value.unshift(newNotification)
// 更新未读计数
fetchUnreadCount()
}
// 处理消息撤回
const handleMessageRecall = (recallMessage) => {
// 在本地标记消息为已撤回
const index = notifications.value.findIndex(
n => n.messageId === recallMessage.messageId
)
if (index !== -1) {
notifications.value[index].status = 'RECALLED'
}
}
// 标记消息为已读
const markAsRead = async (messageId) => {
try {
await notificationApi.markAsRead(messageId)
// 更新本地状态
const index = notifications.value.findIndex(n => n.messageId === messageId)
if (index !== -1) {
notifications.value[index].read = true
}
fetchUnreadCount()
ElMessage.success('已标记为已读')
} catch (error) {
console.error('标记已读失败:', error)
}
}
// 全部标为已读
const markAllAsRead = async () => {
try {
// 这里简化处理,实际应调用后端批量接口
const unreadNotifications = notifications.value.filter(n => !n.read)
for (const notification of unreadNotifications) {
await notificationApi.markAsRead(notification.messageId)
notification.read = true
}
fetchUnreadCount()
ElMessage.success('全部已标为已读')
} catch (error) {
console.error('批量标记已读失败:', error)
}
}
// 撤回消息
const handleRecall = async (messageId) => {
try {
const confirm = await ElMessageBox.confirm(
'确定要撤回这条消息吗?',
'确认撤回',
{
confirmButtonText: '确定',
cancelButtonText: '取消',
type: 'warning'
}
)
if (confirm) {
const result = await notificationApi.recallNotification(messageId)
if (result) {
// 更新本地状态
const index = notifications.value.findIndex(n => n.messageId === messageId)
if (index !== -1) {
notifications.value[index].status = 'RECALLED'
}
ElMessage.success('消息已撤回')
}
}
} catch (error) {
console.error('撤回消息失败:', error)
}
}
// 分页变化
const handlePageChange = (page) => {
currentPage.value = page - 1 // 后端页码从0开始
fetchNotifications()
}
// 格式化时间
const formatTime = (dateString) => {
return formatDistanceToNow(new Date(dateString), {
addSuffix: true,
locale: zhCN
})
}
// 根据类型获取图标
const getIcon = (type) => {
switch (type) {
case 'SYSTEM': return 'el-icon-warning'
case 'FRIEND_REQUEST': return 'el-icon-user-plus'
case 'COMMENT': return 'el-icon-comment'
case 'LIKE': return 'el-icon-heart'
case 'MENTION': return 'el-icon-at'
case 'MESSAGE': return 'el-icon-message'
default: return 'el-icon-bell'
}
}
// 根据类型获取颜色
const getColor = (type) => {
switch (type) {
case 'SYSTEM': return '#e6a23c'
case 'FRIEND_REQUEST': return '#52c41a'
case 'MESSAGE': return '#1890ff'
default: return '#8c8c8c'
}
}
// 获取类型名称
const getTypeName = (type) => {
const typeMap = {
'SYSTEM': '系统通知',
'FRIEND_REQUEST': '好友请求',
'COMMENT': '评论',
'LIKE': '点赞',
'MENTION': '提及',
'MESSAGE': '私信'
}
return typeMap[type] || type
}
// 判断是否可以撤回
const canRecall = (notification) => {
// 只有自己发送的消息且未撤回的才能撤回
return notification.senderId === userId.value &&
notification.status !== 'RECALLED' &&
// 5分钟内可以撤回
new Date() - new Date(notification.createdAt) < 5 * 60 * 1000
}
</script>
<style scoped>
.card-header {
display: flex;
justify-content: space-between;
align-items: center;
}
.notification-badge {
margin-left: 10px;
}
.notification-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 10px;
}
.notification-type {
font-size: 12px;
padding: 2px 8px;
border-radius: 12px;
background-color: #f0f2f5;
}
.notification-content {
margin-bottom: 10px;
line-height: 1.6;
}
.recall-message {
margin-bottom: 10px;
padding: 10px;
background-color: #fff1f0;
border-radius: 4px;
color: #f5222d;
font-size: 14px;
}
.notification-actions {
display: flex;
justify-content: flex-end;
gap: 10px;
}
.unread-notification {
border-left: 3px solid #1890ff;
}
.recalled-notification {
opacity: 0.7;
}
.pagination {
margin-top: 20px;
text-align: center;
}
</style>
4.5 发送通知组件
<template>
<el-card>
<div slot="header">
<h2>发送通知</h2>
</div>
<el-form
ref="formRef"
:model="form"
:rules="rules"
label-width="100px"
class="send-notification-form"
>
<el-form-item label="接收用户ID" prop="userId">
<el-input v-model="form.userId" placeholder="请输入接收用户ID"></el-input>
</el-form-item>
<el-form-item label="通知类型" prop="type">
<el-select v-model="form.type" placeholder="请选择通知类型">
<el-option label="系统通知" value="SYSTEM"></el-option>
<el-option label="好友请求" value="FRIEND_REQUEST"></el-option>
<el-option label="评论" value="COMMENT"></el-option>
<el-option label="点赞" value="LIKE"></el-option>
<el-option label="提及" value="MENTION"></el-option>
<el-option label="私信" value="MESSAGE"></el-option>
</el-select>
</el-form-item>
<el-form-item label="优先级" prop="priority">
<el-slider
v-model="form.priority"
:min="0"
:max="9"
:step="1"
show-input
></el-slider>
<div class="priority-info">
<span>低</span>
<span class="priority-high">高</span>
</div>
</el-form-item>
<el-form-item label="标题" prop="title">
<el-input v-model="form.title" placeholder="请输入通知标题"></el-input>
</el-form-item>
<el-form-item label="内容" prop="content">
<el-input
v-model="form.content"
type="textarea"
:rows="4"
placeholder="请输入通知内容"
></el-input>
</el-form-item>
<el-form-item>
<el-button type="primary" @click="submitForm">发送通知</el-button>
<el-button @click="resetForm">重置</el-button>
</el-form-item>
</el-form>
</el-card>
</template>
<script setup>
import { ref, reactive } from 'vue'
import { notificationApi } from '../services/apiService'
import { ElMessage } from 'element-plus'
// 表单引用
const formRef = ref(null)
// 表单数据
const form = reactive({
userId: '',
type: '',
priority: 5,
title: '',
content: ''
})
// 表单规则
const rules = reactive({
userId: [
{ required: true, message: '请输入接收用户ID', trigger: 'blur' }
],
type: [
{ required: true, message: '请选择通知类型', trigger: 'change' }
],
title: [
{ required: true, message: '请输入通知标题', trigger: 'blur' },
{ max: 50, message: '标题长度不能超过50个字符', trigger: 'blur' }
],
content: [
{ required: true, message: '请输入通知内容', trigger: 'blur' },
{ max: 500, message: '内容长度不能超过500个字符', trigger: 'blur' }
]
})
// 提交表单
const submitForm = async () => {
if (!formRef.value) return
try {
await formRef.value.validate()
// 发送通知
const messageId = await notificationApi.sendNotification(form)
ElMessage.success(`通知发送成功,消息ID: ${messageId}`)
resetForm()
} catch (error) {
console.error('发送通知失败:', error)
if (error.name !== 'ValidationError') {
ElMessage.error('发送通知失败,请稍后重试')
}
}
}
// 重置表单
const resetForm = () => {
if (formRef.value) {
formRef.value.resetFields()
}
}
</script>
<style scoped>
.send-notification-form {
max-width: 600px;
}
.priority-info {
display: flex;
justify-content: space-between;
margin-top: -10px;
margin-bottom: 10px;
font-size: 12px;
color: #606266;
}
.priority-high {
color: #f56c6c;
}
</style>
4.6 主页面组件
<template>
<div class="home-container">
<el-row :gutter="20">
<el-col :span="12">
<SendNotification />
</el-col>
<el-col :span="12">
<div class="connection-status">
<el-card>
<div slot="header">
<h2>连接状态</h2>
</div>
<div class="status-info">
<el-descriptions column="1">
<el-descriptions-item label="MQTT连接状态">
<el-tag :type="mqttConnected ? 'success' : 'danger'">
{{ mqttConnected ? '已连接' : '未连接' }}
</el-tag>
</el-descriptions-item>
<el-descriptions-item label="当前用户">
{{ currentUser || '未登录' }}
</el-descriptions-item>
<el-descriptions-item label="未读消息">
<el-badge :value="unreadCount" class="item">
<span>{{ unreadCount }} 条</span>
</el-badge>
</el-descriptions-item>
</el-descriptions>
<el-button
v-if="!mqttConnected && currentUser"
type="primary"
@click="reconnectMqtt"
style="margin-top: 15px;"
>
重新连接
</el-button>
</div>
</el-card>
</div>
</el-col>
</el-row>
<el-row style="margin-top: 20px;">
<el-col :span="24">
<NotificationList />
</el-col>
</el-row>
</div>
</template>
<script setup>
import { ref, onMounted, watchEffect } from 'vue'
import SendNotification from '../components/SendNotification.vue'
import NotificationList from '../components/NotificationList.vue'
import mqttService from '../services/mqttService'
import { authApi, notificationApi } from '../services/apiService'
// 状态变量
const mqttConnected = ref(false)
const currentUser = ref('')
const unreadCount = ref(0)
// 初始化
onMounted(() => {
checkLoginStatus()
setupMqttStatusListener()
fetchUnreadCount()
})
// 检查登录状态
const checkLoginStatus = async () => {
try {
const user = await authApi.getCurrentUser()
currentUser.value = user.id
localStorage.setItem('userId', user.id)
} catch (error) {
console.log('用户未登录或会话已过期')
// 这里可以跳转到登录页
}
}
// 设置MQTT状态监听器
const setupMqttStatusListener = () => {
// 定期检查连接状态
setInterval(() => {
mqttConnected.value = mqttService.isConnected()
}, 1000)
}
// 获取未读消息数量
const fetchUnreadCount = async () => {
try {
const count = await notificationApi.getUnreadCount()
unreadCount.value = count
} catch (error) {
console.error('获取未读数量失败:', error)
}
}
// 重新连接MQTT
const reconnectMqtt = () => {
if (currentUser.value) {
mqttService.init(currentUser.value, () => {}, () => {})
}
}
// 监听未读数量变化
watchEffect(() => {
// 每30秒刷新一次未读数量
const timer = setInterval(fetchUnreadCount, 30000)
return () => clearInterval(timer)
})
</script>
<style scoped>
.home-container {
padding: 20px;
}
.connection-status {
height: 100%;
}
.status-info {
padding: 10px 0;
}
</style>
5. 系统测试与验证
5.1 功能测试
- 用户A登录系统,进入通知中心
- 用户B发送通知给用户A,验证用户A能否实时收到
- 用户B撤回消息,验证用户A的界面是否显示消息已撤回
- 断开网络连接,再重新连接,验证是否能收到离线期间的消息
- 发送高优先级消息,验证是否优先展示
5.2 性能测试
- 使用工具模拟1000个并发连接,测试系统稳定性
- 连续发送10000条消息,测试消息处理能力和延迟
- 测试网络波动情况下的自动重连和消息重试机制
6. 总结
本系统基于MQTT协议实现了一个功能完整的实时消息推送系统,具有以下特点:
- 实时性:利用MQTT的发布/订阅模式和长连接特性,实现消息的实时推送
- 可靠性:通过QoS 2级别保证消息恰好一次送达,配合重试机制处理发送失败情况
- 优先级支持:基于MQTT 5.0的消息优先级属性,实现不同重要程度消息的分级处理
- 消息撤回:通过撤回指令消息实现逻辑上的消息撤回功能
- 断线重连:客户端实现自动重连机制,确保连接稳定性
- 离线消息:利用MQTT的会话保持特性,支持客户端重连后接收离线消息
系统架构清晰,前后端分离,易于扩展和维护,可根据实际业务需求进一步扩展功能。
补充(Vue2前端)
如果使用的前端为vue2,前端实现可参考以下
前端实现(Vue 2)
4.1 项目初始化
# 创建Vue 2项目
vue create mqtt-notification-client-vue2
# 选择Vue 2版本
# 安装依赖
cd mqtt-notification-client-vue2
npm install mqtt element-ui axios jsonwebtoken moment
4.2 MQTT服务封装
import mqtt from 'mqtt/dist/mqtt.min'
import { Notification } from 'element-ui'
class MqttService {
constructor() {
this.client = null
this.userId = null
this.connected = false
this.reconnectTimeout = 1000 // 初始重连延迟(ms)
this.maxReconnectTimeout = 30000 // 最大重连延迟
this.notificationCallback = null // 接收通知的回调
this.recallCallback = null // 接收撤回的回调
}
/**
* 初始化MQTT连接
* @param {string} userId 用户ID
* @param {function} notificationCallback 通知回调函数
* @param {function} recallCallback 撤回回调函数
*/
init(userId, notificationCallback, recallCallback) {
this.userId = userId
this.notificationCallback = notificationCallback
this.recallCallback = recallCallback
// 如果已连接则先断开
if (this.client) {
this.disconnect()
}
this.connect()
}
/**
* 连接到MQTT Broker
*/
connect() {
if (!this.userId) {
console.error('用户ID不能为空')
return
}
// 生成客户端ID
const clientId = `web-client-${this.userId}-${Math.random().toString(16).substr(2, 8)}`
// MQTT over WebSocket连接地址
const brokerUrl = 'ws://localhost:8083/mqtt'
// 连接选项
const options = {
clientId,
clean: false, // 保持会话,重连后可接收离线消息
connectTimeout: 4000,
username: 'admin', // 实际环境中应使用JWT令牌
password: 'public',
keepalive: 30 // 30秒心跳
}
// 连接到broker
this.client = mqtt.connect(brokerUrl, options)
// 连接成功
this.client.on('connect', (connack) => {
this.connected = true
this.reconnectTimeout = 1000 // 重置重连延迟
console.log('MQTT连接成功', connack)
Notification.success({
title: '连接成功',
message: '已成功连接到消息服务器',
duration: 3000
})
// 订阅用户专属主题
const topics = [
`user/${this.userId}/notifications`,
`user/${this.userId}/recall`
]
this.client.subscribe(topics, { qos: 2 }, (err) => {
if (!err) {
console.log(`已订阅主题: ${topics.join(', ')}`)
} else {
console.error('订阅主题失败:', err)
Notification.error({
title: '订阅失败',
message: '无法订阅通知主题'
})
}
})
})
// 接收消息
this.client.on('message', (topic, message) => {
this.handleMessage(topic, message.toString())
})
// 连接断开
this.client.on('close', () => {
this.connected = false
console.log('MQTT连接已关闭')
this.scheduleReconnect()
})
// 连接错误
this.client.on('error', (err) => {
this.connected = false
console.error('MQTT错误:', err)
Notification.error({
title: '连接错误',
message: '消息服务连接发生错误'
})
this.scheduleReconnect()
})
// 重连中
this.client.on('reconnect', () => {
console.log('正在重连到MQTT服务器...')
})
}
/**
* 处理接收到的消息
*/
handleMessage(topic, payload) {
try {
const message = JSON.parse(payload)
if (topic.endsWith('/recall') && message.isRecall) {
// 处理撤回消息
console.log('收到撤回指令:', message)
if (this.recallCallback) {
this.recallCallback(message)
}
Notification.info({
title: '消息已撤回',
message: '一条消息已被发送者撤回'
})
} else if (topic.endsWith('/notifications')) {
// 处理通知消息
console.log('收到新通知:', message)
if (this.notificationCallback) {
this.notificationCallback(message)
}
// 显示通知
Notification({
title: message.title,
message: message.content,
type: this.getNotificationTypeIcon(message.type),
duration: 5000
})
}
} catch (e) {
console.error('解析消息失败:', e)
}
}
/**
* 根据通知类型获取对应的图标
*/
getNotificationTypeIcon(type) {
switch (type) {
case 'SYSTEM':
return 'warning'
case 'FRIEND_REQUEST':
return 'success'
case 'MESSAGE':
return 'info'
default:
return 'info'
}
}
/**
* 安排重连
*/
scheduleReconnect() {
if (!this.client.connected) {
Notification.warning({
title: '连接断开',
message: `将在 ${this.reconnectTimeout / 1000} 秒后尝试重连`
})
// 指数退避重连
setTimeout(() => {
if (!this.connected) {
this.connect()
this.reconnectTimeout = Math.min(this.reconnectTimeout * 2, this.maxReconnectTimeout)
}
}, this.reconnectTimeout)
}
}
/**
* 断开连接
*/
disconnect() {
if (this.client) {
this.client.end()
this.client = null
this.connected = false
console.log('MQTT连接已手动断开')
}
}
/**
* 检查连接状态
*/
isConnected() {
return this.connected
}
}
// 导出单例实例
export default new MqttService()
4.3 API服务封装
import axios from 'axios'
import { Message } from 'element-ui'
// 创建axios实例
const api = axios.create({
baseURL: 'http://localhost:8080/api',
timeout: 10000,
headers: {
'Content-Type': 'application/json'
}
})
// 请求拦截器 - 添加认证令牌
api.interceptors.request.use(
(config) => {
const token = localStorage.getItem('token')
if (token) {
config.headers.Authorization = `Bearer ${token}`
}
return config
},
(error) => {
return Promise.reject(error)
}
)
// 响应拦截器 - 处理错误
api.interceptors.response.use(
(response) => {
return response.data
},
(error) => {
const message = error.response?.data?.error || error.message || '请求失败'
Message.error(message)
return Promise.reject(error)
}
)
// 通知相关API
export const notificationApi = {
/**
* 发送通知
*/
sendNotification: (data) => {
return api.post('/notifications', data)
},
/**
* 撤回通知
*/
recallNotification: (messageId) => {
return api.post(`/notifications/${messageId}/recall`)
},
/**
* 获取当前用户的通知列表
*/
getUserNotifications: (page = 0, size = 20) => {
return api.get(`/notifications?page=${page}&size=${size}`)
},
/**
* 标记消息为已读
*/
markAsRead: (messageId) => {
return api.put(`/notifications/${messageId}/read`)
},
/**
* 获取未读消息数量
*/
getUnreadCount: () => {
return api.get('/notifications/unread/count')
}
}
// 用户认证相关API
export const authApi = {
/**
* 用户登录
*/
login: (credentials) => {
return api.post('/auth/login', credentials)
},
/**
* 用户注册
*/
register: (userData) => {
return api.post('/auth/register', userData)
},
/**
* 获取当前用户信息
*/
getCurrentUser: () => {
return api.get('/auth/me')
}
}
export default api
4.4 通知列表组件
<template>
<div class="notification-list">
<el-card>
<div slot="header" class="card-header">
<h2>通知中心</h2>
<el-badge :value="unreadCount" class="notification-badge">
<el-button size="small" @click="markAllAsRead">
全部标为已读
</el-button>
</el-badge>
</div>
<el-empty
v-if="notifications.length === 0 && !loading"
description="暂无通知"
></el-empty>
<el-skeleton
v-if="loading"
:rows="5"
animated
></el-skeleton>
<el-timeline v-else>
<el-timeline-item
v-for="notification in notifications"
:key="notification.messageId"
:timestamp="formatTime(notification.createdAt)"
:icon="getIcon(notification.type)"
:color="getColor(notification.type)"
placement="top"
>
<el-card
:class="{ 'unread-notification': !notification.read, 'recalled-notification': notification.status === 'RECALLED' }"
>
<div class="notification-header">
<h3>{{ notification.title }}</h3>
<span class="notification-type">{{ getTypeName(notification.type) }}</span>
</div>
<div v-if="notification.status === 'RECALLED'" class="recall-message">
此消息已被撤回
</div>
<div v-else class="notification-content">
{{ notification.content }}
</div>
<div class="notification-actions">
<el-button
size="mini"
type="text"
@click="markAsRead(notification.messageId)"
v-if="!notification.read && notification.status !== 'RECALLED'"
>
标为已读
</el-button>
<el-button
size="mini"
type="text"
style="color: #F56C6C;"
@click="handleRecall(notification.messageId)"
v-if="canRecall(notification)"
>
撤回
</el-button>
</div>
</el-card>
</el-timeline-item>
</el-timeline>
<el-pagination
v-if="notifications.length > 0"
class="pagination"
:current-page="currentPage + 1"
:page-size="pageSize"
:total="totalNotifications"
@current-change="handlePageChange"
layout="prev, pager, next"
></el-pagination>
</el-card>
</div>
</template>
<script>
import { notificationApi } from '../services/apiService'
import { MessageBox, Message } from 'element-ui'
import mqttService from '../services/mqttService'
import moment from 'moment'
export default {
name: 'NotificationList',
data() {
return {
notifications: [],
loading: true,
currentPage: 0,
pageSize: 20,
totalNotifications: 0,
unreadCount: 0,
userId: localStorage.getItem('userId')
}
},
mounted() {
this.fetchNotifications()
this.fetchUnreadCount()
this.setupMqttListeners()
},
methods: {
setupMqttListeners() {
if (this.userId) {
mqttService.init(
this.userId,
this.handleNewNotification, // 处理新通知
this.handleMessageRecall // 处理消息撤回
)
}
},
fetchNotifications() {
this.loading = true
notificationApi.getUserNotifications(this.currentPage, this.pageSize)
.then(response => {
this.notifications = response.content
this.totalNotifications = response.totalElements
})
.catch(error => {
console.error('获取通知失败:', error)
})
.finally(() => {
this.loading = false
})
},
fetchUnreadCount() {
notificationApi.getUnreadCount()
.then(count => {
this.unreadCount = count
})
.catch(error => {
console.error('获取未读数量失败:', error)
})
},
handleNewNotification(newNotification) {
// 添加到列表头部
this.notifications.unshift(newNotification)
// 更新未读计数
this.fetchUnreadCount()
},
handleMessageRecall(recallMessage) {
// 在本地标记消息为已撤回
const index = this.notifications.findIndex(
n => n.messageId === recallMessage.messageId
)
if (index !== -1) {
this.$set(this.notifications, index, {
...this.notifications[index],
status: 'RECALLED'
})
}
},
markAsRead(messageId) {
notificationApi.markAsRead(messageId)
.then(() => {
// 更新本地状态
const index = this.notifications.findIndex(n => n.messageId === messageId)
if (index !== -1) {
this.$set(this.notifications[index], 'read', true)
}
this.fetchUnreadCount()
Message.success('已标记为已读')
})
.catch(error => {
console.error('标记已读失败:', error)
})
},
markAllAsRead() {
// 这里简化处理,实际应调用后端批量接口
const unreadNotifications = this.notifications.filter(n => !n.read)
Promise.all(unreadNotifications.map(notification =>
notificationApi.markAsRead(notification.messageId)
)).then(() => {
unreadNotifications.forEach((notification, index) => {
const localIndex = this.notifications.findIndex(
n => n.messageId === notification.messageId
)
if (localIndex !== -1) {
this.$set(this.notifications[localIndex], 'read', true)
}
})
this.fetchUnreadCount()
Message.success('全部已标为已读')
}).catch(error => {
console.error('批量标记已读失败:', error)
})
},
handleRecall(messageId) {
MessageBox.confirm(
'确定要撤回这条消息吗?',
'确认撤回',
{
confirmButtonText: '确定',
cancelButtonText: '取消',
type: 'warning'
}
).then(() => {
return notificationApi.recallNotification(messageId)
}).then(result => {
if (result) {
// 更新本地状态
const index = this.notifications.findIndex(n => n.messageId === messageId)
if (index !== -1) {
this.$set(this.notifications, index, {
...this.notifications[index],
status: 'RECALLED'
})
}
Message.success('消息已撤回')
}
}).catch(error => {
if (error !== 'cancel') { // 排除用户取消的情况
console.error('撤回消息失败:', error)
}
})
},
handlePageChange(page) {
this.currentPage = page - 1 // 后端页码从0开始
this.fetchNotifications()
},
formatTime(dateString) {
return moment(dateString).fromNow()
},
getIcon(type) {
switch (type) {
case 'SYSTEM': return 'el-icon-warning'
case 'FRIEND_REQUEST': return 'el-icon-user-plus'
case 'COMMENT': return 'el-icon-comment'
case 'LIKE': return 'el-icon-heart'
case 'MENTION': return 'el-icon-at'
case 'MESSAGE': return 'el-icon-message'
default: return 'el-icon-bell'
}
},
getColor(type) {
switch (type) {
case 'SYSTEM': return '#e6a23c'
case 'FRIEND_REQUEST': return '#52c41a'
case 'MESSAGE': return '#1890ff'
default: return '#8c8c8c'
}
},
getTypeName(type) {
const typeMap = {
'SYSTEM': '系统通知',
'FRIEND_REQUEST': '好友请求',
'COMMENT': '评论',
'LIKE': '点赞',
'MENTION': '提及',
'MESSAGE': '私信'
}
return typeMap[type] || type
},
canRecall(notification) {
// 只有自己发送的消息且未撤回的才能撤回
return notification.senderId === this.userId &&
notification.status !== 'RECALLED' &&
// 5分钟内可以撤回
new Date() - new Date(notification.createdAt) < 5 * 60 * 1000
}
}
}
</script>
<style scoped>
.card-header {
display: flex;
justify-content: space-between;
align-items: center;
}
.notification-badge {
margin-left: 10px;
}
.notification-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 10px;
}
.notification-type {
font-size: 12px;
padding: 2px 8px;
border-radius: 12px;
background-color: #f0f2f5;
}
.notification-content {
margin-bottom: 10px;
line-height: 1.6;
}
.recall-message {
margin-bottom: 10px;
padding: 10px;
background-color: #fff1f0;
border-radius: 4px;
color: #f5222d;
font-size: 14px;
}
.notification-actions {
display: flex;
justify-content: flex-end;
gap: 10px;
}
.unread-notification {
border-left: 3px solid #1890ff;
}
.recalled-notification {
opacity: 0.7;
}
.pagination {
margin-top: 20px;
text-align: center;
}
</style>
4.5 发送通知组件
<template>
<el-card>
<div slot="header">
<h2>发送通知</h2>
</div>
<el-form
ref="form"
:model="form"
:rules="rules"
label-width="100px"
class="send-notification-form"
>
<el-form-item label="接收用户ID" prop="userId">
<el-input v-model="form.userId" placeholder="请输入接收用户ID"></el-input>
</el-form-item>
<el-form-item label="通知类型" prop="type">
<el-select v-model="form.type" placeholder="请选择通知类型">
<el-option label="系统通知" value="SYSTEM"></el-option>
<el-option label="好友请求" value="FRIEND_REQUEST"></el-option>
<el-option label="评论" value="COMMENT"></el-option>
<el-option label="点赞" value="LIKE"></el-option>
<el-option label="提及" value="MENTION"></el-option>
<el-option label="私信" value="MESSAGE"></el-option>
</el-select>
</el-form-item>
<el-form-item label="优先级" prop="priority">
<el-slider
v-model="form.priority"
:min="0"
:max="9"
:step="1"
show-input
></el-slider>
<div class="priority-info">
<span>低</span>
<span class="priority-high">高</span>
</div>
</el-form-item>
<el-form-item label="标题" prop="title">
<el-input v-model="form.title" placeholder="请输入通知标题"></el-input>
</el-form-item>
<el-form-item label="内容" prop="content">
<el-input
v-model="form.content"
type="textarea"
:rows="4"
placeholder="请输入通知内容"
></el-input>
</el-form-item>
<el-form-item>
<el-button type="primary" @click="submitForm">发送通知</el-button>
<el-button @click="resetForm">重置</el-button>
</el-form-item>
</el-form>
</el-card>
</template>
<script>
import { notificationApi } from '../services/apiService'
import { Message } from 'element-ui'
export default {
name: 'SendNotification',
data() {
return {
form: {
userId: '',
type: '',
priority: 5,
title: '',
content: ''
},
rules: {
userId: [
{ required: true, message: '请输入接收用户ID', trigger: 'blur' }
],
type: [
{ required: true, message: '请选择通知类型', trigger: 'change' }
],
title: [
{ required: true, message: '请输入通知标题', trigger: 'blur' },
{ max: 50, message: '标题长度不能超过50个字符', trigger: 'blur' }
],
content: [
{ required: true, message: '请输入通知内容', trigger: 'blur' },
{ max: 500, message: '内容长度不能超过500个字符', trigger: 'blur' }
]
}
}
},
methods: {
submitForm() {
this.$refs.form.validate((valid) => {
if (valid) {
notificationApi.sendNotification(this.form)
.then(messageId => {
Message.success(`通知发送成功,消息ID: ${messageId}`)
this.resetForm()
})
.catch(error => {
console.error('发送通知失败:', error)
Message.error('发送通知失败,请稍后重试')
})
}
})
},
resetForm() {
this.$refs.form.resetFields()
}
}
}
</script>
<style scoped>
.send-notification-form {
max-width: 600px;
}
.priority-info {
display: flex;
justify-content: space-between;
margin-top: -10px;
margin-bottom: 10px;
font-size: 12px;
color: #606266;
}
.priority-high {
color: #f56c6c;
}
</style>
4.6 主页面组件
<template>
<div class="home-container">
<el-row :gutter="20">
<el-col :span="12">
<send-notification />
</el-col>
<el-col :span="12">
<div class="connection-status">
<el-card>
<div slot="header">
<h2>连接状态</h2>
</div>
<div class="status-info">
<el-descriptions column="1">
<el-descriptions-item label="MQTT连接状态">
<el-tag :type="mqttConnected ? 'success' : 'danger'">
{{ mqttConnected ? '已连接' : '未连接' }}
</el-tag>
</el-descriptions-item>
<el-descriptions-item label="当前用户">
{{ currentUser || '未登录' }}
</el-descriptions-item>
<el-descriptions-item label="未读消息">
<el-badge :value="unreadCount" class="item">
<span>{{ unreadCount }} 条</span>
</el-badge>
</el-descriptions-item>
</el-descriptions>
<el-button
v-if="!mqttConnected && currentUser"
type="primary"
@click="reconnectMqtt"
style="margin-top: 15px;"
>
重新连接
</el-button>
</div>
</el-card>
</div>
</el-col>
</el-row>
<el-row style="margin-top: 20px;">
<el-col :span="24">
<notification-list />
</el-col>
</el-row>
</div>
</template>
<script>
import SendNotification from '../components/SendNotification.vue'
import NotificationList from '../components/NotificationList.vue'
import mqttService from '../services/mqttService'
import { authApi, notificationApi } from '../services/apiService'
export default {
name: 'Home',
components: {
SendNotification,
NotificationList
},
data() {
return {
mqttConnected: false,
currentUser: '',
unreadCount: 0,
statusCheckInterval: null
}
},
mounted() {
this.checkLoginStatus()
this.setupMqttStatusListener()
this.fetchUnreadCount()
// 每30秒刷新一次未读数量
setInterval(() => {
this.fetchUnreadCount()
}, 30000)
},
beforeDestroy() {
// 清除定时器
if (this.statusCheckInterval) {
clearInterval(this.statusCheckInterval)
}
},
methods: {
checkLoginStatus() {
authApi.getCurrentUser()
.then(user => {
this.currentUser = user.id
localStorage.setItem('userId', user.id)
})
.catch(error => {
console.log('用户未登录或会话已过期')
// 这里可以跳转到登录页
})
},
setupMqttStatusListener() {
// 定期检查连接状态
this.statusCheckInterval = setInterval(() => {
this.mqttConnected = mqttService.isConnected()
}, 1000)
},
fetchUnreadCount() {
notificationApi.getUnreadCount()
.then(count => {
this.unreadCount = count
})
.catch(error => {
console.error('获取未读数量失败:', error)
})
},
reconnectMqtt() {
if (this.currentUser) {
mqttService.init(this.currentUser, () => {}, () => {})
}
}
}
}
</script>
<style scoped>
.home-container {
padding: 20px;
}
.connection-status {
height: 100%;
}
.status-info {
padding: 10px 0;
}
</style>
4.7 入口文件配置
import Vue from 'vue'
import App from './App.vue'
import router from './router'
import ElementUI from 'element-ui'
import 'element-ui/lib/theme-chalk/index.css'
// 全局注册Element UI
Vue.use(ElementUI)
Vue.config.productionTip = false
new Vue({
router,
render: h => h(App)
}).$mount('#app')
4.8 路由配置
import Vue from 'vue'
import VueRouter from 'vue-router'
import Home from '../views/Home.vue'
Vue.use(VueRouter)
const routes = [
{
path: '/',
name: 'Home',
component: Home
},
{
path: '/login',
name: 'Login',
// 懒加载登录组件
component: () => import('../views/Login.vue')
}
]
const router = new VueRouter({
mode: 'history',
base: process.env.BASE_URL,
routes
})
export default router
主要改动说明
从Vue3迁移到Vue2的主要变化包括:
Composition API → Options API:将Vue3的
setup()
语法转换为Vue2的选项式API,使用data()
、methods
、mounted
等选项。响应式系统变化:
- 使用
this.$set
进行响应式数据更新,替代Vue3的ref
和reactive
- 移除
watchEffect
,使用setInterval
实现定时任务
- 使用
组件注册方式:在Vue2中需要显式注册组件,通过
components
选项。Element UI替代Element Plus:
- 导入和使用Element UI而非Element Plus
- 调整部分组件的属性和事件(如
type="text"
替代text
属性)
路由配置:Vue2使用Vue Router 3.x版本,与Vue3使用的4.x版本在配置上略有差异。
工具库替换:使用
moment.js
替代date-fns
处理日期格式化,更符合Vue2生态习惯。模板语法调整:保持基本一致,但移除了Vue3特有的语法如
v-model
参数。
这个Vue2版本的实现与之前的Vue3版本功能完全一致,包括实时消息推送、消息优先级、消息撤回、重试机制和连接稳定性保障等核心功能,同时保持了相同的UI风格和用户体验。