02-netty基础-java四种IO模型

发布于:2025-07-23 ⋅ 阅读:(14) ⋅ 点赞:(0)

1 阻塞IO(Blocking IO)

1.1 工作机制

从应用程序发起调用到内核空间准备好数据、拷贝数据到用户空间,然后将数据返回给应用程序,这期间应用程序这块都是阻塞的,无法响应其他请求。

  • 工作机制:在进行 IO 操作时(如读取数据),线程会被挂起,进入等待状态,直到数据准备好并读取完成后才会继续执行后续代码。
  • 特点:实现简单,但在等待期间线程无法处理其他任务,导致资源浪费,适用于连接数少且 IO 操作耗时短的场景。
  • 示例场景:传统的 Java IO 操作,如InputStream.read()方法调用时,如果没有数据可读,线程会一直阻塞。

socket交互的流程可以查看上一篇文章: 01-netty基础-socket-CSDN博客

1.2 代码实现 

1.2.1 服务端代码

1.2.1.1 方式一单线程

处理完一个客户端请求,然后在处理下一个客户端请求

package com.bonnie.bio;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * 阻塞io服务端
 * 当一个客户端连接上来后,未处理完成,那么其他客户端是无法连接上来的;
 * 相当于串行执行,前一个执行完成才能轮到下一个执行
 */
public class BlockingServer {

    public static void main(String[] args) throws IOException {
        // 第一步:首先通过ServerSocket来监听端口,我们知道,每个进程都有一个唯一的端口
        ServerSocket serverSocket = new ServerSocket(8080);
        while (true) {
            try {
                // 通过accept方法阻塞调用,直到有客户端的连接过来,就会返回Socket
                Socket socket = serverSocket.accept();
                // 获取socket的输入流
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                int port = socket.getPort();
                System.out.println("客户端的端口号:"+ port);
                // 获取客户端的数据,这个地方是一个阻塞的io,阻塞到直到数据读取完成
                String cliStr = bufferedReader.readLine();
                System.out.println("收到客户端的数据:"+ cliStr);

                // 获取socket的输出流
                BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                // 给客户端会写数据  这块结尾一定要使用\n,结束标志
                bufferedWriter.write("ok\n");
                // 刷新
                bufferedWriter.flush();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}
1.2.1.2 方式二线程池

来一个客户端的请求,开启一个新线程,从而可以达到同时处理多个请求;
因为accept方法会阻塞等待客户端的连接,导致一个线程只能处理一个连接;如果想要处理多个连接,就要使用线程池来处理连接,但是这个是非常消耗线程的,线程是非常宝贵的资源,除非是机器性能很好,一般不建议采用

package com.bonnie.bio;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 阻塞io服务端
 * 当一个客户端连接上来后,未处理完成,那么其他客户端是无法连接上来的;
 * 使用多线程,将接收到的客户端请求放入都到线程池中,进而看到多个客户端可以同时连接和处理的现象
 */
public class ThreadBlockingServer {

    static ExecutorService executorService = Executors.newCachedThreadPool();

    public static void main(String[] args) throws IOException {
        // 第一步:首先通过ServerSocket来监听端口,我们知道,每个进程都有一个唯一的端口
        ServerSocket serverSocket = new ServerSocket(8080);
        while (true) {
            // 通过accept方法阻塞调用,直到有客户端的连接过来,就会返回Socket
            Socket socket = serverSocket.accept();
            // 接收到客户端的请求,将请求放到线程池中,一个客户端一个线程,【创建线程消耗资源消耗时间、线程资源也比较珍贵】
            executorService.execute(()-> {
                try {
                    // 获取socket的输入流
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                    int port = socket.getPort();
                    System.out.println("客户端的端口号:"+ port);
                    // 获取客户端的数据,这个地方是一个阻塞的io,阻塞到直到数据读取完成
                    String cliStr = bufferedReader.readLine();
                    System.out.println("收到客户端的数据:"+ cliStr);

                    // 获取socket的输出流
                    BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
                    // 给客户端会写数据
                    bufferedWriter.write("ok\n");
                    // 刷新
                    bufferedWriter.flush();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    }

}

12.2 客户端代码

package com.bonnie.bio;

import java.io.*;
import java.net.Socket;

/**
 * 阻塞io客户端
 */
public class BlockingClient {

    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("127.0.0.1", 8080);

        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
        // 这块结尾一定要使用\n,结束标志
        bufferedWriter.write("你好我是客户端 \n");
        bufferedWriter.flush();

        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
        String s = bufferedReader.readLine();
        System.out.println("服务端写回的数据: " + s);
    }

}

1.2.3 码云位置

git地址: https://gitee.com/huyanqiu6666/netty.git    分支: 250721-io

2 非阻塞IO(No Blocking IO)

2.1 工作机制

阻塞IO:发起系统调用后,直到内核有数据才会返回数据,在这个期间,线程一直阻塞。
非阻塞IO:发起系统调用后,无论内核中数据是否准备好,都不再阻塞应用线程,而是反复轮询直到数据准备好。下图就是描述了非阻塞IO的流程

  • 工作机制:线程在发起 IO 操作后会立即返回一个状态值(如-1表示数据未准备好),线程不会被阻塞,可以继续执行其他任务。之后线程需要不断轮询检查 IO 操作的状态,直到数据准备好。
  • 特点:线程在等待期间可以处理其他任务,提高了资源利用率,但频繁的轮询会消耗 CPU 资源。
  • 示例场景:在 Java 中,可以通过设置socket.setSoTimeout(1000)将 Socket 设置为非阻塞模式,然后循环调用read()方法检查数据是否就绪

2.2 代码实现

2.2.1 服务端代码

package com.bonnie.noblocking;


import org.apache.commons.compress.utils.Lists;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.List;

/**
 * 非阻塞IO: 一个线程可以处理多个连接
 * 定时轮询:询问客户端是否有数据进来,每次都要询问,消耗时间,消耗资源,
 */
public class NoBlockingServer {

    static List<SocketChannel> clients = Lists.newArrayList();

    public static void main(String[] args) throws IOException {
        // 得到一个serverSocketChannel管道,这个就等同于serverSocket,只不过这个是支持异步并且可以同时读写
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 我们想要socket为非阻塞,通过设置该值为false就是为非阻塞
        serverSocketChannel.configureBlocking(Boolean.FALSE);
        // 绑定端口
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        while (true) {
            try {
                // 接收客户端的请求,调用accept,由于设置成非阻塞了,所以accept将不会阻塞在这里等客户端的连接过来
                SocketChannel socketChannel = serverSocketChannel.accept();
                if (socketChannel != null) {
                    // 同时也设置socketChannel为非阻塞,因为原来我们读取数据read方法也是阻塞的
                    socketChannel.configureBlocking(Boolean.FALSE);
                    clients.add(socketChannel);
                    System.out.println("客户端端口:" + socketChannel.socket().getPort());
                } else {
                    Thread.sleep(3 * 1000);
                    System.out.println("没有连接,请等待!!!");
                }
                // 主线程处理多个客户端的连接  假设有10个客户端
                for (SocketChannel client : clients) {
                    // channel中的数据都是先读取到buffer中,也都先写入到buffer中,所以定义一个ByteBuffer
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    // 数据读取到缓冲区,由于上面设置了非阻塞,此时的read将不会阻塞
                    // 一直循环调用read,看是否有数据存在===> 调用10次read===>就是一次系统调用,10次系统调用, 消耗时间消耗资源
                    int num = client.read(byteBuffer);
                    if (num>0) {
                        System.out.println("收到客户端数据:" + new String(byteBuffer.array(), StandardCharsets.UTF_8));
                        socketChannel.write(ByteBuffer.wrap("你好我是服务端\n".getBytes(StandardCharsets.UTF_8)));
                    } else {
                        System.out.println("等待客户端写数据!!!");
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

2.2.2 客户端代码

package com.bonnie.noblocking;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;

public class NoBlockingClient {

    public static void main(String[] args) throws IOException, InterruptedException {
        SocketChannel socketChannel = SocketChannel.open();
        // 连接服务器
        socketChannel.connect(new InetSocketAddress("127.0.0.1",8080));
        // 发送消息到服务端
        socketChannel.write(ByteBuffer.wrap("你好我是客户端\n".getBytes(StandardCharsets.UTF_8)));

        // 接收服务端消息
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        int num = socketChannel.read(byteBuffer);
        if (num>0) {
            // 设置读取到末尾,并且重置位置
            byteBuffer.flip();
            System.out.println("服务端写回的数据: " + new String(byteBuffer.array(), StandardCharsets.UTF_8));
        }
    }

}

2.3 码云位置

git地址: https://gitee.com/huyanqiu6666/netty.git    分支: 250721-io

2.4 存在的问题

如下图:多个客户端访问服务端,就看到一个服务端的一个线程可以同时处理多个请求,,由于是非阻塞的,所以每个客户端都会去调用read,看数据是否准备好,进而会导致很多无用的系统调用,非常的浪费资源;如果有1W个客户端只有1个客户端准备好了,资源会造成极大的浪费。

 2.5 如何解决

可以采用如下图的方式,使用多路复用器,这种方式可以监听到有数据到来的IO,然后触发下一个请求;由原来的轮询所有找出有数据的IO,变成了只监听有数据的IO,性能得到了大大的提升。

多路复用(Multiplexing)是一种让单个实体能同时管理多个资源的技术方案。在 IO 编程的范畴内,多路复用指的是由单个线程借助 Selector(选择器)来监管多个 IO 通道(像网络连接这类),一旦某个通道有 IO 事件(例如数据可读)发生,就能及时对其进行处理。

工作原理

多路复用的运行机制主要包含以下几个步骤:

 
  1. 注册通道:把所有需要监控的 IO 通道都注册到 Selector 上,并且为每个通道指定想要监控的事件类型,比如读事件或者写事件。
  2. 阻塞等待:Selector 会进入阻塞状态,一直等到至少有一个注册的通道发生了 IO 事件。
  3. 事件分发:当有 IO 事件出现时,Selector 会返回发生事件的通道集合,随后线程会对这些事件进行处理。

应用场景

多路复用技术在以下场景中尤为适用:

 
  • 高并发连接:在需要处理大量并发连接的场景下,比如聊天服务器、Web 服务器等,多路复用技术能够充分发挥其优势。
  • 连接活跃度低:当大量连接处于空闲状态,只是偶尔有 IO 操作时,多路复用技术可以高效地管理这些连接。
  • 资源受限环境:在系统资源有限的情况下,无法为每个连接都分配一个独立的线程,此时多路复用技术就成为了理想的选择。

3 NIO(New IO)

  • 工作机制:基于 Selector(选择器)和 Channel(通道)实现。多个 Channel 可以注册到一个 Selector 上,Selector 会不断轮询这些 Channel,当某个 Channel 有数据就绪时,会通知线程进行处理。
  • 特点:单线程可以处理多个连接,避免了频繁创建和销毁线程的开销,适用于连接数多但 IO 操作轻量的场景(如聊天服务器)。
  • 示例场景:Java NIO 包中的SelectorSocketChannelServerSocketChannel的组合使用。

3.1 工作机制

3.2 代码实现

3.2.1 服务端代码

package com.bonnie.newio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * 多路复用
 */
public class NewIoServer {

    static Selector selector;

    public static void main(String[] args) throws IOException {
        // 得到一个多路复用器
        selector = Selector.open();
        // 获取一个管道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 设置为非阻塞
        serverSocketChannel.configureBlocking(Boolean.FALSE);
        serverSocketChannel.socket().bind(new InetSocketAddress(8080));
        /**
         * 把连接事件注册到多路复用器上,通过注册不同事件处理不同的任务,
         * 把serverSocketChannel注册到selector上,主要是当连接到来的时候,
         * 由于一个Accpet事件
         */
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // 该方法阻塞,只有当有事件到来时就不会阻塞了  === 底层:多路复用
            selector.select();
            // 获取所有事件,事件都被封装成SelectionKey
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                // 获取到相应的事件key
                SelectionKey key = iterator.next();
                // 拿到后要删除,防止再次调用
                iterator.remove();
                // 连接事件
                if (key.isAcceptable()) {
                    handleAccept(key);
                }
                // 读的就绪事件
                else if (key.isReadable()) {
                    handlesRead(key);
                }
            }
        }
    }

    private static void handleAccept(SelectionKey selectionKey) throws IOException {
        // 从selector中获取serverSocketChannel,因为当初把serverSocketChannel注册到selector上,并且注册的accept事件
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
        // 能到这里,一定有客户端连接过来,所以一定会连接
        SocketChannel socketChannel = serverSocketChannel.accept();
        // 设置为非阻塞
        socketChannel.configureBlocking(Boolean.FALSE);
        // 给客户端会写数据
        socketChannel.write(ByteBuffer.wrap("hello client. newio Server".getBytes()));
        // 注册read事件,等while循环再次获取read事件,然后读取socketChannel中的数据
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

    private static void handlesRead(SelectionKey selectionKey) throws IOException {
        // 从selector中获取serverSocketChannel
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        socketChannel.read(byteBuffer);
        System.out.println("server receive msg:"+new String(byteBuffer.array()));
    }


}

3.2.1 客户端代码

package com.bonnie.newio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * 多路复用
 */
public class NewIoClient {

    static Selector selector;

    public static void main(String[] args) throws IOException {
        selector = Selector.open();
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(Boolean.FALSE);
        socketChannel.connect(new InetSocketAddress("localhost", 8080));
        // 连接事件
        socketChannel.register(selector, SelectionKey.OP_CONNECT);
        while (true) {
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                iterator.remove();
                // 连接事件
                if (selectionKey.isConnectable()) {
                    handleConnect(selectionKey);
                }
                // 读的就绪事件
                else if (selectionKey.isReadable()) {
                    handleReadable(selectionKey);
                }
            }
        }
    }

    private static void handleConnect(SelectionKey selectionKey) throws IOException {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        // 是否完成了连接,没有则建立连接
        if (socketChannel.isConnectionPending()) {
            // 建立连接
            socketChannel.finishConnect();
        }
        // 设置为非阻塞
        socketChannel.configureBlocking(Boolean.FALSE);
        // 给服务端写数据
        socketChannel.write(ByteBuffer.wrap("hello server. I am newio client".getBytes()));
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

    private static void handleReadable(SelectionKey selectionKey) throws IOException {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        socketChannel.read(byteBuffer);
        System.out.println("client receive msg:"+new String(byteBuffer.array()));
    }


}

3.3 码云位置

git地址: https://gitee.com/huyanqiu6666/netty.git    分支: 250721-io

4 AIO

无论是否准备好数据,都直接返回,然后可以执行其他的任务,当数据准备完毕后,主动推送数据到应用程序。

  • 工作机制:基于事件和回调机制。当发起 IO 操作时,线程会继续执行后续代码,IO 操作完成后会通过回调函数通知线程处理结果。
  • 特点:真正的异步 IO,线程不需要关注 IO 操作的过程,只需处理结果,效率最高,适用于连接数多且 IO 操作耗时长的场景(如文件传输)。
  • 示例场景:Java 7 引入的AsynchronousFileChannelAsynchronousSocketChannel

3.2 代码实现

3.2.1 服务端代码

package com.bonnie.aio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

/**
 * 异步IO-服务端
 */
public class AIOServer {

    public static void main(String[] args) throws Exception {
        // 创建一个serverChannel并绑定8080端口
        AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open()
                .bind(new InetSocketAddress(8080));
        serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
                try {
                    // 打印线程的名字
                    System.out.println("2--"+ Thread.currentThread().getName());
                    System.out.println(socketChannel.getRemoteAddress());
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    // socketChannel异步的读取数据到buffer中
                    socketChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                        @Override
                        public void completed(Integer result, ByteBuffer buffer) {
                            // 打印线程的名字
                            System.out.println("3--"+ Thread.currentThread().getName());
                            buffer.flip();
                            System.out.println(new String(buffer.array(), 0, result));
                            socketChannel.write(ByteBuffer.wrap("helloClient".getBytes()));
                        }

                        @Override
                        public void failed(Throwable exc, ByteBuffer attachment) {
                            exc.printStackTrace();;
                        }
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                exc.printStackTrace();
            }
        });

        System.out.println("1--"+ Thread.currentThread().getName());
        Thread.sleep(Integer.MAX_VALUE);
    }

}

3.2.1 客户端代码

package com.bonnie.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AIOClient {

    private final AsynchronousSocketChannel client;

    public AIOClient() throws IOException {
        client = AsynchronousSocketChannel.open();
    }

    public static void main(String[] args) throws Exception{
        new AIOClient().connect("localhost", 8080);
    }

    private void connect(String host, int port) {
        // 客户端向服务端发起连接
        client.connect(new InetSocketAddress(host, port), null, new CompletionHandler<Void, Object>() {
            @Override
            public void completed(Void result, Object attachment) {
                try {
                    client.write(ByteBuffer.wrap("这是一条测试数据".getBytes())).get();
                    System.out.println("已发送到服务端");
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                exc.printStackTrace();
            }
        });

        final ByteBuffer bb = ByteBuffer.allocate(1024);
        // 客户端接收服务端的数据,获取的数据写入到bb中
        client.read(bb, null, new CompletionHandler<Integer, Object>() {
            @Override
            public void completed(Integer result, Object attachment) {
                // 服务端返回数据的长度result
                System.out.println("I/O操作完成:"+result);
                System.out.println("获取反馈:"+ new String(bb.array()));
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                exc.printStackTrace();
            }
        });

        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}

3.3 码云位置

git地址: https://gitee.com/huyanqiu6666/netty.git    分支: 250721-io


网站公告

今日签到

点亮在社区的每一天
去签到