当前位置: 首页 > news >正文

使用Python实现DLT645-2007智能电表协议

文章目录

      • 🌴通讯支持
      • 🌴 功能完成情况
      • 服务端架构设计
        • 一、核心模块划分
        • 二、数据层定义
        • 三、协议解析层
        • 四、通信业务层(以DLT645服务端为例)
        • 五、通信层(以TCP为例)
        • 使用例子

🌴通讯支持

功能状态
TCP客户端(方便通讯测试) 🐾
TCP服务端(方便通讯测试) 🐾
RTU主站 🐾
RTU从站 🐾

🌴 功能完成情况

功能状态
读、写通讯地址 🐾
广播校时 🐾
电能量 🐾
最大需量及发生时间 🐾
变量 🐾
事件记录 🐾
参变量 🐾
冻结量 🐾
负荷纪录 🐾

项目地址:https://gitee.com/chen-dongyu123/dlt645

服务端架构设计

一、核心模块划分
  • 数据层:模拟电表数据,存储数据标识与值的映射关系。
  • 协议解析层:负责帧的组装、拆卸、校验和转义。
  • 业务逻辑层:根据解析出的控制码和DI,执行相应操作并组织回复数据。
  • 通信层:负责底层的字节流收发(串口/TCP)。

代码结构

├── config      		# 测点json,用于初始化导入
├── src        			# 源文件夹
│   ├── common			# 存放通用函数
│   ├── model			# 数据模型
│   │   ├── data
│   │   │   └── define
│   │   └── types		# dlt645数据类型
│   ├── protocol		# 协议解析层
│   ├── service
│   │   ├── clientsvc	# dlt645客户端api及实现
│   │   └── serversvc	# dlt645服务端api及实现
│   └── transport
│       ├── client		# 客户端通讯接口,支持TCP客户端、RTU主站
│       └── server		# 服务端通讯接口,支持TCP服务端、RTU从站
└── test				# 测试文件

流程图:启动 -> 监听连接 -> 接收数据 -> 解析帧 -> 处理请求 -> 组织回复帧 -> 发送数据

二、数据层定义
  1. 通过读取config文件夹里的json导入测点,json示例如下

    [{"Di": "02010100","Name": "A相电压","Unit": "V","DataFormat": "XXX.X"},{"Di": "02010200","Name": "B相电压","Unit": "V","DataFormat": "XXX.X"},...
    ]
    
  2. 定义数据类型和映射map

    class DataItem:def __init__(self, di: int, name: str, data_format: str, value: float = 0, unit: str = '', timestamp: int = 0):self.di = diself.name = nameself.data_format = data_formatself.value = valueself.unit = unitself.timestamp = timestampdef __repr__(self):return (f"DataItem(name={self.name}, di={format(self.di, '#x')}, value={self.value}, "f"unit={self.unit},data_format={self.data_format}, timestamp={self.timestamp})")DIMap: Dict[int, DataItem] = {}
    
  3. 定义设置DataItem值接口和获取DataItem接口

    def get_data_item(di: int) -> Optional[DataItem]:"""根据 di 获取数据项"""item = DIMap.get(di)if item is None:log.info(f"未通过di {hex(di)} 找到映射")return Nonereturn itemdef set_data_item(di: int, data: Any) -> bool:"""设置指定 di 的数据项"""if data is None:log.info("data is nil")return Falseif di in DIMap:DIMap[di].value = datalog.info(f"设置数据项 {hex(di)} 成功, 值 {DIMap[di]}")return Truereturn False
    
  4. 初始化数据标识map

    def init_variable_def(VariableTypes: List[DataItem]):for date_type in VariableTypes:DIMap[date_type.di] = DataItem(di=date_type.di,name=date_type.name,data_format=date_type.data_format,unit=date_type.unit)
    
  5. 初始化DLT645相关测点数据

    EnergyTypes = []
    DemandTypes = []
    VariableTypes = []def init():global EnergyTypesEnergyTypes = initDataTypeFromJson(os.path.join(conf_path, 'energy_types.json'))DemandTypes = initDataTypeFromJson(os.path.join(conf_path, 'demand_types.json'))VariableTypes = initDataTypeFromJson(os.path.join(conf_path, 'variable_types.json'))init_energy_def(EnergyTypes)init_demand_def(DemandTypes)init_variable_def(VariableTypes)# 执行初始化
    init()
    
三、协议解析层
  1. 帧类型

    # 常量定义
    FRAME_START_BYTE = 0x68
    FRAME_END_BYTE = 0x16
    BROADCAST_ADDR = 0xAAclass Frame:def __init__(self, preamble: List[int] = None, start_flag: int = 0, addr: List[int] = None,ctrl_code: int = 0, data_len: int = 0, data: List[int] = None,check_sum: int = 0, end_flag: int = 0):self.preamble = preamble if preamble is not None else []  # 前导字节self.start_flag = start_flag  # 起始字节self.addr = addr if addr is not None else [0] * 6  # 地址字节self.ctrl_code = ctrl_code  # 控制字节self.data_len = data_len  # 数据长度字节self.data = data if data is not None else []  # 数据字节self.check_sum = check_sum  # 校验字节self.end_flag = end_flag  # 结束字节
    
  2. 协议解析

    • 数据域解码

          @classmethoddef decode_data(cls, data: bytes) -> bytes:"""数据域解码(±33H转换)"""return bytes([b - 0x33 for b in data])
      
    • 数据域编码

          @classmethoddef encode_data(cls, data: bytes) -> bytes:"""数据域编码"""return bytes([b + 0x33 for b in data])
      
    • 计算校验和

          @classmethoddef calculate_checksum(cls, data: bytes) -> int:"""校验和计算(模256求和)"""return sum(data) % 256
      
    • 构建帧

          @classmethoddef build_frame(cls, addr: bytes, ctrl_code: int, data: bytes) -> bytearray:"""帧构建(支持广播和单播)"""if len(addr) != 6:raise ValueError("地址长度必须为6字节")buf = []buf.append(FRAME_START_BYTE)buf.extend(addr)buf.append(FRAME_START_BYTE)buf.append(ctrl_code)# 数据域编码encoded_data = DLT645Protocol.encode_data(data)buf.append(len(encoded_data))buf.extend(encoded_data)# 计算校验和check_sum = DLT645Protocol.calculate_checksum(bytes(buf))buf.append(check_sum)buf.append(FRAME_END_BYTE)return bytearray(buf)
      
    • 字节数组反序列化Frame结构体

          @classmethoddef deserialize(cls, raw: bytes) -> Optional[Frame]:"""将字节切片反序列化为 Frame 结构体"""# 基础校验if len(raw) < 12:raise Exception(f"frame too short: {raw}")# 帧边界检查(需考虑前导FE)try:start_idx = raw.index(FRAME_START_BYTE)except ValueError:log.error(f"invalid start flag: {raw}")raise Exception("invalid start flag")if start_idx == -1 or start_idx + 10 >= len(raw):log.error(f"invalid start flag: {raw}")raise Exception("invalid start flag")if start_idx + 7 >= len(raw) or raw[start_idx + 7] != FRAME_START_BYTE:log.error(f"missing second start flag: {raw}")raise Exception("missing second start flag")# 构建帧结构frame = Frame()frame.start_flag = raw[start_idx]frame.addr = raw[start_idx + 1:start_idx + 7]frame.ctrl_code = raw[start_idx + 8]frame.data_len = raw[start_idx + 9]# 数据域提取(严格按协议1.2.5节处理)data_start = start_idx + 10data_end = data_start + frame.data_lenif data_end > len(raw) - 2:log.error(f"invalid data length {frame.data_len}")raise Exception(f"invalid data length {frame.data_len}")# 数据域解码(需处理加33H/减33H)frame.data = DLT645Protocol.decode_data(raw[data_start:data_end])# 校验和验证(从第一个68H到校验码前)checksum_start = start_idxchecksum_end = data_endif checksum_end >= len(raw):log.error(f"frame truncated: {raw}")raise Exception(f"frame truncated: {raw}")calculated_sum = DLT645Protocol.calculate_checksum(raw[checksum_start:checksum_end])if calculated_sum != raw[checksum_end]:log.error(f"checksum error: calc=0x{calculated_sum:02X}, actual=0x{raw[checksum_end]:02X}")raise Exception(f"checksum error: calc=0x{calculated_sum:02X}, actual=0x{raw[checksum_end]:02X}")# 结束符验证if checksum_end + 1 >= len(raw) or raw[checksum_end + 1] != FRAME_END_BYTE:log.error(f"invalid end flag: {raw[checksum_end + 1]}")raise Exception(f"invalid end flag: {raw[checksum_end + 1]}")# 转换为带缩进的JSONlog.info(f"frame: {frame}")return frame
      
    • Frame 结构体序列化为字节数组

          @classmethoddef serialize(cls, frame: Frame) -> Optional[bytes]:"""将 Frame 结构体序列化为字节切片"""if frame.start_flag != FRAME_START_BYTE or frame.end_flag != FRAME_END_BYTE:log.error(f"invalid start or end flag: {frame.start_flag} {frame.end_flag}")raise Exception(f"invalid start or end flag: {frame.start_flag} {frame.end_flag}")buf = []# 写入前导字节buf.extend(frame.preamble)# 写入起始符buf.append(frame.start_flag)# 写入地址buf.extend(frame.addr)# 写入第二个起始符buf.append(frame.start_flag)# 写入控制码buf.append(frame.ctrl_code)# 数据域编码encoded_data = DLT645Protocol.encode_data(frame.data)# 写入数据长度buf.append(len(encoded_data))# 写入编码后的数据buf.extend(encoded_data)# 计算并写入校验和check_sum = DLT645Protocol.calculate_checksum(bytearray(buf))buf.append(check_sum)# 写入结束符buf.append(frame.end_flag)return bytearray(buf)
      
    四、通信业务层(以DLT645服务端为例)
    1. 定义电表Service

      class MeterServerService:def __init__(self,server: Union[TcpServer, RtuServer],address: bytearray = bytearray([0x00] * 6),password: bytearray = bytearray([0x00] * 4)):self.server = server	self.address = addressself.password = password
      
    2. 设置值接口,以电能为例

          # 写电能量def set_00(self, di: int, value: float) -> bool:ok = set_data_item(di, value)if not ok:log.error(f"写电能量失败")return ok
      
    3. 处理数据函数

          # 处理读数据请求(协议与业务分离)def handle_request(self, frame):# 1. 验证设备if not self.validate_device(frame.addr):log.info(f"验证设备地址: {bytes_to_spaced_hex(frame.addr)} 失败")raise Exception("unauthorized device")# 2. 根据控制码判断请求类型if frame.ctrl_code == CtrlCode.BroadcastTimeSync:  # 广播校时log.info(f"广播校时: {frame.Data.hex(' ')}")self.set_time(frame.Data)return DLT645Protocol.build_frame(frame.addr, frame.ctrl_code | 0x80, frame.data)elif frame.ctrl_code == CtrlCode.CtrlReadData:# 解析数据标识di = frame.datadi3 = di[3]if di3 == 0x00:  # 读取电能# 构建响应帧res_data = bytearray(8)# 解析数据标识为 32 位无符号整数data_id = struct.unpack("<I", frame.data[:4])[0]data_item = data.get_data_item(data_id)if data_item is None:raise Exception("data item not found")res_data[:4] = frame.data[:4]  # 仅复制前 4 字节数据标识value = data_item.value# 转换为 BCD 码bcd_value = float_to_bcd(value, data_item.data_format, 'little')res_data[4:] = bcd_valuereturn DLT645Protocol.build_frame(frame.addr, frame.ctrl_code | 0x80, bytes(res_data))elif di3 == 0x01:  # 读取最大需量及发生时间res_data = bytearray(12)data_id = struct.unpack("<I", frame.data[:4])[0]data_item = data.get_data_item(data_id)if data_item is None:raise Exception("data item not found")res_data[:4] = frame.data[:4]  # 返回数据标识value = data_item.value# 转换为 BCD 码bcd_value = float_to_bcd(value, data_item.data_format, 'little')res_data[4:7] = bcd_value[:3]# 需量发生时间res_data[7:12] = time_to_bcd(time.time())log.info(f"读取最大需量及发生时间: {res_data}")return DLT645Protocol.build_frame(frame.addr, frame.ctrl_code | 0x80, bytes(res_data))elif di3 == 0x02:  # 读变量data_id = struct.unpack("<I", frame.data[:4])[0]data_item = data.get_data_item(data_id)if data_item is None:raise Exception("data item not found")# 变量数据长度data_len = 4data_len += (len(data_item.data_format) - 1) // 2  # (数据格式长度 - 1 位小数点)/2# 构建响应帧res_data = bytearray(data_len)res_data[:4] = frame.data[:4]  # 仅复制前 4 字节value = data_item.value# 转换为 BCD 码(小端序)bcd_value = float_to_bcd(value, data_item.data_format, 'little')res_data[4:data_len] = bcd_valuereturn DLT645Protocol.build_frame(frame.addr, frame.ctrl_code | 0x80, bytes(res_data))else:log.info(f"unknown: {hex(di3)}")return Exception("unknown di3")elif frame.ctrl_code == CtrlCode.ReadAddress:# 构建响应帧res_data = self.address[:6]return DLT645Protocol.build_frame(self.address, frame.ctrl_code | 0x80, res_data)elif frame.ctrl_code == CtrlCode.WriteAddress:res_data = b''  # 写通讯地址不需要返回数据# 解析数据addr = frame.data[:6]self.set_address(addr)  # 设置通讯地址return DLT645Protocol.build_frame(self.address, frame.ctrl_code | 0x80, res_data)else:log.info(f"unknown control code: {hex(frame.ctrl_code)}")raise Exception("unknown control code")
      
    4. 注入通讯协议到Service

      def new_tcp_server(ip: str, port: int, timeout: int) -> MeterServerService:# 1. 先创建 TcpServer(不依赖 Service)tcp_server = TcpServer(ip, port, timeout, None)# 2. 创建 MeterServerService,注入 TcpServer(作为 Server 接口)meter_service = MeterServerService(tcp_server)# 3. 将 MeterServerService 注入回 TcpServertcp_server.service = meter_servicereturn meter_servicedef new_rtu_server(port: str, dataBits: int, stopBits: int, baudRate: int, parity: str,timeout: float) -> MeterServerService:rtu_server = RtuServer(port, dataBits, stopBits, baudRate, parity, timeout)# 2. 创建 MeterServerService,注入 RtuServer(作为 Server 接口)meter_service = MeterServerService(rtu_server)# 3. 将 MeterServerService 注入回 RtuServerrtu_server.service = meter_servicereturn meter_service
      
    五、通信层(以TCP为例)
    1. 定义TCP服务端

      class TcpServer:def __init__(self, ip: str, port: int, timeout: float, service):self.ip = ipself.port = portself.timeout = timeoutself.ln = Noneself.service = service	# Dlt645业务
      
    2. 处理数据

          def handle_connection(self, conn):try:while True:try:# 接收数据buf = conn.recv(256)if not buf:breaklog.info(f"Received data: {bytes_to_spaced_hex(buf)}")# 协议解析try:frame = DLT645Protocol.deserialize(buf)except Exception as e:log.error(f"Error parsing frame: {e}")continue# 业务处理try:resp = self.service.handle_request(frame)except Exception as e:log.error(f"Error handling request: {e}")continue# 响应if resp:try:conn.sendall(resp)log.info(f"Sent response: {bytes_to_spaced_hex(resp)}")except Exception as e:log.error(f"Error writing response: {e}")except socket.timeout:breakexcept Exception as e:log.error(f"Error handling connection: {e}")finally:try:conn.close()except Exception as e:log.error(f"Error closing connection: {e}")
      
    3. 启动服务端

          def start(self):try:# 创建 TCP 套接字self.ln = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 设置地址可重用self.ln.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)# 绑定地址和端口self.ln.bind((self.ip, self.port))# 开始监听self.ln.listen(5)log.info(f"TCP server started on port {self.port}")while True:try:# 接受连接conn, addr = self.ln.accept()log.info(f"Accepted connection from {addr}")# 设置超时时间conn.settimeout(self.timeout)# 启动新线程处理连接import threadingthreading.Thread(target=self.handle_connection, args=(conn,)).start()except socket.error as e:if isinstance(e, socket.timeout):continuelog.error(f"Failed to accept connection: {e}")if not e.errno == 10038:  # 非套接字关闭错误return eexcept Exception as e:log.error(f"Failed to start TCP server: {e}")return e
      
    4. 关闭服务端

          def stop(self):if self.ln:log.info("Shutting down TCP server...")try:self.ln.close()except Exception as e:log.error(f"Error closing server: {e}")return ereturn None
      
    使用例子
    1. 启动服务端

      if __name__ == '__main__':server_svc = new_tcp_server("127.0.0.1", 8021, 3000)# server_svc = new_rtu_server("COM4", 8, 1, 9600, "N", 1000)server_svc.set_00(0x00000000, 100.0)server_svc.set_02(0x02010100, 86.0)server_svc.server.start()
      

      在这里插入图片描述

    2. 使用模拟DLT645客户端软件测试

      在这里插入图片描述

http://www.dtcms.com/a/349038.html

相关文章:

  • 【Docker基础】Docker-compose常用命令实践(三):镜像与配置管理
  • 纯净Win11游戏系统|24H2专业工作站版,预装运行库,无捆绑,开机快,游戏兼容性超强!
  • 27.编程思想
  • 【JVM内存结构系列】四、不同垃圾回收器与堆内存的适配关系:从分代GC到Region GC
  • kylin10-x64 离线安装docker28.3.3
  • 第16届蓝桥杯C++中高级选拔赛(STEMA)2025年3月9日真题
  • 互联网大厂Java面试模拟:核心技术点深度解析
  • 深度剖析Spring AI源码(四):RAG的基石,解密VectorStore的统一抽象
  • 冯·诺依曼体系结构
  • 【机器学习】5 Bayesian statistics
  • AOSP构建指南:从零开始的Android源码之旅
  • 青少年软件编程(python六级)等级考试试卷-客观题(2024年6月)
  • 2.3 金融中介机构的业务
  • 《深入理解Java虚拟机》学习笔记
  • 【高级】系统架构师 | 系统工程
  • Java面试篇
  • 数字防线:现代企业网络安全运维实战指南
  • WinContig:高效磁盘碎片整理工具
  • [Mysql数据库] 知识点总结1
  • [身份验证脚手架] docs | breeze:install
  • 电梯间电动车误检率↓79%!陌讯多模态融合算法实战解析
  • ROS1中的Package
  • Ansible 自动化基石:变量定义、优先级控制与 Vault 敏感信息加密实战指南
  • 100个实用小工具1.3历年股价分析小工具新增股价批量下载
  • Linux shell脚本条件循环
  • MATLAB 在工程仿真中的实践:以机械振动分析为例的完整流程
  • 1.Spring Boot:超越配置地狱,重塑Java开发体验
  • LeetCode 101 刷题 - (2) 第二章 玩转双指针
  • Jupyter Lab 常用快捷键清单
  • C++标准库头文件使用指南