C# DotNetty客户端,包含心跳发送,断线重连机制

发布于:2024-10-18 ⋅ 阅读:(29) ⋅ 点赞:(0)

1.新建MessageBean消息类型类,也可以不用,看自己需要

public enum MsgType { STATUS_CONNECT_ERROR, STATUS_CONNECT_SUCCESS, STATUS_CONNECT_CLOSED, STATUC_CONNECT_RECONNECT }

public class MessageBean
{
    /**
 * 消息类型
 */
    private MsgType type;
    /**
     * 消息数据
     */
    private Object msg;

    public MessageBean(MsgType type,Object msg)
    {
        this.type = type;
        this.msg = msg;
    }

    public Object getMsg()
    {
        return msg;
    }

    public void setMsg(Object msg)
    {
        this.msg = msg;
    }

    public void setType(MsgType type)
    {
        this.type = type;
    }

   
    public MsgType getType()
    {
        return type;
    }

    public override string ToString()
    {
        return "MsgBean{" +
            "type=" + type +
            ", msg=" + msg +
            '}';
    }
}

2.新建DotNettyClientHandler类,用于接收服务端消息

public class DotNettyClientHandler : FlowControlHandler
{
    /// <param name="ctx"></param>
    /// <param name="msg"></param>
    private DotNettyListener listener;
    private static String strMsg; 

    public DotNettyClientHandler(DotNettyListener listener)
    {
        this.listener = listener;
    }

    public override void ChannelActive(IChannelHandlerContext context)
    {
        Console.WriteLine("==================ChannelActive======================");
        if (null != listener)
        {
            listener.onServiceStatusConnectChanged(new MessageBean(MsgType.STATUS_CONNECT_SUCCESS, null));
        }
    }

    public override void ChannelInactive(IChannelHandlerContext context)
    {
        base.ChannelInactive(context);
        //掉线了
        Console.WriteLine("===============ChannelInactive==============");
        if (null != listener)
        {
            listener.onServiceStatusConnectChanged(new MessageBean(MsgType.STATUS_CONNECT_CLOSED, null));
        }
        DotNettyClient.getInstance(listener).start();
    }

    public override void ChannelRead(IChannelHandlerContext context, object msg)
    {
        Console.WriteLine("===============ChannelRead==============");
        IByteBuffer byteBuffer = msg as IByteBuffer;
        strMsg += byteBuffer.ToString(Encoding.UTF8);
        if (byteBuffer != null)
        {
            Console.WriteLine("byteBuffer:" + strMsg);
            if (strMsg.StartsWith("{") && strMsg.EndsWith("}"))
            {
                listener.onMessageResponse(strMsg);
                strMsg = null;
            }
        }
        else {
            Console.WriteLine("---------null---------- ");
        }
        //context.WriteAsync(msg);
    }


    public override void ChannelReadComplete(IChannelHandlerContext context)
    {
        Console.WriteLine("===============ChannelReadComplete==============");
        context.Flush();
    }

    public override void UserEventTriggered(IChannelHandlerContext context, object evt)
    {
        Console.WriteLine("----------------UserEventTriggered----------------");
        base.UserEventTriggered(context, evt);
        if (evt is IdleStateEvent)
        {
            var e = evt as IdleStateEvent;
            switch (e.State)
            {
                //长期没收到服务器推送数据
                /**case IdleState.ReaderIdle:
                    {
                        //可以重新连接
                        if (!context.Channel.Active)
                            context.ConnectAsync(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9997));
                    }
                    break;
                //长期未向服务器发送数据
                case IdleState.WriterIdle:
                    {
                        //发送心跳包
                        byte[] messageBytes = Encoding.UTF8.GetBytes("heartbeat");
                        context.WriteAndFlushAsync(messageBytes);
                    }
                    break;**/
                //All
                case IdleState.AllIdle:
                    {
                        //发送心跳包
                    }
                    break;
            }
        }
    }

    public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
    {
        Console.WriteLine("===============ExceptionCaught==============");
        Console.WriteLine(exception);
        context.CloseAsync();
        if (null != listener)
        {
            listener.onServiceStatusConnectChanged(new MessageBean(MsgType.STATUS_CONNECT_ERROR, exception.Message));
        }
    }
}

3.新建DotNettyClient类,管理DotNetty连接,消息发送

public interface DotNettyListener
{
    /**
     * 当接收到系统消息
     */
    void onMessageResponse(Object msg);

    /**
     * 当连接状态发生变化时调用
     */
    void onServiceStatusConnectChanged(MessageBean msg);
}
public class DotNettyClient
{
    #region Instance
    public IChannel clientChannel;
    public String ip = "127.0.0.1";//ip
    public int port = 9997;//端口
    private DotNettyListener listener;//写的接口用来接收服务端返回的值

    private static DotNettyClient instance;
    public static DotNettyClient getInstance(DotNettyListener listener) {
        if (instance == null) {
            instance = new DotNettyClient();
        }

        instance.setListener(listener);
        return instance;
    }

    private DotNettyClient(){ }
    #endregion

    private ManualResetEvent ClosingArrivedEvent = new ManualResetEvent(false);

    public void start()
    {
        try
        {
            Task.Run(() => runClientAsync(ip,port));
        }
        catch (Exception exception)
        {
            Console.WriteLine(exception.StackTrace);
        }
    }

    
    public async Task runClientAsync(String ip,int host)
    {
        var group = new MultithreadEventLoopGroup();
        X509Certificate2 cert = null;
        string targetHost = null;

        try
        {
            var bootstrap = new Bootstrap();
            bootstrap
                .Group(group)
                .Channel<TcpSocketChannel>()
                .Option(ChannelOption.TcpNodelay, true)
                //.Option(ChannelOption.ConnectTimeout, new TimeSpan(0, 0, 0, 0, args.ConnectTimeout))
                .Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
                {
                    IChannelPipeline pipeline = channel.Pipeline;
                    pipeline.AddLast(new StringEncoder(Encoding.UTF8));
                    //pipeline.AddLast(new StringDecoder(Encoding.UTF8));
                    pipeline.AddLast(new IdleStateHandler(0, 0, 100));
                    pipeline.AddLast(new DotNettyClientHandler(listener));//定义Handler类及名称                
                }));

            clientChannel = await bootstrap.ConnectAsync(ip,port);

            ClosingArrivedEvent.Reset();

            ClosingArrivedEvent.WaitOne();

            await clientChannel.CloseAsync();
        }
        catch (Exception exp)
        {
            //MainWindow.SetText("Client connection failed");
            Console.WriteLine(exp.ToString());
            Console.WriteLine(exp.StackTrace);
        }
        finally
        {
            //await group.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));
            reConnectServer();
        }

    }
    //断线重连
    private void reConnectServer()
    {
        Console.WriteLine("=================reConnectServer===================");
        if (null != listener) {
            listener.onServiceStatusConnectChanged(new MessageBean(MsgType.STATUC_CONNECT_RECONNECT, null));
        }
        
        try
        {
            Thread.Sleep(5000);
            start();
        }
        catch (Exception e)
        {

        }
    }

    public bool SendMessage(string message)
    {
        if (clientChannel != null && clientChannel.Open)
        {
            clientChannel.WriteAndFlushAsync(message);
            Console.WriteLine("Sent message to server: " + message);
            return true;
        }
        return false;
    }

    public void disconnect() {
        Console.WriteLine("============disconnect==============");
        //await channel.CloseAsync();
        clientChannel.CloseAsync();
    }

    public void setListener(DotNettyListener listener)
    {
        this.listener = listener;
    }
}

4.使用DotNetty

public partial class Form1 : Form, DotNettyListener
{
    public Form1()
    {
        InitializeComponent();
    }

    public void onMessageResponse(object msg)
    {
        Console.WriteLine(msg);
    }

    public void onServiceStatusConnectChanged(MessageBean msg)
    {
        Console.WriteLine("type = "+msg.getType()+",msg = "+msg.getMsg());
    }

    private void simpleButton1_Click(object sender, EventArgs e)
    {
        DotNettyClient client = DotNettyClient.getInstance(this);
        client.start();
        Thread.Sleep(5000);
        
        client.SendMessage("12345");
    }
}

网站公告

今日签到

点亮在社区的每一天
去签到