多个agent接入设备
- 每个agent对接同一个消费队列,非竞争设置,通过判断consumer中的参数如果是发给自己的,则下发,如果不是,则快速跳过。
- 每个消费者接收消息时通过Header中值判断是来着哪个agent服务器的,发送返回消息时带入这个Header中的服务器消息。
发送时:
// 创建一个发送消息的配置
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host("rabbitmq://localhost");
// 定义发送逻辑
cfg.Send<IYourMessage>(x =>
{
x.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));
x.AddHeaders(headers =>
{
headers.Set("machineName", Environment.MachineName);
headers.Set("processName", Process.GetCurrentProcess().ProcessName);
headers.Set("processId", Process.GetCurrentProcess().Id.ToString());
headers.Set("assembly", Assembly.GetExecutingAssembly().GetName().Name);
headers.Set("assemblyVersion", Assembly.GetExecutingAssembly().GetName().Version.ToString());
headers.Set("massTransitVersion", typeof(IBusControl).Assembly.GetName().Version.ToString());
headers.Set("operatingSystemVersion", RuntimeInformation.OSDescription);
});
});
接收时:
public class YourConsumer : IConsumer<IYourMessage>
{
public async Task Consume(ConsumeContext<IYourMessage> context)
{
var machineName = context.Headers.Get<string>("machineName");
var processName = context.Headers.Get<string>("processName");
var processId = context.Headers.Get<string>("processId");
var assembly = context.Headers.Get<string>("assembly");
var assemblyVersion = context.Headers.Get<string>("assemblyVersion");
var massTransitVersion = context.Headers.Get<string>("massTransitVersion");
var osVersion = context.Headers.Get<string>("operatingSystemVersion");
// 打印或使用这些信息
Console.WriteLine($"Machine Name: {machineName}");
Console.WriteLine($"Process Name: {processName}");
Console.WriteLine($"Process ID: {processId}");
Console.WriteLine($"Assembly: {assembly}");
Console.WriteLine($"Assembly Version: {assemblyVersion}");
Console.WriteLine($"MassTransit Version: {massTransitVersion}");
Console.WriteLine($"OS Version: {osVersion}");
}
}
整体架构如下:
architecture-beta
service mq(server)[MQ]
service agent1(internet)[Gateway1]
service agent2(internet)[Gateway2]
service consumer1(disk)[Consumer1]
service consumer2(disk)[Consumer2]
service consumerx(disk)[Consumerx]
junction junctionCenter
junction junctionRight
junction junctionRightX
mq:R -- L:junctionCenter
agent1:B -- T:junctionCenter
agent2:B -- T:junctionRight
junctionCenter:R -- L:junctionRight
junctionRight:R -- L:junctionRightX
consumer1:T -- B:junctionCenter
consumer2:T -- B:junctionRight
consumerx:T -- B:junctionRightX