当前位置: 首页 > news >正文

工业自动化通信控制

工业自动化通信控制

目录

  1. 串口通信控制
  2. 网口通信控制
  3. 物联网控制
  4. 综合应用案例
  5. 最佳实践与故障排查

串口通信控制

1.1 串口通信基础

串口通信是工业自动化中最常用的通信方式之一,主要包括RS232、RS485和RS422标准。

核心概念
  • 波特率(Baud Rate): 数据传输速率,常用值:9600、19200、38400、115200
  • 数据位(Data Bits): 通常为7位或8位
  • 停止位(Stop Bits): 1位、1.5位或2位
  • 校验位(Parity): 无校验(None)、奇校验(Odd)、偶校验(Even)
  • 流控制(Flow Control): 硬件流控(RTS/CTS)或软件流控(XON/XOFF)
RS232 vs RS485
特性RS232RS485
传输距离≤15米≤1200米
传输方式全双工半双工/全双工
设备数量1对1最多128个设备
抗干扰性较弱强(差分信号)
应用场景近距离点对点工业现场总线

1.2 C# 串口通信实现

基础串口类封装
using System;
using System.IO.Ports;
using System.Text;
using System.Threading;namespace IndustrialAutomation
{public class SerialPortManager : IDisposable{private SerialPort _serialPort;private bool _isConnected = false;// 数据接收事件public event EventHandler<string> DataReceived;public event EventHandler<Exception> ErrorOccurred;/// <summary>/// 初始化串口/// </summary>public SerialPortManager(string portName, int baudRate = 9600, Parity parity = Parity.None, int dataBits = 8, StopBits stopBits = StopBits.One){_serialPort = new SerialPort{PortName = portName,BaudRate = baudRate,Parity = parity,DataBits = dataBits,StopBits = stopBits,Handshake = Handshake.None,ReadTimeout = 1000,WriteTimeout = 1000};_serialPort.DataReceived += SerialPort_DataReceived;_serialPort.ErrorReceived += SerialPort_ErrorReceived;}/// <summary>/// 打开串口/// </summary>public bool Open(){try{if (!_serialPort.IsOpen){_serialPort.Open();_isConnected = true;Console.WriteLine($"串口 {_serialPort.PortName} 已打开");return true;}return false;}catch (Exception ex){ErrorOccurred?.Invoke(this, ex);return false;}}/// <summary>/// 关闭串口/// </summary>public void Close(){if (_serialPort != null && _serialPort.IsOpen){_serialPort.Close();_isConnected = false;Console.WriteLine("串口已关闭");}}/// <summary>/// 发送字符串数据/// </summary>public bool SendString(string data){try{if (_serialPort.IsOpen){_serialPort.WriteLine(data);return true;}return false;}catch (Exception ex){ErrorOccurred?.Invoke(this, ex);return false;}}/// <summary>/// 发送字节数组(常用于Modbus等协议)/// </summary>public bool SendBytes(byte[] data){try{if (_serialPort.IsOpen){_serialPort.Write(data, 0, data.Length);return true;}return false;}catch (Exception ex){ErrorOccurred?.Invoke(this, ex);return false;}}/// <summary>/// 发送并等待响应/// </summary>public byte[] SendAndReceive(byte[] sendData, int expectedLength, int timeout = 1000){try{if (!_serialPort.IsOpen) return null;_serialPort.DiscardInBuffer();_serialPort.Write(sendData, 0, sendData.Length);Thread.Sleep(50); // 等待设备响应byte[] buffer = new byte[expectedLength];int bytesRead = _serialPort.Read(buffer, 0, expectedLength);if (bytesRead == expectedLength)return buffer;return null;}catch (Exception ex){ErrorOccurred?.Invoke(this, ex);return null;}}private void SerialPort_DataReceived(object sender, SerialDataReceivedEventArgs e){try{string data = _serialPort.ReadExisting();DataReceived?.Invoke(this, data);}catch (Exception ex){ErrorOccurred?.Invoke(this, ex);}}private void SerialPort_ErrorReceived(object sender, SerialErrorReceivedEventArgs e){ErrorOccurred?.Invoke(this, new Exception($"串口错误: {e.EventType}"));}public void Dispose(){Close();_serialPort?.Dispose();}}
}
Modbus RTU 协议实现
public class ModbusRTU
{private SerialPortManager _serialPort;public ModbusRTU(string portName, int baudRate = 9600){_serialPort = new SerialPortManager(portName, baudRate);}/// <summary>/// 读取保持寄存器 (功能码 0x03)/// </summary>public ushort[] ReadHoldingRegisters(byte slaveId, ushort startAddress, ushort quantity){// 构建请求报文byte[] request = new byte[8];request[0] = slaveId;           // 从站地址request[1] = 0x03;              // 功能码request[2] = (byte)(startAddress >> 8);   // 起始地址高字节request[3] = (byte)(startAddress & 0xFF); // 起始地址低字节request[4] = (byte)(quantity >> 8);       // 寄存器数量高字节request[5] = (byte)(quantity & 0xFF);     // 寄存器数量低字节// 计算CRC校验ushort crc = CalculateCRC(request, 6);request[6] = (byte)(crc & 0xFF);request[7] = (byte)(crc >> 8);// 发送并接收响应int expectedLength = 5 + (quantity * 2);byte[] response = _serialPort.SendAndReceive(request, expectedLength, 1000);if (response == null || !ValidateCRC(response))throw new Exception("Modbus通信错误或CRC校验失败");// 解析数据ushort[] values = new ushort[quantity];for (int i = 0; i < quantity; i++){values[i] = (ushort)((response[3 + i * 2] << 8) | response[4 + i * 2]);}return values;}/// <summary>/// 写单个寄存器 (功能码 0x06)/// </summary>public bool WriteSingleRegister(byte slaveId, ushort address, ushort value){byte[] request = new byte[8];request[0] = slaveId;request[1] = 0x06;request[2] = (byte)(address >> 8);request[3] = (byte)(address & 0xFF);request[4] = (byte)(value >> 8);request[5] = (byte)(value & 0xFF);ushort crc = CalculateCRC(request, 6);request[6] = (byte)(crc & 0xFF);request[7] = (byte)(crc >> 8);byte[] response = _serialPort.SendAndReceive(request, 8, 1000);return response != null && ValidateCRC(response);}/// <summary>/// CRC16校验计算/// </summary>private ushort CalculateCRC(byte[] data, int length){ushort crc = 0xFFFF;for (int i = 0; i < length; i++){crc ^= data[i];for (int j = 0; j < 8; j++){if ((crc & 0x0001) != 0)crc = (ushort)((crc >> 1) ^ 0xA001);elsecrc >>= 1;}}return crc;}private bool ValidateCRC(byte[] data){if (data.Length < 2) return false;ushort receivedCRC = (ushort)(data[data.Length - 2] | (data[data.Length - 1] << 8));ushort calculatedCRC = CalculateCRC(data, data.Length - 2);return receivedCRC == calculatedCRC;}
}

1.3 Python 串口通信实现

基础串口操作
import serial
import serial.tools.list_ports
import time
import struct
from typing import Optional, Listclass SerialPortManager:"""串口管理器类"""def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0, **kwargs):"""初始化串口Args:port: 串口名称 (如 'COM1' 或 '/dev/ttyUSB0')baudrate: 波特率timeout: 超时时间(秒)**kwargs: 其他serial.Serial参数"""self.port = portself.baudrate = baudrateself.timeout = timeoutself.serial = Noneself.kwargs = kwargsdef open(self) -> bool:"""打开串口"""try:self.serial = serial.Serial(port=self.port,baudrate=self.baudrate,timeout=self.timeout,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,**self.kwargs)print(f"串口 {self.port} 已打开")return Trueexcept serial.SerialException as e:print(f"打开串口失败: {e}")return Falsedef close(self):"""关闭串口"""if self.serial and self.serial.is_open:self.serial.close()print("串口已关闭")def send_string(self, data: str, encoding: str = 'utf-8') -> bool:"""发送字符串"""try:if self.serial and self.serial.is_open:self.serial.write(data.encode(encoding))return Truereturn Falseexcept Exception as e:print(f"发送数据失败: {e}")return Falsedef send_bytes(self, data: bytes) -> bool:"""发送字节数据"""try:if self.serial and self.serial.is_open:self.serial.write(data)return Truereturn Falseexcept Exception as e:print(f"发送数据失败: {e}")return Falsedef read_bytes(self, size: int = 1) -> Optional[bytes]:"""读取指定字节数"""try:if self.serial and self.serial.is_open:return self.serial.read(size)return Noneexcept Exception as e:print(f"读取数据失败: {e}")return Nonedef read_until(self, terminator: bytes = b'\n') -> Optional[bytes]:"""读取直到遇到终止符"""try:if self.serial and self.serial.is_open:return self.serial.read_until(terminator)return Noneexcept Exception as e:print(f"读取数据失败: {e}")return Nonedef send_and_receive(self, send_data: bytes, expected_length: int, delay: float = 0.05) -> Optional[bytes]:"""发送数据并等待响应"""try:if not self.serial or not self.serial.is_open:return Noneself.serial.reset_input_buffer()self.serial.write(send_data)time.sleep(delay)response = self.serial.read(expected_length)return response if len(response) == expected_length else Noneexcept Exception as e:print(f"通信失败: {e}")return None@staticmethoddef list_ports() -> List[str]:"""列出所有可用串口"""ports = serial.tools.list_ports.comports()return [port.device for port in ports]def __enter__(self):self.open()return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.close()class ModbusRTU:"""Modbus RTU协议实现"""def __init__(self, port: str, baudrate: int = 9600):self.serial_manager = SerialPortManager(port, baudrate)def open(self):return self.serial_manager.open()def close(self):self.serial_manager.close()def read_holding_registers(self, slave_id: int, start_address: int, quantity: int) -> Optional[List[int]]:"""读取保持寄存器 (功能码 0x03)Args:slave_id: 从站地址start_address: 起始地址quantity: 寄存器数量Returns:寄存器值列表"""# 构建请求报文request = struct.pack('>BBHH',slave_id,        # 从站地址0x03,            # 功能码start_address,   # 起始地址quantity         # 寄存器数量)# 添加CRC校验crc = self._calculate_crc(request)request += struct.pack('<H', crc)# 发送并接收expected_length = 5 + (quantity * 2)response = self.serial_manager.send_and_receive(request, expected_length)if not response or not self._validate_crc(response):print("Modbus通信错误或CRC校验失败")return None# 解析数据values = []for i in range(quantity):offset = 3 + (i * 2)value = struct.unpack('>H', response[offset:offset+2])[0]values.append(value)return valuesdef write_single_register(self, slave_id: int, address: int, value: int) -> bool:"""写单个寄存器 (功能码 0x06)"""request = struct.pack('>BBHH',slave_id,0x06,address,value)crc = self._calculate_crc(request)request += struct.pack('<H', crc)response = self.serial_manager.send_and_receive(request, 8)return response is not None and self._validate_crc(response)def write_multiple_registers(self, slave_id: int,start_address: int,values: List[int]) -> bool:"""写多个寄存器 (功能码 0x10)"""quantity = len(values)byte_count = quantity * 2# 构建请求头request = struct.pack('>BBHHB',slave_id,0x10,start_address,quantity,byte_count)# 添加数据for value in values:request += struct.pack('>H', value)# 添加CRCcrc = self._calculate_crc(request)request += struct.pack('<H', crc)response = self.serial_manager.send_and_receive(request, 8)return response is not None and self._validate_crc(response)@staticmethoddef _calculate_crc(data: bytes) -> int:"""计算CRC16校验"""crc = 0xFFFFfor byte in data:crc ^= bytefor _ in range(8):if crc & 0x0001:crc = (crc >> 1) ^ 0xA001else:crc >>= 1return crc@staticmethoddef _validate_crc(data: bytes) -> bool:"""验证CRC校验"""if len(data) < 2:return Falsereceived_crc = struct.unpack('<H', data[-2:])[0]calculated_crc = ModbusRTU._calculate_crc(data[:-2])return received_crc == calculated_crc
使用示例
# 示例1: 基础串口通信
with SerialPortManager('/dev/ttyUSB0', 9600) as serial_mgr:# 发送字符串serial_mgr.send_string("Hello Device\n")# 读取响应response = serial_mgr.read_until(b'\n')print(f"收到: {response.decode()}")# 示例2: Modbus RTU通信
modbus = ModbusRTU('/dev/ttyUSB0', 9600)
modbus.open()try:# 读取从站1的保持寄存器values = modbus.read_holding_registers(slave_id=1, start_address=0, quantity=10)if values:print(f"读取到的寄存器值: {values}")# 写入单个寄存器success = modbus.write_single_register(slave_id=1, address=0, value=1234)print(f"写入{'成功' if success else '失败'}")finally:modbus.close()

网口通信控制

2.1 TCP/IP 通信

C# TCP 客户端实现
using System;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;namespace IndustrialAutomation.Network
{public class TcpClientManager : IDisposable{private TcpClient _client;private NetworkStream _stream;private bool _isConnected;private CancellationTokenSource _cancellationTokenSource;public event EventHandler<byte[]> DataReceived;public event EventHandler<Exception> ErrorOccurred;public event EventHandler Connected;public event EventHandler Disconnected;public string ServerIP { get; private set; }public int ServerPort { get; private set; }public bool IsConnected => _isConnected && _client?.Connected == true;public TcpClientManager(string serverIP, int serverPort){ServerIP = serverIP;ServerPort = serverPort;}/// <summary>/// 异步连接到服务器/// </summary>public async Task<bool> ConnectAsync(int timeout = 5000){try{_client = new TcpClient();var connectTask = _client.ConnectAsync(ServerIP, ServerPort);if (await Task.WhenAny(connectTask, Task.Delay(timeout)) == connectTask){_stream = _client.GetStream();_isConnected = true;_cancellationTokenSource = new CancellationTokenSource();// 启动接收线程_ = Task.Run(() => ReceiveDataAsync(_cancellationTokenSource.Token));Connected?.Invoke(this, EventArgs.Empty);Console.WriteLine($"已连接到 {ServerIP}:{ServerPort}");return true;}else{throw new TimeoutException("连接超时");}}catch (Exception ex){ErrorOccurred?.Invoke(this, ex);return false;}}/// <summary>/// 发送数据/// </summary>public async Task<bool> SendAsync(byte[] data){try{if (_stream != null && _stream.CanWrite){await _stream.WriteAsync(data, 0, data.Length);await _stream.FlushAsync();return true;}return false;}catch (Exception ex){ErrorOccurred?.Invoke(this, ex);return false;}}/// <summary>/// 发送字符串/// </summary>public Task<bool> SendStringAsync(string message, Encoding encoding = null){encoding = encoding ?? Encoding.UTF8;byte[] data = encoding.GetBytes(message);return SendAsync(data);}/// <summary>/// 发送并等待响应/// </summary>public async Task<byte[]> SendAndReceiveAsync(byte[] data, int expectedLength, int timeout = 3000){try{if (!IsConnected) return null;await SendAsync(data);byte[] buffer = new byte[expectedLength];var readTask = _stream.ReadAsync(buffer, 0, expectedLength);if (await Task.WhenAny(readTask, Task.Delay(timeout)) == readTask){int bytesRead = await readTask;if (bytesRead == expectedLength)return buffer;}return null;}catch (Exception ex){ErrorOccurred?.Invoke(this, ex);return null;}}private async Task ReceiveDataAsync(CancellationToken token){byte[] buffer = new byte[4096];try{while (!token.IsCancellationRequested && _stream != null){int bytesRead = await _stream.ReadAsync(buffer, 0, buffer.Length, token);if (bytesRead > 0){byte[] data = new byte[bytesRead];Array.Copy(buffer, data, bytesRead);DataReceived?.Invoke(this, data);}else{// 连接已断开break;}}}catch (Exception ex){if (!token.IsCancellationRequested)ErrorOccurred?.Invoke(this, ex);}finally{Disconnect();}}public void Disconnect(){_isConnected = false;_cancellationTokenSource?.Cancel();_stream?.Close();_client?.Close();Disconnected?.Invoke(this, EventArgs.Empty);Console.WriteLine("连接已断开");}public void Dispose(){Disconnect();_cancellationTokenSource?.Dispose();_stream?.Dispose();_client?.Dispose();}}
}
C# TCP 服务器实现
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;namespace IndustrialAutomation.Network
{public class TcpServerManager : IDisposable{private TcpListener _listener;private CancellationTokenSource _cancellationTokenSource;private ConcurrentDictionary<string, TcpClient> _clients;public event EventHandler<ClientConnectedEventArgs> ClientConnected;public event EventHandler<ClientDisconnectedEventArgs> ClientDisconnected;public event EventHandler<DataReceivedEventArgs> DataReceived;public int Port { get; private set; }public bool IsRunning { get; private set; }public TcpServerManager(int port){Port = port;_clients = new ConcurrentDictionary<string, TcpClient>();}public void Start(){try{_listener = new TcpListener(IPAddress.Any, Port);_listener.Start();_cancellationTokenSource = new CancellationTokenSource();IsRunning = true;_ = Task.Run(() => AcceptClientsAsync(_cancellationTokenSource.Token));Console.WriteLine($"TCP服务器已启动,监听端口: {Port}");}catch (Exception ex){Console.WriteLine($"启动服务器失败: {ex.Message}");}}private async Task AcceptClientsAsync(CancellationToken token){while (!token.IsCancellationRequested){try{TcpClient client = await _listener.AcceptTcpClientAsync();string clientId = client.Client.RemoteEndPoint.ToString();_clients.TryAdd(clientId, client);ClientConnected?.Invoke(this, new ClientConnectedEventArgs(clientId));_ = Task.Run(() => HandleClientAsync(client, clientId, token));}catch (Exception ex){if (!token.IsCancellationRequested)Console.WriteLine($"接受客户端连接失败: {ex.Message}");}}}private async Task HandleClientAsync(TcpClient client, string clientId, CancellationToken token){NetworkStream stream = client.GetStream();byte[] buffer = new byte[4096];try{while (!token.IsCancellationRequested && client.Connected){int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, token);if (bytesRead > 0){byte[] data = new byte[bytesRead];Array.Copy(buffer, data, bytesRead);DataReceived?.Invoke(this, new DataReceivedEventArgs(clientId, data));}else{break;}}}catch (Exception ex){Console.WriteLine($"处理客户端数据失败: {ex.Message}");}finally{_clients.TryRemove(clientId, out _);stream.Close();client.Close();ClientDisconnected?.Invoke(this, new ClientDisconnectedEventArgs(clientId));}}public async Task<bool> SendToClientAsync(string clientId, byte[] data){if (_clients.TryGetValue(clientId, out TcpClient client)){try{NetworkStream stream = client.GetStream();await stream.WriteAsync(data, 0, data.Length);return true;}catch (Exception ex){Console.WriteLine($"发送数据失败: {ex.Message}");}}return false;}public async Task BroadcastAsync(byte[] data){foreach (var client in _clients.Values){try{NetworkStream stream = client.GetStream();await stream.WriteAsync(data, 0, data.Length);}catch { }}}public void Stop(){IsRunning = false;_cancellationTokenSource?.Cancel();foreach (var client in _clients.Values){client.Close();}_clients.Clear();_listener?.Stop();Console.WriteLine("TCP服务器已停止");}public void Dispose(){Stop();_cancellationTokenSource?.Dispose();}}public class ClientConnectedEventArgs : EventArgs{public string ClientId { get; }public ClientConnectedEventArgs(string clientId) => ClientId = clientId;}public class ClientDisconnectedEventArgs : EventArgs{public string ClientId { get; }public ClientDisconnectedEventArgs(string clientId) => ClientId = clientId;}public class DataReceivedEventArgs : EventArgs{public string ClientId { get; }public byte[] Data { get; }public DataReceivedEventArgs(string clientId, byte[] data){ClientId = clientId;Data = data;}}
}
Python TCP 通信实现
import socket
import threading
import time
from typing import Optional, Callable, Dict
from queue import Queueclass TcpClient:"""TCP客户端"""def __init__(self, host: str, port: int, buffer_size: int = 4096):self.host = hostself.port = portself.buffer_size = buffer_sizeself.socket: Optional[socket.socket] = Noneself.is_connected = Falseself.receive_thread: Optional[threading.Thread] = Noneself.running = False# 回调函数self.on_data_received: Optional[Callable[[bytes], None]] = Noneself.on_connected: Optional[Callable[[], None]] = Noneself.on_disconnected: Optional[Callable[[], None]] = Noneself.on_error: Optional[Callable[[Exception], None]] = Nonedef connect(self, timeout: float = 5.0) -> bool:"""连接到服务器"""try:self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.socket.settimeout(timeout)self.socket.connect((self.host, self.port))self.socket.settimeout(None)self.is_connected = Trueself.running = True# 启动接收线程self.receive_thread = threading.Thread(target=self._receive_loop, daemon=True)self.receive_thread.start()print(f"已连接到 {self.host}:{self.port}")if self.on_connected:self.on_connected()return Trueexcept Exception as e:print(f"连接失败: {e}")if self.on_error:self.on_error(e)return Falsedef disconnect(self):"""断开连接"""self.running = Falseself.is_connected = Falseif self.socket:try:self.socket.close()except:passif self.receive_thread and self.receive_thread.is_alive():self.receive_thread.join(timeout=2)print("连接已断开")if self.on_disconnected:self.on_disconnected()def send(self, data: bytes) -> bool:"""发送数据"""try:if self.socket and self.is_connected:self.socket.sendall(data)return Truereturn Falseexcept Exception as e:print(f"发送数据失败: {e}")if self.on_error:self.on_error(e)self.disconnect()return Falsedef send_string(self, message: str, encoding: str = 'utf-8') -> bool:"""发送字符串"""return self.send(message.encode(encoding))def send_and_receive(self, data: bytes, timeout: float = 3.0) -> Optional[bytes]:"""发送数据并等待响应"""try:if not self.is_connected:return Noneself.socket.settimeout(timeout)self.socket.sendall(data)response = self.socket.recv(self.buffer_size)self.socket.settimeout(None)return response if response else Noneexcept socket.timeout:print("接收超时")self.socket.settimeout(None)return Noneexcept Exception as e:print(f"通信失败: {e}")if self.on_error:self.on_error(e)return Nonedef _receive_loop(self):"""接收数据循环"""while self.running:try:data = self.socket.recv(self.buffer_size)if data:if self.on_data_received:self.on_data_received(data)else:# 连接已关闭breakexcept Exception as e:if self.running:print(f"接收数据错误: {e}")if self.on_error:self.on_error(e)breakself.disconnect()def __enter__(self):self.connect()return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.disconnect()class TcpServer:"""TCP服务器"""def __init__(self, host: str = '0.0.0.0', port: int = 8888, buffer_size: int = 4096):self.host = hostself.port = portself.buffer_size = buffer_sizeself.server_socket: Optional[socket.socket] = Noneself.clients: Dict[str, socket.socket] = {}self.running = False# 回调函数self.on_client_connected: Optional[Callable[[str], None]] = Noneself.on_client_disconnected: Optional[Callable[[str], None]] = Noneself.on_data_received: Optional[Callable[[str, bytes], None]] = Nonedef start(self):"""启动服务器"""try:self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.server_socket.bind((self.host, self.port))self.server_socket.listen(5)self.running = Trueprint(f"TCP服务器已启动,监听 {self.host}:{self.port}")# 启动接受连接的线程accept_thread = threading.Thread(target=self._accept_loop, daemon=True)accept_thread.start()except Exception as e:print(f"启动服务器失败: {e}")def stop(self):"""停止服务器"""self.running = False# 关闭所有客户端连接for client_id, client_socket in list(self.clients.items()):try:client_socket.close()except:passself.clients.clear()# 关闭服务器套接字if self.server_socket:try:self.server_socket.close()except:passprint("TCP服务器已停止")def _accept_loop(self):"""接受客户端连接循环"""while self.running:try:client_socket, client_address = self.server_socket.accept()client_id = f"{client_address[0]}:{client_address[1]}"self.clients[client_id] = client_socketprint(f"客户端已连接: {client_id}")if self.on_client_connected:self.on_client_connected(client_id)# 为每个客户端启动处理线程client_thread = threading.Thread(target=self._handle_client,args=(client_socket, client_id),daemon=True)client_thread.start()except Exception as e:if self.running:print(f"接受客户端连接失败: {e}")def _handle_client(self, client_socket: socket.socket, client_id: str):"""处理客户端数据"""try:while self.running:data = client_socket.recv(self.buffer_size)if data:if self.on_data_received:self.on_data_received(client_id, data)else:breakexcept Exception as e:print(f"处理客户端数据失败: {e}")finally:# 清理客户端if client_id in self.clients:del self.clients[client_id]try:client_socket.close()except:passprint(f"客户端已断开: {client_id}")if self.on_client_disconnected:self.on_client_disconnected(client_id)def send_to_client(self, client_id: str, data: bytes) -> bool:"""发送数据到指定客户端"""if client_id in self.clients:try:self.clients[client_id].sendall(data)return Trueexcept Exception as e:print(f"发送数据失败: {e}")return Falsereturn Falsedef broadcast(self, data: bytes):"""广播数据到所有客户端"""for client_id, client_socket in list(self.clients.items()):try:client_socket.sendall(data)except:pass

2.2 UDP 通信实现

Python UDP 实现
import socket
import threading
from typing import Optional, Callable, Tupleclass UdpCommunicator:"""UDP通信器(可用作客户端或服务器)"""def __init__(self, local_port: int = 0, buffer_size: int = 4096):"""初始化UDP通信器Args:local_port: 本地绑定端口,0表示自动分配buffer_size: 接收缓冲区大小"""self.local_port = local_portself.buffer_size = buffer_sizeself.socket: Optional[socket.socket] = Noneself.running = Falseself.receive_thread: Optional[threading.Thread] = None# 回调函数self.on_data_received: Optional[Callable[[bytes, Tuple[str, int]], None]] = Nonedef start(self):"""启动UDP通信"""try:self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.socket.bind(('0.0.0.0', self.local_port))# 获取实际绑定的端口self.local_port = self.socket.getsockname()[1]self.running = Trueself.receive_thread = threading.Thread(target=self._receive_loop, daemon=True)self.receive_thread.start()print(f"UDP通信已启动,监听端口: {self.local_port}")except Exception as e:print(f"启动UDP通信失败: {e}")def stop(self):"""停止UDP通信"""self.running = Falseif self.socket:try:self.socket.close()except:passif self.receive_thread and self.receive_thread.is_alive():self.receive_thread.join(timeout=2)print("UDP通信已停止")def send(self, data: bytes, address: Tuple[str, int]) -> bool:"""发送数据Args:data: 要发送的数据address: 目标地址 (host, port)"""try:if self.socket:self.socket.sendto(data, address)return Truereturn Falseexcept Exception as e:print(f"发送UDP数据失败: {e}")return Falsedef send_string(self, message: str, address: Tuple[str, int], encoding: str = 'utf-8') -> bool:"""发送字符串"""return self.send(message.encode(encoding), address)def send_broadcast(self, data: bytes, port: int):"""发送广播消息"""try:self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)self.socket.sendto(data, ('<broadcast>', port))return Trueexcept Exception as e:print(f"发送广播失败: {e}")return Falsedef _receive_loop(self):"""接收数据循环"""while self.running:try:data, address = self.socket.recvfrom(self.buffer_size)if self.on_data_received:self.on_data_received(data, address)except Exception as e:if self.running:print(f"接收UDP数据错误: {e}")def __enter__(self):self.start()return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.stop()

2.3 Modbus TCP 实现

import struct
from typing import Optional, Listclass ModbusTCP:"""Modbus TCP协议实现"""def __init__(self, host: str, port: int = 502):self.tcp_client = TcpClient(host, port)self.transaction_id = 0def connect(self) -> bool:return self.tcp_client.connect()def disconnect(self):self.tcp_client.disconnect()def read_holding_registers(self, slave_id: int, start_address: int, quantity: int) -> Optional[List[int]]:"""读取保持寄存器"""self.transaction_id += 1# 构建Modbus TCP请求request = struct.pack('>HHHBBHH',self.transaction_id,  # 事务标识符0,                    # 协议标识符6,                    # 长度slave_id,             # 单元标识符0x03,                 # 功能码start_address,        # 起始地址quantity              # 数量)# 发送并接收response = self.tcp_client.send_and_receive(request, timeout=3.0)if not response or len(response) < 9:return None# 解析响应trans_id, proto_id, length, unit_id, func_code, byte_count = struct.unpack('>HHHBBB', response[:9])if func_code != 0x03 or trans_id != self.transaction_id:return None# 提取寄存器值values = []for i in range(quantity):offset = 9 + (i * 2)if offset + 2 <= len(response):value = struct.unpack('>H', response[offset:offset+2])[0]values.append(value)return valuesdef write_single_register(self, slave_id: int, address: int, value: int) -> bool:"""写单个寄存器"""self.transaction_id += 1request = struct.pack('>HHHBBHH',self.transaction_id,0,6,slave_id,0x06,address,value)response = self.tcp_client.send_and_receive(request, timeout=3.0)return response is not None and len(response) >= 12def write_multiple_registers(self, slave_id: int, start_address: int, values: List[int]) -> bool:"""写多个寄存器"""self.transaction_id += 1quantity = len(values)byte_count = quantity * 2# 构建请求头request = struct.pack('>HHHBBHHB',self.transaction_id,0,7 + byte_count,slave_id,0x10,start_address,quantity,byte_count)# 添加数据for value in values:request += struct.pack('>H', value)response = self.tcp_client.send_and_receive(request, timeout=3.0)return response is not None and len(response) >= 12

物联网控制

3.1 MQTT 协议实现

C# MQTT 客户端(使用MQTTnet库)
using MQTTnet;
using MQTTnet.Client;
using System;
using System.Text;
using System.Threading.Tasks;namespace IndustrialAutomation.IoT
{public class MqttClientManager : IDisposable{private IMqttClient _mqttClient;private MqttClientOptions _options;public event EventHandler<string> MessageReceived;public event EventHandler Connected;public event EventHandler Disconnected;public bool IsConnected => _mqttClient?.IsConnected ?? false;public MqttClientManager(string broker, int port = 1883, string clientId = null){var factory = new MqttFactory();_mqttClient = factory.CreateMqttClient();clientId = clientId ?? $"Client_{Guid.NewGuid():N}";_options = new MqttClientOptionsBuilder().WithTcpServer(broker, port).WithClientId(clientId).WithCleanSession().Build();_mqttClient.ConnectedAsync += OnConnectedAsync;_mqttClient.DisconnectedAsync += OnDisconnectedAsync;_mqttClient.ApplicationMessageReceivedAsync += OnMessageReceivedAsync;}public async Task<bool> ConnectAsync(string username = null, string password = null){try{if (!string.IsNullOrEmpty(username)){_options = new MqttClientOptionsBuilder(_options).WithCredentials(username, password).Build();}await _mqttClient.ConnectAsync(_options);return true;}catch (Exception ex){Console.WriteLine($"MQTT连接失败: {ex.Message}");return false;}}public async Task DisconnectAsync(){if (_mqttClient.IsConnected){await _mqttClient.DisconnectAsync();}}public async Task<bool> SubscribeAsync(string topic, int qos = 1){try{var subscribeOptions = new MqttClientSubscribeOptionsBuilder().WithTopicFilter(f => f.WithTopic(topic).WithQualityOfServiceLevel((MQTTnet.Protocol.MqttQualityOfServiceLevel)qos)).Build();await _mqttClient.SubscribeAsync(subscribeOptions);Console.WriteLine($"已订阅主题: {topic}");return true;}catch (Exception ex){Console.WriteLine($"订阅失败: {ex.Message}");return false;}}public async Task<bool> UnsubscribeAsync(string topic){try{var unsubscribeOptions = new MqttClientUnsubscribeOptionsBuilder().WithTopicFilter(topic).Build();await _mqttClient.UnsubscribeAsync(unsubscribeOptions);Console.WriteLine($"已取消订阅: {topic}");return true;}catch (Exception ex){Console.WriteLine($"取消订阅失败: {ex.Message}");return false;}}public async Task<bool> PublishAsync(string topic, string payload, int qos = 1, bool retain = false){try{var message = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(payload).WithQualityOfServiceLevel((MQTTnet.Protocol.MqttQualityOfServiceLevel)qos).WithRetainFlag(retain).Build();await _mqttClient.PublishAsync(message);return true;}catch (Exception ex){Console.WriteLine($"发布消息失败: {ex.Message}");return false;}}public async Task<bool> PublishJsonAsync<T>(string topic, T data, int qos = 1){string json = System.Text.Json.JsonSerializer.Serialize(data);return await PublishAsync(topic, json, qos);}private Task OnConnectedAsync(MqttClientConnectedEventArgs arg){Console.WriteLine("MQTT已连接");Connected?.Invoke(this, EventArgs.Empty);return Task.CompletedTask;}private Task OnDisconnectedAsync(MqttClientDisconnectedEventArgs arg){Console.WriteLine("MQTT已断开");Disconnected?.Invoke(this, EventArgs.Empty);return Task.CompletedTask;}private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg){string payload = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment);string topic = arg.ApplicationMessage.Topic;Console.WriteLine($"收到消息 - 主题: {topic}, 内容: {payload}");MessageReceived?.Invoke(this, payload);return Task.CompletedTask;}public void Dispose(){_mqttClient?.Dispose();}}
}
Python MQTT 客户端(使用paho-mqtt库)
import paho.mqtt.client as mqtt
import json
from typing import Optional, Callable, Any
import timeclass MqttClient:"""MQTT客户端封装"""def __init__(self, broker: str, port: int = 1883, client_id: Optional[str] = None):"""初始化MQTT客户端Args:broker: MQTT代理服务器地址port: 端口号client_id: 客户端ID,None则自动生成"""self.broker = brokerself.port = portself.client_id = client_id or f"client_{int(time.time())}"self.client = mqtt.Client(client_id=self.client_id)self.is_connected = False# 设置回调self.client.on_connect = self._on_connectself.client.on_disconnect = self._on_disconnectself.client.on_message = self._on_messageself.client.on_subscribe = self._on_subscribeself.client.on_publish = self._on_publish# 用户回调函数self.on_message_callback: Optional[Callable[[str, str], None]] = Noneself.on_connected_callback: Optional[Callable[[], None]] = Noneself.on_disconnected_callback: Optional[Callable[[], None]] = Nonedef connect(self, username: Optional[str] = None, password: Optional[str] = None, keepalive: int = 60) -> bool:"""连接到MQTT代理"""try:if username and password:self.client.username_pw_set(username, password)self.client.connect(self.broker, self.port, keepalive)self.client.loop_start()  # 启动网络循环# 等待连接建立timeout = 10start_time = time.time()while not self.is_connected and (time.time() - start_time) < timeout:time.sleep(0.1)return self.is_connectedexcept Exception as e:print(f"MQTT连接失败: {e}")return Falsedef disconnect(self):"""断开连接"""self.client.loop_stop()self.client.disconnect()def subscribe(self, topic: str, qos: int = 1):"""订阅主题"""self.client.subscribe(topic, qos)print(f"已订阅主题: {topic}")def unsubscribe(self, topic: str):"""取消订阅"""self.client.unsubscribe(topic)print(f"已取消订阅: {topic}")def publish(self, topic: str, payload: str, qos: int = 1, retain: bool = False) -> bool:"""发布消息"""try:result = self.client.publish(topic, payload, qos, retain)return result.rc == mqtt.MQTT_ERR_SUCCESSexcept Exception as e:print(f"发布消息失败: {e}")return Falsedef publish_json(self, topic: str, data: Any, qos: int = 1, retain: bool = False) -> bool:"""发布JSON数据"""try:payload = json.dumps(data, ensure_ascii=False)return self.publish(topic, payload, qos, retain)except Exception as e:print(f"发布JSON失败: {e}")return Falsedef _on_connect(self, client, userdata, flags, rc):"""连接回调"""if rc == 0:self.is_connected = Trueprint("MQTT已连接")if self.on_connected_callback:self.on_connected_callback()else:print(f"MQTT连接失败,错误码: {rc}")def _on_disconnect(self, client, userdata, rc):"""断开连接回调"""self.is_connected = Falseprint("MQTT已断开")if self.on_disconnected_callback:self.on_disconnected_callback()def _on_message(self, client, userdata, msg):"""消息接收回调"""topic = msg.topicpayload = msg.payload.decode('utf-8')print(f"收到消息 - 主题: {topic}, 内容: {payload}")if self.on_message_callback:self.on_message_callback(topic, payload)def _on_subscribe(self, client, userdata, mid, granted_qos):"""订阅确认回调"""print(f"订阅确认,QoS: {granted_qos}")def _on_publish(self, client, userdata, mid):"""发布确认回调"""passdef __enter__(self):self.connect()return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.disconnect()

3.2 HTTP RESTful API 实现

Python HTTP客户端(用于物联网平台API)
import requests
import json
from typing import Optional, Dict, Any
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retryclass IoTApiClient:"""物联网平台HTTP API客户端"""def __init__(self, base_url: str, api_key: Optional[str] = None, timeout: int = 30):"""初始化API客户端Args:base_url: API基础URLapi_key: API密钥timeout: 请求超时时间"""self.base_url = base_url.rstrip('/')self.api_key = api_keyself.timeout = timeout# 创建会话self.session = requests.Session()# 设置重试策略retry_strategy = Retry(total=3,backoff_factor=1,status_forcelist=[429, 500, 502, 503, 504],allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"])adapter = HTTPAdapter(max_retries=retry_strategy)self.session.mount("http://", adapter)self.session.mount("https://", adapter)# 设置默认请求头self.session.headers.update({'Content-Type': 'application/json','Accept': 'application/json'})if api_key:self.session.headers.update({'Authorization': f'Bearer {api_key}'})def get(self, endpoint: str, params: Optional[Dict] = None) -> Optional[Dict]:"""GET请求"""try:url = f"{self.base_url}/{endpoint.lstrip('/')}"response = self.session.get(url, params=params, timeout=self.timeout)response.raise_for_status()return response.json()except requests.exceptions.RequestException as e:print(f"GET请求失败: {e}")return Nonedef post(self, endpoint: str, data: Optional[Dict] = None) -> Optional[Dict]:"""POST请求"""try:url = f"{self.base_url}/{endpoint.lstrip('/')}"response = self.session.post(url, json=data, timeout=self.timeout)response.raise_for_status()return response.json()except requests.exceptions.RequestException as e:print(f"POST请求失败: {e}")return Nonedef put(self, endpoint: str, data: Optional[Dict] = None) -> Optional[Dict]:"""PUT请求"""try:url = f"{self.base_url}/{endpoint.lstrip('/')}"response = self.session.put(url, json=data, timeout=self.timeout)response.raise_for_status()return response.json()except requests.exceptions.RequestException as e:print(f"PUT请求失败: {e}")return Nonedef delete(self, endpoint: str) -> bool:"""DELETE请求"""try:url = f"{self.base_url}/{endpoint.lstrip('/')}"response = self.session.delete(url, timeout=self.timeout)response.raise_for_status()return Trueexcept requests.exceptions.RequestException as e:print(f"DELETE请求失败: {e}")return False# 设备管理API示例def get_device(self, device_id: str) -> Optional[Dict]:"""获取设备信息"""return self.get(f"/devices/{device_id}")def get_devices(self, page: int = 1, page_size: int = 20) -> Optional[Dict]:"""获取设备列表"""return self.get("/devices", params={"page": page, "pageSize": page_size})def create_device(self, device_data: Dict) -> Optional[Dict]:"""创建设备"""return self.post("/devices", data=device_data)def update_device(self, device_id: str, device_data: Dict) -> Optional[Dict]:"""更新设备"""return self.put(f"/devices/{device_id}", data=device_data)def delete_device(self, device_id: str) -> bool:"""删除设备"""return self.delete(f"/devices/{device_id}")def send_command(self, device_id: str, command: str, params: Optional[Dict] = None) -> Optional[Dict]:"""向设备发送命令"""data = {"command": command,"params": params or {}}return self.post(f"/devices/{device_id}/commands", data=data)def get_telemetry(self, device_id: str, start_time: Optional[str] = None, end_time: Optional[str] = None) -> Optional[Dict]:"""获取设备遥测数据"""params = {}if start_time:params['startTime'] = start_timeif end_time:params['endTime'] = end_timereturn self.get(f"/devices/{device_id}/telemetry", params=params)

综合应用案例

4.1 智能工厂数据采集系统

import threading
import time
import queue
from datetime import datetime
from typing import Dict, Listclass IndustrialDataCollector:"""工业数据采集器 - 综合串口、网口、物联网"""def __init__(self):# 数据队列self.data_queue = queue.Queue()# 通信管理器self.modbus_rtu = Noneself.modbus_tcp = Noneself.mqtt_client = Noneself.api_client = None# 运行状态self.running = Falseself.threads: List[threading.Thread] = []def initialize(self, config: Dict):"""初始化所有通信接口"""# 初始化Modbus RTU(串口)if 'modbus_rtu' in config:cfg = config['modbus_rtu']self.modbus_rtu = ModbusRTU(cfg['port'], cfg['baudrate'])self.modbus_rtu.open()print("Modbus RTU已初始化")# 初始化Modbus TCP(网口)if 'modbus_tcp' in config:cfg = config['modbus_tcp']self.modbus_tcp = ModbusTCP(cfg['host'], cfg['port'])self.modbus_tcp.connect()print("Modbus TCP已初始化")# 初始化MQTT(物联网)if 'mqtt' in config:cfg = config['mqtt']self.mqtt_client = MqttClient(cfg['broker'], cfg['port'])self.mqtt_client.connect(cfg.get('username'), cfg.get('password'))self.mqtt_client.subscribe(cfg['subscribe_topic'])# 设置MQTT消息回调self.mqtt_client.on_message_callback = self._on_mqtt_messageprint("MQTT已初始化")# 初始化HTTP API(云平台)if 'api' in config:cfg = config['api']self.api_client = IoTApiClient(cfg['base_url'], cfg['api_key'])print("API客户端已初始化")def start(self):"""启动数据采集"""self.running = True# 启动各个数据采集线程if self.modbus_rtu:thread = threading.Thread(target=self._collect_modbus_rtu_data, daemon=True)thread.start()self.threads.append(thread)if self.modbus_tcp:thread = threading.Thread(target=self._collect_modbus_tcp_data, daemon=True)thread.start()self.threads.append(thread)# 启动数据处理线程thread = threading.Thread(target=self._process_data, daemon=True)thread.start()self.threads.append(thread)print("数据采集已启动")def stop(self):"""停止数据采集"""self.running = False# 等待线程结束for thread in self.threads:thread.join(timeout=2)# 关闭所有连接if self.modbus_rtu:self.modbus_rtu.close()if self.modbus_tcp:self.modbus_tcp.disconnect()if self.mqtt_client:self.mqtt_client.disconnect()print("数据采集已停止")def _collect_modbus_rtu_data(self):"""采集Modbus RTU数据(串口设备)"""while self.running:try:# 读取传感器数据values = self.modbus_rtu.read_holding_registers(slave_id=1,start_address=0,quantity=10)if values:data = {'source': 'modbus_rtu','timestamp': datetime.now().isoformat(),'device_id': 'sensor_001','data': {'temperature': values[0] / 10.0,'humidity': values[1] / 10.0,'pressure': values[2] / 100.0,# 更多传感器数据...}}self.data_queue.put(data)time.sleep(1)  # 1秒采集一次except Exception as e:print(f"Modbus RTU采集错误: {e}")time.sleep(5)def _collect_modbus_tcp_data(self):"""采集Modbus TCP数据(网口PLC)"""while self.running:try:# 读取PLC数据values = self.modbus_tcp.read_holding_registers(slave_id=1,start_address=0,quantity=20)if values:data = {'source': 'modbus_tcp','timestamp': datetime.now().isoformat(),'device_id': 'plc_001','data': {'motor_speed': values[0],'motor_current': values[1] / 100.0,'production_count': values[2],'alarm_status': values[3],# 更多PLC数据...}}self.data_queue.put(data)time.sleep(0.5)  # 500ms采集一次except Exception as e:print(f"Modbus TCP采集错误: {e}")time.sleep(5)def _on_mqtt_message(self, topic: str, payload: str):"""处理MQTT消息"""try:import jsonmessage_data = json.loads(payload)data = {'source': 'mqtt','timestamp': datetime.now().isoformat(),'topic': topic,'data': message_data}self.data_queue.put(data)except Exception as e:print(f"MQTT消息处理错误: {e}")def _process_data(self):"""处理采集的数据"""while self.running:try:# 从队列获取数据(阻塞,超时1秒)data = self.data_queue.get(timeout=1)# 数据处理逻辑print(f"处理数据: {data['source']} - {data['device_id'] if 'device_id' in data else data.get('topic', 'N/A')}")# 1. 本地存储(可选)self._store_locally(data)# 2. 上传到云平台if self.api_client:self._upload_to_cloud(data)# 3. 通过MQTT发布(可选)if self.mqtt_client:self._publish_to_mqtt(data)# 4. 实时分析和告警self._analyze_and_alert(data)except queue.Empty:continueexcept Exception as e:print(f"数据处理错误: {e}")def _store_locally(self, data: Dict):"""本地存储数据"""# 这里可以存储到SQLite、CSV等passdef _upload_to_cloud(self, data: Dict):"""上传数据到云平台"""try:result = self.api_client.post('/telemetry', data=data)if result:print(f"数据已上传到云平台")except Exception as e:print(f"上传云平台失败: {e}")def _publish_to_mqtt(self, data: Dict):"""通过MQTT发布数据"""try:topic = f"factory/data/{data.get('device_id', 'unknown')}"self.mqtt_client.publish_json(topic, data)except Exception as e:print(f"MQTT发布失败: {e}")def _analyze_and_alert(self, data: Dict):"""数据分析和告警"""# 实现告警逻辑if 'data' in data:values = data['data']# 温度告警if 'temperature' in values and values['temperature'] > 80:print(f"⚠️ 温度告警: {values['temperature']}°C")# 发送告警...# 其他告警逻辑...# 使用示例
if __name__ == "__main__":# 配置config = {'modbus_rtu': {'port': '/dev/ttyUSB0','baudrate': 9600},'modbus_tcp': {'host': '192.168.1.100','port': 502},'mqtt': {'broker': 'mqtt.example.com','port': 1883,'username': 'user','password': 'pass','subscribe_topic': 'factory/commands/#'},'api': {'base_url': 'https://api.iot-platform.com','api_key': 'your_api_key_here'}}# 创建采集器collector = IndustrialDataCollector()collector.initialize(config)collector.start()try:# 运行采集while True:time.sleep(1)except KeyboardInterrupt:print("\n正在停止...")collector.stop()

最佳实践与故障排查

5.1 通信最佳实践

串口通信
  1. 始终添加超时机制,避免程序挂起
  2. 使用CRC或校验位确保数据完整性
  3. 实现重试机制,处理通信失败
  4. 清空缓冲区,避免旧数据干扰
  5. 使用线程或异步,避免阻塞主程序
网口通信
  1. 使用心跳包保持连接活跃
  2. 实现断线重连机制
  3. 限制并发连接数
  4. 使用连接池提高效率
  5. 设置合理的超时时间
物联网通信
  1. 使用QoS机制保证消息可靠性
  2. 实现消息去重
  3. 压缩大数据,减少带宽
  4. 使用TLS加密保护数据安全
  5. 实现离线缓存,网络恢复后上传

5.2 常见故障排查

串口问题
# 检查可用串口
ports = SerialPortManager.list_ports()
print("可用串口:", ports)# 测试串口是否可打开
try:with SerialPortManager(port, baudrate) as serial:print("串口正常")
except Exception as e:print(f"串口错误: {e}")
网络连接问题
import socketdef check_tcp_connection(host: str, port: int) -> bool:"""检查TCP连接是否可达"""try:sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.settimeout(3)result = sock.connect_ex((host, port))sock.close()return result == 0except:return False# 使用
if check_tcp_connection('192.168.1.100', 502):print("设备在线")
else:print("设备离线或端口不可达")

5.3 性能优化建议

  1. 批量操作: 合并多个读写请求,减少通信次数
  2. 数据缓存: 对不常变化的数据进行缓存
  3. 异步处理: 使用异步I/O提高吞吐量
  4. 连接复用: 保持长连接,避免频繁建立连接
  5. 数据压缩: 对大量数据进行压缩传输

5.4 安全性建议

  1. 加密通信: 使用TLS/SSL加密敏感数据
  2. 身份认证: 实现设备认证和用户认证
  3. 访问控制: 限制设备访问权限
  4. 输入验证: 验证所有输入数据,防止注入攻击
  5. 日志记录: 记录所有关键操作,便于审计

总结

本文档详细介绍了工业自动化领域的三大核心通信技术:

  • 串口控制: RS232/RS485通信、Modbus RTU协议
  • 网口控制: TCP/IP、UDP、Modbus TCP协议
  • 物联网控制: MQTT、HTTP RESTful API

通过C#和Python的完整实现代码,您可以快速构建工业自动化系统。记住:

  • 选择合适的通信方式
  • 实现可靠的错误处理
  • 注重系统的可维护性和扩展性
  • 重视数据安全和通信稳定性
http://www.dtcms.com/a/471414.html

相关文章:

  • NetworkPolicy详解
  • 郑州网站建设行情wordpress网站第一次打开慢
  • Python多进程编程核心组件详解:Event、Queue与进程生命周期管理
  • 真空共晶贴装技术
  • 添加SystemProperties的4种方法
  • 汕头建站平台免费推广网站入口2023燕
  • 深圳做棋牌网站建设有哪些公司海阳建设局网站
  • 网站优化大赛做电子商务网站需要什么软件
  • 重庆网站建设外贸加盟建筑公司办分公司
  • 用 “按位统计” 找唯一出现少于 3 次的数
  • 【解决】FAILED TO lOAD IDLINUX.c32
  • 去重表格的几种思路
  • 网站美工做的是什么合肥外贸网站建设公司排名
  • 用mitmproxy替代selenium-wire
  • 响应式网站怎么改成都住建局官网住建蓉e办
  • 参数传递:从字符串拼接到 qs 标准化时代
  • 清浦网站建设清河做网站
  • 中山企业网站建设公司网站内容seo
  • 如何快速建立网站装修无忧网
  • 网站建设尾款收取公司网站界面如何设计
  • 网站前端设计图投诉网站制作
  • linux 启动脚本rcS 及分区挂载分析
  • 快递公司网站怎么做贵州网站开发哪家便宜
  • 10大免费开源HR系统软件整理(含国内外对比)
  • 分布式架构 vs 微服务架构:从理念到落地的全面解析
  • 【Android】Android系统体系结构
  • 你使用的Nano Banana安全吗?
  • 移动微网站建设深圳做网站推广排名
  • 云岭建设集团的网站要修改wordpress目录下的文件权限
  • TCP/IP 协议族—理论与实践(二)