当前位置: 首页 > news >正文

Python 链接各种中间件[Mysql\redis\mssql\tdengine]

文章目录

  • 链接参数
  • 设置logger 日志
  • redis 链接
  • mysql 链接
  • emqx 链接
  • mssql 链接
  • tdengine 链接
  • 采集OPCUA的点表的配置信息
    • 设备
    • 点表
  • OPCUA 采集 数据程序
  • 数据采集逻辑

链接参数

import randomtdengine_connection_params = {'username': 'root','password': 'taosdata','host': '127.0.0.1','port': 6030,'database': 'test'
}mysql_connection_params = {'username': 'root','password': 'root','host': '127.0.0.1','port': 3306,'database': 'th'
}mssql_connection_params = {'username': 'sa','password': 'Z','host': '127.0.0.1','port': 1433,'database': 'SistarData','driver': 'ODBC+Driver+17+for+SQL+Server'
}redis_connection_params = {'host': 'localhost',  # Redis 服务器的地址'port': 6379,  # Redis 服务器的端口'db': 0,  # 使用的 Redis 数据库编号'max_connections': 10,  # 连接池的最大连接数'decode_responses': True
}emqx_connection_params = {'broker': '127.0.0.1','port': 1883,'client_id': f'python_mqtt_client_{random.randint(1, 10)}','keep_alive_interval': 60,'password': None,'username': None,'topic_sub': None,'topic_pub': None
}logger_params = {"logger_name": "opcua_adapter_logger","total_level": 20,"file_handler_level": 10,"control_handler_level": 20,"file_name": r"E:\TH\core\basic_service\opcua_adapter\logs\opcua_adapter_logger.txt","mode": "a","max_bytes": 10485760,"backup_count": 10,"encoding": "UTF-8","format": "%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s"
}opcua_adapter_params = {'alarm_consumer_queue_length': 100,'archive_consumer_queue_length': 100,'emqx_consumer_queue_length': 100,'acquisition_frequency':1, # 采集周期'monitor_frequency': 2, # 监控周期'alarm_table_name': 'alarm', # 报警的表格'archive_table_name': 'test', # 归档的数据库名称'emqx_worker': 1,'alarm_worker': 5,'archive_worker':5
}

设置logger 日志

import logging
from concurrent_log_handler import ConcurrentRotatingFileHandlerfrom config import logger_paramsdef get_logger(logger_name, total_level, file_name, mode, max_bytes, backup_count, encoding, file_handler_level,control_handler_level, format) -> logging.getLogger():logger = logging.getLogger(logger_name)logger.setLevel(total_level)  # 设置日志级别file_handler = ConcurrentRotatingFileHandler(filename=file_name,mode=mode,maxBytes=max_bytes,backupCount=backup_count,encoding=encoding)  # 设置输出文件file_handler.setLevel(file_handler_level)control_handler = logging.StreamHandler()control_handler.setLevel(control_handler_level)formatter = logging.Formatter(format)file_handler.setFormatter(formatter)control_handler.setFormatter(formatter)logger.addHandler(file_handler)logger.addHandler(control_handler)return loggerlogger = get_logger(**logger_params)

redis 链接

import redisfrom basic_service.opcua_adapter.config import redis_connection_paramspool = redis.ConnectionPool(host=redis_connection_params['host'],port=redis_connection_params['port'],db=redis_connection_params['db'],max_connections=redis_connection_params['max_connections'],decode_responses=redis_connection_params['decode_responses'],)# 获取连接对象
def get_redis_connection():while True:try:redis_conn = redis.Redis(connection_pool=pool)response = redis_conn.ping()if response:return redis_connelse:continueexcept Exception as e:print.error(str(e))conn = get_redis_connection()

mysql 链接

"""
用于连接tdengine
"""
import time
from basic_service.opcua_adapter.get_logger import logger
from basic_service.opcua_adapter.config import mysql_connection_paramsfrom sqlalchemy import create_engineengine = create_engine(f"mysql+pymysql://{mysql_connection_params['username']}:{mysql_connection_params['password']}@{mysql_connection_params['host']}:{mysql_connection_params['port']}/{mysql_connection_params['database']}")def get_mysql_connection():while True:try:conn = engine.connect()return connexcept Exception:logger.error(f"MySQL连接建立失败,请检查网络是否畅通(ping {mysql_connection_params['host']}),服务器是否正常,是否开启远程连接")time.sleep(3)continueget_mysql_connection()

emqx 链接

import paho.mqtt.client as mqtt
import timefrom basic_service.opcua_adapter.config import emqx_connection_params
from get_logger import loggerclass MQTTClient:def __init__(self, broker, port, client_id, keep_alive_interval, password=None, username=None, topic_sub=None,topic_pub=None):self.broker = brokerself.port = portself.client_id = client_idself.keep_alive_interval = keep_alive_intervalself.topic_sub = topic_subself.topic_pub = topic_pubself.password = passwordself.username = username# 创建 MQTT 客户端self.client = mqtt.Client(client_id=self.client_id)if self.username and self.password:self.client.username_pw_set(username=self.username, password=self.password)  # 如果有需要设置用户名密码self.client.on_connect = self.on_connectself.client.on_disconnect = self.on_disconnectself.client.on_message = self.on_messageself.client.on_publish = self.on_publishself.connect()self.client.loop_start()def on_connect(self, client, userdata, flags, rc):if rc == 0:logger.info(f"Connected to MQTT Broker! Returned code={rc}")if self.topic_sub:client.subscribe(self.topic_sub)  # 连接成功后,订阅主题else:logger.error(f"Failed to connect, return code {rc}")def on_disconnect(self, client, userdata, rc):logger.info("Disconnected from MQTT Broker with code: " + str(rc))# 这里可以实现重连逻辑self.reconnect()def on_message(self, client, userdata, msg):# logger.info(f"Received message: {msg.topic} -> {msg.payload.decode()}")passdef on_publish(self, client, userdata, mid):# print(f"Message {mid} has been published.")pass# 重连机制def reconnect(self):try:logger.error("Attempting to reconnect...")self.client.reconnect()  # 尝试重连except Exception as e:logger.error(f"Reconnection failed: {e}")time.sleep(5)  # 延时后再尝试def connect(self):try:self.client.connect(self.broker, self.port, self.keep_alive_interval)except Exception as e:logger.error(f"Connection failed: {e}")self.reconnect()get_emqx_connection = MQTTClient(**emqx_connection_params)# 主循环发布消息
# try:
#     while True:
#         message = "Hello MQTT"
#         result = mqtt_client.client.publish('/test', message, qos=0)
#         status = result[0]
#         if status == 0:
#             print(f"Sent `{message}` to topic /test")
#         else:
#             print(f"Failed to send message to topic /test")
#         time.sleep(10)  # 每10秒发布一次
# except KeyboardInterrupt:
#     print("Interrupted by user, stopping...")
# finally:
#     mqtt_client.client.loop_stop()
#     mqtt_client.client.disconnect()

mssql 链接

"""
用于连接tdengine
"""
import time
#from basic_service.opcua_adapter.get_logger import logger
from basic_service.opcua_adapter.config import mysql_connection_params, mssql_connection_paramsfrom sqlalchemy import create_engine, textengine = create_engine(f'mssql+pyodbc://sa:Z#@127.0.0.1:1433/sistarData?driver=ODBC+Driver+17+for+SQL+Server')def get_mssql_connection():while True:try:conn = engine.connect()return connexcept Exception:logger.error(f"MSSQL连接建立失败,请检查网络是否畅通(ping {mssql_connection_params['host']}),服务器是否正常,是否开启远程连接")time.sleep(3)continueconn = get_mssql_connection()result = conn.execute(text("SELECT TABLE_NAME,COLUMN_NAME,COLUMN_DEFAULT,IS_NULLABLE,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'sistar_eng_areas';"))print(result.fetchall())

tdengine 链接

"""
用于连接tdengine
"""
import time
from basic_service.opcua_adapter.get_logger import logger
from basic_service.opcua_adapter.config import tdengine_connection_paramsfrom sqlalchemy import create_engineengine = create_engine(f"taos://{tdengine_connection_params['username']}:{tdengine_connection_params['password']}@{tdengine_connection_params['host']}:{tdengine_connection_params['port']}/{tdengine_connection_params['database']}")def get_tdengine_connection():while True:try:conn = engine.connect()return connexcept Exception:logger.error(f"TDEngine连接建立失败,请检查网络是否畅通(ping {tdengine_connection_params['host']}),服务器是否正常,是否开启远程连接,是否安装与服务器相同版本的客户端")time.sleep(3)continue

采集OPCUA的点表的配置信息

设备

[{"No.": 1,"device_name": "device1","url": "opc.tcp:\/\/192.168.10.132:4862"},{"No.": 2,"device_name": "device2","url": "opc.tcp:\/\/192.168.10.132:4863"}
]

点表

[{"No": 1,"tag_uuid": "tag0001","node_id": "ns=1;s=t|tag1","interval": 1,"active": true,"active_alarm": true,"alarm_up": 100,"alarm_down": 10,"alarm_up_info": "\u4e0a\u9650\u62a5\u8b66","alarm_down_info": "\u4e0b\u9650\u62a5\u8b66","alarm_up_change": null,"alarm_down_change": null,"active_archive": true,"archive_onchange": true,"archive_interval": 1,"active_scale": true,"scale_sign": "add","scale_factor": 1,"mqtt_topic_name": "topic_name:\/opcua\/device1\/group1,Qos:0;topic_name:\/opcua\/device1\/group2,Qos:1\n","unit": "m\u00b3\/H","comments": "\u704c\u88c5\u673a\u901f\u5ea61"},{"No": 2,"tag_uuid": "tag0002","node_id": "ns=1;s=t|tag2","interval": 1,"active": true,"active_alarm": true,"alarm_up": 100,"alarm_down": 10,"alarm_up_info": "\u4e0a\u9650\u62a5\u8b66","alarm_down_info": "\u4e0b\u9650\u62a5\u8b66","alarm_up_change": null,"alarm_down_change": null,"active_archive": true,"archive_onchange": true,"archive_interval": 1,"active_scale": true,"scale_sign": "sub","scale_factor": 2,"mqtt_topic_name": "topic_name:\/opcua\/device1\/group1,Qos:0;topic_name:\/opcua\/device1\/group2,Qos:1\n","unit": "m\u00b3\/H","comments": "\u704c\u88c5\u673a\u901f\u5ea62"}]

在这里插入图片描述

在这里插入图片描述

用户可以支持从excel导入数据。

从excel 上传数据到 系统中

"""
用于初始化,创建数据库,创建数据库表
"""
import randomimport pandas as pd
from sqlalchemy import text
from connectors.tdengine import get_tdengine_connection
from get_logger import loggerxlsx = r'E:\TH\core\basic_service\opcua_adapter\static\opcua_template.xlsx'connection = get_tdengine_connection()db_name = 'test'
stable = 'meters'df = pd.read_excel(xlsx, sheet_name='device1')tag_uuid = df.loc[:, ['tag_uuid', 'unit']]create_table_sql = ''for item in tag_uuid.values:_sql = f"""CREATE TABLE IF NOT EXISTS {db_name}.{item[0]} USING {db_name}.{stable} TAGS ('{item[1]}', '{random.randint(1, 10)}');"""create_table_sql += _sqlcreate_db_stable_sql = f"""CREATE DATABASE IF NOT EXISTS {db_name} KEEP 3650;
use {db_name};
CREATE STABLE IF NOT EXISTS {db_name}.{stable} (ts TIMESTAMP, val FLOAT) TAGS ( unit BINARY(20),   s_type BINARY(20));
"""
total_sql = create_db_stable_sql + create_table_sql
print(total_sql)
try:connection.execute(text(total_sql))connection.commit()logger.info('init success')
except Exception as e:print(str(e))connection.rollback()logger.error('init failed')

OPCUA 采集 数据程序

import datetime
import json
import os
import queue
import threading
import time
from threading import Thread
from typing import List, Any
import opcua.client.client
import psutil as psutil
from opcua.client.client import Client
from opcua.common.node import Node
from opcua.ua import UaStatusCodeError
from config import opcua_adapter_params
from connectors._redis import get_redis_connection
from connectors.emqx import get_emqx_connection
from connectors.mysql import get_mysql_connection
from connectors.tdengine import get_tdengine_connection
from get_logger import logger
from sqlalchemy import textemqx_connection = get_emqx_connection.clientclass OPCUAAdapter(object):"""OPCUA数据采集类"""def __init__(self, device_info, tag_id_to_node_id_map, tag_id_to_detail_map, use_subscribe=True):self.url: str = device_info.get('device_url')self._ua: opcua.client.client.Client = Client(url=self.url, timeout=5)self.connected: bool = Falseself.thread_list: List[Thread] = []self.raw_dict = {}self.tag_id_to_node_id_map = tag_id_to_node_id_mapself.tag_id_to_detail_map = tag_id_to_detail_mapself.device_info = device_infoself.alarm_consumer_queue = queue.Queue(maxsize=opcua_adapter_params['alarm_consumer_queue_length'])self.archive_consumer_queue = queue.Queue(maxsize=opcua_adapter_params['archive_consumer_queue_length'])self.emqx_consumer_queue = queue.Queue(maxsize=opcua_adapter_params['emqx_consumer_queue_length'])def connect(self):"""连接函数:return:"""try:if self.connected:returnelse:self._ua.connect()self.connected = Truelogger.info(f"初次连接{self.url}成功!")returnexcept Exception as e:self.disconnect()self.connected = Falselogger.error("初次连接失败,失败原因:", e)# 开始重连self.reconnect()def disconnect(self):"""断开连接:return:"""try:if self._ua and self.connected:self._ua.disconnect()self.connected = Falselogger.info("主动断开连接成功")except Exception as e:logger.error("主动断开连接失败,失败原因:", str(e))def reconnect(self):"""重连:return:"""index = 0while True:try:self._ua.connect()self.connected = Trueindex = 0logger.info(f"重连{self.url}成功!")returnexcept AttributeError as e:index += 1logger.error(f"第{index}次重连失败,失败原因:{str(e)}!")self.connected = Falsetime.sleep(index * 1)continueexcept ConnectionRefusedError as e:index += 1logger.error(f"第{index}次重连失败,失败原因:{str(e)}!")self.connected = Falsetime.sleep(index * 1)continueexcept OSError as e:index += 1logger.error(f"与OPCUA服务器未能建立连接,失败原因:{str(e)}!")self.connected = Falsetime.sleep(index * 1)continueexcept Exception as e:index += 1logger.error(f"第{index}次重连失败,失败原因:{str(e)}!")self.connected = Falsetime.sleep(index * 1)continuedef interval_read(self, interval: int) -> None:"""按照采集频率定时去采集:param interval::return:"""connection = get_redis_connection()thread_name = threading.current_thread().namenodes = []while True:# 每分钟采集多少次,采集超时多少次,采集node数量,多少个node是Nonestart_time = time.time()if not self.connected:# 如果没有连接成功,开启重连self.reconnect()else:try:nodes_str_list = self.tag_id_to_node_id_map.keys()nodes = [self._ua.get_node(node) for node in nodes_str_list]values = self._ua.get_values(nodes)self.raw_dict = dict(zip(nodes_str_list, values))except AttributeError as e:logger.error(f"属性读取错误:{str(e)}!")except TimeoutError:logger.error(f"接收服务端报文超时")except ConnectionRefusedError as e:self.disconnect()self.reconnect()logger.error(f"数据获取失败,失败原因:{str(e)}!")except ConnectionAbortedError as e:self.disconnect()self.reconnect()logger.error(f"数据获取失败,失败原因:{str(e)}!")except UaStatusCodeError as e:self.disconnect()self.reconnect()logger.error(f"数据获取失败,失败原因:{str(e)}!")except OSError as e:self.disconnect()self.reconnect()logger.error(f"数据获取失败,失败原因:{str(e)}!")except RuntimeError as e:self.disconnect()self.reconnect()logger.error(f'运行错误,失败原因:{str(e)}')except Exception as e:self.disconnect()self.reconnect()logger.error(f"未捕获到的异常:{str(e)}")finally:end_time = time.time()try:connection.hmset('performance',mapping={'nodes': len(nodes), f'{thread_name}_use_time': f'{(end_time - start_time):.2f}'})except Exception:if connection:connection.close()connection = get_redis_connection()time.sleep(interval)def node_write(self, nodes: List[Node], values: List[Any]):"""写入node:param nodes::param values::return:"""try:self._ua.set_values(nodes, values)except Exception as e:logger.error(f"数据写入失败,失败原因:{str(e)}")def monitor_thread(self):"""监视线程:return:"""redis_connection = get_redis_connection()while True:try:current_process_id = os.getpid()process = psutil.Process(current_process_id)# 获取进程的基本信息process_name = process.name()cpu_usage = process.cpu_percent(interval=1)  # 进程的 CPU 使用率,间隔 1 秒memory_info = process.memory_info()  # 进程的内存使用情况io_counters = process.io_counters()  # 进程的 IO 计数disk_usage = psutil.disk_usage('/')  # 获取根目录的磁盘使用情况thread_list = []for thread in threading.enumerate():thread_list.append((thread.ident, thread.name, thread.is_alive()))if thread.name == 'continuous_thread' and thread.is_alive() == False:logger.error(f'读取线程出错,请尽快联系管理员处理!')redis_connection.hmset(name='performance',mapping={'process_name': process_name, 'cpu_usage': cpu_usage,'memory_info_RSS': f'{memory_info.rss / (1024 * 1024):.2f} MB','memory_info_VMS': f'{memory_info.vms / (1024 * 1024): .2f} MB','io_read': f"{io_counters.read_bytes / (1024 * 1024):.2f} MB",'io_write': f"{io_counters.write_bytes / (1024 * 1024):.2f} MB",'disk_usage': f'{disk_usage.percent}%','threads': json.dumps(thread_list),'pid': current_process_id,'archive_consumer_length': self.archive_consumer_queue.qsize(),'alarm_consumer_length': self.alarm_consumer_queue.qsize(),'emqx_consumer_length': self.emqx_consumer_queue.qsize(),})except Exception as e:logger.error(f'监控子线程出错:{str(e)}')if redis_connection:redis_connection.close()redis_connection = get_redis_connection()finally:time.sleep(int(opcua_adapter_params['monitor_frequency']))def change_data_notifier(self, timestamp, node_id, new_data, old_data):""":param timestamp::param node::param new_data::param old_data::return:"""try:tag_id = self.tag_id_to_node_id_map[node_id]except KeyError:passelse:content = {'timestamp': timestamp,'tag_id': tag_id,'new_data': new_data,'old_data': old_data}self.alarm_consumer_queue.put(content)self.emqx_consumer_queue.put(content)self.archive_consumer_queue.put(content)def consumer_alarm_info(self):"""处理报警信息:return:"""redis_connection = get_redis_connection()alarm_table_name = opcua_adapter_params['alarm_table_name']connection = get_mysql_connection()thread_name = threading.current_thread().namealarm_count = 0while True:try:start_time = time.time()content = self.alarm_consumer_queue.get()tag_detail = self.tag_id_to_detail_map.get(content['tag_id'])active_alarm = tag_detail.get('active_alarm')if active_alarm:up_limit = tag_detail.get('alarm_up')down_limit = tag_detail.get('alarm_down')if float(content['new_data']) > float(up_limit):sql = f"""insert into {alarm_table_name} (device_name, tag_uuid, tag_name,alarm_message,alarm_limit,value) values ("{self.device_info['device_name']}","{content["tag_id"]}", "{tag_detail.get("comments")}", "{tag_detail.get("alarm_up_info")}", "{tag_detail.get("alarm_up")}", "{content["new_data"]}")"""try:connection.execute(text(sql))connection.commit()alarm_count += 1except Exception as e:logger.error(f'数据插入错误,错误原因:{str(e)}!')connection.rollback()elif float(content['new_data']) < float(down_limit):sql = f"""insert into {alarm_table_name} (device_name, tag_uuid, tag_name,alarm_message,alarm_limit,value) values ("{self.device_info['device_name']}","{content["tag_id"]}", "{tag_detail.get("comments")}", "{tag_detail.get("alarm_down_info")}", "{tag_detail.get("alarm_down")}", "{content["new_data"]}")"""try:connection.execute(text(sql))connection.commit()alarm_count += 1except Exception as e:logger.error(f'数据插入错误,错误原因:{str(e)}!')connection.rollback()except Exception as e:logger.error(str(e))if connection:connection.close()connection = get_mysql_connection()finally:end_time = time.time()try:redis_connection.hmset('performance',mapping={f'{thread_name}_use_time': f'{(end_time - start_time):.2f}','alarm_count': alarm_count})except Exception:if redis_connection:redis_connection.close()redis_connection = get_redis_connection()time.sleep(2)def consumer_archive_info(self):thread_name = threading.current_thread().nameredis_connection = get_redis_connection()connection = get_tdengine_connection()exception_buffer = {}buffer = {}db_name = opcua_adapter_params['archive_table_name']exception_time = 0total_time = 0for k in self.tag_id_to_node_id_map.values():buffer.setdefault(k, [])# 异常buffer,当数据没有被正常插入时,数据不被丢弃,放到异常队列中,一旦恢复了连接,先将异常队列中的数据恢复exception_buffer.setdefault(k, [])while True:try:start_time = time.time()for k, v in exception_buffer.items():if len(v) > 0:sql = f"""INSERT INTO {db_name}.{k} VALUES {str(v).replace('[', '').replace(']', '')}"""try:connection.execute(sql)connection.commit()exception_buffer.setdefault(k, [])except Exception:exception_time += 1connection.rollback()content = self.archive_consumer_queue.get()tag_detail = self.tag_id_to_detail_map.get(content['tag_id'])if tag_detail.get('active_archive'):timestamp = datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')try:data = float(content['new_data'])except ValueError:data = 0.0except TypeError:data = 0.0except Exception:data = 0.0if len(buffer[content['tag_id']]) < 100:buffer[content['tag_id']].append((timestamp, data))else:sql = f"""INSERT INTO {db_name}.{content['tag_id']} VALUES {str(buffer[content["tag_id"]]).replace('[', '').replace(']', '')};"""try:connection.execute(text(sql))connection.commit()buffer[content['tag_id']] = []except Exception:logger.error(f'insert error:{sql}')connection.rollback()exception_time += 1if len(exception_buffer) < 10000:exception_buffer[content['tag_id']].extend(buffer[content['tag_id']])buffer[content['tag_id']] = []else:# 如果超过设定的缓存值滞后,将旧值丢弃掉exception_buffer[content['tag_id']] = exception_buffer[content['tag_id']][100:]total_time += 1except Exception as e:logger.error(str(e))exception_time += 1if connection:connection.close()connection = get_tdengine_connection()finally:end_time = time.time()try:redis_connection.hmset('performance',mapping={'buffer': len(buffer), 'exception_buffer': len(exception_buffer),f'{thread_name}_use_time': f'{(end_time - start_time):.2f}','total_time': total_time,'exception_time': exception_time})except Exception:if redis_connection:redis_connection.close()redis_connection = get_redis_connection()time.sleep(2)def consumer_emqx_info(self):while True:try:content = self.emqx_consumer_queue.get()tag_detail = self.tag_id_to_detail_map.get(content['tag_id'])mqtt_topic_str = tag_detail.get('mqtt_topic_name')topic_list = []for topic in mqtt_topic_str.split(';'):topic_name, qos = topic.split(',')topic_list.append({'topic_name': topic_name.split(':')[1].strip().replace('\n', ''),'qos': qos.split(':')[1].strip().replace('\n', '')})payload = json.dumps({content['tag_id']: content['new_data']})for topic in topic_list:emqx_connection.publish(topic=topic['topic_name'], payload=payload, qos=int(topic['qos']))except Exception as e:logger.error(str(e))def subscribe_data_change(self):copy_raw_dict = self.raw_dict.copy()flag = Falsewhile True:d1_keys = self.raw_dict.keys()d2_keys = copy_raw_dict.keys()if _ := d1_keys - d2_keys:flag = Truefor k in list(_):self.change_data_notifier(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), k,self.raw_dict[k],0)if _ := d2_keys - d1_keys:flag = Truefor k in list(_):self.change_data_notifier(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), k, 0,self.raw_dict[k])commen_keys = d1_keys & d2_keysfor key in commen_keys:if copy_raw_dict[key] != self.raw_dict[key]:self.change_data_notifier(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), key,copy_raw_dict[key], self.raw_dict[key])flag = Trueif flag:copy_raw_dict = self.raw_dict.copy()flag = Falsetime.sleep(0.5)def run(self):"""启动服务:return:"""try:self.connect()interval_acqusition_task = Thread(target=self.interval_read, name='continuous_thread', args=(1,))monitor_thread_task = Thread(target=self.monitor_thread, name='monitor_thread')subscribe_thread_task = Thread(target=self.subscribe_data_change, name='subscribe_thread')for i in range(opcua_adapter_params['alarm_worker']):consumer_alarm_info_task = Thread(target=self.consumer_alarm_info, name=f'consumer_alarm_info_{i+1}')self.thread_list.append(consumer_alarm_info_task)for i in range(opcua_adapter_params['archive_worker']):consumer_archive_info_task = Thread(target=self.consumer_archive_info, name=f'consumer_archive_info_{i+1}')self.thread_list.append(consumer_archive_info_task)for i in range(opcua_adapter_params['emqx_worker']):consumer_emqx_info_task = Thread(target=self.consumer_emqx_info, name=f'consumer_emqx_info_{i+1}')self.thread_list.append(consumer_emqx_info_task)self.thread_list.append(interval_acqusition_task)self.thread_list.append(monitor_thread_task)self.thread_list.append(subscribe_thread_task)for th in self.thread_list:th.start()for th in self.thread_list:th.join()except Exception as e:logger.error(str(e))finally:get_emqx_connection.client.loop_stop()get_emqx_connection.client.disconnect()def init():conn = get_redis_connection()while True:try:device_info = json.loads(conn.get('device_info'))tag_id_to_node_id_map = json.loads(conn.get('tag_id_to_node_id_map'))tag_id_to_detail_map = json.loads(conn.get('tag_id_to_detail_map'))if device_info and tag_id_to_detail_map and tag_id_to_node_id_map:return device_info, tag_id_to_node_id_map, tag_id_to_detail_mapelse:logger.error('init error')time.sleep(3)continueexcept Exception:logger.error('Init Failed!')time.sleep(3)continuedef main():device_info, tag_id_to_node_id_map, tag_id_to_detail_map = init()opcua_adapter = OPCUAAdapter(device_info=device_info, tag_id_to_node_id_map=tag_id_to_node_id_map,tag_id_to_detail_map=tag_id_to_detail_map)opcua_adapter.run()if __name__ == '__main__':main()

数据采集逻辑

# 断线重连机制
多进程中# 多进程模型
一个设备一个进程# 多线程模型
不同的采集频率一个线程,一个进程中的多个线程共享一个链接,进程# 客户端采用订阅的方式# 客户端采用轮旋的方式# 字段的含义
字段名称                字段含义            数值
No                     自增id
tag_uuid	           变量的唯一ID
node_id	               UA node_id
interval	           采集周期,以秒为单位
active	               是否激活,TURE, FALSE 必须大写
active_alarm           是否激活报警
alarm_up               报警上限
alarm_down             报警下限
alarm_up_info          报警上限值
alarm_down_info        报警下限值
alarm_up_change        达到报警上限的修改值
alarm_down_change      达到报警下限的修改值
active_archive         激活归档
archive_onchange       变化时归档 为FALSE archive_interval有效
archive_interval       轮训归档周期
active_scale           激活变换
scale_sign             符号
scale_factor           因子
mqtt_topic_name
unit
comments
http://www.dtcms.com/a/292967.html

相关文章:

  • 数据结构01:链表
  • FashionAI / 智尚衣橱 / TryFit / 智能时尚搭配平台
  • 面试150 N皇后Ⅱ
  • Docker环境搭建RabbitMq集群详解
  • 【CAN】2.帧格式
  • ReasonFlux:基于思维模板与分层强化学习的高效推理新范式
  • Python接口自动化实战 ( 第一阶段) - 封装接口请求类和异常处理
  • Ubuntu 虚拟机配置 与Windows互传文件
  • react19相关问题和解答
  • 【技术新闻】OpenAI发布GPT-5,AI编程助手迎来革命性突破
  • React集成百度【BMap Draw】教程(001):实现距离测量和面积测量
  • dubbo源码分析之请求调用异步化原理
  • Pandas核心数据结构详解
  • 第3章通用的服务可用性治理手段——3.2 重试
  • Kotlin 作用域函数 let 的实现原理
  • 大疆视觉算法面试30问全景精解
  • 基于Java+MySQL实现(Web)文件共享管理系统(仿照百度文库)
  • Java自动拆箱机制
  • 云祺容灾备份系统阿里云对象存储备份与恢复实操手册
  • List<UserInfo> list = new ArrayList<>();为什么要这样创建数组?
  • 智能文本抽取在法院卷宗管理应用剖析
  • 力扣-139.单词拆分
  • Qt 网络编程如何采用Http进行通信
  • 碳化硅缺陷分类与原因
  • C++的lambda表达式原理
  • 【RK3576】【Android14】MIC开发调试
  • 【iOS】SideTable
  • [学习] 笛卡尔坐标系的任意移动与旋转详解
  • 交叉编译opencv(Cpp)于arm64架构开发板上
  • AI 音频产品开发模板及流程(二)