kafka发送消息,同时支持消息压缩和不压缩
1、生产者配置
nacos中增加配置,和公共spring-kafka配置字段有区分
需要发送压缩消息时,使用该配置类发送即可
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;/*** @date 2025/3/31 13:50* @description:*/
@Configuration
public class KafkaTemplateConfig {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;private KafkaTemplate<String, String> compressedKafkaTemplate;@Value("${spring.kafka.producer.compression-format:lz4}")private String compressionFormat;@Value("${spring.kafka.producer.max-request-size:10485760}")private String maxRequestSize;@PostConstructpublic void initProducerConfig() {DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(kafkaTemplate.getProducerFactory().getConfigurationProperties());Map<String, Object> configs = new HashMap<>(2);configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionFormat);configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,maxRequestSize );factory.updateConfigs(configs);compressedKafkaTemplate = new KafkaTemplate<>(factory);}public KafkaTemplate<String, String> getCompressedKafkaTemplate() {return compressedKafkaTemplate;}}
2、使用
关键代码:
kafkaTemplateConfig.getCompressedKafkaTemplate().send(logTopic, messageBody)
.addCallback(success -> log.info("send log to kafka success, requestId:{},traceId:{}", requestLog.getRequestNo(),requestLog.getTraceId()),
failure -> log.error("send log to kafka failure, param:{}", requestLog, failure));
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.chint.anneng.finance.common.log.http.HttpLogExtendParam;
import com.chint.anneng.finance.common.log.kafka.KafkaTemplateConfig;
import com.chint.anneng.finance.common.utils.thread.wrapper.ThreadPoolExecutorMdcWrapper;
import com.chint.anneng.finance.portal.log.api.model.param.ApiOperateLogParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @date 2025/2/10 17:25*/
@Slf4j
@Service
@ConditionalOnProperty(name = "finance.kafka-biz.log", havingValue = "true", matchIfMissing = true)
public class XXKafkaLogCollector implements LogCollector {private static final ThreadPoolExecutor executor = new ThreadPoolExecutorMdcWrapper(0, Runtime.getRuntime().availableProcessors() * 2,60L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000),new NameThreadFactory("finance.kafka-biz.log."), new ThreadPoolExecutor.AbortPolicy());@Autowiredprivate KafkaTemplateConfig kafkaTemplateConfig;@Value("${finance.kafka.log-topic:finance_http_biz_log}")private String logTopic;@Overridepublic void sendHttpLog(HttpLogExtendParam requestLog) {if (!Boolean.TRUE.equals(requestLog.getLogConfig().getSave2Db())) {return;}executor.submit(() -> {try {requestLog.setBizSucCode(null);requestLog.setBizSucKey(null);requestLog.setDecryptHandlerCode(null);requestLog.setLogConfig(null);String messageBody = JSONObject.toJSONString(requestLog);kafkaTemplateConfig.getCompressedKafkaTemplate().send(logTopic, messageBody).addCallback(success -> log.info("send log to kafka success, requestId:{},traceId:{}", requestLog.getRequestNo(),requestLog.getTraceId()),failure -> log.error("send log to kafka failure, param:{}", requestLog, failure));} catch (Exception e) {log.warn("http请求日志[{}]发送到日志中心失败,请通过日志文件查看日志内容", requestLog.getRequestNo(), e);}});}@Overridepublic void sendApiLog(ApiOperateLogParam apiOperateLog) {}
}