1.新建MessageBean消息类型类,也可以不用,看自己需要
public enum MsgType { STATUS_CONNECT_ERROR, STATUS_CONNECT_SUCCESS, STATUS_CONNECT_CLOSED, STATUC_CONNECT_RECONNECT }
public class MessageBean
{
/**
* 消息类型
*/
private MsgType type;
/**
* 消息数据
*/
private Object msg;
public MessageBean(MsgType type,Object msg)
{
this.type = type;
this.msg = msg;
}
public Object getMsg()
{
return msg;
}
public void setMsg(Object msg)
{
this.msg = msg;
}
public void setType(MsgType type)
{
this.type = type;
}
public MsgType getType()
{
return type;
}
public override string ToString()
{
return "MsgBean{" +
"type=" + type +
", msg=" + msg +
'}';
}
}
2.新建DotNettyClientHandler类,用于接收服务端消息
public class DotNettyClientHandler : FlowControlHandler
{
/// <param name="ctx"></param>
/// <param name="msg"></param>
private DotNettyListener listener;
private static String strMsg;
public DotNettyClientHandler(DotNettyListener listener)
{
this.listener = listener;
}
public override void ChannelActive(IChannelHandlerContext context)
{
Console.WriteLine("==================ChannelActive======================");
if (null != listener)
{
listener.onServiceStatusConnectChanged(new MessageBean(MsgType.STATUS_CONNECT_SUCCESS, null));
}
}
public override void ChannelInactive(IChannelHandlerContext context)
{
base.ChannelInactive(context);
//掉线了
Console.WriteLine("===============ChannelInactive==============");
if (null != listener)
{
listener.onServiceStatusConnectChanged(new MessageBean(MsgType.STATUS_CONNECT_CLOSED, null));
}
DotNettyClient.getInstance(listener).start();
}
public override void ChannelRead(IChannelHandlerContext context, object msg)
{
Console.WriteLine("===============ChannelRead==============");
IByteBuffer byteBuffer = msg as IByteBuffer;
strMsg += byteBuffer.ToString(Encoding.UTF8);
if (byteBuffer != null)
{
Console.WriteLine("byteBuffer:" + strMsg);
if (strMsg.StartsWith("{") && strMsg.EndsWith("}"))
{
listener.onMessageResponse(strMsg);
strMsg = null;
}
}
else {
Console.WriteLine("---------null---------- ");
}
//context.WriteAsync(msg);
}
public override void ChannelReadComplete(IChannelHandlerContext context)
{
Console.WriteLine("===============ChannelReadComplete==============");
context.Flush();
}
public override void UserEventTriggered(IChannelHandlerContext context, object evt)
{
Console.WriteLine("----------------UserEventTriggered----------------");
base.UserEventTriggered(context, evt);
if (evt is IdleStateEvent)
{
var e = evt as IdleStateEvent;
switch (e.State)
{
//长期没收到服务器推送数据
/**case IdleState.ReaderIdle:
{
//可以重新连接
if (!context.Channel.Active)
context.ConnectAsync(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9997));
}
break;
//长期未向服务器发送数据
case IdleState.WriterIdle:
{
//发送心跳包
byte[] messageBytes = Encoding.UTF8.GetBytes("heartbeat");
context.WriteAndFlushAsync(messageBytes);
}
break;**/
//All
case IdleState.AllIdle:
{
//发送心跳包
}
break;
}
}
}
public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
Console.WriteLine("===============ExceptionCaught==============");
Console.WriteLine(exception);
context.CloseAsync();
if (null != listener)
{
listener.onServiceStatusConnectChanged(new MessageBean(MsgType.STATUS_CONNECT_ERROR, exception.Message));
}
}
}
3.新建DotNettyClient类,管理DotNetty连接,消息发送
public interface DotNettyListener
{
/**
* 当接收到系统消息
*/
void onMessageResponse(Object msg);
/**
* 当连接状态发生变化时调用
*/
void onServiceStatusConnectChanged(MessageBean msg);
}
public class DotNettyClient
{
#region Instance
public IChannel clientChannel;
public String ip = "127.0.0.1";//ip
public int port = 9997;//端口
private DotNettyListener listener;//写的接口用来接收服务端返回的值
private static DotNettyClient instance;
public static DotNettyClient getInstance(DotNettyListener listener) {
if (instance == null) {
instance = new DotNettyClient();
}
instance.setListener(listener);
return instance;
}
private DotNettyClient(){ }
#endregion
private ManualResetEvent ClosingArrivedEvent = new ManualResetEvent(false);
public void start()
{
try
{
Task.Run(() => runClientAsync(ip,port));
}
catch (Exception exception)
{
Console.WriteLine(exception.StackTrace);
}
}
public async Task runClientAsync(String ip,int host)
{
var group = new MultithreadEventLoopGroup();
X509Certificate2 cert = null;
string targetHost = null;
try
{
var bootstrap = new Bootstrap();
bootstrap
.Group(group)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
//.Option(ChannelOption.ConnectTimeout, new TimeSpan(0, 0, 0, 0, args.ConnectTimeout))
.Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
pipeline.AddLast(new StringEncoder(Encoding.UTF8));
//pipeline.AddLast(new StringDecoder(Encoding.UTF8));
pipeline.AddLast(new IdleStateHandler(0, 0, 100));
pipeline.AddLast(new DotNettyClientHandler(listener));//定义Handler类及名称
}));
clientChannel = await bootstrap.ConnectAsync(ip,port);
ClosingArrivedEvent.Reset();
ClosingArrivedEvent.WaitOne();
await clientChannel.CloseAsync();
}
catch (Exception exp)
{
//MainWindow.SetText("Client connection failed");
Console.WriteLine(exp.ToString());
Console.WriteLine(exp.StackTrace);
}
finally
{
//await group.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));
reConnectServer();
}
}
//断线重连
private void reConnectServer()
{
Console.WriteLine("=================reConnectServer===================");
if (null != listener) {
listener.onServiceStatusConnectChanged(new MessageBean(MsgType.STATUC_CONNECT_RECONNECT, null));
}
try
{
Thread.Sleep(5000);
start();
}
catch (Exception e)
{
}
}
public bool SendMessage(string message)
{
if (clientChannel != null && clientChannel.Open)
{
clientChannel.WriteAndFlushAsync(message);
Console.WriteLine("Sent message to server: " + message);
return true;
}
return false;
}
public void disconnect() {
Console.WriteLine("============disconnect==============");
//await channel.CloseAsync();
clientChannel.CloseAsync();
}
public void setListener(DotNettyListener listener)
{
this.listener = listener;
}
}
4.使用DotNetty
public partial class Form1 : Form, DotNettyListener
{
public Form1()
{
InitializeComponent();
}
public void onMessageResponse(object msg)
{
Console.WriteLine(msg);
}
public void onServiceStatusConnectChanged(MessageBean msg)
{
Console.WriteLine("type = "+msg.getType()+",msg = "+msg.getMsg());
}
private void simpleButton1_Click(object sender, EventArgs e)
{
DotNettyClient client = DotNettyClient.getInstance(this);
client.start();
Thread.Sleep(5000);
client.SendMessage("12345");
}
}