基于MQTT的实时消息推送系统设计与实现(Java后端+Vue前端)

发布于:2025-09-11 ⋅ 阅读:(13) ⋅ 点赞:(0)

基于MQTT的实时消息推送系统设计与实现(Java后端+Vue前端)

本文档将详细介绍如何使用MQTT协议实现一个具备实时推送、消息优先级、消息撤回、重试机制和连接稳定性保障的完整消息系统。我们将以社交应用的实时通知场景为例,提供完整的Java后端和Vue前端实现方案。

1. 系统设计概述

1.1 业务场景

我们实现一个社交应用的实时通知系统,支持:

  • 实时接收好友请求、评论、点赞等通知
  • 按优先级处理不同类型消息(系统通知 > 私信 > 点赞)
  • 消息发送者可在5分钟内撤回消息
  • 网络异常时自动重试发送
  • 客户端断线后自动重连并接收离线消息

1.2 系统架构

Vue前端应用
    |
    | MQTT over WebSocket
    v
MQTT Broker (EMQX)
    |
    | 发布/订阅模式
    v
Spring Boot后端服务
    |
    v
MySQL数据库 (存储消息记录)

1.3 技术选型

  • MQTT Broker: EMQX 5.0(支持MQTT 5.0,高并发)
  • 后端: Spring Boot 2.7.x + Eclipse Paho MQTT客户端
  • 前端: Vue 3 + MQTT.js + Element Plus
  • 数据库: MySQL 8.0
  • 认证: JWT令牌认证

2. 环境搭建

2.1 安装EMQX Broker

# 使用Docker安装EMQX
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 18083:18083 emqx/emqx:5.0.24

访问EMQX控制台:http://localhost:18083,默认账号密码:admin/public

2.2 创建主题结构

设计以下MQTT主题:

  • user/${userId}/notifications - 用户接收通知的主题
  • user/${userId}/recall - 用户接收消息撤回指令的主题
  • system/broadcast - 系统广播通知
  • notifications/dlq - 死信队列

3. 后端实现(Java/Spring Boot)

3.1 项目初始化

创建Spring Boot项目,添加依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.14</version>
        <relativePath/>
    </parent>
    
    <groupId>com.example</groupId>
    <artifactId>mqtt-notification-service</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    
    <name>mqtt-notification-service</name>
    <description>MQTT Notification Service with Spring Boot</description>
    
    <properties>
        <java.version>11</java.version>
    </properties>
    
    <dependencies>
        <!-- Spring Boot Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <!-- Spring Data JPA -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        
        <!-- MySQL Driver -->
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <scope>runtime</scope>
        </dependency>
        
        <!-- MQTT Client -->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>
        
        <!-- JWT Authentication -->
        <dependency>
            <groupId>io.jsonwebtoken</groupId>
            <artifactId>jjwt-api</artifactId>
            <version>0.11.5</version>
        </dependency>
        <dependency>
            <groupId>io.jsonwebtoken</groupId>
            <artifactId>jjwt-impl</artifactId>
            <version>0.11.5</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>io.jsonwebtoken</groupId>
            <artifactId>jjwt-jackson</artifactId>
            <version>0.11.5</version>
            <scope>runtime</scope>
        </dependency>
        
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        
        <!-- Validation -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
        </dependency>
        
        <!-- Testing -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

3.2 配置文件

# 服务器端口
server.port=8080

# 数据库配置
spring.datasource.url=jdbc:mysql://localhost:3306/mqtt_notification?useSSL=false&serverTimezone=UTC&allowPublicKeyRetrieval=true
spring.datasource.username=root
spring.datasource.password=password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

# JPA配置
spring.jpa.hibernate.ddl-auto=update
spring.jpa.show-sql=true
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL8Dialect

# MQTT配置
mqtt.broker-url=tcp://localhost:1883
mqtt.client-id=notification-server-${random.value}
mqtt.username=admin
mqtt.password=public
mqtt.keep-alive-interval=30
mqtt.connection-timeout=3000
mqtt.clean-session=false
mqtt.retry-interval=1000
mqtt.max-retry-attempts=3

# JWT配置
jwt.secret=your-secret-key-should-be-very-long-and-secure-for-production-use
jwt.expiration=86400000

# 业务配置
app.max-recall-minutes=5

3.3 MQTT客户端配置

package com.example.mqtt.config;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqttConfig {

    @Value("${mqtt.broker-url}")
    private String brokerUrl;

    @Value("${mqtt.client-id}")
    private String clientId;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.keep-alive-interval}")
    private int keepAliveInterval;

    @Value("${mqtt.connection-timeout}")
    private int connectionTimeout;

    @Value("${mqtt.clean-session}")
    private boolean cleanSession;

    @Bean
    public MqttClient mqttClient() throws MqttException {
        // 创建连接选项
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setKeepAliveInterval(keepAliveInterval);
        options.setConnectionTimeout(connectionTimeout);
        options.setCleanSession(cleanSession);
        options.setAutomaticReconnect(true); // 自动重连
        
        // 创建MQTT客户端
        MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
        client.connect(options);
        
        return client;
    }
}

3.4 数据模型设计

消息实体类
package com.example.mqtt.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.persistence.*;
import java.time.LocalDateTime;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Entity
@Table(name = "notifications")
public class Notification {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(unique = true, nullable = false)
    private String messageId;

    @Column(nullable = false)
    private String userId;

    @Column(nullable = false)
    private String senderId;

    @Column(nullable = false)
    private String title;

    @Column(nullable = false, columnDefinition = "TEXT")
    private String content;

    @Column(nullable = false)
    @Enumerated(EnumType.STRING)
    private NotificationType type;

    @Column(nullable = false)
    private int priority; // 0-9,9为最高优先级

    @Column(nullable = false)
    @Enumerated(EnumType.STRING)
    private NotificationStatus status = NotificationStatus.ACTIVE;

    @Column(nullable = false)
    private LocalDateTime createdAt;

    private LocalDateTime recallAt;

    private int retryCount = 0;

    private boolean isRead = false;

    public enum NotificationType {
        FRIEND_REQUEST, COMMENT, LIKE, SYSTEM, MENTION, MESSAGE
    }

    public enum NotificationStatus {
        ACTIVE, RECALLED, EXPIRED, FAILED
    }
}

消息数据访问接口
package com.example.mqtt.repository;

import com.example.mqtt.model.Notification;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

import java.util.Optional;

@Repository
public interface NotificationRepository extends JpaRepository<Notification, Long> {

    Optional<Notification> findByMessageId(String messageId);
    
    Page<Notification> findByUserIdOrderByCreatedAtDesc(String userId, Pageable pageable);
    
    long countByUserIdAndIsReadFalse(String userId);
}

3.5 MQTT消息服务实现

package com.example.mqtt.service;

import com.example.mqtt.model.Notification;
import com.example.mqtt.repository.NotificationRepository;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Service
public class MqttService {

    private final MqttClient mqttClient;
    private final NotificationRepository notificationRepository;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

    @Value("${mqtt.retry-interval}")
    private int retryInterval;

    @Value("${mqtt.max-retry-attempts}")
    private int maxRetryAttempts;

    @Value("${app.max-recall-minutes}")
    private int maxRecallMinutes;

    @Autowired
    public MqttService(MqttClient mqttClient, NotificationRepository notificationRepository) {
        this.mqttClient = mqttClient;
        this.notificationRepository = notificationRepository;
    }

    @PostConstruct
    public void init() {
        try {
            // 订阅撤回主题,用于记录撤回操作
            mqttClient.subscribe("user/+/recall", 2, (topic, message) -> {
                String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
                System.out.println("Received recall command: " + payload + " on topic: " + topic);
                // 这里可以添加撤回记录逻辑
            });
        } catch (MqttException e) {
            System.err.println("Failed to subscribe to recall topics: " + e.getMessage());
        }
    }

    /**
     * 发送通知消息
     */
    public String sendNotification(Notification notification) {
        // 生成唯一消息ID
        String messageId = "notif-" + UUID.randomUUID().toString();
        notification.setMessageId(messageId);
        notification.setCreatedAt(LocalDateTime.now());
        notification.setStatus(Notification.NotificationStatus.ACTIVE);
        
        // 保存消息到数据库
        notificationRepository.save(notification);
        
        // 构建MQTT消息
        String topic = "user/" + notification.getUserId() + "/notifications";
        String payload = buildNotificationPayload(notification, messageId);
        
        MqttMessage mqttMessage = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
        mqttMessage.setQos(2); // 确保消息恰好一次送达
        
        // 设置MQTT 5.0属性 - 优先级和过期时间
        if (mqttClient.getVersion() == MqttClient.PROTOCOL_VERSION_5) {
            MqttProperties properties = new MqttProperties();
            properties.setMessagePriority(notification.getPriority());
            properties.setMessageExpiryInterval(86400); // 24小时后过期
            mqttMessage.setProperties(properties);
        }
        
        try {
            // 发送消息
            mqttClient.publish(topic, mqttMessage);
            System.out.println("Message sent successfully: " + messageId);
            return messageId;
        } catch (MqttException e) {
            System.err.println("Failed to send message: " + e.getMessage());
            // 启动重试机制
            scheduleRetry(notification, 1);
            return messageId;
        }
    }

    /**
     * 构建通知消息 payload
     */
    private String buildNotificationPayload(Notification notification, String messageId) {
        return String.format(
            "{\"messageId\":\"%s\",\"userId\":\"%s\",\"senderId\":\"%s\",\"title\":\"%s\",\"content\":\"%s\"," +
            "\"type\":\"%s\",\"priority\":%d,\"createdAt\":\"%s\"}",
            messageId,
            notification.getUserId(),
            notification.getSenderId(),
            escapeJson(notification.getTitle()),
            escapeJson(notification.getContent()),
            notification.getType(),
            notification.getPriority(),
            notification.getCreatedAt()
        );
    }

    /**
     * JSON转义
     */
    private String escapeJson(String value) {
        if (value == null) return "";
        return value.replace("\\", "\\\\")
                    .replace("\"", "\\\"")
                    .replace("\b", "\\b")
                    .replace("\f", "\\f")
                    .replace("\n", "\\n")
                    .replace("\r", "\\r")
                    .replace("\t", "\\t");
    }

    /**
     * 安排消息重试
     */
    private void scheduleRetry(Notification notification, int attempt) {
        if (attempt > maxRetryAttempts) {
            // 达到最大重试次数,标记为失败
            notification.setStatus(Notification.NotificationStatus.FAILED);
            notification.setRetryCount(attempt);
            notificationRepository.save(notification);
            
            // 发送到死信队列
            String dlqTopic = "notifications/dlq";
            String payload = String.format(
                "{\"messageId\":\"%s\",\"error\":\"Max retry attempts reached\",\"retryCount\":%d}",
                notification.getMessageId(), attempt
            );
            
            try {
                mqttClient.publish(dlqTopic, new MqttMessage(payload.getBytes()));
            } catch (MqttException e) {
                System.err.println("Failed to publish to DLQ: " + e.getMessage());
            }
            return;
        }
        
        // 指数退避策略计算延迟
        long delay = (long) (Math.pow(2, attempt) * retryInterval);
        
        scheduler.schedule(() -> {
            try {
                String topic = "user/" + notification.getUserId() + "/notifications";
                String payload = buildNotificationPayload(notification, notification.getMessageId());
                
                MqttMessage mqttMessage = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
                mqttMessage.setQos(2);
                
                mqttClient.publish(topic, mqttMessage);
                System.out.println("Message retried successfully (attempt " + attempt + "): " + notification.getMessageId());
                
                // 更新重试计数
                notification.setRetryCount(attempt);
                notificationRepository.save(notification);
            } catch (MqttException e) {
                System.err.println("Retry attempt " + attempt + " failed: " + e.getMessage());
                // 继续重试
                scheduleRetry(notification, attempt + 1);
            }
        }, delay, TimeUnit.MILLISECONDS);
    }

    /**
     * 撤回消息
     */
    public boolean recallMessage(String messageId, String operatorId) {
        // 查找消息
        Notification notification = notificationRepository.findByMessageId(messageId)
                .orElseThrow(() -> new IllegalArgumentException("Message not found"));
        
        // 验证权限
        if (!notification.getSenderId().equals(operatorId) && !"admin".equals(operatorId)) {
            throw new SecurityException("No permission to recall this message");
        }
        
        // 检查是否在可撤回时间内
        LocalDateTime now = LocalDateTime.now();
        long minutesSinceCreation = java.time.Duration.between(notification.getCreatedAt(), now).toMinutes();
        
        if (minutesSinceCreation > maxRecallMinutes) {
            throw new IllegalStateException("Cannot recall message after " + maxRecallMinutes + " minutes");
        }
        
        // 更新消息状态
        notification.setStatus(Notification.NotificationStatus.RECALLED);
        notification.setRecallAt(now);
        notificationRepository.save(notification);
        
        // 发送撤回指令
        String topic = "user/" + notification.getUserId() + "/recall";
        String payload = String.format(
            "{\"messageId\":\"%s\",\"operatorId\":\"%s\",\"recalledAt\":\"%s\",\"isRecall\":true}",
            messageId, operatorId, now
        );
        
        try {
            MqttMessage mqttMessage = new MqttMessage(payload.getBytes(StandardCharsets.UTF_8));
            mqttMessage.setQos(2);
            mqttClient.publish(topic, mqttMessage);
            return true;
        } catch (MqttException e) {
            System.err.println("Failed to send recall command: " + e.getMessage());
            return false;
        }
    }
}

3.6 业务服务和控制器

通知业务服务
package com.example.mqtt.service;

import com.example.mqtt.model.Notification;
import com.example.mqtt.repository.NotificationRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;

@Service
public class NotificationService {

    private final NotificationRepository notificationRepository;
    private final MqttService mqttService;

    @Autowired
    public NotificationService(NotificationRepository notificationRepository, MqttService mqttService) {
        this.notificationRepository = notificationRepository;
        this.mqttService = mqttService;
    }

    /**
     * 创建并发送通知
     */
    public String createAndSendNotification(String userId, String senderId, String title, 
                                           String content, Notification.NotificationType type, 
                                           int priority) {
        Notification notification = Notification.builder()
                .userId(userId)
                .senderId(senderId)
                .title(title)
                .content(content)
                .type(type)
                .priority(priority)
                .build();
                
        return mqttService.sendNotification(notification);
    }

    /**
     * 撤回消息
     */
    public boolean recallNotification(String messageId, String operatorId) {
        return mqttService.recallMessage(messageId, operatorId);
    }

    /**
     * 获取用户通知列表
     */
    public Page<Notification> getUserNotifications(String userId, Pageable pageable) {
        return notificationRepository.findByUserIdOrderByCreatedAtDesc(userId, pageable);
    }

    /**
     * 标记消息为已读
     */
    public void markAsRead(String messageId, String userId) {
        Notification notification = notificationRepository.findByMessageId(messageId)
                .orElseThrow(() -> new IllegalArgumentException("Message not found"));
                
        if (!notification.getUserId().equals(userId)) {
            throw new SecurityException("No permission to mark this message as read");
        }
        
        notification.setRead(true);
        notificationRepository.save(notification);
    }

    /**
     * 获取未读消息数量
     */
    public long getUnreadCount(String userId) {
        return notificationRepository.countByUserIdAndIsReadFalse(userId);
    }
}

通知API控制器
package com.example.mqtt.controller;

import com.example.mqtt.model.Notification;
import com.example.mqtt.service.NotificationService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.http.ResponseEntity;
import org.springframework.security.core.Authentication;
import org.springframework.web.bind.annotation.*;

import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;

@RestController
@RequestMapping("/api/notifications")
public class NotificationController {

    private final NotificationService notificationService;

    @Autowired
    public NotificationController(NotificationService notificationService) {
        this.notificationService = notificationService;
    }

    /**
     * 发送通知
     */
    @PostMapping
    public ResponseEntity<String> sendNotification(
            Authentication authentication,
            @Valid @RequestBody SendNotificationRequest request) {
        
        String senderId = authentication.getName();
        String messageId = notificationService.createAndSendNotification(
                request.getUserId(),
                senderId,
                request.getTitle(),
                request.getContent(),
                request.getType(),
                request.getPriority()
        );
        
        return ResponseEntity.ok(messageId);
    }

    /**
     * 撤回通知
     */
    @PostMapping("/{messageId}/recall")
    public ResponseEntity<Boolean> recallNotification(
            Authentication authentication,
            @PathVariable String messageId) {
        
        String operatorId = authentication.getName();
        boolean result = notificationService.recallNotification(messageId, operatorId);
        return ResponseEntity.ok(result);
    }

    /**
     * 获取当前用户的通知列表
     */
    @GetMapping
    public ResponseEntity<Page<Notification>> getUserNotifications(
            Authentication authentication,
            Pageable pageable) {
        
        String userId = authentication.getName();
        Page<Notification> notifications = notificationService.getUserNotifications(userId, pageable);
        return ResponseEntity.ok(notifications);
    }

    /**
     * 标记消息为已读
     */
    @PutMapping("/{messageId}/read")
    public ResponseEntity<Void> markAsRead(
            Authentication authentication,
            @PathVariable String messageId) {
        
        String userId = authentication.getName();
        notificationService.markAsRead(messageId, userId);
        return ResponseEntity.noContent().build();
    }

    /**
     * 获取未读消息数量
     */
    @GetMapping("/unread/count")
    public ResponseEntity<Long> getUnreadCount(Authentication authentication) {
        String userId = authentication.getName();
        long count = notificationService.getUnreadCount(userId);
        return ResponseEntity.ok(count);
    }

    // 请求参数模型
    public static class SendNotificationRequest {
        @NotBlank(message = "User ID is required")
        private String userId;
        
        @NotBlank(message = "Title is required")
        private String title;
        
        @NotBlank(message = "Content is required")
        private String content;
        
        @NotNull(message = "Type is required")
        private Notification.NotificationType type;
        
        @Min(value = 0, message = "Priority must be between 0 and 9")
        private int priority = 5;

        // Getters and setters
        public String getUserId() { return userId; }
        public void setUserId(String userId) { this.userId = userId; }
        public String getTitle() { return title; }
        public void setTitle(String title) { this.title = title; }
        public String getContent() { return content; }
        public void setContent(String content) { this.content = content; }
        public Notification.NotificationType getType() { return type; }
        public void setType(Notification.NotificationType type) { this.type = type; }
        public int getPriority() { return priority; }
        public void setPriority(int priority) { this.priority = priority; }
    }
}

3.7 JWT认证配置

package com.example.mqtt.config;

import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import io.jsonwebtoken.io.Decoders;
import io.jsonwebtoken.security.Keys;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Component;

import java.security.Key;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

@Component
public class JwtConfig {

    @Value("${jwt.secret}")
    private String secretKey;

    @Value("${jwt.expiration}")
    private long jwtExpiration;

    public String extractUsername(String token) {
        return extractClaim(token, Claims::getSubject);
    }

    public <T> T extractClaim(String token, Function<Claims, T> claimsResolver) {
        final Claims claims = extractAllClaims(token);
        return claimsResolver.apply(claims);
    }

    public String generateToken(UserDetails userDetails) {
        return generateToken(new HashMap<>(), userDetails);
    }

    public String generateToken(Map<String, Object> extraClaims, UserDetails userDetails) {
        return buildToken(extraClaims, userDetails, jwtExpiration);
    }

    private String buildToken(
            Map<String, Object> extraClaims,
            UserDetails userDetails,
            long expiration
    ) {
        return Jwts
                .builder()
                .setClaims(extraClaims)
                .setSubject(userDetails.getUsername())
                .setIssuedAt(new Date(System.currentTimeMillis()))
                .setExpiration(new Date(System.currentTimeMillis() + expiration))
                .signWith(getSignInKey(), SignatureAlgorithm.HS256)
                .compact();
    }

    public boolean isTokenValid(String token, UserDetails userDetails) {
        final String username = extractUsername(token);
        return (username.equals(userDetails.getUsername())) && !isTokenExpired(token);
    }

    private boolean isTokenExpired(String token) {
        return extractExpiration(token).before(new Date());
    }

    private Date extractExpiration(String token) {
        return extractClaim(token, Claims::getExpiration);
    }

    private Claims extractAllClaims(String token) {
        return Jwts
                .parserBuilder()
                .setSigningKey(getSignInKey())
                .build()
                .parseClaimsJws(token)
                .getBody();
    }

    private Key getSignInKey() {
        byte[] keyBytes = Decoders.BASE64.decode(secretKey);
        return Keys.hmacShaKeyFor(keyBytes);
    }
}

4. 前端实现(Vue 3)

4.1 项目初始化

# 创建Vue项目
vue create mqtt-notification-client
cd mqtt-notification-client

# 安装依赖
npm install mqtt element-plus axios jwt-decode

4.2 MQTT服务封装

import mqtt from 'mqtt/dist/mqtt.min'
import { ElNotification, ElMessageBox } from 'element-plus'

class MqttService {
  constructor() {
    this.client = null
    this.userId = null
    this.connected = false
    this.reconnectTimeout = 1000 // 初始重连延迟(ms)
    this.maxReconnectTimeout = 30000 // 最大重连延迟
    this.notificationCallback = null // 接收通知的回调
    this.recallCallback = null // 接收撤回的回调
  }

  /**
   * 初始化MQTT连接
   * @param {string} userId 用户ID
   * @param {function} notificationCallback 通知回调函数
   * @param {function} recallCallback 撤回回调函数
   */
  init(userId, notificationCallback, recallCallback) {
    this.userId = userId
    this.notificationCallback = notificationCallback
    this.recallCallback = recallCallback

    // 如果已连接则先断开
    if (this.client) {
      this.disconnect()
    }

    this.connect()
  }

  /**
   * 连接到MQTT Broker
   */
  connect() {
    if (!this.userId) {
      console.error('用户ID不能为空')
      return
    }

    // 生成客户端ID
    const clientId = `web-client-${this.userId}-${Math.random().toString(16).substr(2, 8)}`
    
    // MQTT over WebSocket连接地址
    const brokerUrl = 'ws://localhost:8083/mqtt'
    
    // 连接选项
    const options = {
      clientId,
      clean: false, // 保持会话,重连后可接收离线消息
      connectTimeout: 4000,
      username: 'admin', // 实际环境中应使用JWT令牌
      password: 'public',
      keepalive: 30 // 30秒心跳
    }

    // 连接到broker
    this.client = mqtt.connect(brokerUrl, options)

    // 连接成功
    this.client.on('connect', (connack) => {
      this.connected = true
      this.reconnectTimeout = 1000 // 重置重连延迟
      
      console.log('MQTT连接成功', connack)
      ElNotification({
        title: '连接成功',
        message: '已成功连接到消息服务器',
        type: 'success',
        duration: 3000
      })

      // 订阅用户专属主题
      const topics = [
        `user/${this.userId}/notifications`,
        `user/${this.userId}/recall`
      ]
      
      this.client.subscribe(topics, { qos: 2 }, (err) => {
        if (!err) {
          console.log(`已订阅主题: ${topics.join(', ')}`)
        } else {
          console.error('订阅主题失败:', err)
          ElNotification({
            title: '订阅失败',
            message: '无法订阅通知主题',
            type: 'error'
          })
        }
      })
    })

    // 接收消息
    this.client.on('message', (topic, message) => {
      this.handleMessage(topic, message.toString())
    })

    // 连接断开
    this.client.on('close', () => {
      this.connected = false
      console.log('MQTT连接已关闭')
      this.scheduleReconnect()
    })

    // 连接错误
    this.client.on('error', (err) => {
      this.connected = false
      console.error('MQTT错误:', err)
      
      ElNotification({
        title: '连接错误',
        message: '消息服务连接发生错误',
        type: 'error'
      })
      
      this.scheduleReconnect()
    })

    // 重连中
    this.client.on('reconnect', () => {
      console.log('正在重连到MQTT服务器...')
    })
  }

  /**
   * 处理接收到的消息
   */
  handleMessage(topic, payload) {
    try {
      const message = JSON.parse(payload)
      
      if (topic.endsWith('/recall') && message.isRecall) {
        // 处理撤回消息
        console.log('收到撤回指令:', message)
        if (this.recallCallback) {
          this.recallCallback(message)
        }
        
        ElNotification({
          title: '消息已撤回',
          message: '一条消息已被发送者撤回',
          type: 'info'
        })
      } else if (topic.endsWith('/notifications')) {
        // 处理通知消息
        console.log('收到新通知:', message)
        if (this.notificationCallback) {
          this.notificationCallback(message)
        }
        
        // 显示通知
        ElNotification({
          title: message.title,
          message: message.content,
          type: this.getNotificationTypeIcon(message.type),
          duration: 5000
        })
      }
    } catch (e) {
      console.error('解析消息失败:', e)
    }
  }

  /**
   * 根据通知类型获取对应的图标
   */
  getNotificationTypeIcon(type) {
    switch (type) {
      case 'SYSTEM':
        return 'warning'
      case 'FRIEND_REQUEST':
        return 'success'
      case 'MESSAGE':
        return 'info'
      default:
        return 'info'
    }
  }

  /**
   * 安排重连
   */
  scheduleReconnect() {
    if (!this.client.connected) {
      ElNotification({
        title: '连接断开',
        message: `将在 ${this.reconnectTimeout / 1000} 秒后尝试重连`,
        type: 'warning'
      })
      
      // 指数退避重连
      setTimeout(() => {
        if (!this.connected) {
          this.connect()
          this.reconnectTimeout = Math.min(this.reconnectTimeout * 2, this.maxReconnectTimeout)
        }
      }, this.reconnectTimeout)
    }
  }

  /**
   * 断开连接
   */
  disconnect() {
    if (this.client) {
      this.client.end()
      this.client = null
      this.connected = false
      console.log('MQTT连接已手动断开')
    }
  }

  /**
   * 检查连接状态
   */
  isConnected() {
    return this.connected
  }
}

// 导出单例实例
export default new MqttService()

4.3 API服务封装

import axios from 'axios'
import { ElMessage } from 'element-plus'

// 创建axios实例
const api = axios.create({
  baseURL: 'http://localhost:8080/api',
  timeout: 10000,
  headers: {
    'Content-Type': 'application/json'
  }
})

// 请求拦截器 - 添加认证令牌
api.interceptors.request.use(
  (config) => {
    const token = localStorage.getItem('token')
    if (token) {
      config.headers.Authorization = `Bearer ${token}`
    }
    return config
  },
  (error) => {
    return Promise.reject(error)
  }
)

// 响应拦截器 - 处理错误
api.interceptors.response.use(
  (response) => {
    return response.data
  },
  (error) => {
    const message = error.response?.data?.error || error.message || '请求失败'
    ElMessage.error(message)
    return Promise.reject(error)
  }
)

// 通知相关API
export const notificationApi = {
  /**
   * 发送通知
   */
  sendNotification: (data) => {
    return api.post('/notifications', data)
  },

  /**
   * 撤回通知
   */
  recallNotification: (messageId) => {
    return api.post(`/notifications/${messageId}/recall`)
  },

  /**
   * 获取当前用户的通知列表
   */
  getUserNotifications: (page = 0, size = 20) => {
    return api.get(`/notifications?page=${page}&size=${size}`)
  },

  /**
   * 标记消息为已读
   */
  markAsRead: (messageId) => {
    return api.put(`/notifications/${messageId}/read`)
  },

  /**
   * 获取未读消息数量
   */
  getUnreadCount: () => {
    return api.get('/notifications/unread/count')
  }
}

// 用户认证相关API
export const authApi = {
  /**
   * 用户登录
   */
  login: (credentials) => {
    return api.post('/auth/login', credentials)
  },

  /**
   * 用户注册
   */
  register: (userData) => {
    return api.post('/auth/register', userData)
  },

  /**
   * 获取当前用户信息
   */
  getCurrentUser: () => {
    return api.get('/auth/me')
  }
}

export default api

4.4 通知列表组件

<template>
  <div class="notification-list">
    <el-card>
      <div slot="header" class="card-header">
        <h2>通知中心</h2>
        <el-badge :value="unreadCount" class="notification-badge">
          <el-button size="small" @click="markAllAsRead">
            全部标为已读
          </el-button>
        </el-badge>
      </div>

      <el-empty 
        v-if="notifications.length === 0 && !loading"
        description="暂无通知"
      ></el-empty>

      <el-skeleton 
        v-if="loading"
        :rows="5"
        animated
      ></el-skeleton>

      <el-timeline v-else>
        <el-timeline-item 
          v-for="notification in notifications" 
          :key="notification.messageId"
          :timestamp="formatTime(notification.createdAt)"
          :icon="getIcon(notification.type)"
          :color="getColor(notification.type)"
          placement="top"
        >
          <el-card 
            :class="{ 'unread-notification': !notification.read, 'recalled-notification': notification.status === 'RECALLED' }"
          >
            <div class="notification-header">
              <h3>{{ notification.title }}</h3>
              <span class="notification-type">{{ getTypeName(notification.type) }}</span>
            </div>
            
            <div v-if="notification.status === 'RECALLED'" class="recall-message">
              此消息已被撤回
            </div>
            <div v-else class="notification-content">
              {{ notification.content }}
            </div>
            
            <div class="notification-actions">
              <el-button 
                size="mini" 
                text 
                @click="markAsRead(notification.messageId)"
                v-if="!notification.read && notification.status !== 'RECALLED'"
              >
                标为已读
              </el-button>
              
              <el-button 
                size="mini" 
                text 
                type="danger"
                @click="handleRecall(notification.messageId)"
                v-if="canRecall(notification)"
              >
                撤回
              </el-button>
            </div>
          </el-card>
        </el-timeline-item>
      </el-timeline>

      <el-pagination
        v-if="notifications.length > 0"
        class="pagination"
        :current-page="currentPage"
        :page-size="pageSize"
        :total="totalNotifications"
        @current-change="handlePageChange"
        layout="prev, pager, next"
      ></el-pagination>
    </el-card>
  </div>
</template>

<script setup>
import { ref, onMounted, computed } from 'vue'
import { notificationApi } from '../services/apiService'
import { ElMessageBox, ElMessage } from 'element-plus'
import mqttService from '../services/mqttService'
import { formatDistanceToNow } from 'date-fns'
import { zhCN } from 'date-fns/locale'

// 状态变量
const notifications = ref([])
const loading = ref(true)
const currentPage = ref(0)
const pageSize = ref(20)
const totalNotifications = ref(0)
const unreadCount = ref(0)
const userId = ref(localStorage.getItem('userId'))

// 初始化
onMounted(() => {
  fetchNotifications()
  fetchUnreadCount()
  setupMqttListeners()
})

// 设置MQTT监听器
const setupMqttListeners = () => {
  if (userId.value) {
    mqttService.init(
      userId.value,
      handleNewNotification, // 处理新通知
      handleMessageRecall    // 处理消息撤回
    )
  }
}

// 获取通知列表
const fetchNotifications = async () => {
  try {
    loading.value = true
    const response = await notificationApi.getUserNotifications(currentPage.value, pageSize.value)
    notifications.value = response.content
    totalNotifications.value = response.totalElements
  } catch (error) {
    console.error('获取通知失败:', error)
  } finally {
    loading.value = false
  }
}

// 获取未读消息数量
const fetchUnreadCount = async () => {
  try {
    const count = await notificationApi.getUnreadCount()
    unreadCount.value = count
  } catch (error) {
    console.error('获取未读数量失败:', error)
  }
}

// 处理新通知
const handleNewNotification = (newNotification) => {
  // 添加到列表头部
  notifications.value.unshift(newNotification)
  // 更新未读计数
  fetchUnreadCount()
}

// 处理消息撤回
const handleMessageRecall = (recallMessage) => {
  // 在本地标记消息为已撤回
  const index = notifications.value.findIndex(
    n => n.messageId === recallMessage.messageId
  )
  
  if (index !== -1) {
    notifications.value[index].status = 'RECALLED'
  }
}

// 标记消息为已读
const markAsRead = async (messageId) => {
  try {
    await notificationApi.markAsRead(messageId)
    // 更新本地状态
    const index = notifications.value.findIndex(n => n.messageId === messageId)
    if (index !== -1) {
      notifications.value[index].read = true
    }
    fetchUnreadCount()
    ElMessage.success('已标记为已读')
  } catch (error) {
    console.error('标记已读失败:', error)
  }
}

// 全部标为已读
const markAllAsRead = async () => {
  try {
    // 这里简化处理,实际应调用后端批量接口
    const unreadNotifications = notifications.value.filter(n => !n.read)
    for (const notification of unreadNotifications) {
      await notificationApi.markAsRead(notification.messageId)
      notification.read = true
    }
    fetchUnreadCount()
    ElMessage.success('全部已标为已读')
  } catch (error) {
    console.error('批量标记已读失败:', error)
  }
}

// 撤回消息
const handleRecall = async (messageId) => {
  try {
    const confirm = await ElMessageBox.confirm(
      '确定要撤回这条消息吗?',
      '确认撤回',
      {
        confirmButtonText: '确定',
        cancelButtonText: '取消',
        type: 'warning'
      }
    )
    
    if (confirm) {
      const result = await notificationApi.recallNotification(messageId)
      if (result) {
        // 更新本地状态
        const index = notifications.value.findIndex(n => n.messageId === messageId)
        if (index !== -1) {
          notifications.value[index].status = 'RECALLED'
        }
        ElMessage.success('消息已撤回')
      }
    }
  } catch (error) {
    console.error('撤回消息失败:', error)
  }
}

// 分页变化
const handlePageChange = (page) => {
  currentPage.value = page - 1 // 后端页码从0开始
  fetchNotifications()
}

// 格式化时间
const formatTime = (dateString) => {
  return formatDistanceToNow(new Date(dateString), { 
    addSuffix: true,
    locale: zhCN
  })
}

// 根据类型获取图标
const getIcon = (type) => {
  switch (type) {
    case 'SYSTEM': return 'el-icon-warning'
    case 'FRIEND_REQUEST': return 'el-icon-user-plus'
    case 'COMMENT': return 'el-icon-comment'
    case 'LIKE': return 'el-icon-heart'
    case 'MENTION': return 'el-icon-at'
    case 'MESSAGE': return 'el-icon-message'
    default: return 'el-icon-bell'
  }
}

// 根据类型获取颜色
const getColor = (type) => {
  switch (type) {
    case 'SYSTEM': return '#e6a23c'
    case 'FRIEND_REQUEST': return '#52c41a'
    case 'MESSAGE': return '#1890ff'
    default: return '#8c8c8c'
  }
}

// 获取类型名称
const getTypeName = (type) => {
  const typeMap = {
    'SYSTEM': '系统通知',
    'FRIEND_REQUEST': '好友请求',
    'COMMENT': '评论',
    'LIKE': '点赞',
    'MENTION': '提及',
    'MESSAGE': '私信'
  }
  return typeMap[type] || type
}

// 判断是否可以撤回
const canRecall = (notification) => {
  // 只有自己发送的消息且未撤回的才能撤回
  return notification.senderId === userId.value && 
         notification.status !== 'RECALLED' &&
         // 5分钟内可以撤回
         new Date() - new Date(notification.createdAt) < 5 * 60 * 1000
}
</script>

<style scoped>
.card-header {
  display: flex;
  justify-content: space-between;
  align-items: center;
}

.notification-badge {
  margin-left: 10px;
}

.notification-header {
  display: flex;
  justify-content: space-between;
  align-items: center;
  margin-bottom: 10px;
}

.notification-type {
  font-size: 12px;
  padding: 2px 8px;
  border-radius: 12px;
  background-color: #f0f2f5;
}

.notification-content {
  margin-bottom: 10px;
  line-height: 1.6;
}

.recall-message {
  margin-bottom: 10px;
  padding: 10px;
  background-color: #fff1f0;
  border-radius: 4px;
  color: #f5222d;
  font-size: 14px;
}

.notification-actions {
  display: flex;
  justify-content: flex-end;
  gap: 10px;
}

.unread-notification {
  border-left: 3px solid #1890ff;
}

.recalled-notification {
  opacity: 0.7;
}

.pagination {
  margin-top: 20px;
  text-align: center;
}
</style>

4.5 发送通知组件

<template>
  <el-card>
    <div slot="header">
      <h2>发送通知</h2>
    </div>

    <el-form 
      ref="formRef" 
      :model="form" 
      :rules="rules" 
      label-width="100px"
      class="send-notification-form"
    >
      <el-form-item label="接收用户ID" prop="userId">
        <el-input v-model="form.userId" placeholder="请输入接收用户ID"></el-input>
      </el-form-item>
      
      <el-form-item label="通知类型" prop="type">
        <el-select v-model="form.type" placeholder="请选择通知类型">
          <el-option label="系统通知" value="SYSTEM"></el-option>
          <el-option label="好友请求" value="FRIEND_REQUEST"></el-option>
          <el-option label="评论" value="COMMENT"></el-option>
          <el-option label="点赞" value="LIKE"></el-option>
          <el-option label="提及" value="MENTION"></el-option>
          <el-option label="私信" value="MESSAGE"></el-option>
        </el-select>
      </el-form-item>
      
      <el-form-item label="优先级" prop="priority">
        <el-slider 
          v-model="form.priority" 
          :min="0" 
          :max="9" 
          :step="1"
          show-input
        ></el-slider>
        <div class="priority-info">
          <span></span>
          <span class="priority-high"></span>
        </div>
      </el-form-item>
      
      <el-form-item label="标题" prop="title">
        <el-input v-model="form.title" placeholder="请输入通知标题"></el-input>
      </el-form-item>
      
      <el-form-item label="内容" prop="content">
        <el-input 
          v-model="form.content" 
          type="textarea" 
          :rows="4" 
          placeholder="请输入通知内容"
        ></el-input>
      </el-form-item>
      
      <el-form-item>
        <el-button type="primary" @click="submitForm">发送通知</el-button>
        <el-button @click="resetForm">重置</el-button>
      </el-form-item>
    </el-form>
  </el-card>
</template>

<script setup>
import { ref, reactive } from 'vue'
import { notificationApi } from '../services/apiService'
import { ElMessage } from 'element-plus'

// 表单引用
const formRef = ref(null)

// 表单数据
const form = reactive({
  userId: '',
  type: '',
  priority: 5,
  title: '',
  content: ''
})

// 表单规则
const rules = reactive({
  userId: [
    { required: true, message: '请输入接收用户ID', trigger: 'blur' }
  ],
  type: [
    { required: true, message: '请选择通知类型', trigger: 'change' }
  ],
  title: [
    { required: true, message: '请输入通知标题', trigger: 'blur' },
    { max: 50, message: '标题长度不能超过50个字符', trigger: 'blur' }
  ],
  content: [
    { required: true, message: '请输入通知内容', trigger: 'blur' },
    { max: 500, message: '内容长度不能超过500个字符', trigger: 'blur' }
  ]
})

// 提交表单
const submitForm = async () => {
  if (!formRef.value) return
  
  try {
    await formRef.value.validate()
    // 发送通知
    const messageId = await notificationApi.sendNotification(form)
    ElMessage.success(`通知发送成功,消息ID: ${messageId}`)
    resetForm()
  } catch (error) {
    console.error('发送通知失败:', error)
    if (error.name !== 'ValidationError') {
      ElMessage.error('发送通知失败,请稍后重试')
    }
  }
}

// 重置表单
const resetForm = () => {
  if (formRef.value) {
    formRef.value.resetFields()
  }
}
</script>

<style scoped>
.send-notification-form {
  max-width: 600px;
}

.priority-info {
  display: flex;
  justify-content: space-between;
  margin-top: -10px;
  margin-bottom: 10px;
  font-size: 12px;
  color: #606266;
}

.priority-high {
  color: #f56c6c;
}
</style>

4.6 主页面组件

<template>
  <div class="home-container">
    <el-row :gutter="20">
      <el-col :span="12">
        <SendNotification />
      </el-col>
      <el-col :span="12">
        <div class="connection-status">
          <el-card>
            <div slot="header">
              <h2>连接状态</h2>
            </div>
            <div class="status-info">
              <el-descriptions column="1">
                <el-descriptions-item label="MQTT连接状态">
                  <el-tag :type="mqttConnected ? 'success' : 'danger'">
                    {{ mqttConnected ? '已连接' : '未连接' }}
                  </el-tag>
                </el-descriptions-item>
                <el-descriptions-item label="当前用户">
                  {{ currentUser || '未登录' }}
                </el-descriptions-item>
                <el-descriptions-item label="未读消息">
                  <el-badge :value="unreadCount" class="item">
                    <span>{{ unreadCount }} 条</span>
                  </el-badge>
                </el-descriptions-item>
              </el-descriptions>
              
              <el-button 
                v-if="!mqttConnected && currentUser" 
                type="primary" 
                @click="reconnectMqtt"
                style="margin-top: 15px;"
              >
                重新连接
              </el-button>
            </div>
          </el-card>
        </div>
      </el-col>
    </el-row>
    
    <el-row style="margin-top: 20px;">
      <el-col :span="24">
        <NotificationList />
      </el-col>
    </el-row>
  </div>
</template>

<script setup>
import { ref, onMounted, watchEffect } from 'vue'
import SendNotification from '../components/SendNotification.vue'
import NotificationList from '../components/NotificationList.vue'
import mqttService from '../services/mqttService'
import { authApi, notificationApi } from '../services/apiService'

// 状态变量
const mqttConnected = ref(false)
const currentUser = ref('')
const unreadCount = ref(0)

// 初始化
onMounted(() => {
  checkLoginStatus()
  setupMqttStatusListener()
  fetchUnreadCount()
})

// 检查登录状态
const checkLoginStatus = async () => {
  try {
    const user = await authApi.getCurrentUser()
    currentUser.value = user.id
    localStorage.setItem('userId', user.id)
  } catch (error) {
    console.log('用户未登录或会话已过期')
    // 这里可以跳转到登录页
  }
}

// 设置MQTT状态监听器
const setupMqttStatusListener = () => {
  // 定期检查连接状态
  setInterval(() => {
    mqttConnected.value = mqttService.isConnected()
  }, 1000)
}

// 获取未读消息数量
const fetchUnreadCount = async () => {
  try {
    const count = await notificationApi.getUnreadCount()
    unreadCount.value = count
  } catch (error) {
    console.error('获取未读数量失败:', error)
  }
}

// 重新连接MQTT
const reconnectMqtt = () => {
  if (currentUser.value) {
    mqttService.init(currentUser.value, () => {}, () => {})
  }
}

// 监听未读数量变化
watchEffect(() => {
  // 每30秒刷新一次未读数量
  const timer = setInterval(fetchUnreadCount, 30000)
  return () => clearInterval(timer)
})
</script>

<style scoped>
.home-container {
  padding: 20px;
}

.connection-status {
  height: 100%;
}

.status-info {
  padding: 10px 0;
}
</style>

5. 系统测试与验证

5.1 功能测试

  1. 用户A登录系统,进入通知中心
  2. 用户B发送通知给用户A,验证用户A能否实时收到
  3. 用户B撤回消息,验证用户A的界面是否显示消息已撤回
  4. 断开网络连接,再重新连接,验证是否能收到离线期间的消息
  5. 发送高优先级消息,验证是否优先展示

5.2 性能测试

  • 使用工具模拟1000个并发连接,测试系统稳定性
  • 连续发送10000条消息,测试消息处理能力和延迟
  • 测试网络波动情况下的自动重连和消息重试机制

6. 总结

本系统基于MQTT协议实现了一个功能完整的实时消息推送系统,具有以下特点:

  1. 实时性:利用MQTT的发布/订阅模式和长连接特性,实现消息的实时推送
  2. 可靠性:通过QoS 2级别保证消息恰好一次送达,配合重试机制处理发送失败情况
  3. 优先级支持:基于MQTT 5.0的消息优先级属性,实现不同重要程度消息的分级处理
  4. 消息撤回:通过撤回指令消息实现逻辑上的消息撤回功能
  5. 断线重连:客户端实现自动重连机制,确保连接稳定性
  6. 离线消息:利用MQTT的会话保持特性,支持客户端重连后接收离线消息

系统架构清晰,前后端分离,易于扩展和维护,可根据实际业务需求进一步扩展功能。

补充(Vue2前端)

如果使用的前端为vue2,前端实现可参考以下

前端实现(Vue 2)

4.1 项目初始化

# 创建Vue 2项目
vue create mqtt-notification-client-vue2
# 选择Vue 2版本

# 安装依赖
cd mqtt-notification-client-vue2
npm install mqtt element-ui axios jsonwebtoken moment

4.2 MQTT服务封装

import mqtt from 'mqtt/dist/mqtt.min'
import { Notification } from 'element-ui'

class MqttService {
  constructor() {
    this.client = null
    this.userId = null
    this.connected = false
    this.reconnectTimeout = 1000 // 初始重连延迟(ms)
    this.maxReconnectTimeout = 30000 // 最大重连延迟
    this.notificationCallback = null // 接收通知的回调
    this.recallCallback = null // 接收撤回的回调
  }

  /**
   * 初始化MQTT连接
   * @param {string} userId 用户ID
   * @param {function} notificationCallback 通知回调函数
   * @param {function} recallCallback 撤回回调函数
   */
  init(userId, notificationCallback, recallCallback) {
    this.userId = userId
    this.notificationCallback = notificationCallback
    this.recallCallback = recallCallback

    // 如果已连接则先断开
    if (this.client) {
      this.disconnect()
    }

    this.connect()
  }

  /**
   * 连接到MQTT Broker
   */
  connect() {
    if (!this.userId) {
      console.error('用户ID不能为空')
      return
    }

    // 生成客户端ID
    const clientId = `web-client-${this.userId}-${Math.random().toString(16).substr(2, 8)}`
    
    // MQTT over WebSocket连接地址
    const brokerUrl = 'ws://localhost:8083/mqtt'
    
    // 连接选项
    const options = {
      clientId,
      clean: false, // 保持会话,重连后可接收离线消息
      connectTimeout: 4000,
      username: 'admin', // 实际环境中应使用JWT令牌
      password: 'public',
      keepalive: 30 // 30秒心跳
    }

    // 连接到broker
    this.client = mqtt.connect(brokerUrl, options)

    // 连接成功
    this.client.on('connect', (connack) => {
      this.connected = true
      this.reconnectTimeout = 1000 // 重置重连延迟
      
      console.log('MQTT连接成功', connack)
      Notification.success({
        title: '连接成功',
        message: '已成功连接到消息服务器',
        duration: 3000
      })

      // 订阅用户专属主题
      const topics = [
        `user/${this.userId}/notifications`,
        `user/${this.userId}/recall`
      ]
      
      this.client.subscribe(topics, { qos: 2 }, (err) => {
        if (!err) {
          console.log(`已订阅主题: ${topics.join(', ')}`)
        } else {
          console.error('订阅主题失败:', err)
          Notification.error({
            title: '订阅失败',
            message: '无法订阅通知主题'
          })
        }
      })
    })

    // 接收消息
    this.client.on('message', (topic, message) => {
      this.handleMessage(topic, message.toString())
    })

    // 连接断开
    this.client.on('close', () => {
      this.connected = false
      console.log('MQTT连接已关闭')
      this.scheduleReconnect()
    })

    // 连接错误
    this.client.on('error', (err) => {
      this.connected = false
      console.error('MQTT错误:', err)
      
      Notification.error({
        title: '连接错误',
        message: '消息服务连接发生错误'
      })
      
      this.scheduleReconnect()
    })

    // 重连中
    this.client.on('reconnect', () => {
      console.log('正在重连到MQTT服务器...')
    })
  }

  /**
   * 处理接收到的消息
   */
  handleMessage(topic, payload) {
    try {
      const message = JSON.parse(payload)
      
      if (topic.endsWith('/recall') && message.isRecall) {
        // 处理撤回消息
        console.log('收到撤回指令:', message)
        if (this.recallCallback) {
          this.recallCallback(message)
        }
        
        Notification.info({
          title: '消息已撤回',
          message: '一条消息已被发送者撤回'
        })
      } else if (topic.endsWith('/notifications')) {
        // 处理通知消息
        console.log('收到新通知:', message)
        if (this.notificationCallback) {
          this.notificationCallback(message)
        }
        
        // 显示通知
        Notification({
          title: message.title,
          message: message.content,
          type: this.getNotificationTypeIcon(message.type),
          duration: 5000
        })
      }
    } catch (e) {
      console.error('解析消息失败:', e)
    }
  }

  /**
   * 根据通知类型获取对应的图标
   */
  getNotificationTypeIcon(type) {
    switch (type) {
      case 'SYSTEM':
        return 'warning'
      case 'FRIEND_REQUEST':
        return 'success'
      case 'MESSAGE':
        return 'info'
      default:
        return 'info'
    }
  }

  /**
   * 安排重连
   */
  scheduleReconnect() {
    if (!this.client.connected) {
      Notification.warning({
        title: '连接断开',
        message: `将在 ${this.reconnectTimeout / 1000} 秒后尝试重连`
      })
      
      // 指数退避重连
      setTimeout(() => {
        if (!this.connected) {
          this.connect()
          this.reconnectTimeout = Math.min(this.reconnectTimeout * 2, this.maxReconnectTimeout)
        }
      }, this.reconnectTimeout)
    }
  }

  /**
   * 断开连接
   */
  disconnect() {
    if (this.client) {
      this.client.end()
      this.client = null
      this.connected = false
      console.log('MQTT连接已手动断开')
    }
  }

  /**
   * 检查连接状态
   */
  isConnected() {
    return this.connected
  }
}

// 导出单例实例
export default new MqttService()

4.3 API服务封装

import axios from 'axios'
import { Message } from 'element-ui'

// 创建axios实例
const api = axios.create({
  baseURL: 'http://localhost:8080/api',
  timeout: 10000,
  headers: {
    'Content-Type': 'application/json'
  }
})

// 请求拦截器 - 添加认证令牌
api.interceptors.request.use(
  (config) => {
    const token = localStorage.getItem('token')
    if (token) {
      config.headers.Authorization = `Bearer ${token}`
    }
    return config
  },
  (error) => {
    return Promise.reject(error)
  }
)

// 响应拦截器 - 处理错误
api.interceptors.response.use(
  (response) => {
    return response.data
  },
  (error) => {
    const message = error.response?.data?.error || error.message || '请求失败'
    Message.error(message)
    return Promise.reject(error)
  }
)

// 通知相关API
export const notificationApi = {
  /**
   * 发送通知
   */
  sendNotification: (data) => {
    return api.post('/notifications', data)
  },

  /**
   * 撤回通知
   */
  recallNotification: (messageId) => {
    return api.post(`/notifications/${messageId}/recall`)
  },

  /**
   * 获取当前用户的通知列表
   */
  getUserNotifications: (page = 0, size = 20) => {
    return api.get(`/notifications?page=${page}&size=${size}`)
  },

  /**
   * 标记消息为已读
   */
  markAsRead: (messageId) => {
    return api.put(`/notifications/${messageId}/read`)
  },

  /**
   * 获取未读消息数量
   */
  getUnreadCount: () => {
    return api.get('/notifications/unread/count')
  }
}

// 用户认证相关API
export const authApi = {
  /**
   * 用户登录
   */
  login: (credentials) => {
    return api.post('/auth/login', credentials)
  },

  /**
   * 用户注册
   */
  register: (userData) => {
    return api.post('/auth/register', userData)
  },

  /**
   * 获取当前用户信息
   */
  getCurrentUser: () => {
    return api.get('/auth/me')
  }
}

export default api

4.4 通知列表组件

<template>
  <div class="notification-list">
    <el-card>
      <div slot="header" class="card-header">
        <h2>通知中心</h2>
        <el-badge :value="unreadCount" class="notification-badge">
          <el-button size="small" @click="markAllAsRead">
            全部标为已读
          </el-button>
        </el-badge>
      </div>

      <el-empty 
        v-if="notifications.length === 0 && !loading"
        description="暂无通知"
      ></el-empty>

      <el-skeleton 
        v-if="loading"
        :rows="5"
        animated
      ></el-skeleton>

      <el-timeline v-else>
        <el-timeline-item 
          v-for="notification in notifications" 
          :key="notification.messageId"
          :timestamp="formatTime(notification.createdAt)"
          :icon="getIcon(notification.type)"
          :color="getColor(notification.type)"
          placement="top"
        >
          <el-card 
            :class="{ 'unread-notification': !notification.read, 'recalled-notification': notification.status === 'RECALLED' }"
          >
            <div class="notification-header">
              <h3>{{ notification.title }}</h3>
              <span class="notification-type">{{ getTypeName(notification.type) }}</span>
            </div>
            
            <div v-if="notification.status === 'RECALLED'" class="recall-message">
              此消息已被撤回
            </div>
            <div v-else class="notification-content">
              {{ notification.content }}
            </div>
            
            <div class="notification-actions">
              <el-button 
                size="mini" 
                type="text" 
                @click="markAsRead(notification.messageId)"
                v-if="!notification.read && notification.status !== 'RECALLED'"
              >
                标为已读
              </el-button>
              
              <el-button 
                size="mini" 
                type="text" 
                style="color: #F56C6C;"
                @click="handleRecall(notification.messageId)"
                v-if="canRecall(notification)"
              >
                撤回
              </el-button>
            </div>
          </el-card>
        </el-timeline-item>
      </el-timeline>

      <el-pagination
        v-if="notifications.length > 0"
        class="pagination"
        :current-page="currentPage + 1"
        :page-size="pageSize"
        :total="totalNotifications"
        @current-change="handlePageChange"
        layout="prev, pager, next"
      ></el-pagination>
    </el-card>
  </div>
</template>

<script>
import { notificationApi } from '../services/apiService'
import { MessageBox, Message } from 'element-ui'
import mqttService from '../services/mqttService'
import moment from 'moment'

export default {
  name: 'NotificationList',
  data() {
    return {
      notifications: [],
      loading: true,
      currentPage: 0,
      pageSize: 20,
      totalNotifications: 0,
      unreadCount: 0,
      userId: localStorage.getItem('userId')
    }
  },
  mounted() {
    this.fetchNotifications()
    this.fetchUnreadCount()
    this.setupMqttListeners()
  },
  methods: {
    setupMqttListeners() {
      if (this.userId) {
        mqttService.init(
          this.userId,
          this.handleNewNotification, // 处理新通知
          this.handleMessageRecall    // 处理消息撤回
        )
      }
    },

    fetchNotifications() {
      this.loading = true
      notificationApi.getUserNotifications(this.currentPage, this.pageSize)
        .then(response => {
          this.notifications = response.content
          this.totalNotifications = response.totalElements
        })
        .catch(error => {
          console.error('获取通知失败:', error)
        })
        .finally(() => {
          this.loading = false
        })
    },

    fetchUnreadCount() {
      notificationApi.getUnreadCount()
        .then(count => {
          this.unreadCount = count
        })
        .catch(error => {
          console.error('获取未读数量失败:', error)
        })
    },

    handleNewNotification(newNotification) {
      // 添加到列表头部
      this.notifications.unshift(newNotification)
      // 更新未读计数
      this.fetchUnreadCount()
    },

    handleMessageRecall(recallMessage) {
      // 在本地标记消息为已撤回
      const index = this.notifications.findIndex(
        n => n.messageId === recallMessage.messageId
      )
      
      if (index !== -1) {
        this.$set(this.notifications, index, {
          ...this.notifications[index],
          status: 'RECALLED'
        })
      }
    },

    markAsRead(messageId) {
      notificationApi.markAsRead(messageId)
        .then(() => {
          // 更新本地状态
          const index = this.notifications.findIndex(n => n.messageId === messageId)
          if (index !== -1) {
            this.$set(this.notifications[index], 'read', true)
          }
          this.fetchUnreadCount()
          Message.success('已标记为已读')
        })
        .catch(error => {
          console.error('标记已读失败:', error)
        })
    },

    markAllAsRead() {
      // 这里简化处理,实际应调用后端批量接口
      const unreadNotifications = this.notifications.filter(n => !n.read)
      
      Promise.all(unreadNotifications.map(notification => 
        notificationApi.markAsRead(notification.messageId)
      )).then(() => {
        unreadNotifications.forEach((notification, index) => {
          const localIndex = this.notifications.findIndex(
            n => n.messageId === notification.messageId
          )
          if (localIndex !== -1) {
            this.$set(this.notifications[localIndex], 'read', true)
          }
        })
        this.fetchUnreadCount()
        Message.success('全部已标为已读')
      }).catch(error => {
        console.error('批量标记已读失败:', error)
      })
    },

    handleRecall(messageId) {
      MessageBox.confirm(
        '确定要撤回这条消息吗?',
        '确认撤回',
        {
          confirmButtonText: '确定',
          cancelButtonText: '取消',
          type: 'warning'
        }
      ).then(() => {
        return notificationApi.recallNotification(messageId)
      }).then(result => {
        if (result) {
          // 更新本地状态
          const index = this.notifications.findIndex(n => n.messageId === messageId)
          if (index !== -1) {
            this.$set(this.notifications, index, {
              ...this.notifications[index],
              status: 'RECALLED'
            })
          }
          Message.success('消息已撤回')
        }
      }).catch(error => {
        if (error !== 'cancel') { // 排除用户取消的情况
          console.error('撤回消息失败:', error)
        }
      })
    },

    handlePageChange(page) {
      this.currentPage = page - 1 // 后端页码从0开始
      this.fetchNotifications()
    },

    formatTime(dateString) {
      return moment(dateString).fromNow()
    },

    getIcon(type) {
      switch (type) {
        case 'SYSTEM': return 'el-icon-warning'
        case 'FRIEND_REQUEST': return 'el-icon-user-plus'
        case 'COMMENT': return 'el-icon-comment'
        case 'LIKE': return 'el-icon-heart'
        case 'MENTION': return 'el-icon-at'
        case 'MESSAGE': return 'el-icon-message'
        default: return 'el-icon-bell'
      }
    },

    getColor(type) {
      switch (type) {
        case 'SYSTEM': return '#e6a23c'
        case 'FRIEND_REQUEST': return '#52c41a'
        case 'MESSAGE': return '#1890ff'
        default: return '#8c8c8c'
      }
    },

    getTypeName(type) {
      const typeMap = {
        'SYSTEM': '系统通知',
        'FRIEND_REQUEST': '好友请求',
        'COMMENT': '评论',
        'LIKE': '点赞',
        'MENTION': '提及',
        'MESSAGE': '私信'
      }
      return typeMap[type] || type
    },

    canRecall(notification) {
      // 只有自己发送的消息且未撤回的才能撤回
      return notification.senderId === this.userId && 
             notification.status !== 'RECALLED' &&
             // 5分钟内可以撤回
             new Date() - new Date(notification.createdAt) < 5 * 60 * 1000
    }
  }
}
</script>

<style scoped>
.card-header {
  display: flex;
  justify-content: space-between;
  align-items: center;
}

.notification-badge {
  margin-left: 10px;
}

.notification-header {
  display: flex;
  justify-content: space-between;
  align-items: center;
  margin-bottom: 10px;
}

.notification-type {
  font-size: 12px;
  padding: 2px 8px;
  border-radius: 12px;
  background-color: #f0f2f5;
}

.notification-content {
  margin-bottom: 10px;
  line-height: 1.6;
}

.recall-message {
  margin-bottom: 10px;
  padding: 10px;
  background-color: #fff1f0;
  border-radius: 4px;
  color: #f5222d;
  font-size: 14px;
}

.notification-actions {
  display: flex;
  justify-content: flex-end;
  gap: 10px;
}

.unread-notification {
  border-left: 3px solid #1890ff;
}

.recalled-notification {
  opacity: 0.7;
}

.pagination {
  margin-top: 20px;
  text-align: center;
}
</style>

4.5 发送通知组件

<template>
  <el-card>
    <div slot="header">
      <h2>发送通知</h2>
    </div>

    <el-form 
      ref="form" 
      :model="form" 
      :rules="rules" 
      label-width="100px"
      class="send-notification-form"
    >
      <el-form-item label="接收用户ID" prop="userId">
        <el-input v-model="form.userId" placeholder="请输入接收用户ID"></el-input>
      </el-form-item>
      
      <el-form-item label="通知类型" prop="type">
        <el-select v-model="form.type" placeholder="请选择通知类型">
          <el-option label="系统通知" value="SYSTEM"></el-option>
          <el-option label="好友请求" value="FRIEND_REQUEST"></el-option>
          <el-option label="评论" value="COMMENT"></el-option>
          <el-option label="点赞" value="LIKE"></el-option>
          <el-option label="提及" value="MENTION"></el-option>
          <el-option label="私信" value="MESSAGE"></el-option>
        </el-select>
      </el-form-item>
      
      <el-form-item label="优先级" prop="priority">
        <el-slider 
          v-model="form.priority" 
          :min="0" 
          :max="9" 
          :step="1"
          show-input
        ></el-slider>
        <div class="priority-info">
          <span></span>
          <span class="priority-high"></span>
        </div>
      </el-form-item>
      
      <el-form-item label="标题" prop="title">
        <el-input v-model="form.title" placeholder="请输入通知标题"></el-input>
      </el-form-item>
      
      <el-form-item label="内容" prop="content">
        <el-input 
          v-model="form.content" 
          type="textarea" 
          :rows="4" 
          placeholder="请输入通知内容"
        ></el-input>
      </el-form-item>
      
      <el-form-item>
        <el-button type="primary" @click="submitForm">发送通知</el-button>
        <el-button @click="resetForm">重置</el-button>
      </el-form-item>
    </el-form>
  </el-card>
</template>

<script>
import { notificationApi } from '../services/apiService'
import { Message } from 'element-ui'

export default {
  name: 'SendNotification',
  data() {
    return {
      form: {
        userId: '',
        type: '',
        priority: 5,
        title: '',
        content: ''
      },
      rules: {
        userId: [
          { required: true, message: '请输入接收用户ID', trigger: 'blur' }
        ],
        type: [
          { required: true, message: '请选择通知类型', trigger: 'change' }
        ],
        title: [
          { required: true, message: '请输入通知标题', trigger: 'blur' },
          { max: 50, message: '标题长度不能超过50个字符', trigger: 'blur' }
        ],
        content: [
          { required: true, message: '请输入通知内容', trigger: 'blur' },
          { max: 500, message: '内容长度不能超过500个字符', trigger: 'blur' }
        ]
      }
    }
  },
  methods: {
    submitForm() {
      this.$refs.form.validate((valid) => {
        if (valid) {
          notificationApi.sendNotification(this.form)
            .then(messageId => {
              Message.success(`通知发送成功,消息ID: ${messageId}`)
              this.resetForm()
            })
            .catch(error => {
              console.error('发送通知失败:', error)
              Message.error('发送通知失败,请稍后重试')
            })
        }
      })
    },
    resetForm() {
      this.$refs.form.resetFields()
    }
  }
}
</script>

<style scoped>
.send-notification-form {
  max-width: 600px;
}

.priority-info {
  display: flex;
  justify-content: space-between;
  margin-top: -10px;
  margin-bottom: 10px;
  font-size: 12px;
  color: #606266;
}

.priority-high {
  color: #f56c6c;
}
</style>

4.6 主页面组件

<template>
  <div class="home-container">
    <el-row :gutter="20">
      <el-col :span="12">
        <send-notification />
      </el-col>
      <el-col :span="12">
        <div class="connection-status">
          <el-card>
            <div slot="header">
              <h2>连接状态</h2>
            </div>
            <div class="status-info">
              <el-descriptions column="1">
                <el-descriptions-item label="MQTT连接状态">
                  <el-tag :type="mqttConnected ? 'success' : 'danger'">
                    {{ mqttConnected ? '已连接' : '未连接' }}
                  </el-tag>
                </el-descriptions-item>
                <el-descriptions-item label="当前用户">
                  {{ currentUser || '未登录' }}
                </el-descriptions-item>
                <el-descriptions-item label="未读消息">
                  <el-badge :value="unreadCount" class="item">
                    <span>{{ unreadCount }} 条</span>
                  </el-badge>
                </el-descriptions-item>
              </el-descriptions>
              
              <el-button 
                v-if="!mqttConnected && currentUser" 
                type="primary" 
                @click="reconnectMqtt"
                style="margin-top: 15px;"
              >
                重新连接
              </el-button>
            </div>
          </el-card>
        </div>
      </el-col>
    </el-row>
    
    <el-row style="margin-top: 20px;">
      <el-col :span="24">
        <notification-list />
      </el-col>
    </el-row>
  </div>
</template>

<script>
import SendNotification from '../components/SendNotification.vue'
import NotificationList from '../components/NotificationList.vue'
import mqttService from '../services/mqttService'
import { authApi, notificationApi } from '../services/apiService'

export default {
  name: 'Home',
  components: {
    SendNotification,
    NotificationList
  },
  data() {
    return {
      mqttConnected: false,
      currentUser: '',
      unreadCount: 0,
      statusCheckInterval: null
    }
  },
  mounted() {
    this.checkLoginStatus()
    this.setupMqttStatusListener()
    this.fetchUnreadCount()
    
    // 每30秒刷新一次未读数量
    setInterval(() => {
      this.fetchUnreadCount()
    }, 30000)
  },
  beforeDestroy() {
    // 清除定时器
    if (this.statusCheckInterval) {
      clearInterval(this.statusCheckInterval)
    }
  },
  methods: {
    checkLoginStatus() {
      authApi.getCurrentUser()
        .then(user => {
          this.currentUser = user.id
          localStorage.setItem('userId', user.id)
        })
        .catch(error => {
          console.log('用户未登录或会话已过期')
          // 这里可以跳转到登录页
        })
    },

    setupMqttStatusListener() {
      // 定期检查连接状态
      this.statusCheckInterval = setInterval(() => {
        this.mqttConnected = mqttService.isConnected()
      }, 1000)
    },

    fetchUnreadCount() {
      notificationApi.getUnreadCount()
        .then(count => {
          this.unreadCount = count
        })
        .catch(error => {
          console.error('获取未读数量失败:', error)
        })
    },

    reconnectMqtt() {
      if (this.currentUser) {
        mqttService.init(this.currentUser, () => {}, () => {})
      }
    }
  }
}
</script>

<style scoped>
.home-container {
  padding: 20px;
}

.connection-status {
  height: 100%;
}

.status-info {
  padding: 10px 0;
}
</style>

4.7 入口文件配置

import Vue from 'vue'
import App from './App.vue'
import router from './router'
import ElementUI from 'element-ui'
import 'element-ui/lib/theme-chalk/index.css'

// 全局注册Element UI
Vue.use(ElementUI)

Vue.config.productionTip = false

new Vue({
  router,
  render: h => h(App)
}).$mount('#app')

4.8 路由配置

import Vue from 'vue'
import VueRouter from 'vue-router'
import Home from '../views/Home.vue'

Vue.use(VueRouter)

const routes = [
  {
    path: '/',
    name: 'Home',
    component: Home
  },
  {
    path: '/login',
    name: 'Login',
    // 懒加载登录组件
    component: () => import('../views/Login.vue')
  }
]

const router = new VueRouter({
  mode: 'history',
  base: process.env.BASE_URL,
  routes
})

export default router

主要改动说明

从Vue3迁移到Vue2的主要变化包括:

  1. Composition API → Options API:将Vue3的setup()语法转换为Vue2的选项式API,使用data()methodsmounted等选项。

  2. 响应式系统变化

    • 使用this.$set进行响应式数据更新,替代Vue3的refreactive
    • 移除watchEffect,使用setInterval实现定时任务
  3. 组件注册方式:在Vue2中需要显式注册组件,通过components选项。

  4. Element UI替代Element Plus

    • 导入和使用Element UI而非Element Plus
    • 调整部分组件的属性和事件(如type="text"替代text属性)
  5. 路由配置:Vue2使用Vue Router 3.x版本,与Vue3使用的4.x版本在配置上略有差异。

  6. 工具库替换:使用moment.js替代date-fns处理日期格式化,更符合Vue2生态习惯。

  7. 模板语法调整:保持基本一致,但移除了Vue3特有的语法如v-model参数。

这个Vue2版本的实现与之前的Vue3版本功能完全一致,包括实时消息推送、消息优先级、消息撤回、重试机制和连接稳定性保障等核心功能,同时保持了相同的UI风格和用户体验。


网站公告

今日签到

点亮在社区的每一天
去签到