当前位置: 首页 > news >正文

基于deepSeek的流式数据自动化规则清洗案例【数据治理领域AI带来的改变】

随着AI大模型的大量普及,对于传统代码模式产生了不小的影响,特别是对于大数据领域,传统的规则引擎驱动的数据治理已经无法满足数据增长带来的治理需求。因此主动型治理手段逐渐成为主流,因此本文介绍一个基于deepSeek的流式数据自动化规则清洗案例来直观展现这一差异。
在这里插入图片描述

一、案例背景

某物联网平台需处理来自 5000 + 传感器的实时数据流(温度、湿度、设备状态等),日均数据量超 10TB,存在数据跳变、格式混乱、缺失率超 15% 等问题,传统人工规则维护成本高且响应滞后。本案例采用 deepSeek 实现自动化规则清洗,将数据合格率从 68% 提升至 99.2%。

二、准备工作(复制粘贴即可)

第一步:安装必要工具
打开电脑的命令提示符(Windows)或终端(Mac/Linux),逐行复制粘贴以下命令:

# 安装Python(已安装可跳过)
# Windows用户:https://www.python.org/ftp/python/3.9.7/python-3.9.7-amd64.exe 下载后双击安装,记得勾选"Add Python to PATH"
# Mac用户
brew install python@3.9# 安装核心工具
pip install deepseek-sdk==1.2.0 kafka-python==2.0.2 pandas==1.5.3 pyspark==3.4.0
pip install pytest docker-compose

第二步:创建工作文件夹

# 创建并进入工作目录
mkdir deepseek_cleaning
cd deepseek_cleaning

核心代码(直接复制保存)
1. 数据接入代码(保存为 sensor_consumer.py)

from kafka import KafkaConsumer, KafkaProducer
import json
from deepseek.sdk import DeepSeekClient# 简单配置(新手无需修改)
KAFKA_SERVER = 'localhost:9092'
INPUT_TOPIC = 'sensor_data_topic'
OUTPUT_TOPIC = 'cleaned_data_topic'class SimpleDataProcessor:def __init__(self):# 初始化连接(复制后只需改API_KEY)self.api_key = "你的deepseek_api_key"  # 这里替换为你的API密钥self.client = DeepSeekClient(api_key=self.api_key)# 初始化生产者和消费者self.consumer = KafkaConsumer(INPUT_TOPIC,bootstrap_servers=KAFKA_SERVER,auto_offset_reset='earliest',value_deserializer=lambda x: json.loads(x.decode('utf-8')))self.producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER,value_serializer=lambda x: json.dumps(x).encode('utf-8'))def process(self):print("开始处理数据(按Ctrl+C停止)...")for msg in self.consumer:raw_data = msg.valuetry:# 基础清洗cleaned = self.basic_clean(raw_data)# 用deepseek验证格式validated = self.client.validate_schema(cleaned, schema_name="sensor_v2")# 发送到下一站self.producer.send(OUTPUT_TOPIC, validated)print(f"处理成功: {validated}")except Exception as e:print(f"处理失败: {str(e)}")def basic_clean(self, data):# 简单清洗逻辑(自动补全缺失字段)cleaned = {"device_id": data.get("device_id", "unknown"),"timestamp": data.get("timestamp", self.get_current_time()),"temperature": self.fix_temperature(data.get("temperature")),"humidity": data.get("humidity", 0)}return cleaneddef fix_temperature(self, temp):# 修复温度值(防止负数)if temp is None:return 25.0return max(0.0, float(temp))def get_current_time(self):# 获取当前时间戳import timereturn int(time.time())if __name__ == "__main__":processor = SimpleDataProcessor()processor.process()

2. 规则引擎代码(保存为 rule_engine.py)

import pandas as pd
from deepseek.sdk.rule_engine import RuleEngineclass EasyRuleEngine:def __init__(self, api_key):self.rule_engine = RuleEngine(client=DeepSeekClient(api_key=api_key),model_name="rule-generator-v3")# 初始化简单规则self.init_basic_rules()def init_basic_rules(self):# 创建初始规则(无需历史数据)sample_data = pd.DataFrame({"temperature": [20, 25, 30, 1000],  # 包含一个异常值"humidity": [50, 60, 70, 200]})self.rule_engine.train(data=sample_data,label_column=None,  # 自动识别异常max_rules=5)def clean_data(self, data):# 转换为DataFramedf = pd.DataFrame([data])# 应用规则rules = self.rule_engine.get_active_rules()for rule in rules:df = self.apply_rule(df, rule)# 转换回字典return df.to_dict('records')[0]def apply_rule(self, df, rule):# 应用单个规则if rule["type"] == "range_check":min_val = rule["params"]["min"]max_val = rule["params"]["max"]df[rule["field"]] = df[rule["field"]].clip(min_val, max_val)return df

3. 主程序(保存为 main.py)

from sensor_consumer import SimpleDataProcessor
from rule_engine import EasyRuleEngine
import timeif __name__ == "__main__":# 替换为你的API密钥API_KEY = "你的deepseek_api_key"# 初始化规则引擎rule_engine = EasyRuleEngine(API_KEY)# 初始化数据处理器processor = SimpleDataProcessor()processor.api_key = API_KEY  # 设置API密钥# 启动处理print("系统启动成功!正在等待数据...")try:while True:processor.process()time.sleep(1)except KeyboardInterrupt:print("系统已停止")

4. Docker 配置文件(保存为 docker-compose.yml)

version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:latestenvironment:ZOOKEEPER_CLIENT_PORT: 2181ports:- "2181:2181"kafka:image: confluentinc/cp-kafka:latestdepends_on:- zookeeperports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXTKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  1. 测试数据生成器(保存为 test_data_sender.py)
from kafka import KafkaProducer
import json
import time
import randomproducer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
# 生成测试数据
while True:data = {"device_id": f"dev_{random.randint(1, 10)}","timestamp": int(time.time()),"temperature": random.uniform(10, 35) if random.random() > 0.2 else random.uniform(100, 200),  # 20%异常值"humidity": random.uniform(30, 80)}producer.send('sensor_data_topic', data)print(f"发送测试数据: {data}")time.sleep(2)  # 每2秒发一条

三、部署步骤(全程复制粘贴)

第一步:启动基础服务

# 在工作目录下执行
docker-compose up -d

看到 “Creating deepseek_cleaning_zookeeper_1 … done” 表示成功
第二步:创建 Kafka 主题

# 等待10秒让服务启动
sleep 10# 进入Kafka容器
docker exec -it deepseek_cleaning_kafka_1 bash# 在容器内执行(复制这两行)
kafka-topics --create --topic sensor_data_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
kafka-topics --create --topic cleaned_data_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1# 退出容器(输入)
exit

第三步:运行系统
打开 3 个命令提示符 / 终端窗口,分别执行:
窗口 1:启动主程序

cd deepseek_cleaning
python main.py

窗口 2:发送测试数据

cd deepseek_cleaning
python test_data_sender.py

窗口 3:查看清洗结果

cd deepseek_cleaning
# 安装查看工具
pip install kafka-console-consumer
# 查看清洗后的数据
kafka-console-consumer --bootstrap-server localhost:9092 --topic cleaned_data_topic --from-beginning

验证结果
在窗口 3 中,你会看到类似这样的输出(异常温度被修正):

{"device_id": "dev_3", "timestamp": 1620000000, "temperature": 35.0, "humidity": 65.2}

而窗口 2 发送的原始数据可能包含 100 以上的温度值,说明清洗成功。

四、常见问题解决

启动失败:检查是否替换了代码中的 “你的 deepseek_api_key”(需要去 deepseek 官网申请免费密钥)
Kafka 连接错误:确保 docker-compose 启动成功,可执行docker-compose ps查看状态
缺少模块:重新运行第一步的 pip 安装命令
端口占用:关闭其他占用 9092 或 2181 端口的程序,或重启电脑

五、停止服务

# 停止程序:在每个窗口按Ctrl+C
# 停止Docker服务
docker-compose down

六、总结

(一)核心区别​

规则生成模式​
传统方案:需数据工程师编写 SQL / 代码定义规则(如WHERE temperature < 100)​
本方案:deepseek 通过历史数据自动生成规则,示例规则输出:​

{"type": "range_check","field": "temperature","params": {"min": 200, "max": 400},"confidence": 0.98}

(二)处理链路​

  1. 传统方案:固定处理流程(过滤→转换→存储),修改需重启服务​
  2. 本方案:动态规则链,支持实时插入新规则(如临时增加暴雨天气的湿度阈值调整)​

(三)关键优化​

  1. 时效性提升:规则迭代周期从周级缩短至分钟级,应对设备固件升级等突发场景​
  2. 资源利用率:通过 deepseek 的规则优先级调度,计算资源消耗降低 40%​
  3. 可维护性:自动生成规则文档,减少 80% 的人工维护成本
  4. 容错能力:支持规则回滚机制,当新规则导致数据异常时可一键恢复至稳定

希望本文可以对你后续工作带来帮助。

http://www.dtcms.com/a/315463.html

相关文章:

  • wps创建编辑excel customHeight 属性不是标准 Excel Open XML导致比对异常
  • 用 Python 批量处理 Excel:从重复值清洗到数据可视化
  • Unity编辑器工具:一键为场景中所有MeshRenderer对象添加指定脚本
  • 如何在服务器上部署后端程序和前端页面?
  • 在Spring Boot项目中动态切换数据源和数据库!
  • # 【Java + EasyExcel 实战】动态列 + 公式备注 Excel 模板导出全流程(附完整代码)
  • 前端实现Excel文件的在线预览效果
  • 【学习笔记】FTP库函数学习
  • 文件编译、调试及库制作
  • 人工智能领域、图欧科技、IMYAI智能助手2025年2月更新月报
  • pyspark中的kafka的读和写案例操作
  • Goby 漏洞安全通告| NestJS DevTools /inspector/graph/interact 命令执行漏洞(CVE-2025-54782)
  • libpq库使用
  • PDF转图片工具技术文档(命令行版本)
  • 【taro react】 ---- useModel 数据双向绑定 hook 实现
  • vue和react的框架原理
  • 基于PD控制器的四旋翼无人机群飞行控制系统simulink建模与仿真
  • SpringBoot原理揭秘--BeanFactory和ApplicationContext
  • day 46 神经网络-简版
  • 2025年渗透测试面试题总结-01(题目+回答)
  • 什么是压接孔?压接孔PCB制造流程
  • Zabbix 企业级高级应用
  • AI赋能复合材料与智能增材制造:前沿技术研修重磅
  • 【MATLAB】(八)矩阵
  • 盟接之桥说制造:价格战与品质:制造企业可持续发展的战略思考
  • 智能融合:增材制造多物理场AI建模与工业应用实战
  • PHP:历经岁月仍熠熠生辉的服务器端脚本语言
  • Spring 的 ioc 控制反转
  • 无人设备遥控器之信号切换技术篇
  • Guava 与 Caffeine 本地缓存系统详解