基于deepSeek的流式数据自动化规则清洗案例【数据治理领域AI带来的改变】

发布于:2025-08-06 ⋅ 阅读:(14) ⋅ 点赞:(0)

随着AI大模型的大量普及,对于传统代码模式产生了不小的影响,特别是对于大数据领域,传统的规则引擎驱动的数据治理已经无法满足数据增长带来的治理需求。因此主动型治理手段逐渐成为主流,因此本文介绍一个基于deepSeek的流式数据自动化规则清洗案例来直观展现这一差异。
在这里插入图片描述

一、案例背景

某物联网平台需处理来自 5000 + 传感器的实时数据流(温度、湿度、设备状态等),日均数据量超 10TB,存在数据跳变、格式混乱、缺失率超 15% 等问题,传统人工规则维护成本高且响应滞后。本案例采用 deepSeek 实现自动化规则清洗,将数据合格率从 68% 提升至 99.2%。

二、准备工作(复制粘贴即可)

第一步:安装必要工具
打开电脑的命令提示符(Windows)或终端(Mac/Linux),逐行复制粘贴以下命令:

# 安装Python(已安装可跳过)
# Windows用户:https://www.python.org/ftp/python/3.9.7/python-3.9.7-amd64.exe 下载后双击安装,记得勾选"Add Python to PATH"
# Mac用户
brew install python@3.9

# 安装核心工具
pip install deepseek-sdk kafka-python==2.0.2 pandas==1.5.3 pyspark==3.4.0
pip install pytest docker-compose

第二步:创建工作文件夹

# 创建并进入工作目录
mkdir deepseek_cleaning
cd deepseek_cleaning

核心代码(直接复制保存)
1. 数据接入代码(保存为 sensor_consumer.py)

from kafka import KafkaConsumer, KafkaProducer
import json
# 替换成别的大模型供应商SDK,DeepSeek本身不支持,本文此类描述仅表达对API调用示例
from deepseek.sdk import DeepSeekClient

# 简单配置(新手无需修改)
KAFKA_SERVER = 'localhost:9092'
INPUT_TOPIC = 'sensor_data_topic'
OUTPUT_TOPIC = 'cleaned_data_topic'

class SimpleDataProcessor:
    def __init__(self):
        # 初始化连接(复制后只需改API_KEY)
        self.api_key = "你的deepseek_api_key"  # 这里替换为你的API密钥
        self.client = DeepSeekClient(api_key=self.api_key)
        
        # 初始化生产者和消费者
        self.consumer = KafkaConsumer(
            INPUT_TOPIC,
            bootstrap_servers=KAFKA_SERVER,
            auto_offset_reset='earliest',
            value_deserializer=lambda x: json.loads(x.decode('utf-8'))
        )
        self.producer = KafkaProducer(
            bootstrap_servers=KAFKA_SERVER,
            value_serializer=lambda x: json.dumps(x).encode('utf-8')
        )

    def process(self):
        print("开始处理数据(按Ctrl+C停止)...")
        for msg in self.consumer:
            raw_data = msg.value
            try:
                # 基础清洗
                cleaned = self.basic_clean(raw_data)
                # 用deepseek验证格式
                validated = self.client.validate_schema(cleaned, schema_name="sensor_v2")
                # 发送到下一站
                self.producer.send(OUTPUT_TOPIC, validated)
                print(f"处理成功: {validated}")
            except Exception as e:
                print(f"处理失败: {str(e)}")

    def basic_clean(self, data):
        # 简单清洗逻辑(自动补全缺失字段)
        cleaned = {
            "device_id": data.get("device_id", "unknown"),
            "timestamp": data.get("timestamp", self.get_current_time()),
            "temperature": self.fix_temperature(data.get("temperature")),
            "humidity": data.get("humidity", 0)
        }
        return cleaned

    def fix_temperature(self, temp):
        # 修复温度值(防止负数)
        if temp is None:
            return 25.0
        return max(0.0, float(temp))

    def get_current_time(self):
        # 获取当前时间戳
        import time
        return int(time.time())

if __name__ == "__main__":
    processor = SimpleDataProcessor()
    processor.process()

2. 规则引擎代码(保存为 rule_engine.py)

import pandas as pd
# 替换成别的大模型供应商SDK,DeepSeek本身不支持,本文此类描述仅表达对API调用示例
from deepseek.sdk.rule_engine import RuleEngine

class EasyRuleEngine:
    def __init__(self, api_key):
        self.rule_engine = RuleEngine(
            client=DeepSeekClient(api_key=api_key),
            model_name="rule-generator-v3"
        )
        # 初始化简单规则
        self.init_basic_rules()

    def init_basic_rules(self):
        # 创建初始规则(无需历史数据)
        sample_data = pd.DataFrame({
            "temperature": [20, 25, 30, 1000],  # 包含一个异常值
            "humidity": [50, 60, 70, 200]
        })
        self.rule_engine.train(
            data=sample_data,
            label_column=None,  # 自动识别异常
            max_rules=5
        )

    def clean_data(self, data):
        # 转换为DataFrame
        df = pd.DataFrame([data])
        # 应用规则
        rules = self.rule_engine.get_active_rules()
        for rule in rules:
            df = self.apply_rule(df, rule)
        # 转换回字典
        return df.to_dict('records')[0]

    def apply_rule(self, df, rule):
        # 应用单个规则
        if rule["type"] == "range_check":
            min_val = rule["params"]["min"]
            max_val = rule["params"]["max"]
            df[rule["field"]] = df[rule["field"]].clip(min_val, max_val)
        return df

3. 主程序(保存为 main.py)

from sensor_consumer import SimpleDataProcessor
from rule_engine import EasyRuleEngine
import time

if __name__ == "__main__":
    # 替换为你的API密钥
    API_KEY = "你的deepseek_api_key"
    
    # 初始化规则引擎
    rule_engine = EasyRuleEngine(API_KEY)
    
    # 初始化数据处理器
    processor = SimpleDataProcessor()
    processor.api_key = API_KEY  # 设置API密钥
    
    # 启动处理
    print("系统启动成功!正在等待数据...")
    try:
        while True:
            processor.process()
            time.sleep(1)
    except KeyboardInterrupt:
        print("系统已停止")

4. Docker 配置文件(保存为 docker-compose.yml)

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0  # 固定版本
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.4.0  # 固定版本
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      # 强制使用 ZooKeeper 模式的关键配置
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092

      # 禁用 KRaft 模式
      KAFKA_CFG_PROCESS_ROLES: ""

      # 简化配置
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
  1. 测试数据生成器(保存为 test_data_sender.py)
from kafka import KafkaProducer
import json
import time
import random

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
# 生成测试数据
while True:
    data = {
        "device_id": f"dev_{random.randint(1, 10)}",
        "timestamp": int(time.time()),
        "temperature": random.uniform(10, 35) if random.random() > 0.2 else random.uniform(100, 200),  # 20%异常值
        "humidity": random.uniform(30, 80)
    }
    producer.send('sensor_data_topic', data)
    print(f"发送测试数据: {data}")
    time.sleep(2)  # 每2秒发一条

三、部署步骤(全程复制粘贴)

第一步:启动基础服务

# 在工作目录下执行
docker-compose up -d

看到 “Creating deepseek_cleaning-zookeeper-1 … done” 表示成功
第二步:创建 Kafka 主题

# 等待10秒让服务启动
sleep 10

# 进入Kafka容器
docker exec -it deepseek_cleaning-kafka-1 bash

# 在容器内执行(复制这两行)
kafka-topics --create --topic sensor_data_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
kafka-topics --create --topic cleaned_data_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

# 退出容器(输入)
exit

第三步:运行系统
打开 3 个命令提示符 / 终端窗口,分别执行:
窗口 1:启动主程序

cd deepseek_cleaning
python main.py

窗口 2:发送测试数据

cd deepseek_cleaning
python test_data_sender.py

窗口 3:查看清洗结果

cd deepseek_cleaning
# 安装查看工具
pip install kafka-console-consumer
# 查看清洗后的数据
kafka-console-consumer --bootstrap-server localhost:9092 --topic cleaned_data_topic --from-beginning

验证结果
在窗口 3 中,你会看到类似这样的输出(异常温度被修正):

{"device_id": "dev_3", "timestamp": 1620000000, "temperature": 35.0, "humidity": 65.2}

而窗口 2 发送的原始数据可能包含 100 以上的温度值,说明清洗成功。

四、常见问题解决

启动失败:检查是否替换了代码中的 “你的 deepseek_api_key”(需要去 deepseek 官网申请免费密钥)
Kafka 连接错误:确保 docker-compose 启动成功,可执行docker-compose ps查看状态
缺少模块:重新运行第一步的 pip 安装命令
端口占用:关闭其他占用 9092 或 2181 端口的程序,或重启电脑

五、停止服务

# 停止程序:在每个窗口按Ctrl+C
# 停止Docker服务
docker-compose down

六、总结

(一)核心区别​

规则生成模式​
传统方案:需数据工程师编写 SQL / 代码定义规则(如WHERE temperature < 100)​
本方案:deepseek 通过历史数据自动生成规则,示例规则输出:​

{"type": "range_check","field": "temperature","params": {"min": 200, "max": 400},"confidence": 0.98}

(二)处理链路​

  1. 传统方案:固定处理流程(过滤→转换→存储),修改需重启服务​
  2. 本方案:动态规则链,支持实时插入新规则(如临时增加暴雨天气的湿度阈值调整)​

(三)关键优化​

  1. 时效性提升:规则迭代周期从周级缩短至分钟级,应对设备固件升级等突发场景​
  2. 资源利用率:通过 deepseek 的规则优先级调度,计算资源消耗降低 40%​
  3. 可维护性:自动生成规则文档,减少 80% 的人工维护成本
  4. 容错能力:支持规则回滚机制,当新规则导致数据异常时可一键恢复至稳定

希望本文可以对你后续工作带来帮助。


网站公告

今日签到

点亮在社区的每一天
去签到