基于Spring Boot和WebSocket的实时聊天系统

发布于:2025-08-11 ⋅ 阅读:(32) ⋅ 点赞:(0)

一、项目架构设计

1. 技术栈组成

组件 用途 版本
Spring Boot 基础框架 2.7.x
javax.websocket WebSocket实现 JSR-356
Mybatis-Plus 数据持久化 3.5.x
Redis 缓存/消息队列 6.2.x
MongoDB 聊天记录存储 5.0.x
MinIO 文件存储 8.0.x
Kafka 消息分发 2.8.x
OAuth2 认证授权 2.5.x

2. 系统架构图

客户端
WebSocket连接
Spring Boot服务
消息处理器
MySQL: 用户/关系
MongoDB: 聊天记录
Redis: 在线状态
MinIO: 文件存储
Kafka: 消息分发

二、核心功能实现

1. WebSocket配置增强版

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(chatWebSocketHandler(), "/websocket")
                .setAllowedOrigins("*")
                .addInterceptors(new AuthHandshakeInterceptor())
                .withSockJS();
    }

    @Bean
    public WebSocketHandler chatWebSocketHandler() {
        return new ChatWebSocketHandler();
    }

    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(8192);
        container.setMaxBinaryMessageBufferSize(8192);
        container.setMaxSessionIdleTimeout(600000L);
        return container;
    }
}

2. 消息处理器实现

public class ChatWebSocketHandler extends TextWebSocketHandler {

    private static final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        String userId = getUserIdFromSession(session);
        sessions.put(userId, session);
        updateOnlineStatus(userId, true);
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) {
        try {
            ChatMessage chatMessage = objectMapper.readValue(message.getPayload(), ChatMessage.class);

            switch (chatMessage.getType()) {
                case HEARTBEAT:
                    handleHeartbeat(session);
                    break;
                case SINGLE_CHAT:
                    handleSingleChat(chatMessage);
                    break;
                case GROUP_CHAT:
                    handleGroupChat(chatMessage);
                    break;
                case READ_RECEIPT:
                    handleReadReceipt(chatMessage);
                    break;
                case FILE_UPLOAD:
                    handleFileUpload(chatMessage);
                    break;
            }
        } catch (Exception e) {
            log.error("消息处理异常", e);
        }
    }

    private void handleHeartbeat(WebSocketSession session) {
        try {
            session.sendMessage(new TextMessage("{\\"type\\":\\"HEARTBEAT_RESPONSE\\"}"));
        } catch (IOException e) {
            log.error("心跳响应失败", e);
        }
    }

    // 其他处理方法...
}

三、关键业务逻辑实现

1. 消息存储设计

MySQL表结构

CREATE TABLE `chat_message` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `msg_id` varchar(64) NOT NULL COMMENT '消息唯一ID',
  `sender_id` varchar(64) NOT NULL,
  `receiver_id` varchar(64) NOT NULL,
  `content` text,
  `msg_type` tinyint NOT NULL COMMENT '1-文本 2-图片 3-视频',
  `status` tinyint DEFAULT '0' COMMENT '0-未读 1-已读',
  `created_at` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_msg_id` (`msg_id`),
  KEY `idx_sender_receiver` (`sender_id`,`receiver_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

MongoDB文档结构

@Document(collection = "chat_messages")
public class ChatMessageDocument {
    @Id
    private String id;
    private String msgId;
    private String senderId;
    private String receiverId;
    private String content;
    private MessageType msgType;
    private MessageStatus status;
    private Date createdAt;
    private List<ReadReceipt> readReceipts;

    // 嵌套文档
    public static class ReadReceipt {
        private String userId;
        private Date readAt;
    }
}

2. 消息分发流程

@Service
@RequiredArgsConstructor
public class MessageDispatcher {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final RedisTemplate<String, String> redisTemplate;

    public void dispatch(ChatMessage message) {
        // 存储消息
        storeMessage(message);

        // 实时推送
        if (isUserOnline(message.getReceiverId())) {
            realtimePush(message);
        } else {
            // 离线用户通过推送通知
            pushNotification(message);
        }

        // 发往Kafka做后续处理
        kafkaTemplate.send("chat-messages", message.getMsgId(), serialize(message));
    }

    private boolean isUserOnline(String userId) {
        return redisTemplate.opsForValue().get("user:online:" + userId) != null;
    }

    private void realtimePush(ChatMessage message) {
        WebSocketSession session = sessions.get(message.getReceiverId());
        if (session != null && session.isOpen()) {
            try {
                session.sendMessage(new TextMessage(serialize(message)));
            } catch (IOException e) {
                log.error("消息推送失败", e);
            }
        }
    }
}

四、高级功能实现

1. 心跳检测机制

@Scheduled(fixedRate = 30000)
public void checkHeartbeat() {
    long now = System.currentTimeMillis();
    sessions.forEach((userId, session) -> {
        Long lastHeartbeat = heartbeatTimestamps.get(userId);
        if (lastHeartbeat == null || now - lastHeartbeat > 60000) {
            try {
                session.close(CloseStatus.SESSION_NOT_RELIABLE);
                sessions.remove(userId);
                updateOnlineStatus(userId, false);
            } catch (IOException e) {
                log.error("关闭会话失败", e);
            }
        }
    });
}

2. 消息已读回执

public void handleReadReceipt(ChatMessage message) {
    // 更新MySQL中的消息状态
    chatMessageMapper.updateStatusByMsgId(message.getMsgId(), MessageStatus.READ);

    // 更新MongoDB中的阅读状态
    Query query = Query.query(Criteria.where("msgId").is(message.getMsgId()));
    Update update = new Update()
        .push("readReceipts", new ReadReceipt(message.getSenderId(), new Date()))
        .set("status", MessageStatus.READ);
    mongoTemplate.updateFirst(query, update, ChatMessageDocument.class);

    // 通知发送方消息已读
    if (isUserOnline(message.getSenderId())) {
        realtimePush(new ChatMessage(
            MessageType.READ_RECEIPT,
            message.getMsgId(),
            message.getReceiverId(),
            message.getSenderId()
        ));
    }
}

五、性能优化方案

1. 消息批量处理

@KafkaListener(topics = "chat-messages", groupId = "message-processor")
public void processMessages(List<ConsumerRecord<String, String>> records) {
    List<ChatMessage> messages = records.stream()
        .map(record -> deserialize(record.value()))
        .collect(Collectors.toList());

    // 批量存储MySQL
    chatMessageMapper.batchInsert(messages);

    // 批量存储MongoDB
    List<ChatMessageDocument> documents = messages.stream()
        .map(this::convertToDocument)
        .collect(Collectors.toList());
    mongoTemplate.insertAll(documents);
}

2. Redis缓存优化

@Service
public class UserStatusService {

    private final RedisTemplate<String, String> redisTemplate;

    public boolean isOnline(String userId) {
        return redisTemplate.opsForValue().get("user:online:" + userId) != null;
    }

    public void setOnline(String userId, boolean online) {
        if (online) {
            redisTemplate.opsForValue().set(
                "user:online:" + userId,
                "1",
                5, TimeUnit.MINUTES);
        } else {
            redisTemplate.delete("user:online:" + userId);
        }
    }

    public List<String> getOnlineUsers(List<String> userIds) {
        return redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
            for (String userId : userIds) {
                connection.exists(("user:online:" + userId).getBytes());
            }
            return null;
        }).stream()
          .map(Object::toString)
          .collect(Collectors.toList());
    }
}

六、安全防护措施

1. OAuth2认证集成

@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
    http
        .authorizeHttpRequests(auth -> auth
            .antMatchers("/websocket/**").authenticated()
            .anyRequest().permitAll()
        )
        .oauth2ResourceServer(oauth2 -> oauth2
            .jwt(jwt -> jwt
                .decoder(jwtDecoder())
            )
        )
        .sessionManagement(session -> session
            .sessionCreationPolicy(SessionCreationPolicy.STATELESS)
        );
    return http.build();
}

@Bean
public JwtDecoder jwtDecoder() {
    return NimbusJwtDecoder.withJwkSetUri(jwkSetUri).build();
}

2. WebSocket安全拦截器

public class AuthHandshakeInterceptor implements HandshakeInterceptor {

    @Override
    public boolean beforeHandshake(ServerHttpRequest request,
                                 ServerHttpResponse response,
                                 WebSocketHandler wsHandler,
                                 Map<String, Object> attributes) {
        String token = extractToken(request);
        if (token == null) {
            return false;
        }

        try {
            Jwt jwt = jwtDecoder.decode(token);
            attributes.put("userId", jwt.getSubject());
            return true;
        } catch (JwtException e) {
            return false;
        }
    }

    private String extractToken(ServerHttpRequest request) {
        // 从请求头或参数中提取token
    }
}

七、部署与监控

1. Docker Compose配置

version: '3.8'

services:
  app:
    build: .
    ports:
      - "8080:8080"
    depends_on:
      - redis
      - mysql
      - mongodb
      - kafka

  redis:
    image: redis:6.2
    ports:
      - "6379:6379"

  mysql:
    image: mysql:8.0
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: chat_db
    ports:
      - "3306:3306"

  mongodb:
    image: mongo:5.0
    ports:
      - "27017:27017"

  kafka:
    image: bitnami/kafka:2.8
    ports:
      - "9092:9092"
    environment:
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
      ALLOW_PLAINTEXT_LISTENER: "yes"

  zookeeper:
    image: bitnami/zookeeper:3.7
    ports:
      - "2181:2181"

2. Prometheus监控配置

scrape_configs:
  - job_name: 'chat-app'
    metrics_path: '/actuator/prometheus'
    static_configs:
      - targets: ['app:8080']
        labels:
          service: 'chat-service'

  - job_name: 'redis'
    static_configs:
      - targets: ['redis:9121']

  - job_name: 'mysql'
    static_configs:
      - targets: ['mysql:9104']

  - job_name: 'kafka'
    static_configs:
      - targets: ['kafka:7071']

通过以上实现方案,我们构建了一个功能完善、性能优越且安全可靠的实时聊天系统。该系统不仅支持基本的聊天功能,还提供了消息存储、已读回执、文件传输等高级特性,同时具备良好的扩展性和可维护性。


网站公告

今日签到

点亮在社区的每一天
去签到