1. 引言
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅消息协议,广泛应用于物联网(IoT)和实时数据传输场景。在工业自动化、智能家居等领域,我们经常需要将大量设备数据上传至云端或本地服务器。
本文将分享一个基于C#的MQTT消息发布系统,该系统具备以下特点:
异步编程:使用
async/await
提高吞吐量多线程优化:并行发送消息,提高效率
并发控制:使用
SemaphoreSlim
防止服务器过载错误处理:自动记录日志并邮件通知异常
2. 开发环境与依赖
开发语言:C# (.NET 4.8)
MQTT库:
MQTTnet
(通过NuGet安装)数据库:Oracle 11g(存储待发送数据)
日志系统:自定义日志记录
3. 核心代码实现
3.1 发布和订阅
在MQTT消息系统中,仅依赖PublishAsync
的返回结果并不能100%保证消息已被Broker正确处理。我们曾遇到以下问题场景:
Broker返回
Success
,MQTTX订阅相关的主题,但是没有收到我们发布的消息。(发布消息和订阅消息都使用了Qos 1)
可能的原因:
1)Qos 1的确认范围
PUBACK仅表示Broker接收成功,不保证:
消息已持久化到磁盘(若Broker崩溃)
消息已传递给订阅者
订阅者已成功处理消息
2)网络连接不稳定等原因
解决方案:在发布消息后,主动订阅自身发布的消息,通过双重确认机制确保消息可靠投递。
private async Task<string> SendMQData(string testData, string testId, string topic_Data, IMqttClient mqttClient)
{
//防御性编程:发布消息后,自己订阅topic,确保broker有收到消息再更新数据库。
//原因:就算用Qos1去发布消息后,即使publishResult.ReasonCode返回Success
// MQTTX有时候仍然无法收到消息(尤其并发量高的时候),如果此时更新数据库,将无法得知是哪笔数据没传输成功。
#region 异步方式执行
if (!mqttClient.IsConnected)
{
await mqttClient.ReconnectAsync();
}
// 1. 创建一个“任务完成源”(用于等待异步事件)
var receivedSignal = new TaskCompletionSource<bool>();
// 2. 订阅主题
await mqttClient.SubscribeAsync(topic_Data, MqttQualityOfServiceLevel.AtLeastOnce);
mqttClient.ApplicationMessageReceivedAsync += e =>
{
if (e.ApplicationMessage.Topic == topic_Data)
{
if (Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment.ToArray()) == testData)
{
receivedSignal.TrySetResult(true); // 通知“已收到正确消息”
}
}
return Task.CompletedTask;
};
// 3. 发布消息
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic_Data)
.WithPayload(testData)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
var publishResult = await mqttClient.PublishAsync(message);
if (publishResult.ReasonCode != MqttClientPublishReasonCode.Success)
{
UpdateLog("发布失败(Broker拒绝), Id:" + testId + "\r\n");
log.SaveMsg("SendDataLog", $"发布失败(Broker拒绝), Id: {testId}", DateTime.Now, false);
return "NG";
}
// 4. 等待最多 2 秒,看是否能收到消息
var timeoutTask = Task.Delay(2000);
var completedTask = await Task.WhenAny(receivedSignal.Task, timeoutTask);
// 5. 判断结果
if (completedTask == receivedSignal.Task && await receivedSignal.Task)
{
updateData(testId);
UpdateLog("上传成功, Id:" + testId + "\r\n");
log.SaveMsg("SendDataLog", $"上传成功, Id: {testId}", DateTime.Now, true);
return "OK";
}
else
{
UpdateLog("上传失败(未收到确认), Id:" + testId + "\r\n");
log.SaveMsg("SendDataLog", $"上传失败(未收到确认), Id: {testId}", DateTime.Now, true);
return "NG";
}
#endregion
}
3.2 MQTT连接配置和异步并发
特点:
1)一次连接,多次发布消息(减少连接开销)
2)使用Task
并行发送消息,并通过SemaphoreSlim
控制最大并发数。多次测试表明,当并发数大于18时,会显著出现3.1描述的情况(即:PublishAsync
的返回结果是Success,但是MQTTX作为订阅者并不能收到消息,发布方和接收方都是用Qos 1 的情况下)
public async Task<string> AutoTaskUploadAsync(
DataTable allData,
(string Address, string Account, string Pwd, string Port) Config)
{
StringBuilder resId = new StringBuilder();
List<Task> tasks = new List<Task>();
object lockObj = new object();
string broker = Config.Address;
int port = int.Parse(Config.Port);
string clientId = Guid.NewGuid().ToString();
//string topic = topic_Data;
string username = Config.Account;
string password = Config.Pwd;
//多次测试证明12个并发数比较安全(不同服务器性能不一致,这里大家可以按需修改)
SemaphoreSlim _semaphore = new SemaphoreSlim(12);
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithTcpServer(broker, port)
.WithCredentials(username, password)
.WithClientId(clientId)
.WithCleanSession()
.Build();
var connectResult = await mqttClient.ConnectAsync(options);
if (connectResult.ResultCode == MqttClientConnectResultCode.Success)
{
for (int i = 0; i < allData.Rows.Count; i++)
{
int currentRow = i;
tasks.Add(Task.Run(async () =>
{
//等待信号量许可
await _semaphore.WaitAsync().ConfigureAwait(false);
try
{
string res = await SendMQDataTo(
Config.Address,
Config.Account,
Config.Pwd,
Config.Port,
allData.Rows[currentRow]["test_data"].ToString(),
allData.Rows[currentRow]["tpi_id"].ToString(),
allData.Rows[currentRow]["topic"].ToString(),
mqttClient
).ConfigureAwait(false);
//lock (lockObj)
//{
// resId.Append(allData.Rows[currentRow]["tpi_id"].ToString() + ",");
//}
}
catch (Exception ex)
{
string errorMsg = $"发送数据异常--{ex}";
log.SaveMsg("SendDataLog", errorMsg, DateTime.Now, false);
common.SendEmail("exampleMES@example.COM",
$"MQTT上传失败,ID:{allData.Rows[currentRow][0]}",
ex.ToString());
}
finally
{
// 释放信号量
_semaphore.Release();
}
}
));
}
}
else
{
string strres = common.SendEmail("exampleMES@example.COM", string.Format(@"连接MQ Broker失败,原因:" + connectResult.ResultCode.ToString()), connectResult.ResultCode.ToString());
}
// Unsubscribe and disconnect
await Task.WhenAll(tasks);
//mqttClient.ApplicationMessageReceivedAsync -= OnMessageReceived;
await mqttClient.DisconnectAsync();
// 移除末尾逗号
if (resId.Length > 0)
{
resId.Length--;
resId.Append(")");
}
return resId.ToString();
}
3.3 记录日志、定时执行和异步显示
日志部分:
特点:操作前显示做到哪一步,方便后续维护时排查问题和分析性能瓶颈。
如本次项目中,本人一直以为多线程的速度有瓶颈,加了日志后,发现是读取数据库表时存在瓶颈。
private async Task SendData(string getDate)
{
string Addres = string.Empty; // MQ服务器地址
string Account = string.Empty; // MQ连接账号
string Pwd = string.Empty; // MQ连接密码
string Port = string.Empty; // 端口
StringBuilder result = new StringBuilder();
StringBuilder resId = new StringBuilder();
string ngMsg = string.Empty;
Addres = txtAddress.Text;
Account = txtAccount.Text;
Pwd = txtPassWord.Text;
Port = txtPort.Text;
var Config = (Addres, Account, Pwd, Port);
if (string.IsNullOrEmpty(Addres))
{
MessageBox.Show("请输入MQ服务器地址");
return;
}
if (string.IsNullOrEmpty(Account))
{
MessageBox.Show("请输入MQ账号");
return;
}
if (string.IsNullOrEmpty(Pwd))
{
MessageBox.Show("请输入MQ密码");
return;
}
if (string.IsNullOrEmpty(Port))
{
MessageBox.Show("请输入MQ端口号");
return;
}
string strDate = getDate;
//获取全部没上传的记录
UpdateLog("开始读取T_TABLE信息"+"\r\n");
DataTable allData = GetDataToSend();
UpdateLog("读取完成,准备上传" + "\r\n");
///如果有输入日期,则只获取日期内没上传的记录
if (!string.IsNullOrEmpty(strDate))
{
allData = GetDataToSend(strDate);
}
if (allData.Rows.Count == 0 || allData is null)
{
UpdateLog(string.Format(@"----------暂无数据,日期【{0}】,上传时间【{1}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString(), DateTime.Now.ToString()) + "\r\n");
log.SaveMsg("SendDataLog", string.Format(@"----------暂无数据,时间【{0}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString()), DateTime.Now, true); //插入日志
return;
}
result.Append(string.Format(@"本次上传共{0}条数据,ID(", allData.Rows.Count));
UpdateLog(string.Format(@"----------开始上传数据,日期【{0}】,上传时间【{1}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString(), DateTime.Now.ToString()) + "\r\n");
log.SaveMsg("SendDataLog", string.Format(@"----------开始上传数据时间【{0}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString()), DateTime.Now, true); //插入日志
#region 异步调用(更稳定)
string asyncResult = await AutoTaskUploadAsync(allData, Config);
result.Append(asyncResult);
#endregion
UpdateLog(result.ToString() + "\r\n");
log.SaveMsg("SendDataLog", result.ToString(), DateTime.Now, true);
UpdateLog(string.Format(@"----------完成本次上传数据,日期【{0}】,上传时间【{1}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString(), DateTime.Now.ToString()) + "\r\n");
log.SaveMsg("SendDataLog", string.Format(@"----------完成本次上传数据时间【{0}】--------------", !string.IsNullOrEmpty(strDate) ? strDate : DateTime.Now.ToString()), DateTime.Now, true);
}
定时执行部分
特点:
利用定时器,根据前端设定的时间间隔,定时执行
private void xcButton3_Click(object sender, EventArgs e)
{
int seconds = int.Parse(this.txt_Seconds.Text.ToString());
this.timer1.Interval = seconds * 1000;
this.btnStart.Enabled = false;
this.btnStop.Enabled = true;
点击启动时,立刻运行一次
this.BeginInvoke(new Action(() => timer1_Tick(null, EventArgs.Empty)));
this.timer1.Start();
}
private void btnStop_Click(object sender, EventArgs e)
{
this.btnStart.Enabled = true;
this.btnStop.Enabled = false;
this.timer1.Stop();
}
private async void timer1_Tick(object sender, EventArgs e)
{
LogClear();
UpdateLog("\r\n" + "本次上传时间:" + DateTime.Now.ToString() + "\r\n");
int seconds = int.Parse(this.txt_Seconds.Text.ToString());
await SendData("");
UpdateLog("下次上传时间:" + DateTime.Now.AddSeconds(seconds).ToString() + "\r\n");
}
异步显示部分
特点:
1)如果当前线程不在主线程,利用委托,将日志显示在控件上
private void UpdateLog(string message)
{
if (txt_SendData_Log.InvokeRequired)
{
txt_SendData_Log.Invoke((MethodInvoker)delegate
{
txt_SendData_Log.AppendText(message + "\r\n");
});
}
else
{
txt_SendData_Log.AppendText(message + "\r\n");
}
}
2)当txtbox超过100行日志时,清楚控件上的记录,释放内存,防止内存溢出。
private void LogClear()
{
int loglines = this.txt_SendData_Log.Lines.Length;
if (loglines >= 100)
{
this.txt_SendData_Log.Clear();
}
}
3)将日志写入到txt文档中,做完整的日志记录。
Loger类
public class Loger
{
private object LockObj = new object();
private string ExePath;
public Loger()
{
ExePath = Path.GetDirectoryName(System.Diagnostics.Process.GetCurrentProcess().MainModule.FileName);
}
private StreamWriter Create(string fullPath)
{
StreamWriter sr;
if (File.Exists(fullPath))
{
sr = File.AppendText(fullPath);
}
else
{
sr = File.CreateText(fullPath);
}
return sr;
}
public void SaveMsg(string cataLog, string message, DateTime dt, bool isLoger)
{
string fileName = string.Format("{0}.log", dt.ToString("yyyyMMdd"));
Save(cataLog, fileName, message, isLoger);
}
public void Save(string cataLog, string fileName, string msg, bool isLoger)
{
lock (LockObj)
{
if (isLoger)
{
try
{
string fullPath = Path.Combine(ExePath + "\\" + cataLog, fileName);
EnsureDirectory(fullPath);
using (StreamWriter sw = Create(fullPath))
{
sw.WriteLine(string.Format("{0}-->:{1}\r\n", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msg));
sw.Close();
}
}
catch { }
}
}
}
private void EnsureDirectory(string fullPath)
{
string path = Path.GetDirectoryName(fullPath);
if (!Directory.Exists(path))
Directory.CreateDirectory(path);
}
public void Remove(string fileName)
{
lock (LockObj)
{
if (File.Exists(fileName))
{
File.Delete(fileName);
}
}
}
调用方式
private Loger log = new Loger();
log.SaveMsg("SendDataLog", $"上传成功, Id: {testId}", DateTime.Now, true);