当前位置: 首页 > news >正文

高性能采集服务上线回顾

最近基于lua脚本做了一套数据采集服务,日采集数据量 :1.1亿条/天,CPU和内存都可以做到极低的水平。当然上线以来也经历了一波全链路的崩溃到修复过程。其中涉及到nginx优化、lua脚本写入kafka性能优化、kafka性能优化;

1、数据压缩策略

     进行对应的数据压缩是大数据量传输的前提条件,业界用得比较多的压缩算法有gzip、lz4、snappy、zstd。已经实现了snappy、gzip,后续计划扩展zstd和lz4;假如考虑速度更快可以考虑snappy、lz4;考虑压缩比更高可以考虑zstd;(ps:微信桌面端通信采用snappy进行数据压缩)

2、nginx网关层

  worker_processes  auto;        #  根据CPU自动选择worker进程数据

   client_body_buffer_size 64k; #  增大可以防止生成大量小文件,全在内存进行处理,提升效率

3、lua采集脚本

      lua-resty-kafka配置优化(生产者优化)

 kafka_producer = producer:new(broker_list, {
            producer_type = "async",  -- 异步生产者类型
            required_acks = 1,
            request_timeout = 20000,
            max_retry = 15,
            retry_backoff = 1000,
            max_buffering = 100000,     -- 生产者缓冲消息队列最大值,默认5w条(queue.buffering.max.messages)
            --batch_size = 400,
            batch_size = 2097152,       -- 生产者缓冲区大小,每次发送的数据大于这个值,有可能发生堆积(send.buffer.bytes)应小于kafka的socket.request.max.bytes / 2 - 10k
            batch_num = 600,            -- 生产者每次批量发送的数量(batch.num.messages)
            linger_ms = 800,            -- 
            compression_codec = "gzip"  -- 当前插件是不支持压缩,可惜了
        })

     (1)生产者内存队列缓存区配置

     (2)每个批次发送大小

     (2)网络缓冲区,批次大小不能超过网络缓冲区,网络缓冲区最大依赖于操作系统的tcp和kafka broker配置。

      (3)发送前的数据压缩

      (4)多分区topic提升写入并行度

      (5)异步写入

      (6)ack=1,单副本成功即可

异步批量写盘优化

local function async_flush_logs()
    if #log_buffer == 0 then return end

    ngx.thread.spawn(function()
        local retries = 3  -- 最大重试次数
        local delay = 0.1  -- 每次重试的延迟时间(秒)

        while retries > 0 do
            -- 确保日志文件已初始化
            init_log_file()

            -- 检查日志文件是否已成功打开
            if current_log_file then
                local ok, err = pcall(function()
                    for _, log_entry in ipairs(log_buffer) do
                        current_log_file:write(log_entry .. "\n")
                    end
                    current_log_file:flush()  -- 确保数据立即写入磁盘
                end)

                if not ok then
                    ngx.log(ngx.ERR, "Failed to write log: ", err)
                else
                    -- 写入成功,清空缓冲区
                    log_buffer = {}
                    break  -- 退出重试循环
                end
            else
                ngx.log(ngx.WARN, "Log file is not initialized, retrying...")
                retries = retries - 1
                ngx.sleep(delay)  -- 等待一段时间后重试
            end
        end

        if retries == 0 then
            ngx.log(ngx.ERR, "Failed to write log after multiple retries")
        end
    end)
end

4、kafka端

     Kafka Broker 的 socket.request.max.bytes=204857600(200M),配合写入端进行性能优化

     堆内存配置大小 

                中型正式环境  4~8G

                大型生产环境 8~16G

5、安全性

    采用非对称加密+对称加密的混合方式进行数据加密传输;完全的非对称加密在数据量稍大的情况下会消耗大量的cpu和压缩解压时间;

   对于上报的数据压缩提需要校验上传文件的最大值,以及解压后数据大小,防止非法请求,循环解压浪费大量cpu和存储资源

相关文章:

  • Leetcode 209 长度最小的子数组
  • RX580双卡32GB显存跑DeepSeek-R17b、8b、14b、32b实测
  • vue3+vant4+js的移动端项目,每次部署完项目后,部分点击跳转至新页面的事件就失效了,不跳转,也不报错,如何解决?
  • Linux知识-第一天
  • SpringBoot集成Netty实现Ws和Tcp通信
  • 初探WebAssembly
  • 什么是组态软件?
  • Kotlin 类委托与属性委托
  • 图论-岛屿数量
  • 什么是分布式和微服务?
  • 第一章:6.差分+前缀和(一个区域整体添加一个数)
  • EVOAGENT: Towards Automatic Multi-Agent Generation via Evolutionary Algorithms
  • yolo初体验
  • 【Kubernets】K8S亲和性配置相关说明
  • (链表 删除链表的倒数第N个结点)leetcode 19
  • 【Elasticsearch】自定义内置的索引生命周期管理(ILM)策略。
  • 博客系统测试报告
  • 17. LangChain实战项目2——易速鲜花宣传文案批量生成并导出
  • 探秘基带算法:从原理到5G时代的通信变革【十】基带算法应用与对比
  • 【图像处理与OpenCV:技术栈、应用和实现】
  • 出生于1991年,石秀清拟提名为铜陵市辖县(区)政府副县(区)长人选
  • 住建部:我国超9.4亿人生活在城镇
  • 首付款12.5亿美元!三生制药与辉瑞就国产双抗达成合作协议
  • 中国戏剧梅花奖终评结果公示,蓝天和朱洁静等15名演员入选
  • 探月工程鹊桥二号中继星取得阶段性进展
  • 小米法务部:犯罪团伙操纵近万账号诋毁小米,该起黑公关案告破