当前位置: 首页 > news >正文

如何保证 Kafka 数据实时同步到 Elasticsearch?

Kafka 数据实时同步到 Elasticsearch数据同步

  1. 核心配置文件 (需要创建新文件)
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=3
topics=target_topic
key.ignore=true
connection.url=http://localhost:9200
type.name=_doc
value.converter=org.apache.confluent.connect.json.JsonSchemaConverter
value.converter.schema.registry.url=http://localhost:8081
schema.ignore=true
  1. 启动命令
.\bin\windows\connect-standalone.bat .\config\connect-standalone.properties .\config\es-sink.properties

关键优化参数说明

  • batch.size=2000:控制批量写入ES的文档数量
  • max.in.flight.requests=5:提升写入吞吐量
  • flush.timeout.ms=30000:设置刷新超时时间
  • retry.backoff.ms=5000:失败重试间隔
  • max.retries=10:最大重试次数

监控指标

// 在Kafka Connect配置中添加监控
config.put(ElasticsearchSinkConnectorConfig.READ_TIMEOUT_MS_CONFIG, 10000);
config.put(ElasticsearchSinkConnectorConfig.CONNECTION_TIMEOUT_MS_CONFIG, 10000);

异常处理方案

  1. 配置死信队列(DLQ)处理失败记录:
errors.tolerance=all
errors.deadletterqueue.topic.name=dlq-topic
errors.deadletterqueue.context.headers.enable=true
  1. 实现重试策略:
RetryUtil.executeWithRetry(() -> client.bulk(request), 3, Duration.ofSeconds(2),Arrays.asList(ElasticsearchTimeoutException.class)
);

kafka数据转换和清洗到Elasticsearch

1. 核心配置文件(新增)

# 连接器基础配置
name=es-sink-connector  # 连接器实例名称
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=3  # 并行任务数(根据ES集群规模调整)
topics=user_behavior  # 消费的Kafka主题名称# Elasticsearch配置
connection.url=http://localhost:9200  # ES集群地址
type.name=_doc  # 文档类型(ES7+固定值)
behavior.on.null.values=ignore  # 空值处理策略# 数据转换配置
value.converter=com.example.EsRecordConverter  # 自定义转换器(对应下方Java代码)
value.converter.schemas.enable=false  # 禁用schema验证# 清洗配置SMT链
transforms=FilterInvalid,FormatField,AddTimestamp
transforms.FilterInvalid.type=org.apache.kafka.connect.transforms.Filter$Value
transforms.FilterInvalid.predicate=HasUserId  # 过滤无用户ID的记录
transforms.FilterInvalid.negate=truetransforms.FormatField.type=org.apache.kafka.connect.transforms.ReplaceField$Value 
transforms.FormatField.renames=userId:user_id,ipAddr:client_ip  # 字段重命名transforms.AddTimestamp.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.AddTimestamp.timestamp.field=event_time  # 添加处理时间戳# 异常处理配置
errors.tolerance=all  # 错误容忍模式
errors.deadletterqueue.topic.name=es_sink_dlq  # 死信队列名称
errors.deadletterqueue.context.headers.enable=true  # 保留错误上下文# 性能优化
batch.size=2000  # 批量写入条数
flush.timeout.ms=30000  # 刷新超时时间
max.in.flight.requests=5  # 并发请求数

2. 自定义转换器代码(新增)

/*** 自定义数据转换器(对应配置中的value.converter)*/
public class EsRecordConverter implements Converter {private static final Logger LOG = LoggerFactory.getLogger(EsRecordConverter.class);// 数据转换入口方法@Overridepublic SchemaAndValue toConnectData(String topic, byte[] value) {try {JSONObject json = new JSONObject(new String(value, "UTF-8"));// 数据清洗:IP地址标准化if(json.has("client_ip")) {String ip = json.getString("client_ip");json.put("client_ip", ip.replace(" ", ""));  // 去除空格}// 添加清洗标记json.put("clean_flag", calculateCleanFlag(json));return new SchemaAndValue(null, json.toString());} catch (Exception e) {LOG.error("Data conversion failed", e);throw new DataException("Conversion error", e);}}// 生成数据清洗哈希值private String calculateCleanFlag(JSONObject data) {String rawData = data.toString();return DigestUtils.sha256Hex(rawData);}
}

3. Elasticsearch预处理管道(命令行执行)

# 创建数据预处理管道
curl -X PUT "localhost:9200/_ingest/pipeline/kafka_pipeline" -H 'Content-Type: application/json' -d'
{"description": "Data final cleaning","processors": [{"remove": {  # 移除调试字段"field": ["debug_info", "temp_field"],"ignore_missing": true}},{"date": {  # 时间格式标准化"field": "event_time","target_field": "@timestamp","formats": ["UNIX_MS"]}}]
}'

4. 启动命令(Windows环境)

:: 启动独立模式连接器
.\bin\windows\connect-standalone.bat ^  # 主启动脚本
.\config\connect-standalone.properties ^  # 通用配置
.\config\es-sink.properties  # 当前连接器配置:: 参数说明:
:: 1. connect-standalone.properties - Kafka Connect基础配置
:: 2. es-sink.properties - 当前连接器专属配置
  1. 数据验证命令
# 查看ES中的清洗后数据
curl -X GET "localhost:9200/user_behavior/_search?pretty" -H 'Content-Type: application/json' -d'
{"query": {"term": {"clean_flag": "d4e5f6..."  # 替换实际哈希值}}
}'# 检查死信队列(需配置kafka-cli)
.\bin\windows\kafka-console-consumer.bat ^
--bootstrap-server localhost:9092 ^
--topic es_sink_dlq ^
--from-beginning

方案特点:

  1. 三层清洗架构

    • SMT层:基础格式处理
    • 转换器层:业务逻辑清洗
    • ES管道层:存储前终检
  2. 追踪机制

    • 通过clean_flag字段实现数据溯源
    • 死信队列保留原始错误数据
  3. 性能平衡

    • 批量大小与内存占用的最佳实践
    • 重试策略:10次指数退避重试(max.retries=10)

相关文章:

  • 【MySQL】第7节|Mysql锁机制与优化实践以及MVCC底层原理剖析
  • 预分配矩阵内存提升文件数据读取速度
  • Kotlin中let、run、with、apply及also的差别
  • 【Python/Pygame】事件监测
  • [C语言初阶]扫雷小游戏
  • Java 函数式接口(Functional Interface)
  • 符合Python风格的对象(使用 __slots__ 类属性节省空间)
  • DeepSeek 赋能数字农业:从智慧种植到产业升级的全链条革新
  • Windows 中动态库.dll 的 .lib 文件有什么作用?
  • SOC-ESP32S3部分:10-GPIO中断按键中断实现
  • 什么是模板字符串?比普通字符串的好处
  • mongodb语法$vlookup性能分析
  • YOLO11解决方案之使用 Streamlit 应用程序进行实时推理
  • github公开项目爬取
  • 【博客系统】博客系统第五弹:基于令牌技术实现用户登录接口
  • 【C++/控制台】迷宫游戏
  • SQL每日一练
  • CloudWeGo-Netpoll:高性能NIO网络库浅析
  • python web 开发-Flask-Login使用详解
  • AtCoder AT_abc407_c [ABC407C] Security 2
  • 网站开发培训学院/cms自助建站系统
  • 网站建设学生选课课程设计报告/直播回放老卡怎么回事
  • 网站开发架设/外贸平台推广
  • 建站之星如何建网站/自己怎么优化网站排名
  • 哪些网站做企业招聘不要花钱/方象科技专注于什么领域
  • 广州制作网站哪家专业/六年级上册数学优化设计答案