背景
因为安装了正向隔离网闸,导致数据传输的时候仅支持TCP协议和UDP协议,因此需要开发TCP Client和Server服务来将数据透传,当前环境是获取的数据并将数据转发到kafka
1.引入依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.84.Final</version>
</dependency>
2.编写TCP Server端
TCP Server代码
本代码已经解决TCP的粘包和半包问题,需要通过固定的$符号进行数据分割,使得数据不会错出现粘包和半包问题,可以根据数据大小制定一个不会超过发送消息长度的值
package com.huanyu.forward.tcp.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
@Slf4j
@Service("tcpServer")
@ConditionalOnExpression("#{'${spring.tcp-server.port:}'.length()>0}")
public class TcpNettyServer {
@Value("${spring.tcp-server.port:22222}")
private Integer port;
public static void main(String[] args) throws Exception {
new TcpNettyServer().server(22222);
}
@PostConstruct()
public void initTcpServer() {
try {
log.info("start tcp server......");
server(port);
} catch (Exception e) {
log.error("tcp server start failed");
}
}
public void server(int port) throws Exception {
//bossGroup就是parentGroup,是负责处理TCP/IP连接的
EventLoopGroup bossGroup = new NioEventLoopGroup();
//workerGroup就是childGroup,是负责处理Channel(通道)的I/O事件
EventLoopGroup workerGroup = new NioEventLoopGroup();
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1, 1);
buffer.writeByte('$');
ServerBootstrap sb = new ServerBootstrap();
//初始化服务端可连接队列,指定了队列的大小500
sb.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
//保持长连接
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 绑定客户端连接时候触发操作
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sh) throws Exception {
//handler是按顺序执行的
ChannelPipeline pipeline = sh.pipeline();
//业务编码 -解决 数据粘包和半包问题-
pipeline.addLast(new DelimiterBasedFrameDecoder(1024 * 1024 * 10, buffer));
// pipeline.addLast(new LoggingHandler(LogLevel.WARN));
pipeline.addLast(new TcpBizFlagHandler());
//业务编码
//使用DataHandler类来处理接收到的消息
pipeline.addLast(new TcpDataHandler());
}
});
//绑定监听端口,调用sync同步阻塞方法等待绑定操作完
ChannelFuture future = sb.bind(port).sync();
if (future.isSuccess()) {
log.info("tcp server is listening on :{}", port);
} else {
log.error("tcp server is failed ", future.cause());
//关闭线程组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
//成功绑定到端口之后,给channel增加一个 管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程。
// future.channel().closeFuture().await();
}
}
数据标志位接收代码
package com.huanyu.forward.tcp.server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.List;
@Slf4j
public class TcpBizFlagHandler extends ByteToMessageDecoder {
public static final String BIZ_FLAG = "bizFlag";
private static final String FLAG_PRE = "@@{";
private static final String FLAG_SUF = "}##";
private static final byte[] FLAG_PREFIX = FLAG_PRE.getBytes(StandardCharsets.UTF_8);
private static final byte[] FLAG_SUFFIX = FLAG_SUF.getBytes(StandardCharsets.UTF_8);
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < FLAG_PREFIX.length + FLAG_SUFFIX.length) {
log.warn("数据长度不够");
text(in);
return;
}
int prefixIndex = in.readerIndex();
if (!startsWith(in)) {
text(in);
// 忽略非标志位开头的数据
in.skipBytes(in.readableBytes());
log.warn("数据不包含指定的前缀");
return;
}
int suffixIndex = indexOf(in);
if (suffixIndex == -1) {
log.warn("数据不包含指定的某字符");
text(in);
return;
}
int flagLength = suffixIndex - prefixIndex + FLAG_SUFFIX.length;
byte[] flagBytes = new byte[flagLength];
in.readBytes(flagBytes); // 读取标志位
// 保留标志位的对象结构-以@@{开头以}##结尾,形如@@{"k":"v"}##{"k":"v"}$,@@和##之间的数据为补充的对象参数JSON,$为换行符号
String flag = new String(flagBytes, FLAG_PRE.length() - 1, flagBytes.length - FLAG_PREFIX.length - FLAG_SUFFIX.length + 2, StandardCharsets.UTF_8);
// 保存标志位到 Channel 属性中供后续使用
ctx.channel().attr(AttributeKey.valueOf(BIZ_FLAG)).set(flag);
// 剩余数据继续传递给下一个 Handler 处理(透传)
out.add(in.readRetainedSlice(in.readableBytes()));
}
private static void text(ByteBuf in) {
byte[] msgByte = new byte[in.readableBytes()];
in.readBytes(msgByte);
log.warn("数据:{}", new String(msgByte, StandardCharsets.UTF_8));
}
private boolean startsWith(ByteBuf buf) {
for (int i = 0; i < TcpBizFlagHandler.FLAG_PREFIX.length; i++) {
if (buf.getByte(buf.readerIndex() + i) != TcpBizFlagHandler.FLAG_PREFIX[i]) {
return false;
}
}
return true;
}
private int indexOf(ByteBuf buf) {
int readerIndex = buf.readerIndex();
int readableBytes = buf.readableBytes();
for (int i = 0; i <= readableBytes - TcpBizFlagHandler.FLAG_SUFFIX.length; i++) {
boolean match = true;
for (int j = 0; j < TcpBizFlagHandler.FLAG_SUFFIX.length; j++) {
if (buf.getByte(readerIndex + i + j) != TcpBizFlagHandler.FLAG_SUFFIX[j]) {
match = false;
break;
}
}
if (match) {
return readerIndex + i;
}
}
return -1;
}
}
业务转发/解析代码
package com.huanyu.forward.tcp.server;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import static com.aimsphm.forward.tcp.server.TcpBizFlagHandler.BIZ_FLAG;
@Slf4j
@Service
public class TcpDataHandler extends ChannelInboundHandlerAdapter {
// @Resource
private KafkaTemplate<String, Object> template;
//接受client发送的消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Channel channel = ctx.channel();
// 获取标志位
String flag = (String) channel.attr(AttributeKey.valueOf(BIZ_FLAG)).get();
if (ObjectUtils.isEmpty(flag)) {
log.warn("没有业务标识");
return;
}
ByteBuf buf = (ByteBuf) msg;
byte[] msgByte = new byte[buf.readableBytes()];
buf.readBytes(msgByte);
// template.send("haha.haha.ha", gbk.getBytes());
log.info("bizFag:{},data: {}", flag, new String(msgByte));
}
//通知处理器最后的channelRead()是当前批处理中的最后一条消息时调用
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
//读操作时捕获到异常时调用
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
//客户端去和服务端连接成功时触发
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush(Unpooled.copiedBuffer("hello client [你好,客户端]".getBytes()));
log.info("client 连接成功: {}", ctx.channel());
}
}
3.编写客户端代码
TCP Client 代码
package com.huanyu.forward.tcp.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.stream.IntStream;
@Getter
@Slf4j
public class TcpNettyClient {
public static void main(String[] args) {
extracted();
}
private static void extracted() {
try {
TcpNettyClient client = new TcpNettyClient("localhost", 4444);
Channel channel = client.getChannel();
IntStream.range(0, 1000).parallel().forEach(i -> {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
buf.writeBytes(("@@{\"cell-topic" + (i + 1) + "\":true}##{01#.01#\":\"data1\"}").getBytes());
buf.writeByte('$');
channel.writeAndFlush(buf);
});
} catch (Exception e) {
log.error("出现异常:", e);
}
}
private Channel channel;
//连接服务端的端口号地址和端口号
public TcpNettyClient(String host, int port) {
tcpClient(host, port);
}
public void tcpClient(String host, int port) {
try {
final EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class) // 使用NioSocketChannel来作为连接用的channel类
.handler(new ChannelInitializer<SocketChannel>() { // 绑定连接初始化器
@Override
public void initChannel(SocketChannel ch) throws Exception {
System.out.println("正在连接中...");
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new TcpClientHandler()); //客户端处理类
}
});
//发起异步连接请求,绑定连接端口和host信息
final ChannelFuture future = b.connect(host, port).sync();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture arg0) throws Exception {
if (future.isSuccess()) {
log.info("连接服务器成功:");
} else {
log.warn("连接服务器失败:");
System.out.println("连接服务器失败");
group.shutdownGracefully(); //关闭线程组
}
}
});
this.channel = future.channel();
} catch (InterruptedException e) {
log.error("TCP服务端启动异常:", e);
}
}
}
客户端数据解析代码
package com.huanyu.forward.tcp.client;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Map;
public class TcpClientHandler extends SimpleChannelInboundHandler<Map<String, ByteBuf>> {
//处理服务端返回的数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, Map<String, ByteBuf> data) throws Exception {
ByteBuf msg = data.get("topic");
byte[] msgByte = new byte[msg.readableBytes()];
msg.readBytes(msgByte);
System.out.println("接受到server响应数据: " + new String(msgByte));
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush(Unpooled.copiedBuffer("hello server 你好".getBytes()));
super.channelActive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
备注
1. 为了尽可能的降低性能消耗,数据以字节数组的形式发送
2. 业务字段通过@@{"key":"value"}##作为消息的头部,用数据标志位处理器进行处理
3. 真实要传送的数据,并不解析出来,并以$结尾,解决粘包和半包问题
记录备查