Necore项目生成器 - 在线创建Necore模板项目 | 一键下载
KafkaController.cs
using Confluent.Kafka;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading.Tasks;
using UnT.Template.Application.Responses;
using UnT.Template.Domain;
namespace UnT.Template.Controllers
{
[Route("api/kafkas")]
[ApiController]
public class KafkaController : ControllerBase
{
private readonly IConfiguration _configuration;
public KafkaController(IConfiguration configuration)
{
_configuration = configuration;
}
[HttpPost("publish")]
[Produces("application/json")]
[ProducesResponseType(typeof(ApiResponse<bool>), StatusCodes.Status200OK)]
public async Task<IActionResult> Insert()
{
try
{
var producerConfig = new ProducerConfig
{
BootstrapServers = _configuration.GetValue<string>("KafkaConnectionString"),
ClientId = "UnT.Template",
Acks = Acks.All,
MessageSendMaxRetries = 3,
RetryBackoffMs = 1000,
LingerMs = 5
};
// 创建生产者
using (var producer = new ProducerBuilder<Null, string>(producerConfig).Build())
{
var message = Newtonsoft.Json.JsonConvert.SerializeObject(new Pro_Product { Name = DateTime.Now.ToFileTime().ToString() });
producer.Produce("unt_queue", new Message<Null, string> { Value = message },
(deliveryReport) =>
{
if (deliveryReport.Error.Code != ErrorCode.NoError)
{
Console.WriteLine($"消息发送失败: {deliveryReport.Error.Reason}");
}
else
{
Console.WriteLine($"消息发送到: {deliveryReport.TopicPartitionOffset}");
}
});
producer.Flush(TimeSpan.FromSeconds(10));
}
return Ok(new ApiResponse<bool> { Success = true, Data = true });
}
catch (Exception ex)
{
return Ok(new ApiResponse<bool> { Success = false, Message = ex.Message, Data = false });
}
}
[HttpPost("consume")]
[Produces("application/json")]
[ProducesResponseType(typeof(ApiResponse<bool>), StatusCodes.Status200OK)]
public async Task<IActionResult> Consume()
{
try
{
Task.Run(() =>
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = _configuration.GetValue<string>("KafkaConnectionString"),
GroupId = "UnT.Template.Consumer.Group",
EnableAutoCommit = false,
AutoOffsetReset = AutoOffsetReset.Latest,
EnablePartitionEof = true,
StatisticsIntervalMs = 5000
};
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
//订阅主题
consumer.Subscribe("unt_queue");
//取消令牌,用于优雅退出
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true;
cts.Cancel();
};
try
{
while (true)
{
try
{
//消费消息
var cr = consumer.Consume(cts.Token);
if (cr.IsPartitionEOF)
{
Console.WriteLine($"分区 {cr.Partition} 已到达末尾,偏移量: {cr.Offset}");
continue;
}
//检查空消息
if (cr.Message == null)
{
Console.WriteLine("收到空消息");
continue;
}
//处理有效消息
Console.WriteLine($"收到消息: {cr.Message.Value} [分区: {cr.Partition}, 偏移量: {cr.Offset}]");
//手动提交偏移量(如果EnableAutoCommit=false)
consumer.Commit(cr);
}
catch (ConsumeException e)
{
Console.WriteLine($"消费错误: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// 确保消费者正确关闭
consumer.Close();
}
}
});
await Task.Delay(TimeSpan.FromSeconds(5));
return Ok(new ApiResponse<bool> { Success = true, Data = true });
}
catch (Exception ex)
{
return Ok(new ApiResponse<bool> { Success = false, Message = ex.Message, Data = false });
}
}
}
}