当前位置: 首页 > news >正文

[Data Pipeline] Kafka消息 | Redis缓存 | Docker部署(Lambda架构)

在这里插入图片描述

第七章:Kafka消息系统(实时流处理)

欢迎回到数据探索之旅!

  • 在前六章中,我们构建了强大的**批量处理流水线**。

  • 通过Airflow DAG(批量任务编排)协调Spark作业(数据处理),数据从MySQL数据库(源系统)经数据层(青铜、白银、黄金)存入MinIO存储(数据湖),并通过数据质量检查确保数据可靠性。这种批量处理非常适合生成日报和分析。

  • 但若需要即时响应数据变化呢?例如客户完成订单后立即推荐相关商品,此时无法等待每日批量作业。

这就需要**实时数据流处理——这正是Kafka消息系统**的核心价值

实时流处理解决的问题

批量处理存在固有延迟(数小时至数天),无法满足以下场景:

  • 欺诈检测(即时拦截可疑交易)
  • 个性化推荐(购物后实时推荐)
  • 实时仪表盘(销售数据动态更新
  • 事件驱动告警

Kafka:高速数据传送带

Apache Kafka是构建实时数据管道的分布式流平台

其核心设计犹如高速传送带,具备:

  • 高吞吐(每秒百万级消息处理)
  • 低延迟(毫秒级响应)
  • 持久化存储(消息可回溯)
  • 水平扩展能力

核心概念解析

  • 消息(事件):数据变更的最小单元,例如MySQL表的新增记录
  • 主题(Topic):数据分类通道(如mysql.coffee_shop.order_details存储订单明细变更)
  • 生产者(Producer):数据写入端(如监控MySQL的Debezium连接器)
  • 消费者(Consumer):数据读取端(如推荐服务)
  • 代理(Broker):Kafka服务节点,组成高可用集群

实时推荐系统工作流

订单数据实时处理流程:

在这里插入图片描述

整个过程可在500ms内完成,实现秒级响应

数据摄取:Kafka Connect与Debezium

**变更数据捕获(CDC)**是实现实时数据摄取的关键技术

# docker-compose.yaml配置片段
connect:image: confluentinc/cp-kafka-connectcommand:- bash- -c- |# 安装Debezium MySQL连接器confluent-hub install debezium/debezium-connector-mysql/etc/confluent/docker/runenvironment:CONNECT_BOOTSTRAP_SERVERS: "kafka-1:9092,kafka-2:9092"  # Kafka集群地址

Debezium通过解析MySQL二进制日志(binlog),将数据变更转化为标准事件格式。示例事件消息:

{"payload": {"after": {"order_id": "ORD_20230619_001","product_id": "COFFEE_BEAN_ESPRESSO","quantity": 2},"op": "c",  // 操作类型:c=新增,u=更新,d=删除"ts_ms": 1687189200000  // 事件时间戳}
}

数据消费:Python消费者实现

实时推荐服务的消费者核心代码:

# kafka_client.py消费者工作线程
def consumer_worker(worker_id: int):# 初始化Kafka连接handler = KafkaHandler(["kafka-1:9092", "kafka-2:9092"])consumer = handler.get_consumer(topic="mysql.coffee_shop.order_details",group_id="realtime-recs")producer = handler.get_producer()while True:# 批量拉取消息(每秒轮询)messages = consumer.poll(timeout_ms=1000)for msg in messages.items():for record in msg.value:# 处理事件(调用推荐逻辑)process_recommendation(record, producer)def process_recommendation(record, producer):# 从Redis获取用户画像(详见第八章)user_profile = redis.get(f"user:{record['user_id']}")if user_profile["tier"] == "DIAMOND":# 生成推荐并发送至下游主题suggestion = {"order_id": record["order_id"],"suggested_product": "COFFEE_GRINDER"}producer.send("order_suggestions", suggestion)

该实现包含以下关键技术点

  1. 消费者组(group_id)实现负载均衡
  2. 自动提交偏移量(enable_auto_commit=True)
  3. 批量消息处理提升吞吐量

基础设施部署

Docker Compose定义的核心服务:

services:kafka-1:image: bitnami/kafka:3.5.1ports:- 29092:29092  # 外部访问端口environment:KAFKA_CFG_NODE_ID: 1  # 节点标识KAFKA_CFG_LISTENERS: PLAINTEXT://:9092kafka-ui:image: provectuslabs/kafka-uiports:- 8000:8080  # 监控界面端口environment:KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS: kafka-1:9092

关键组件说明:

  • 双节点Kafka集群(kafka-1/kafka-2)保障高可用
  • Kafka UI提供可视化监控(http://localhost:8000)
  • 初始化服务(init-kafka)自动创建主题和分区

价值总结

Kafka实时流处理系统与批量处理管道形成互补:

批量处理实时流处理
延迟小时级毫秒级
吞吐极高
用例历史分析即时响应
存储数据湖持久化短期事件保留

这种混合架构同时满足企业对历史数据分析和实时决策的需求$CITE_14 $CITE_17。

下一章:Redis缓存/存储


第八章:Redis缓存/存储

详细专栏:Redis文档学习

在第七章:Kafka消息系统(实时流处理)中,我们了解到Kafka如何实现实时数据流动以支持商品推荐服务。

实时推荐需要极速访问用户等级、支付方式和商品信息,这正是Redis缓存/存储的核心价值

Redis核心特性

Redis是开源的内存数据结构存储系统,具备以下关键能力:

  • 亚毫秒级响应:数据存储在内存而非磁盘,访问速度比传统数据库快100倍
  • 丰富数据结构:支持字符串、哈希、集合、有序集合等数据结构
  • 数据持久化:支持RDB快照AOF日志两种持久化方式
  • 高可用架构:支持主从复制集群部署

项目中的Redis应用

在我们的咖啡销售数据管道中,Redis承担两大核心角色:

1. 查找数据缓存(静态数据加速)

通过lookup_data_cache.py脚本定时从MySQL加载三类核心数据到Redis

# 来源: scripts/database/lookup_data_cache.py
# 钻石客户ID存储为集合
r.sadd("diamond_customers", customer_id)# 支付方式ID存储为集合
r.sadd("bank_acb_payment", payment_method_id)# 商品详情存储为哈希
r.hset(f"product:{product_id}", mapping={"name": "浓缩咖啡", "unit_price": 25})

实时服务通过redis_static连接访问这些数据

# 检查钻石客户(时间复杂度O(1))
is_diamond = redis_static.sismember("diamond_customers", "CUST_202306001")# 获取商品详情(哈希全量读取)
product_info = redis_static.hgetall("product:COFFEE_BEAN_001")

2. 订单状态管理(动态数据暂存)

使用redis_dynamic连接处理实时订单流

# 订单计数器递增(原子操作保证线程安全)
current_count = redis_dynamic.incr(f"message_count:ORDER_20230619_001")# 存储已购商品集合(自动去重)
redis_dynamic.sadd(f"ordered_products:ORDER_20230619_001", "COFFEE_GRINDER")# 设置订单状态(带90秒过期时间)
redis_dynamic.setex(f"order_status:ORDER_20230619_001", 90, "completed")

Redis架构优势

维度传统数据库Redis
响应时间10-100ms0.1-1ms
QPS~1k~1M
数据结构固定表结构多种灵活结构
持久化强持久化可配置持久化

Docker部署配置

Redis服务在docker-compose.yaml中的定义

services:redis:image: redis:7.0.12ports:- "6379:6379"volumes:- ./redis_data:/data  # 数据持久化目录command: ["redis-server", "--save 60 1000", "--appendonly yes"]

关键配置说明:

  • --save 60 100060秒内有1000次写入则触发RDB快照
  • --appendonly yes:启用AOF日志记录所有写操作

数据流可视化

Redis在实时推荐中的交互流程:

在这里插入图片描述

总结

Redis通过内存存储和高效数据结构,在实时推荐中实现:

  1. 查询加速:将钻石客户检查从10ms级优化至0.1ms级
  2. 状态同步:可靠跟踪分布式环境下的订单处理进度
  3. 资源解耦:降低MySQL负载峰值压力达80%

这种缓存+暂存的双重模式,使实时推荐服务能在500ms内完成从事件接收到推荐生成的完整流程

下一章:Docker Compose环境


第九章:Docker Compose环境

详细专栏:Docker 云原生

欢迎回到咖啡销售数据管道核心概念系列的最终章!我们已经深入探讨了各个组件

  • 数据来源(第一章:MySQL数据库(源系统))
  • 数据处理引擎(第二章:Spark作业(数据处理))
  • 分层存储(第三章:MinIO存储(数据湖)和第四章:数据层(青铜、白银、黄金))
  • 工作流调度(第五章:Airflow DAG(批量编排))
  • 数据质量保障(第六章:数据质量检查)
  • 实时事件处理(第七章:Kafka消息系统(实时流))
  • 以及高速缓存(第八章:Redis缓存/存储)

想象组装复杂机械或指挥大型乐团——每个零件或乐手都有特定角色。

如何确保它们协同运作

这正是Docker Compose要解决的核心挑战:将Spark、Airflow、Kafka、数据库等异构服务整合为有机整体。

Docker Compose核心价值

Docker Compose是通过YAML文件定义和管理多容器应用的工具,具备三大核心能力

在这里插入图片描述

蓝图文件解析

项目包含两个核心配置文件:

  • docker-compose.yaml:定义实时服务组件
  • docker-compose-batch.yaml:定义批处理服务组件

服务定义范式

# 摘自 docker-compose-batch.yaml
services:minio:image: minio/minio:latestcontainer_name: minioports:- "9000:9000"  # S3 API端口- "9001:9001"  # 控制台端口environment:MINIO_ROOT_USER: minioadminMINIO_ROOT_PASSWORD: minioadminvolumes:- ./volumes/minio:/data  # 数据持久化路径networks:- myNetwork

关键配置说明:

  • image:指定Docker镜像版本,确保环境一致性
  • ports:端口映射遵循主机端口:容器端口格式
  • volumes:数据卷实现主机与容器的路径映射
  • networks:自定义网络实现服务发现

网络拓扑架构

networks:myNetwork:  # 自定义覆盖网络driver: bridgeattachable: true

网络特性:

  • 服务间通过服务名互访(如spark-master:7077)
  • 隔离外部网络干扰,提升安全性
  • 支持跨compose文件网络共享

数据卷设计

volumes:- ./airflow/dags:/opt/airflow/dags  # DAG文件同步- ./volumes/postgres:/var/lib/postgresql/data  # 元数据持久化

数据管理策略:

  • 批处理数据:MinIO卷映射实现数据湖持久化
  • 元数据存储:PostgreSQL卷保障任务状态不丢失
  • 日志文件:主机目录映射方便问题排查

💡完整服务矩阵

服务类型包含组件通信协议
批处理服务Spark Master/Worker, AirflowHTTP/8080, JDBC/7077
实时服务Kafka集群, Redis, Kafka ConnectTCP/9092, TCP/6379
存储服务MinIO, PostgreSQLS3/9000, JDBC/5432
监控服务Prometheus, Grafana, Kafka UIHTTP/3000, Web/8000

环境管理指令集

全栈启动命令

docker compose -f docker-compose.yaml -f docker-compose-batch.yaml up -d

参数解析:

  • -f:指定多个compose文件实现模块化配置
  • -d后台守护模式运行
  • 启动顺序通过depends_on字段控制

运维监控命令

# 查看容器运行状态
docker compose -f docker-compose.yaml ps# 查看实时日志
docker compose logs -f spark-master# 弹性扩容Spark Worker
docker compose up -d --scale spark-worker=3

系统协同原理

在这里插入图片描述

协同要点:

  1. 批量处理流:Airflow通过SparkSubmitOperator提交作业到Spark集群,实现ETL流水线
  2. 实时处理流Kafka Connect监控MySQL binlog生成CDC事件,触发实时推荐计算
  3. 监控告警流:各组件暴露Metrics端点,Prometheus采集后通过Grafana展示

核心优势

环境一致性保障

  • 开发、测试、生产环境使用相同镜像版本(如bitnami/kafka:3.5.1)
  • 避免"在我机器上能跑"的问题,实现跨平台兼容

资源隔离控制

# 限制容器资源配额
deploy:resources:limits:cpus: '0.50'memory: 1024Mreservations:cpus: '0.25'memory: 512M

资源管理策略:

  • Spark Worker按计算需求分配CPU/MEM
  • Kafka Broker根据吞吐量配置资源上限
  • 关键服务(如PostgreSQL)预留基础资源

快速扩缩容能力

# 扩展Kafka Broker节点
docker compose up -d --scale kafka=3# 缩减Spark Worker节点
docker compose up -d --scale spark-worker=2

动态调整策略:

  • 批量任务高峰期扩展Spark计算节点
  • 大促期间增加Kafka分区和消费者组实例

总结

Docker Compose通过声明式配置将复杂的多服务系统抽象为可版本控制的蓝图文件,实现:

  1. 一键部署:15+组件通过单个命令启动
  2. 服务发现自定义网络实现容器间域名解析
  3. 数据治理卷映射策略保障数据生命周期
  4. 资源管控:CPU/内存配额限制防止资源争抢

该方案使我们的数据管道具备企业级可维护性,支持从开发环境到生产环境的平滑过渡。

通过组合批量处理与实时处理组件,构建出完整的Lambda架构(详见 架构专栏)实现。

在这里插入图片描述

相关文章:

  • jquery 赋值时不触发change事件解决——仙盟创梦IDE
  • 将多个Excel合并到一个Excel中的方法
  • 【嵌入式硬件实例】-555定时器控制舵机/伺服电机
  • MySQL 三大日志:Redo、Undo 与 Binlog 详解
  • Spring Boot 集成 Elasticsearch(含 ElasticsearchRestTemplate 示例)
  • 102页满分PPT | 汽车设备制造业企业信息化业务解决方案智能制造汽车黑灯工厂解决方案
  • [安卓/IOS按键精灵辅助工具]关于脚本中的统计记录功能
  • 黑盒测试(一)(包含源码)
  • WEB安全--WAF的绕过思路
  • React 轻量级状态管理器Zustand
  • YOLOv8改进:Neck篇——2024.1全新MFDS-DETR的HS-FPN特征融合层解析
  • 【Gin框架】中间件
  • 墨记APP:水墨风记事,书写生活诗意
  • 【AI Study】第四天,Pandas(10)- 实用技巧
  • 软件范式正在经历第三次革命
  • 关于嵌入式编译工具链与游戏移植的学习
  • vue3 电商类网站实现规格的选择
  • leetcode:461. 汉明距离(python3解法,数学相关算法题)
  • simuilink和ROS2数据联通,Run后一直卡在Initializting
  • 基于微信小程序和云开发的企业绿色融资平台的设计与实现
  • 网站建设的公司有发展吗/百度营销官网
  • 企业网站建设的基本原则/最佳的资源搜索引擎
  • 网站服务器返回状态码404/软文范例100字以内
  • 代理注册公司服务/济南seo怎么优化
  • 叫别人做网站安全吗/广州百度推广开户
  • 做返利网站如何操作流程/网站手机版排名seo