WebSocket集成方案对比

发布于:2025-05-12 ⋅ 阅读:(9) ⋅ 点赞:(0)

WebSocket集成方案对比与实战

架构选型全景图

JavaEE标准
Spring生态
响应式编程
轻量级库
多协议支持
高性能NIO
WebSocket实现方案
技术栈
Javax API
Spring WebMVC
Spring WebFlux
Java-WebSocket
Socket.IO
Netty

一、Javax原生WebSocket API

核心实现代码

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

@ServerEndpoint("/ws/javax")
public class JavaxWebSocketEndpoint {
    private static final Set<Session> sessions = new CopyOnWriteArraySet<>();

    @OnOpen
    public void onOpen(Session session) {
        sessions.add(session);
        System.out.println("New connection: " + session.getId());
    }

    @OnMessage
    public void onMessage(String message, Session sender) {
        sessions.parallelStream()
            .filter(Session::isOpen)
            .forEach(session -> {
                try {
                    session.getBasicRemote().sendText("Echo: " + message);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
    }

    @OnClose
    public void onClose(Session session) {
        sessions.remove(session);
        System.out.println("Connection closed: " + session.getId());
    }
}


技术特点
✅ 原生JavaEE标准支持(JSR-356)
✅ 无需额外依赖
⚠️ 需手动处理线程安全
⚠️ 不支持协议自动升级

二、Spring WebMVC集成方案

Maven依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

配置与实现

@Configuration
@EnableWebSocket
public class WebMvcWebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(webSocketHandler(), "/ws/spring")
                .addInterceptors(new HttpSessionHandshakeInterceptor())
                .setAllowedOrigins("*");
    }

    @Bean
    public WebSocketHandler webSocketHandler() {
        return new TextWebSocketHandler() {
            private final List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();

            @Override
            public void afterConnectionEstablished(WebSocketSession session) {
                sessions.add(session);
            }

            @Override
            protected void handleTextMessage(WebSocketSession session, TextMessage message) {
                sessions.forEach(s -> {
                    try {
                        s.sendMessage(new TextMessage("Processed: " + message.getPayload()));
                    } catch (IOException e) {
                        // 异常处理
                    }
                });
            }
        };
    }
}

进阶特性

  • 消息转换器(JSON/Protobuf)
  • STOMP子协议支持
  • 与Spring Security集成

三、Spring WebFlux响应式方案

响应式端点

@Configuration
@Slf4j
public class BusinessWebSocketConfig {

    // 自定义业务处理器
    @Component
    public static class BusinessProcessor {
        private final ReactiveRedisTemplate<String, String> redisTemplate;

        public BusinessProcessor(ReactiveRedisTemplate<String, String> redisTemplate) {
            this.redisTemplate = redisTemplate;
        }

        // 示例业务处理:消息校验+存储Redis+生成响应
        public Mono<String> processMessage(WebSocketMessage message) {
            String payload = message.getPayloadAsText();
            
            return Mono.just(payload)
                .filter(msg -> !msg.isBlank())        // 空消息过滤
                .switchIfEmpty(Mono.error(new IllegalArgumentException("空消息")))
                .flatMap(msg -> 
                    redisTemplate.opsForList().leftPush("ws:message:queue", msg)  // 存储到Redis队列
                        .thenReturn("ACK: " + msg)    // 生成响应消息
                )
                .timeout(Duration.ofSeconds(2))       // 超时控制
                .onErrorResume(ex -> {
                    log.error("处理失败: {}", ex.getMessage());
                    return Mono.just("ERROR: " + ex.getMessage());
                });
        }
    }

    @Bean
    public HandlerMapping handlerMapping(BusinessProcessor processor) {
        Map<String, WebSocketHandler> handlers = new HashMap<>();
        
        handlers.put("/ws/business", session -> {
            // 输入流背压配置
            Flux<WebSocketMessage> inputStream = session.receive()
                .onBackpressureBuffer(2000, 
                    BufferOverflowStrategy.DROP_OLDEST)
                .doOnNext(msg -> 
                    Metrics.counter("websocket.receive.count").increment())
                .publishOn(Schedulers.boundedElastic());  // 切换到弹性线程池

            // 业务处理管道
            return session.send(
                inputStream
                    .delayElements(Duration.ofMillis(50)) // 流速控制
                    .concatMap(processor::processMessage) // 业务处理(保证顺序)
                    .map(resp -> session.textMessage(resp))
                    .doOnError(ex -> log.error("发送异常", ex))
                    .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
            );
        });

        return new SimpleUrlHandlerMapping(handlers, -1);
    }

}

Client WebSocketSession BusinessProcessor Redis 发送消息 消息入队(背压缓冲) 空消息过滤 存储消息到队列 返回存储结果 生成响应消息 返回处理结果 返回错误信息 alt [处理异常] Client WebSocketSession BusinessProcessor Redis

选择策略建议

  • 实时聊天系统‌:采用DROP_OLDEST策略+500ms延迟均衡体验
  • 金融交易系统‌:使用ERROR策略+重试队列保证数据完整性
  • 物联网数据采集‌:结合publishOn与delayElements实现阶梯式降速

四、Java-WebSocket独立库

服务端实现

import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;

public class JavaWebSocketServer extends WebSocketServer {
    
    public JavaWebSocketServer(int port) {
        super(new InetSocketAddress(port));
    }

    @Override
    public void onOpen(WebSocket conn, ClientHandshake handshake) {
        System.out.println("New client: " + conn.getRemoteSocketAddress());
    }

    @Override
    public void onMessage(WebSocket conn, String message) {
        broadcast("Broadcast: " + message);
    }

    public static void main(String[] args) {
        new JavaWebSocketServer(9001).run();
    }
}

客户端连接

const ws = new WebSocket('ws://localhost:9001');
ws.onmessage = (event) => console.log('Received:', event.data);

五、Socket.IO集成方案

服务端配置(基于Netty)

@Configuration
public class SocketIOConfig {

    @Bean
    public SocketIOServer socketIOServer() {
        Configuration config = new Configuration();
        config.setHostname("localhost");
        config.setPort(9092);
        
        SocketIOServer server = new SocketIOServer(config);
        
        server.addConnectListener(client -> {
            client.sendEvent("welcome", "Connected to Socket.IO");
        });
        
        server.addEventListener("chat", String.class, 
            (client, data, ack) -> server.getBroadcastOperations().sendEvent("message", data));
        
        return server;
    }
}

客户端适配

import { io } from "socket.io-client";

const socket = io("http://localhost:9092");
socket.on("welcome", data => console.log(data));
socket.emit("chat", "Hello Socket.IO");

六、Netty原生实现

完整服务端代码

public class NettyWebSocketServer {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            new ServerBootstrap()
                .group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new HttpServerCodec());
                        pipeline.addLast(new ChunkedWriteHandler());
                        pipeline.addLast(new HttpObjectAggregator(8192));
                        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
                        pipeline.addLast(new TextWebSocketFrameHandler());
                    }
                })
                .bind(8080).sync().channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private static class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
            ctx.writeAndFlush(new TextWebSocketFrame("NETTY: " + msg.text()));
        }
    }
}

技术方案对比矩阵

特性 Javax WebMVC WebFlux Java-WebSocket Socket.IO Netty
协议支持 WS WS/STOMP RSocket WS WS+Polling 自定义
最大连接数 1万 5万 10万+ 3万 5万 100万+
内存消耗 极低
学习曲线 简单 中等 较高 简单 中等 陡峭
集群支持 需扩展 需扩展 原生支持 需扩展 需扩展 需扩展
生产就绪度 ☆☆ ☆☆☆☆ ☆☆☆☆ ☆☆☆ ☆☆☆☆ ☆☆☆☆☆

最佳实践指南

  • 中小型项目‌:优先选择Spring WebMVC方案
  • 高并发场景‌:WebFlux或Netty方案
  • 多协议需求‌:Socket.IO支持降级通信
  • 资源受限环境‌:Java-WebSocket轻量级方案
  • 需要精细控制‌:直接使用Netty底层API

通过本文您可以快速掌握不同场景下的WebSocket技术选型,建议结合实际业务需求进行性能测试后确定最终方案。


网站公告

今日签到

点亮在社区的每一天
去签到