net8.0一键创建支持(Kafka)
Necore项目生成器 - 在线创建Necore模板项目 | 一键下载
KafkaController.cs
using Confluent.Kafka;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading.Tasks;
using UnT.Template.Application.Responses;
using UnT.Template.Domain;namespace UnT.Template.Controllers
{[Route("api/kafkas")][ApiController]public class KafkaController : ControllerBase{private readonly IConfiguration _configuration;public KafkaController(IConfiguration configuration){_configuration = configuration;}[HttpPost("publish")][Produces("application/json")][ProducesResponseType(typeof(ApiResponse<bool>), StatusCodes.Status200OK)]public async Task<IActionResult> Insert(){try{var producerConfig = new ProducerConfig{BootstrapServers = _configuration.GetValue<string>("KafkaConnectionString"),ClientId = "UnT.Template",Acks = Acks.All, MessageSendMaxRetries = 3,RetryBackoffMs = 1000,LingerMs = 5 };// 创建生产者using (var producer = new ProducerBuilder<Null, string>(producerConfig).Build()){var message = Newtonsoft.Json.JsonConvert.SerializeObject(new Pro_Product { Name = DateTime.Now.ToFileTime().ToString() });producer.Produce("unt_queue", new Message<Null, string> { Value = message },(deliveryReport) =>{if (deliveryReport.Error.Code != ErrorCode.NoError){Console.WriteLine($"消息发送失败: {deliveryReport.Error.Reason}");}else{Console.WriteLine($"消息发送到: {deliveryReport.TopicPartitionOffset}");}});producer.Flush(TimeSpan.FromSeconds(10));}return Ok(new ApiResponse<bool> { Success = true, Data = true });}catch (Exception ex){return Ok(new ApiResponse<bool> { Success = false, Message = ex.Message, Data = false });}}[HttpPost("consume")][Produces("application/json")][ProducesResponseType(typeof(ApiResponse<bool>), StatusCodes.Status200OK)]public async Task<IActionResult> Consume(){try{Task.Run(() =>{var consumerConfig = new ConsumerConfig{BootstrapServers = _configuration.GetValue<string>("KafkaConnectionString"),GroupId = "UnT.Template.Consumer.Group",EnableAutoCommit = false, AutoOffsetReset = AutoOffsetReset.Latest,EnablePartitionEof = true,StatisticsIntervalMs = 5000};using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build()){//订阅主题consumer.Subscribe("unt_queue");//取消令牌,用于优雅退出var cts = new CancellationTokenSource();Console.CancelKeyPress += (_, e) => {e.Cancel = true;cts.Cancel();};try{while (true){try{//消费消息var cr = consumer.Consume(cts.Token);if (cr.IsPartitionEOF){Console.WriteLine($"分区 {cr.Partition} 已到达末尾,偏移量: {cr.Offset}");continue;}//检查空消息if (cr.Message == null){Console.WriteLine("收到空消息");continue;}//处理有效消息Console.WriteLine($"收到消息: {cr.Message.Value} [分区: {cr.Partition}, 偏移量: {cr.Offset}]");//手动提交偏移量(如果EnableAutoCommit=false)consumer.Commit(cr);}catch (ConsumeException e){Console.WriteLine($"消费错误: {e.Error.Reason}");}}}catch (OperationCanceledException){// 确保消费者正确关闭consumer.Close();}}});await Task.Delay(TimeSpan.FromSeconds(5));return Ok(new ApiResponse<bool> { Success = true, Data = true });}catch (Exception ex){return Ok(new ApiResponse<bool> { Success = false, Message = ex.Message, Data = false });}}}
}