netty udp创建服务端+客户端

发布于:2024-07-11 ⋅ 阅读:(24) ⋅ 点赞:(0)

一.udp创建服务端

/**
 * udp 服务器 
 */
@Slf4j
@Component
public class UdpServer {
    /**
     * 创建服务端
     */
    @Async
    public void bind(int port) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioDatagramChannel.class)
                    .option(ChannelOption.SO_BROADCAST, true)
                    .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(65535))
                    .handler(new ChannelInitializer<DatagramChannel>() {
                        @Override
                        public void initChannel(DatagramChannel ch) {
                            ChannelPipeline cp = ch.pipeline();
                            cp.addLast(new ServerHandler(port));
                        }
                    });
            Channel serverChannel = b.bind(port).sync().channel();
            log.info("UdpServer start success...");
            serverChannel.closeFuture().await();
        } catch (Exception e) {
            log.error("UdpServer start fall!");
        } finally {
            group.shutdownGracefully();
        }

    }

    private class ServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {

        private int port;// 当前 端口

        public ServerHandler(int port) {
            this.port = port;
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            log.error(cause.getMessage());
            cause.printStackTrace();
        }

        /**
         * 接收消息
         */
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) {
            // 1.获取数据内容,它是一个ByteBuf
            ByteBuf content = packet.content();
 			String request = content.toString(CharsetUtil.UTF_8);

            // 2.你可以使用ByteBuf的API来读取数据
            //byte[] bytes = new byte[content.readableBytes()];
            //content.readBytes(bytes);
            //String request = new String(bytes, StandardCharsets.UTF_8);

            InetSocketAddress senderAddress = packet.sender();
            log.info("{} ---> {}:{}", senderAddress.getAddress().getHostAddress(), this.port, request);
           
        }
    }

}

二.udp创建客户端

下面展示一些 有些地方赖得改了,当是记录

@Slf4j
@Component
public class UdpClient {
  
    /**
     * 发送udp,等待对方回复
     *
     * @param ip
     * @param port
     * @param format str,hex
     * @param msg
     * @return
     */
    public String sendData(String ip, int port, String format, Object msg) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        EventLoopGroup group = new NioEventLoopGroup();
        Channel channel = null;
        Response r = new Response();
        try {
            InetSocketAddress inetSocketAddress = new InetSocketAddress(InetAddress.getByName(ip), port);
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioDatagramChannel.class)
                    .handler(new SimpleChannelInboundHandler() {
                        @Override
                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                            log.error("exceptionCaught ->" + cause.getMessage());
                            cause.printStackTrace();
                            ctx.close();
                        }

                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
                            // 处理接收到的数据
                            ByteBuf byteBuf = ((DatagramPacket)msg).content();
                            String response = null;
                            if (format.equals("str")) {
                                response = byteBuf.toString(StandardCharsets.UTF_8);
                            } else {
                                response = ByteBufUtil.hexDump(byteBuf);
                            }
                            log.info("response msg: " + response);
                            r.setMsg(response);
                            ctx.close();
                        }

                    });
            ChannelFuture future = b.bind(0).sync(); // 绑定端口0以获取随机可用端口
            channel = future.channel();
            log.info("{}  <- {}", ip, msg.toString());
            Channel finalChannel = channel;
            Future<?> executorServiceFuture = executorService.schedule(() -> {
                // 检查Channel是否仍然是活动状态
                if (finalChannel.isActive()) {
                    finalChannel.close();
                }
            }, 35, TimeUnit.SECONDS);
            ByteBuf byteBuf = null;
            if (format.equals("str")) {
                byteBuf = Unpooled.copiedBuffer(msg.toString(), CharsetUtil.UTF_8); // 将消息内容转换为ByteBuf
            } else {
                byte[] bytes = hexString2Bytes(msg.toString());// 将16进制字符串转换为字节数组
                byteBuf = Unpooled.wrappedBuffer(bytes); // 使用字节数组创建ByteBuf
            }
            DatagramPacket requestPacket = new DatagramPacket(byteBuf, inetSocketAddress);
            channel.writeAndFlush(requestPacket);// 发送
            channel.closeFuture().await();//异步等待,通道关闭后会往下执行
            executorServiceFuture.cancel(true); // 立刻中断
            return r.getMsg();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        } finally {
            executorService.shutdown(); // 清理资源
            if (channel != null) channel.close();
            group.shutdownGracefully();
        }
    }

    /**
     * 发送udp,不等待回复
     * @param ip
     * @param port
     * @param msg
     */
    public void sendDataNoReply(String ip, int port, Object msg) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioDatagramChannel.class)
                    .option(ChannelOption.SO_BROADCAST, true)
                    .handler(new SimpleChannelInboundHandler() {
                        @Override
                        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                            cause.printStackTrace();
                            ctx.close();
                        }
                        @Override
                        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
                            // 处理接收到的数据
                        }
                    });
            Channel channel = bootstrap.bind(0).sync().channel();
            // 发送数据到指定的地址和端口
            InetSocketAddress address = new InetSocketAddress(ip, port);
            ByteBuf buffer = Unpooled.copiedBuffer(msg.toString(), CharsetUtil.UTF_8);
            DatagramPacket packet = new DatagramPacket(buffer, address);
            channel.writeAndFlush(packet);

            // 等待一段时间以确保数据发送完成
            //Thread.sleep(1000);
        } catch (InterruptedException e) {
            log.error("发送udp数据失败:", e);
            throw new RuntimeException("发送数据失败或连接不上");
        } finally {
            group.shutdownGracefully();
        }
    }


    class Response {
        private String msg;

        public Response() {
        }

        public String getMsg() {
            return msg;
        }

        public void setMsg(String msg) {
            this.msg = msg;
        }
    }

    public static byte[] hexString2Bytes(String src) {
        byte[] bytes = new byte[src.length() / 2];
        for (int i = 0; i < bytes.length; i++) {
            int index = i * 2;
            int j = Integer.parseInt(src.substring(index, index + 2), 16);
            bytes[i] = (byte) j;
        }
        return bytes;
    }

}