当前位置: 首页 > news >正文

net8.0一键创建支持(Kafka)

Necore项目生成器 - 在线创建Necore模板项目 | 一键下载

 KafkaController.cs

using Confluent.Kafka;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading.Tasks;
using UnT.Template.Application.Responses;
using UnT.Template.Domain;namespace UnT.Template.Controllers
{[Route("api/kafkas")][ApiController]public class KafkaController : ControllerBase{private readonly IConfiguration _configuration;public KafkaController(IConfiguration configuration){_configuration = configuration;}[HttpPost("publish")][Produces("application/json")][ProducesResponseType(typeof(ApiResponse<bool>), StatusCodes.Status200OK)]public async Task<IActionResult> Insert(){try{var producerConfig = new ProducerConfig{BootstrapServers = _configuration.GetValue<string>("KafkaConnectionString"),ClientId = "UnT.Template",Acks = Acks.All, MessageSendMaxRetries = 3,RetryBackoffMs = 1000,LingerMs = 5 };// 创建生产者using (var producer = new ProducerBuilder<Null, string>(producerConfig).Build()){var message = Newtonsoft.Json.JsonConvert.SerializeObject(new Pro_Product { Name = DateTime.Now.ToFileTime().ToString() });producer.Produce("unt_queue", new Message<Null, string> { Value = message },(deliveryReport) =>{if (deliveryReport.Error.Code != ErrorCode.NoError){Console.WriteLine($"消息发送失败: {deliveryReport.Error.Reason}");}else{Console.WriteLine($"消息发送到: {deliveryReport.TopicPartitionOffset}");}});producer.Flush(TimeSpan.FromSeconds(10));}return Ok(new ApiResponse<bool> { Success = true, Data = true });}catch (Exception ex){return Ok(new ApiResponse<bool> { Success = false, Message = ex.Message, Data = false });}}[HttpPost("consume")][Produces("application/json")][ProducesResponseType(typeof(ApiResponse<bool>), StatusCodes.Status200OK)]public async Task<IActionResult> Consume(){try{Task.Run(() =>{var consumerConfig = new ConsumerConfig{BootstrapServers = _configuration.GetValue<string>("KafkaConnectionString"),GroupId = "UnT.Template.Consumer.Group",EnableAutoCommit = false, AutoOffsetReset = AutoOffsetReset.Latest,EnablePartitionEof = true,StatisticsIntervalMs = 5000};using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build()){//订阅主题consumer.Subscribe("unt_queue");//取消令牌,用于优雅退出var cts = new CancellationTokenSource();Console.CancelKeyPress += (_, e) => {e.Cancel = true;cts.Cancel();};try{while (true){try{//消费消息var cr = consumer.Consume(cts.Token);if (cr.IsPartitionEOF){Console.WriteLine($"分区 {cr.Partition} 已到达末尾,偏移量: {cr.Offset}");continue;}//检查空消息if (cr.Message == null){Console.WriteLine("收到空消息");continue;}//处理有效消息Console.WriteLine($"收到消息: {cr.Message.Value} [分区: {cr.Partition}, 偏移量: {cr.Offset}]");//手动提交偏移量(如果EnableAutoCommit=false)consumer.Commit(cr);}catch (ConsumeException e){Console.WriteLine($"消费错误: {e.Error.Reason}");}}}catch (OperationCanceledException){// 确保消费者正确关闭consumer.Close();}}});await Task.Delay(TimeSpan.FromSeconds(5));return Ok(new ApiResponse<bool> { Success = true, Data = true });}catch (Exception ex){return Ok(new ApiResponse<bool> { Success = false, Message = ex.Message, Data = false });}}}
}

http://www.dtcms.com/a/301624.html

相关文章:

  • 基于Prometheus+Grafana的分布式爬虫监控体系:构建企业级可观测性平台
  • 【旧文】Adobe Express使用教程
  • net8.0一键创建辅助开发的个人小工具
  • c++加载qml文件
  • G1回收器
  • 企业IT管理——信息安全策略纲要【模板】
  • TIM 编码器接口
  • listen() 函数详解
  • 表单重复提交、以及重复消费的幂等性问题解决方案
  • 企业如何便捷地使用宝塔面板管理系统服务和网站:一键全能部署与高效运维
  • 062_Arrays类与数组操作
  • 在飞牛OS上部署MoonTV:一站式影视聚合播放器安装教程
  • [spring6: @EnableWebMvc]-源码分析
  • MySQL 事务和锁
  • Webpack 和 Vite 的关键区别
  • 在Luckfox Lyra(Zero W)上将TF卡格式化为ext4文件系统
  • 人工智能——图像梯度处理、边缘检测、绘制图像轮廓、凸包特征检测
  • 递归查询美国加速-技术演进与行业应用深度解析
  • 2025 环法对决,VELO Angel Glide 坐垫轻装上阵
  • 【AI论文】GR-3技术报告
  • 《频率之光:危机降临》
  • 详细解释一个ros的CMakeLists.txt文件
  • tpms传感器的设计---硬件电路
  • python中的容器和对象
  • 深入理解SmolVLA中的Flow Matching Action Expert:从理论到实现
  • 从0到1学Pandas(九):Pandas 高级数据结构与操作
  • Adobe Animate中文版 v2024.24.0.10.14
  • 洛谷 装箱问题 动态规划-变形背包问题
  • OpenCL study - code03 rgb2gray
  • 进度条制作--Linux知识的小应用