阻塞模式
客户端代码
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
// sc.write(Charset.defaultCharset().encode("helloworld")); // 可以在debug下发数据
System.out.println("waiting...");
}
}
服务器代码
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
// 使用nio来理解阻塞模式(单线程)
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while(true) {
// 4. accept建立与客户端连接,SocketChannel用来与客户端之间通信
log.debug("connecting...");
SocketChannel sc = ssc.accept(); // 阻塞方法,没有连接建立时,线程停止运行
log.debug("connected...{}", sc);
channels.add(sc);
// 5. 接收客户端发送的数据
for(SocketChannel channel : channels) {
log.debug("before read..., {}", channel);
channel.read(buffer); // 阻塞方法,没有数据发送时,线程停止运行
buffer.flip(); // 切换为读模式
debugRead(buffer); // 读取数据
buffer.clear(); // 切换为写模式
log.debug("after read..., {}", channel);
}
}
}
}
ssc.accept() 和 channel.read(buffer) 都是阻塞方法,如果没有建立连接或者没有数据过来时,线程都会阻塞等待。
因此如果同一个客户端再发第二次数据,并不会收到第二次发送的数据,因为此时没有新的连接建立,代码已经被阻塞在ssc.accept()这里了。
非阻塞模式
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 设置ServerSocketChannel为非阻塞模式
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while(true) {
// 4. accept建立与客户端连接,SocketChannel用来与客户端之间通信
SocketChannel sc = ssc.accept(); // 非阻塞,线程还会继续运行,如果没有连接建立,sc是null
if(sc != null) {
log.debug("connected...{}", sc);
ssc.configureBlocking(false); // 设置SocketChannel为非阻塞模式
channels.add(sc);
}
// 5. 接收客户端发送的数据
for(SocketChannel channel : channels) {
int read = channel.read(buffer);// 非阻塞,线程仍然会继续运行,如果没有读到数据,read()返回0
if(read > 0) {
buffer.flip(); // 切换为读模式
debugRead(buffer); // 读取数据
buffer.clear(); // 切换为写模式
log.debug("after read..., {}", channel);
}
}
}
}
}
ssc.configureBlocking(false); // 变为非阻塞模式
非阻塞模式相当于轮询的在检查是否有新的数据、是否有新的连接,这样很消耗系统资源,一般情况下也不会使用非阻塞模式,而是使用selector
Selector(多路复用)
单线程可以配合Selector完成对多个Channel可读写事件的监控
常见的四种事件
- accept:在有连接请求时触发
- connect:客户端连接建立后触发
- read:可读事件
- write:可写事件
@Slf4j
public class Server {
private static void split(ByteBuffer source) {
source.flip();// 切换为读模式
for(int i = 0; i < source.limit(); i++) {
// 找到一条完整消息
if(source.get(i) == '\n') { // get(i)不会改变position的位置
int len = i - source.position() + 1;
// 把这条消息存入新的ByteBuffer中
ByteBuffer target = ByteBuffer.allocate(len);
// 从source读,向target中写
for(int j = 0; j < len; j++) {
target.put(source.get());// get()会改变position的位置
}
debugAll(target);
}
}
source.compact();
}
public static void main(String[] args) throws IOException {
// 1. 创建selector,管理多个channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 2. 建立selector和channel的联系(注册)
SelectionKey sscKey = ssc.register(selector, 0, null);// 事件发生后,通过它可以知道事件和哪个channel的事件(管理ServerSocketChannel)
sscKey.interestOps(SelectionKey.OP_ACCEPT); // 对哪个事件感兴趣(有四种事件)
log.debug("register key:{}", sscKey);
ssc.bind(new InetSocketAddress(8080));
while(true) {
// 3. select 方法
/*
selector():
没有事件发生 - 线程阻塞
有(感兴趣的)事件发生 - 线程会恢复运行
*/
selector.select();
// 4. 处理事件,selectedKey内部包含了所有发生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept,read
while(iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove(); // 处理key的时候,一定要从SelectionKeys集合中删除,否则下次处理就会有问题
log.debug("key:{}", key);
if (key.isAcceptable()) { // 如果是accept事件
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16);// 附件(attachment)
SelectionKey scKey = sc.register(selector, 0, buffer); // 把buffer当成scKey的附属品注册倒scKey上(和channel对应)
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
}else if(key.isReadable()) { // 如果是读事件
try {
SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
ByteBuffer buffer = (ByteBuffer) key.attachment();// 获取scKey上关联的附件
int read = channel.read(buffer);
if(read == -1) {
key.cancel(); // 如果正常断开,read返回值是-1
}else {
split(buffer);
if(buffer.position() == buffer.limit()) { // 扩容
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); // 扩容
buffer.flip(); // 切换读模式
newBuffer.put(buffer); // 把旧的buffer放入新的buffer中
key.attach(newBuffer); // 重新关联新的buffer到key
}
}
}catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因为客户端断开了,所以需要将key取消(从selector集合中删除)
}
}
}
}
}
}
如果selector已经处理过事件,那么下次再来事件时,selector就会认为上一次的事件已经处理过,就会处理新的事件。
但是如果selector没有处理该事件,selector会一直认为上一次的事件还没处理,就还会处理上一次的事件(表现:一直轮询处理上一次的事件)
如果selector不想处理这次事件,可以使用key.cancel()
取消事件
事件发生之后,要么处理,要么取消
selector会在发生事件后,向selectionKeys中加入key,但是不会删除
事件如果被处理,我们应该手动移除
处理客户端断开
- 如果客户端异常断开,此时会抛出异常,需要catch去捕获异常。
- 如果客户端正常断开,read返回值是-1,此时不会抛出异常。
try {
SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
ByteBuffer buffer = ByteBuffer.allocate(16);
int read = channel.read(buffer);
if(read == -1) {
key.cancel(); // 如果正常断开,read返回值是-1
}else {
buffer.flip();
debugRead(buffer);
}
}catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因为客户端断开了,所以需要将key取消(从selector集合中删除)
}
处理消息边界
ByteBuffer的大小分配:
- 每个channel都要记录可能被切割的消息,因为ByteBuffer不能被多个channel使用,需要每个channel都维护一个独立的ByteBuffer(附件attachment的形式)
- ByteBuffer不能太大,因此需要设计大小可变的ByteBuffer:分配一个比较小的ByteBuffer,如果数据不够,再扩容。
while(true) {
// 3. select 方法
/*
selector():
没有事件发生 - 线程阻塞
有(感兴趣的)事件发生 - 线程会恢复运行
*/
selector.select();
// 4. 处理事件,selectedKey内部包含了所有发生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept,read
while(iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove(); // 处理key的时候,一定要从SelectionKeys集合中删除,否则下次处理就会有问题
log.debug("key:{}", key);
if (key.isAcceptable()) { // 如果是accept事件
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16);// 附件(attachment)
SelectionKey scKey = sc.register(selector, 0, buffer); // 把buffer当成scKey的附属品注册倒scKey上(和channel对应)
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
}else if(key.isReadable()) { // 如果是读事件
try {
SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
ByteBuffer buffer = (ByteBuffer) key.attachment();// 获取scKey上关联的附件
int read = channel.read(buffer);
if(read == -1) {
key.cancel(); // 如果正常断开,read返回值是-1
}else {
split(buffer);
if(buffer.position() == buffer.limit()) { // 扩容
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2); // 扩容
buffer.flip(); // 切换读模式
newBuffer.put(buffer); // 把旧的buffer放入新的buffer中
key.attach(newBuffer); // 重新关联新的buffer到key
}
}
}catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因为客户端断开了,所以需要将key取消(从selector集合中删除)
}
}
}
}
服务器写入过多内容处理
基础代码
服务器:
public class WriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while(true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if(key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// 1. 向客户端发送大量数据
StringBuilder sb = new StringBuilder();
for(int i = 0; i < 30000000; ++i) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
while(buffer.hasRemaining()) {
int write = sc.write(buffer);// 返回实际写入次数
System.out.println(write);
}
}
}
}
}
}
客户端:
public class WriteClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("127.0.0.1", 8080));
// 2. 接收数据
int count = 0;
while(true) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
System.out.println(count);
}
}
}
这样虽然可以写大量的数据,但是效率并不高,因为发送端只要内容没发满,就会一直循环,相当于卡在当前的SocetChannel上。
【改进
】:发送缓冲区还没满的话,可以进行读操作;缓冲区满再写。
改进
public class WriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while(true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if(key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey sckey = sc.register(selector, 0, null);
sckey.interestOps(SelectionKey.OP_READ);
// 1. 向客户端发送大量数据
StringBuilder sb = new StringBuilder();
for(int i = 0; i < 3000000; ++i) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
int write = sc.write(buffer);// 返回实际写入次数
System.out.println(write);
if(buffer.hasRemaining()) {
// 2. 关注可写事件
sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
// 3. 把未写完的数据挂到sckey上
sckey.attach(buffer);
}
}else if(key.isWritable()) {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);// 返回实际写入次数
System.out.println(write);
// 4. 清理操作
if(!buffer.hasRemaining()) { // buffer为空
key.attach(null); // 清除buffer
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE); // 不去关注可写事件
}
}
}
}
}
}
利用多线程优化
@Slf4j
public class MultiThreadServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
// 1. 创建固定数量的worker并初始化
Worker[] workers = new Worker[2];
for(int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-" + i);
}
AtomicInteger idx = new AtomicInteger(0);
while(true) {
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while(iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected...{}", sc.getRemoteAddress());
// 2. 关联selector
log.debug("before register...{}", sc.getRemoteAddress());
// 负载均衡算法
workers[idx.getAndIncrement() % workers.length].register(sc); // 初始化selector(boss调用)
log.debug("after register...{}", sc.getRemoteAddress());
}
}
}
}
static class Worker implements Runnable {
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false; // 还未初始化
private ConcurrentLinkedDeque<Runnable> queue = new ConcurrentLinkedDeque<>(); // 线程间传递队列
public Worker(String name) {
this.name = name;
}
// 初始化线程和selector
public void register(SocketChannel sc) throws IOException {
if(!start) {
thread = new Thread(this, name);
selector = Selector.open();
thread.start();
}
// 向队列中添加任务,但是任务并没有执行
queue.add(()->{
try {
sc.register(selector, SelectionKey.OP_READ, null);
} catch (ClosedChannelException e) {
throw new RuntimeException(e);
}
});
selector.wakeup(); // 唤醒selector
}
@Override
public void run() {
while(true) {
try {
selector.select();
Runnable task = queue.poll();
if(task != null) {
task.run(); // 执行任务里的代码
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel channel = (SocketChannel) key.channel();
log.debug("read...{}", channel.getRemoteAddress());
channel.read(buffer);
buffer.flip();
debugAll(buffer);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}
NIO vs BIO
stream vs channel
- stream不会自动缓冲数据,channel会利用系统提供的发送缓冲区、接收缓冲区
- stream只支持阻塞API,channel同时支持阻塞、非阻塞API,channel可以配合selector实现多路复用
- 二者均为全双工,读写可以同时进行
IO模型
同步阻塞、同步非阻塞、多路复用(本质也是同步的)、异步阻塞、异步非阻塞
- 同步:线程自己去获取结果(一个线程)
- 异步:线程自己不去获取结果,由其他线程送结果(至少两个线程)
异步阻塞是错误的
AIO
AIO用来解决数据复制阶段的阻塞问题
@Slf4j
public class AioFileChannel {
public static void main(String[] args) {
try (AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ)) {
/*
参数1:ByteBuffer
参数2:读取的起始位置
参数3:附件
参数4:回调对象
*/
ByteBuffer buffer = ByteBuffer.allocate(16);
log.debug("read begin...");
channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override // read成功
public void completed(Integer result, ByteBuffer attachment) {
log.debug("read completed...");
attachment.flip();
debugAll(attachment);
}
@Override // read失败
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
log.debug("read end...");
} catch (IOException e) {
e.printStackTrace();
}
}
}