C# 关于实现保存数据以及数据溯源推送

发布于:2024-12-18 ⋅ 阅读:(113) ⋅ 点赞:(0)

前言

实现了一个数据接收、存储和推送的功能
首先定义我们数据存储的格式(可根据自己的需求定义格式):
数据切割符号:**$是区分数据其他数据的划分
数据内容切割号:
|**是区分时间戳内容数据的划分
以下是我存储的文本格式Data.log或者Data.txt

$2024-12-07 16:26:53.799|数据1
$2024-12-07 16:26:54.920|数据2
$2024-12-07 16:26:55.640|数据3
...
...

采集与推送:

以下是具体的代码内容:推送数据是一次性加载文本数据,然后再逐条数据推送

using DataAcquisitionModule.Helper;
using HslCommunication.MQTT;
using log4net;
using log4net.Core;
using Sunny.UI;
using Sunny.UI.Win32;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace DataAcquisitionModule.Server
{
    public class DataReceiver
    {
        private static readonly ILog logger = LogManager.GetLogger(typeof(DataReceiver));
        private static readonly object lockObject = new object();
        private readonly string _filePath;
        private readonly ConcurrentQueue<string> _dataQueue;
        private readonly CancellationTokenSource _cancellationTokenSource;
        private bool _isRunning;

        public DataReceiver(string filePath)
        {
            _filePath = filePath;
            _dataQueue = new ConcurrentQueue<string>();
            _cancellationTokenSource = new CancellationTokenSource();
            _message = new MessageLog();
            // 启动数据存储任务
            _ = StartSavingDataAsync(_cancellationTokenSource.Token);
        }
		//数据入列,多肽1
        public void HandleMsg(byte[] msg)
        {
            var data = Encoding.UTF8.GetString(msg);
            _dataQueue.Enqueue(data);
        }
		//数据入列,多肽2
        public void HandleMsg(string topic, string msg)
        {
            string strMsg = $"{topic}#{msg}";
            _dataQueue.Enqueue(strMsg);
        }

        private async Task StartSavingDataAsync(CancellationToken cancellationToken)
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                if (_dataQueue.TryDequeue(out var data))
                {
                    await SaveDataToFileAsync(data);
                }
                else
                {
                    // 如果队列为空,短暂等待
                    await Task.Delay(100, cancellationToken);
                }
            }
        }

        private async Task SaveDataToFileAsync(string data)
        {
            try
            {
               
				logger.Info($"保存数据: {data}");
                await File.AppendAllTextAsync(_filePath, $"${DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}|{data}\n");
            }
            catch (Exception ex)
            {
                logger.Error("保存数据到文件时发生错误", ex);
            }
        }

        public async Task LoadAndProcessDataAsync(Action<string> callAction)
        {
            _isRunning = true;
            while (!_cancellationTokenSource.Token.IsCancellationRequested && _isRunning)
            {
                List<string> lines = new List<string>();

                lock (lockObject)
                {
                    if (File.Exists(_filePath))
                    {
                        // 一次性读取整个文件内容
                        string fileContent = await File.ReadAllTextAsync(_filePath);
                        lines.AddRange(fileContent.Split('$', StringSplitOptions.RemoveEmptyEntries).Select(part => part.Trim()));
                        //RemoveEmptyEntries:返回数组元素移除空字符串元素(不包含空字符串元素);
                    }
                }

                if (lines.Count > 0)
                {
                    DateTime startTime = DateTime.Parse(lines.First().Split('|')[0]);
                    DateTime timestamp = new DateTime();
                    TimeSpan delay = new TimeSpan(0);
                    string[] strData;

                    foreach (var line in lines)
                    {
                        if (!_isRunning)
                        {
                            break;
                        }

                        strData = line.Split('|');
                        if (strData.Length < 2 || string.IsNullOrEmpty(strData[1]))
                        {
                            continue;
                        }

                        timestamp = DateTime.Parse(strData[0]);
                        delay = timestamp - startTime;

                        if (delay.TotalMilliseconds > 0)
                        {
                            await Task.Delay(delay);
                        }

                        startTime = timestamp;
                        callAction?.Invoke(strData[1]);
                        logger.Info($"数据推送: {line}");
                    }

                    callAction?.Invoke("推送完毕!!!");
                    break;
                }

                await Task.Delay(10000); // 每10秒检查一次
            }
        }

        public void StopPushData()
        {
            _isRunning = false;
        }

        public void StopReceiving()
        {
            _cancellationTokenSource.Cancel();
        }
    }
}

异步加载数据文件并推送

实现:数据文件太大时候,采取一边加载一边推送数据方式(而不是一次性加载完成后再推送)

 public async Task LoadAndProcessDataAsync(Action<string> callAction)
 {
     _isRunning = true;
     DateTime? startTime = null;
     StringBuilder currentData = new StringBuilder();
     string line=String.Empty;
     while (!_cancellationTokenSource.Token.IsCancellationRequested && _isRunning)
     {
         if (File.Exists(_filePath))
         {
             using (var reader = new StreamReader(_filePath))
             {
                 while ((line = await reader.ReadLineAsync()) != null && _isRunning)
                 {
                     if (string.IsNullOrEmpty(line))
                     {
                         continue;
                     }

                     currentData.AppendLine(line);
                     try
                     {
                         // 检查当前数据是否包含完整记录
                         if (currentData.ToString().Contains("$"))
                         {
                             var records = currentData.ToString().Split(new[] { '$' }, StringSplitOptions.RemoveEmptyEntries);
                             if (records.Length > 0)
                             {
                                 var remainingData = records[0].ToString().Trim();
                                 if (!string.IsNullOrEmpty(remainingData))
                                 {
                                     var strData = remainingData.Split('|');
                                     if (strData.Length >= 2)
                                     {
                                         var timestamp = DateTime.Parse(strData[0]);
                                         if (!startTime.HasValue)
                                         {//初始化时间
                                             startTime = timestamp;
                                         }
                                         var delay = timestamp - startTime.Value;
                                         if (delay.TotalMilliseconds > 0)
                                         {
                                             await Task.Delay(delay);
                                         }
                                         startTime = timestamp;//更新时间
                                         callAction?.Invoke(strData[1]);
                                         Console.WriteLine($"数据推送: {remainingData}");
                                     }
                                 }
                                 currentData.Clear();
                                 currentData.AppendLine(records[1]);
                             }
                         }
                     }
                     catch (Exception ex)
                     {
                         Console.WriteLine(ex);
                         throw;
                     }
                 }
             }
             callAction?.Invoke("推送完毕!!!");
             // 清空文件
             // File.WriteAllText(_filePath, string.Empty);
         }
         await Task.Delay(10000); // 每10秒检查一次
     }
 }
  1. 逐行读取文件:使用 StreamReader 逐行读取文件内容。
  2. 拼接数据:使用 StringBuilder 拼接每一行数据,直到遇到分隔符 $
  3. 处理完整记录:当拼接的数据包含分隔符 $ 时,将其拆分为完整的记录进行处理。
  4. 处理剩余数据:在读取完文件后,处理 StringBuilder 中剩余的数据,确保没有遗漏的数据被处理。

注意

  1. 优化文件读取和处理
    • 减少文件读取次数:避免频繁读取文件,特别是在文件较大时,可以考虑使用内存缓存。
    • 优化文件写入:使用 File.AppendAllText 方法代替 StreamWriter,减少文件打开和关闭的开销。
  2. 异步处理
    • 异步文件操作:使用 File.ReadAllLinesAsync 和 File.WriteAllLinesAsync 方法进行异步文件操作,减少阻塞。
  3. 错误处理
    • 增加异常处理:在文件操作和网络通信中增加异常处理,确保程序的稳定性。
  4. 日志记录
    • 日志记录:使用 log4net 或其他日志框架记录关键操作的日志,方便调试和维护。
  5. 代码结构优化
    • 分离关注点:将数据接收、存储和推送逻辑分离到不同的类或方法中,提高代码的可读性和可维护性。

数据采集和调用

假设我们有一个简单的控制台应用程序,用于启动 DataReceiver 并模拟数据的接收和处理。

1. 创建 DataReceiver 实例

首先,我们需要创建一个 DataReceiver 实例,并指定数据存储的文件路径。

2. 模拟数据接收

我们可以模拟从外部源(如 MQTT 消息队列)接收到的数据,并调用 HandleMsg 方法将其添加到队列中。

3. 处理和推送数据

启动一个任务来处理和推送数据,调用 LoadAndProcessDataAsync 方法。

using DataAcquisitionModule.Helper;
using HslCommunication.MQTT;
using log4net;
using log4net.Config;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace DataAcquisitionModule
{
    class Program
    {
        private static readonly ILog logger = LogManager.GetLogger(typeof(Program));

        static async Task Main(string[] args)
        {
            // 配置 log4net
            XmlConfigurator.Configure(new FileInfo("log4net.config"));

            // 指定数据存储文件路径
            string filePath = "data.log";

            // 创建 DataReceiver 实例
            DataReceiver dataReceiver = new DataReceiver(filePath);

            // 模拟数据接收
            SimulateDataReception(dataReceiver);

            // 启动数据处理和推送任务
            await dataReceiver.LoadAndProcessDataAsync(data => 
            string[] msg = data.Split('#');
			if (msg.Length == 2)
			{
			    mqttServer.PublishAllClientTopicPayload(msg[0], Encoding.UTF8.GetBytes(msg[1]));
			    Console.WriteLine($"推送的topic: {msg[0]}");
			    Console.WriteLine($"处理具体的数据: {Encoding.UTF8.GetBytes(msg[1])}");
			}
            );

            // 模拟停止数据接收和推送
            Console.WriteLine("按任意键停止数据接收和推送...");
            Console.ReadKey();

            dataReceiver.StopPushData();
            dataReceiver.StopReceiving();
        }

        private static void SimulateDataReception(DataReceiver dataReceiver)
        {
            // 模拟从外部源接收到的数据
            Task.Run(async () =>
            {
                for (int i = 0; i < 10; i++)
                {
                    string topic = "Topic" + i;
                    string message = "Message" + i;
                    dataReceiver.HandleMsg(topic, message);
                    await Task.Delay(1000); // 模拟每秒接收一条数据
                }
            });
        }
    }
}

解释说明

  1. 配置 log4net:
    • 使用 XmlConfigurator.Configure 方法加载 log4net 配置文件,确保日志记录正常工作。
  2. 创建 DataReceiver 实例:
    • 指定数据存储文件路径 data.log,并创建 DataReceiver 实例。
  3. 模拟数据接收:
    • 使用 SimulateDataReception 方法模拟从外部源接收到的数据。这里使用一个 Task 来模拟每秒接收一条数据,并调用 HandleMsg 方法将数据添加到队列中。
  4. 启动数据处理和推送任务:
    • 调用 LoadAndProcessDataAsync 方法启动数据处理和推送任务。这里使用 Console.WriteLine 方法来模拟数据处理操作。
  5. 停止数据接收和推送:
    • 按任意键停止数据接收和推送任务,调用 StopPushDataStopReceiving 方法。

运行结果

运行上述代码后,程序会模拟接收数据并将其存储到 data.log 文件中。然后,程序会读取文件中的数据并按时间顺序推送,每条数据的推送时间间隔与实际接收时间间隔一致。
通过这种方式,验证 DataReceiver 类的功能,其实大家可以根据实际需求进行调整和扩展。我这边只是简单演示


网站公告

今日签到

点亮在社区的每一天
去签到