1. Mina简介
Apache MINA(Multipurpose Infrastructure for Network Applications) 是 Apache 组织一个较新的项目,它是一个网络通信应用框架,为开发高性能和高可用性的网络应用程序提供了非常便利的框架。
也就是说,它主要是对基于TCP/IP、UDP/IP协议栈的通信框架(当然,也可以提供JAVA 对象的序列化服务、虚拟机管道通信服务等)。
Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步IO 默认使用的是JAVA NIO 作为底层支持)操作的编程模型。
Mina 主要有1.x 和2.x 两个分支,这里我们讲解最新版本2.0,如果你使用的是Mina 1.x,那么可能会有一些功能并不适用。学习Mina,需要你已掌握JAVA IO、JAVA NIO、JAVASocket、JAVA 线程及并发库(java.util.concurrent.*)的知识。
Mina 同时提供了网络通信的Server 端、Client 端的封装。无论是哪端,Mina 在整个网通通信结构中都处于如下的位置:可见Mina 的API 将真正的网络通信与我们的应用程序隔离开来,你只需要关心你要发送、接收的数据以及你的业务逻辑即可。
同样的,无论是哪端,Mina 的执行流程如下所示:
2. Mina总体框架
基于MINA框架的应用程序架构应该是这样的:
2.1 Mina的核心部分架构
Mina的底层是基于JAVA的NIO 1.0实现的,其核心部分架构是这样的:
2.2 基于Mina的应用程序
基于Mina的应用程序分为3层:
- I/O Service - 执行实际的I / O,可以选择现成的Services如 (*Acceptor),也可以自己写。
- I/O Filter Chain - 这是一个由多个过滤器组成的过滤器链,在这个环节,将字节数据转换到特定的数据结构中(Filters/Transforms bytes into desired Data Structures and vice-versa)
- I/O Handler - 完成实际的业务逻辑部分
2.3 Mina工作流程图
IO Service:用于描述我们的客户端和服务端接口,其子类是IoConnector和IoAcceptor
IOprocessor:多线程环境来处理我们的连接请求流程
IOFilter:提供数据的过滤工作,包括编解码、日志等信息的过滤。
IOHandler:完成实际的业务对象,自定义handler需要实现IoHandlerAcceptor
IOSession:是客户端和服务端连接的描述,常常用于客户接受和发送数据。
3. 基于MINA框架的Server端应用
对socket通信来说,使用比较广泛的是基于Server端的应用,尤其是并发规模达到一定程度后,颇具挑战性。
那么我们如何创建一个基于Mina的应用程序呢?
- Create I/O service:从现有的Service(*Accept)中选择一个或者创建自己的
- Create I/O Filter Chain:从现有的Filter中选择或创建一个传输request/response的自定义Filter
- Create I/O Handler:编写业务逻辑,处理不同的报文
那么我们来看一下,基于MINA框架的Server端应用:
- IOAcceptor 监听指定的端口,处理新的网络连接;一旦一个新的连接到达后,IOAcceptor 就产生一个session,后续所有从这个IP和端口发送过来的请求就将通过这个Session被处理。
- Session创建后,后续所有的数据包都被人到过滤器链中,通过过滤器将原始的字节码转变成高层的对象,这个环节PacketEncoder/Decoder就十分有用。
- 最后数据包或对象被传送给Handler做业务逻辑处理;
3.1 前置条件
在开始进入 MINA 的应用程序开发的大门之前,我们需要具备下面的必要条件:
- MINA 2.x 的核心包
- JDK 1.5 或更高版本
- SLF4J 1.3.0 或更高版本
- Log4J 1.2 的用户:slf4j-api.jar, slf4j-log4j12.jar, and Log4J 1.2.x
- Log4J 1.3 的用户:slf4j-api.jar, slf4j-log4j13.jar, and Log4J 1.3.x
- java.util.logging 的用户:slf4j-api.jar and slf4j-jdk14.jar
注意:
请务必确认你所使用的 slf4j-*.jar 要与你的日志框架相匹配。
例如,slf4j-log4j12.jar 和log4j-1.3.x.jar 不能在一起使用,否则会引起混乱。
3.2 举个栗子
pom.xml中Mina的依赖配置:
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
<version>2.2.1</version>
</dependency>
定义一个Mina的服务端:MinaServer.java
package com.hl.magic.mina.mina1;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.LineDelimiter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
/**
* Mina入门级客户端程序
*
* 用例:
* 通过Mina创建一个服务端程序,用于监听客户端的连接,并接受客户端发来的数据。
* 当数据是exit时,那么服务端关闭,客户端通过在CMD中执行 “telnet 127.0.0.1 54321” 来模拟收发数据
*
*
* @Author HL
* @Date 2021/10/10 21:00
*/
public class MinaServer {
private static final Logger LOGGER = LoggerFactory.getLogger(MinaServer.class);
/**
* 自定义端口
*/
private static final int PORT = 54321;
public boolean receive(){
try {
//1-NioSocketAcceptor
IoAcceptor acceptor = new NioSocketAcceptor();
//设置日志属性,记录MINA 协议事件。
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
//2-设置编码过滤器(需要自定义过滤器)
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(
new TextLineCodecFactory(
StandardCharsets.UTF_8,
LineDelimiter.WINDOWS.getValue(),
LineDelimiter.WINDOWS.getValue())));
//3-设置session的一些属性
//设置 IO 处理器每次读取时分配的读取缓冲区的大小
acceptor.getSessionConfig().setReadBufferSize(1024);
//以秒为单位设置指定类型空闲的空闲时间
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
//设置自定义适配器,将处理此服务管理的所有连接的处理程序。
acceptor.setHandler(new MyServerHandler());
//4-绑定端口号
acceptor.bind(new InetSocketAddress(PORT));
LOGGER.debug("Server: [{}]", PORT);
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
public static void main(String[] args) {
boolean receive = new MinaServer().receive();
LOGGER.debug("Server: [{}]", receive);
}
}
- 创建IoAcceptor;
- 加入日志记录和解码的过滤器,其中日志过滤器用SL4J库记录信息,而编码过滤器则解码所有收到的信息。使用 new TextLineCodecFactory() 发送的信息迕行编码,返是MINA自带的,功能有限,只能处理文本戒者String类型。
- 设置ServerHandler,这里是一个自定义的Handler:TimeServerHandler;
- 设置Session的对应的I/O processor 读缓存区大小2048;通常这个参数不需要设置;
- 设置空闲时间,这里的BOTH_IDLE指EADER_IDLE 和 WRITER_IDLE. 都为10秒;
- 绑定监听端口54321;
定义服务端处理器:MyServerHandler.java:
package com.hl.magic.mina.mina1;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
/**
* 自定义Mina服务端会话适配器
*
* @Author HL
* @Date 2021/10/10 21:06
*/
public class MyServerHandler extends IoHandlerAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(MyServerHandler.class);
/**
* 创建会话
*
* @param session 会话对象
*/
@Override
public void sessionCreated(IoSession session) throws Exception {
LOGGER.debug("sessionCreated");
}
/**
* 打开会话
*
* @param session 会话对象
*/
@Override
public void sessionOpened(IoSession session) throws Exception {
LOGGER.debug("sessionOpened");
}
/**
* 关闭会话
*
* @param session 会话对象
*/
@Override
public void sessionClosed(IoSession session) throws Exception {
LOGGER.debug("sessionClosed");
}
/**
* 会话空闲
*
* @param session 会话对象
* @param status 会话状态
*/
@Override
public void sessionIdle(IoSession session, IdleStatus status) {
LOGGER.debug("IDLE: [{}]", session.getIdleCount(status));
}
/**
* 连接异常时
*
* @param session 会话对象
*/
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
LOGGER.debug("exceptionCaught");
}
/**
* 接收消息
*
* @param session 会话对象
* @param message 消息体
*/
@Override
public void messageReceived(IoSession session, Object message) {
String msg = (String) message;
LOGGER.debug("服务端接收到数据: [{}]", msg);
if (msg.trim().equalsIgnoreCase("quit")) {
session.closeNow();
return;
}
Date date = new Date();
session.write(date);
}
/**
* 发生消息
*
* @param session 会话对象
* @param message 消息体
*/
@Override
public void messageSent(IoSession session, Object message) {
LOGGER.debug("messageSent...");
}
}
这里主要有一下几个主要的方法:
messageReceived(…),对接收到的消息(已经解码)迕行下一步处理,这里对收到的字符串进行判断,如果是”quit”则断开连接;否则输出当前时间的字符串格式;
exceptionCaught(…),自定义异常处理, 要不然异常会被“吃掉”;
sessionIdle,当Session处于IDLE状态的时候,输出空闲状态次数;
测试结果:
测试,运行CMD,在窗口输入:telnet 127.0.0.1 54321
然后在CMD窗口上随便输入一串字符串,显示当前的时间:
4. 自定义Mina客户端
自定义一个Mina的客户端,然后向上一节的Mina服务端发送数据。
Mina客户端类:MinaClient.java
package com.hl.magic.mina.mina1;
import org.apache.mina.core.future.CloseFuture;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.LineDelimiter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
/**
* Mina客户端
* <p>
* 用例:实现MinaServer的telnet程序
*
* @Author HL
* @Date 2021/10/10 21:00
*/
public class MinaClient {
private static final Logger LOGGER = LoggerFactory.getLogger(MinaClient.class);
private static final String HOST = "127.0.0.1";
private static final int PORT = 54321;
/**
* IO会话连接器
*/
private IoConnector connector;
/**
* IO连接会话
*/
private IoSession session;
/**
* 与Mina服务端建立连接
*
* @return 连接结果
*/
public boolean connect() {
try {
//1-NioSocketAcceptor( IO连接器)
connector = new NioSocketConnector();
connector.setDefaultRemoteAddress(new InetSocketAddress(HOST,PORT));
//2-设置超时时间
connector.setConnectTimeoutMillis(30000);
//3-设置日志属性,记录MINA 协议事件。
connector.getFilterChain().addLast("logger", new LoggingFilter());
//4-设置编/解码过滤器(需要自定义过滤器)
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(
new TextLineCodecFactory(StandardCharsets.UTF_8, LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())));
//5-设置客户端处理器
connector.setHandler(new MyClientHandler());
//4-绑定端口号
ConnectFuture connectFuture = connector.connect(new InetSocketAddress(HOST, PORT));
//等待连接
connectFuture.awaitUninterruptibly();
//session连接会话
session = connectFuture.getSession();
session.write("Hello, Mina Server");
//等待关闭连接
CloseFuture closeFuture = session.getCloseFuture().awaitUninterruptibly();
boolean closed = closeFuture.isClosed();
if (closed) {
session.closeNow();
connector.dispose();
}
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
public static void main(String[] args) {
boolean connect = new MinaClient().connect();
LOGGER.debug("Connect: [{}]", connect);
}
}
自定义Mina客户端的会话处理器:MyClientHandler.java
package com.hl.magic.mina.mina1;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 自定义Mina客户端会话适配器
*
* @Author HL
* @Date 2022/8/2 16:21
*/
public class MyClientHandler extends IoHandlerAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(MyClientHandler.class);
/**
* 连接异常时
*
* @param session 会话对象
*/
@Override
public void exceptionCaught(IoSession session, Throwable cause) {
LOGGER.debug("exceptionCaught");
}
/**
* 接收消息
*
* @param session 会话对象
* @param message 消息体
*/
@Override
public void messageReceived(IoSession session, Object message) {
String msg = (String) message;
LOGGER.debug("客户端接收到数据: [{}]", msg);
if (msg.trim().equalsIgnoreCase("quit")) {
session.closeNow();
}
}
}
然后,先启动MinaServer服务端程序
再启动MinaClient客户端程序。
完成了Mina客户端与服务端数据通信。