当前位置: 首页 > news >正文

kafka发送消息,同时支持消息压缩和不压缩

1、生产者配置

nacos中增加配置,和公共spring-kafka配置字段有区分

需要发送压缩消息时,使用该配置类发送即可


import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;/*** @date 2025/3/31 13:50* @description:*/
@Configuration
public class KafkaTemplateConfig {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;private KafkaTemplate<String, String> compressedKafkaTemplate;@Value("${spring.kafka.producer.compression-format:lz4}")private String compressionFormat;@Value("${spring.kafka.producer.max-request-size:10485760}")private String maxRequestSize;@PostConstructpublic void initProducerConfig() {DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(kafkaTemplate.getProducerFactory().getConfigurationProperties());Map<String, Object> configs = new HashMap<>(2);configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionFormat);configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,maxRequestSize );factory.updateConfigs(configs);compressedKafkaTemplate = new KafkaTemplate<>(factory);}public KafkaTemplate<String, String> getCompressedKafkaTemplate() {return compressedKafkaTemplate;}}

2、使用

关键代码:

kafkaTemplateConfig.getCompressedKafkaTemplate().send(logTopic, messageBody)
                        .addCallback(success -> log.info("send log to kafka success, requestId:{},traceId:{}", requestLog.getRequestNo(),requestLog.getTraceId()),
                                failure -> log.error("send log to kafka failure, param:{}", requestLog, failure));


import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.chint.anneng.finance.common.log.http.HttpLogExtendParam;
import com.chint.anneng.finance.common.log.kafka.KafkaTemplateConfig;
import com.chint.anneng.finance.common.utils.thread.wrapper.ThreadPoolExecutorMdcWrapper;
import com.chint.anneng.finance.portal.log.api.model.param.ApiOperateLogParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @date 2025/2/10 17:25*/
@Slf4j
@Service
@ConditionalOnProperty(name = "finance.kafka-biz.log", havingValue = "true", matchIfMissing = true)
public class XXKafkaLogCollector implements LogCollector {private static final ThreadPoolExecutor executor = new ThreadPoolExecutorMdcWrapper(0, Runtime.getRuntime().availableProcessors() * 2,60L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000),new NameThreadFactory("finance.kafka-biz.log."), new ThreadPoolExecutor.AbortPolicy());@Autowiredprivate KafkaTemplateConfig kafkaTemplateConfig;@Value("${finance.kafka.log-topic:finance_http_biz_log}")private String logTopic;@Overridepublic void sendHttpLog(HttpLogExtendParam requestLog) {if (!Boolean.TRUE.equals(requestLog.getLogConfig().getSave2Db())) {return;}executor.submit(() -> {try {requestLog.setBizSucCode(null);requestLog.setBizSucKey(null);requestLog.setDecryptHandlerCode(null);requestLog.setLogConfig(null);String messageBody = JSONObject.toJSONString(requestLog);kafkaTemplateConfig.getCompressedKafkaTemplate().send(logTopic, messageBody).addCallback(success -> log.info("send log to kafka success, requestId:{},traceId:{}", requestLog.getRequestNo(),requestLog.getTraceId()),failure -> log.error("send log to kafka failure, param:{}", requestLog, failure));} catch (Exception e) {log.warn("http请求日志[{}]发送到日志中心失败,请通过日志文件查看日志内容", requestLog.getRequestNo(), e);}});}@Overridepublic void sendApiLog(ApiOperateLogParam apiOperateLog) {}
}

相关文章:

  • 比较UNION ALL与WITH ROLLUP
  • 函数返回const引用,使用const修饰变量接收
  • java导出word含表格并且带图片
  • 一种改进的CFAR算法用于目标检测(解决多目标掩蔽)
  • 996引擎-实战笔记:Lua 的 NPC 面板获取 Input 内容
  • 从基础概念到前沿应用了解机器学习
  • 23种设计模式-创建型模式之单例模式(Java版本)
  • 用 Deepseek 写的html油耗计算器
  • AI 模型高效化:推理加速与训练优化的技术原理与理论解析
  • 基于Python的医疗质量管理指标智能提取系统【2025代码版】
  • 从入门到精通【MySQL】 JDBC
  • 05-DevOps-Jenkins自动拉取构建代码2
  • 「数据可视化 D3系列」入门第七章:坐标轴的使用
  • 数据结构——八大排序算法
  • 第十节:性能优化-如何排查组件不必要的重复渲染?
  • PH热榜 | 2025-04-17
  • requestAnimationFrame 深度理解
  • 第二十三天 - 性能优化技巧 - 内存分析与调优 - 练习:资源泄漏检测工具
  • GPT对话UI--通义千问API
  • 【LangChain4j快速入门】5分钟用Java玩转GPT-4o-mini,Spring Boot整合实战!| 附源码
  • 神舟二十号航天员乘组将于近日择机实施第一次出舱活动
  • 在“三只手”上跳舞:公共政策的科学与艺术——读《市场、国家和民众:公共政策经济学》
  • 研究显示:肺活量衰减始于20至25岁
  • 再囤三个月库存!美国客户抢付尾款,外贸企业发货订单排到7月
  • 达恩当选罗马尼亚总统
  • 首届巴塞尔艺术奖公布:大卫·哈蒙斯、曹斐等36人获奖