关于多agent多consumer架构设想

发布于:2025-04-15 ⋅ 阅读:(36) ⋅ 点赞:(0)

多个agent接入设备

  1. 每个agent对接同一个消费队列,非竞争设置,通过判断consumer中的参数如果是发给自己的,则下发,如果不是,则快速跳过。
  2. 每个消费者接收消息时通过Header中值判断是来着哪个agent服务器的,发送返回消息时带入这个Header中的服务器消息。
    发送时:
  // 创建一个发送消息的配置
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.Host("rabbitmq://localhost");

            // 定义发送逻辑
            cfg.Send<IYourMessage>(x =>
            {
                x.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));
                x.AddHeaders(headers =>
                {
                    headers.Set("machineName", Environment.MachineName);
                    headers.Set("processName", Process.GetCurrentProcess().ProcessName);
                    headers.Set("processId", Process.GetCurrentProcess().Id.ToString());
                    headers.Set("assembly", Assembly.GetExecutingAssembly().GetName().Name);
                    headers.Set("assemblyVersion", Assembly.GetExecutingAssembly().GetName().Version.ToString());
                    headers.Set("massTransitVersion", typeof(IBusControl).Assembly.GetName().Version.ToString());
                    headers.Set("operatingSystemVersion", RuntimeInformation.OSDescription);
                });
            });

接收时:

public class YourConsumer : IConsumer<IYourMessage>
{
    public async Task Consume(ConsumeContext<IYourMessage> context)
    {
        var machineName = context.Headers.Get<string>("machineName");
        var processName = context.Headers.Get<string>("processName");
        var processId = context.Headers.Get<string>("processId");
        var assembly = context.Headers.Get<string>("assembly");
        var assemblyVersion = context.Headers.Get<string>("assemblyVersion");
        var massTransitVersion = context.Headers.Get<string>("massTransitVersion");
        var osVersion = context.Headers.Get<string>("operatingSystemVersion");

        // 打印或使用这些信息
        Console.WriteLine($"Machine Name: {machineName}");
        Console.WriteLine($"Process Name: {processName}");
        Console.WriteLine($"Process ID: {processId}");
        Console.WriteLine($"Assembly: {assembly}");
        Console.WriteLine($"Assembly Version: {assemblyVersion}");
        Console.WriteLine($"MassTransit Version: {massTransitVersion}");
        Console.WriteLine($"OS Version: {osVersion}");
    }
}

整体架构如下:

architecture-beta
    service mq(server)[MQ]
    service agent1(internet)[Gateway1]
    service agent2(internet)[Gateway2]
	service consumer1(disk)[Consumer1]    
    service consumer2(disk)[Consumer2]
    service consumerx(disk)[Consumerx]
    junction junctionCenter
    junction junctionRight
    junction junctionRightX

    mq:R -- L:junctionCenter
    agent1:B -- T:junctionCenter
    agent2:B -- T:junctionRight
	junctionCenter:R -- L:junctionRight
    junctionRight:R -- L:junctionRightX
	consumer1:T -- B:junctionCenter
    consumer2:T -- B:junctionRight
    consumerx:T -- B:junctionRightX