工业自动化通信控制
工业自动化通信控制
目录
- 串口通信控制
- 网口通信控制
- 物联网控制
- 综合应用案例
- 最佳实践与故障排查
串口通信控制
1.1 串口通信基础
串口通信是工业自动化中最常用的通信方式之一,主要包括RS232、RS485和RS422标准。
核心概念
- 波特率(Baud Rate): 数据传输速率,常用值:9600、19200、38400、115200
- 数据位(Data Bits): 通常为7位或8位
- 停止位(Stop Bits): 1位、1.5位或2位
- 校验位(Parity): 无校验(None)、奇校验(Odd)、偶校验(Even)
- 流控制(Flow Control): 硬件流控(RTS/CTS)或软件流控(XON/XOFF)
RS232 vs RS485
特性 | RS232 | RS485 |
---|---|---|
传输距离 | ≤15米 | ≤1200米 |
传输方式 | 全双工 | 半双工/全双工 |
设备数量 | 1对1 | 最多128个设备 |
抗干扰性 | 较弱 | 强(差分信号) |
应用场景 | 近距离点对点 | 工业现场总线 |
1.2 C# 串口通信实现
基础串口类封装
using System;
using System.IO.Ports;
using System.Text;
using System.Threading;namespace IndustrialAutomation
{public class SerialPortManager : IDisposable{private SerialPort _serialPort;private bool _isConnected = false;// 数据接收事件public event EventHandler<string> DataReceived;public event EventHandler<Exception> ErrorOccurred;/// <summary>/// 初始化串口/// </summary>public SerialPortManager(string portName, int baudRate = 9600, Parity parity = Parity.None, int dataBits = 8, StopBits stopBits = StopBits.One){_serialPort = new SerialPort{PortName = portName,BaudRate = baudRate,Parity = parity,DataBits = dataBits,StopBits = stopBits,Handshake = Handshake.None,ReadTimeout = 1000,WriteTimeout = 1000};_serialPort.DataReceived += SerialPort_DataReceived;_serialPort.ErrorReceived += SerialPort_ErrorReceived;}/// <summary>/// 打开串口/// </summary>public bool Open(){try{if (!_serialPort.IsOpen){_serialPort.Open();_isConnected = true;Console.WriteLine($"串口 {_serialPort.PortName} 已打开");return true;}return false;}catch (Exception ex){ErrorOccurred?.Invoke(this, ex);return false;}}/// <summary>/// 关闭串口/// </summary>public void Close(){if (_serialPort != null && _serialPort.IsOpen){_serialPort.Close();_isConnected = false;Console.WriteLine("串口已关闭");}}/// <summary>/// 发送字符串数据/// </summary>public bool SendString(string data){try{if (_serialPort.IsOpen){_serialPort.WriteLine(data);return true;}return false;}catch (Exception ex){ErrorOccurred?.Invoke(this, ex);return false;}}/// <summary>/// 发送字节数组(常用于Modbus等协议)/// </summary>public bool SendBytes(byte[] data){try{if (_serialPort.IsOpen){_serialPort.Write(data, 0, data.Length);return true;}return false;}catch (Exception ex){ErrorOccurred?.Invoke(this, ex);return false;}}/// <summary>/// 发送并等待响应/// </summary>public byte[] SendAndReceive(byte[] sendData, int expectedLength, int timeout = 1000){try{if (!_serialPort.IsOpen) return null;_serialPort.DiscardInBuffer();_serialPort.Write(sendData, 0, sendData.Length);Thread.Sleep(50); // 等待设备响应byte[] buffer = new byte[expectedLength];int bytesRead = _serialPort.Read(buffer, 0, expectedLength);if (bytesRead == expectedLength)return buffer;return null;}catch (Exception ex){ErrorOccurred?.Invoke(this, ex);return null;}}private void SerialPort_DataReceived(object sender, SerialDataReceivedEventArgs e){try{string data = _serialPort.ReadExisting();DataReceived?.Invoke(this, data);}catch (Exception ex){ErrorOccurred?.Invoke(this, ex);}}private void SerialPort_ErrorReceived(object sender, SerialErrorReceivedEventArgs e){ErrorOccurred?.Invoke(this, new Exception($"串口错误: {e.EventType}"));}public void Dispose(){Close();_serialPort?.Dispose();}}
}
Modbus RTU 协议实现
public class ModbusRTU
{private SerialPortManager _serialPort;public ModbusRTU(string portName, int baudRate = 9600){_serialPort = new SerialPortManager(portName, baudRate);}/// <summary>/// 读取保持寄存器 (功能码 0x03)/// </summary>public ushort[] ReadHoldingRegisters(byte slaveId, ushort startAddress, ushort quantity){// 构建请求报文byte[] request = new byte[8];request[0] = slaveId; // 从站地址request[1] = 0x03; // 功能码request[2] = (byte)(startAddress >> 8); // 起始地址高字节request[3] = (byte)(startAddress & 0xFF); // 起始地址低字节request[4] = (byte)(quantity >> 8); // 寄存器数量高字节request[5] = (byte)(quantity & 0xFF); // 寄存器数量低字节// 计算CRC校验ushort crc = CalculateCRC(request, 6);request[6] = (byte)(crc & 0xFF);request[7] = (byte)(crc >> 8);// 发送并接收响应int expectedLength = 5 + (quantity * 2);byte[] response = _serialPort.SendAndReceive(request, expectedLength, 1000);if (response == null || !ValidateCRC(response))throw new Exception("Modbus通信错误或CRC校验失败");// 解析数据ushort[] values = new ushort[quantity];for (int i = 0; i < quantity; i++){values[i] = (ushort)((response[3 + i * 2] << 8) | response[4 + i * 2]);}return values;}/// <summary>/// 写单个寄存器 (功能码 0x06)/// </summary>public bool WriteSingleRegister(byte slaveId, ushort address, ushort value){byte[] request = new byte[8];request[0] = slaveId;request[1] = 0x06;request[2] = (byte)(address >> 8);request[3] = (byte)(address & 0xFF);request[4] = (byte)(value >> 8);request[5] = (byte)(value & 0xFF);ushort crc = CalculateCRC(request, 6);request[6] = (byte)(crc & 0xFF);request[7] = (byte)(crc >> 8);byte[] response = _serialPort.SendAndReceive(request, 8, 1000);return response != null && ValidateCRC(response);}/// <summary>/// CRC16校验计算/// </summary>private ushort CalculateCRC(byte[] data, int length){ushort crc = 0xFFFF;for (int i = 0; i < length; i++){crc ^= data[i];for (int j = 0; j < 8; j++){if ((crc & 0x0001) != 0)crc = (ushort)((crc >> 1) ^ 0xA001);elsecrc >>= 1;}}return crc;}private bool ValidateCRC(byte[] data){if (data.Length < 2) return false;ushort receivedCRC = (ushort)(data[data.Length - 2] | (data[data.Length - 1] << 8));ushort calculatedCRC = CalculateCRC(data, data.Length - 2);return receivedCRC == calculatedCRC;}
}
1.3 Python 串口通信实现
基础串口操作
import serial
import serial.tools.list_ports
import time
import struct
from typing import Optional, Listclass SerialPortManager:"""串口管理器类"""def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0, **kwargs):"""初始化串口Args:port: 串口名称 (如 'COM1' 或 '/dev/ttyUSB0')baudrate: 波特率timeout: 超时时间(秒)**kwargs: 其他serial.Serial参数"""self.port = portself.baudrate = baudrateself.timeout = timeoutself.serial = Noneself.kwargs = kwargsdef open(self) -> bool:"""打开串口"""try:self.serial = serial.Serial(port=self.port,baudrate=self.baudrate,timeout=self.timeout,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,**self.kwargs)print(f"串口 {self.port} 已打开")return Trueexcept serial.SerialException as e:print(f"打开串口失败: {e}")return Falsedef close(self):"""关闭串口"""if self.serial and self.serial.is_open:self.serial.close()print("串口已关闭")def send_string(self, data: str, encoding: str = 'utf-8') -> bool:"""发送字符串"""try:if self.serial and self.serial.is_open:self.serial.write(data.encode(encoding))return Truereturn Falseexcept Exception as e:print(f"发送数据失败: {e}")return Falsedef send_bytes(self, data: bytes) -> bool:"""发送字节数据"""try:if self.serial and self.serial.is_open:self.serial.write(data)return Truereturn Falseexcept Exception as e:print(f"发送数据失败: {e}")return Falsedef read_bytes(self, size: int = 1) -> Optional[bytes]:"""读取指定字节数"""try:if self.serial and self.serial.is_open:return self.serial.read(size)return Noneexcept Exception as e:print(f"读取数据失败: {e}")return Nonedef read_until(self, terminator: bytes = b'\n') -> Optional[bytes]:"""读取直到遇到终止符"""try:if self.serial and self.serial.is_open:return self.serial.read_until(terminator)return Noneexcept Exception as e:print(f"读取数据失败: {e}")return Nonedef send_and_receive(self, send_data: bytes, expected_length: int, delay: float = 0.05) -> Optional[bytes]:"""发送数据并等待响应"""try:if not self.serial or not self.serial.is_open:return Noneself.serial.reset_input_buffer()self.serial.write(send_data)time.sleep(delay)response = self.serial.read(expected_length)return response if len(response) == expected_length else Noneexcept Exception as e:print(f"通信失败: {e}")return None@staticmethoddef list_ports() -> List[str]:"""列出所有可用串口"""ports = serial.tools.list_ports.comports()return [port.device for port in ports]def __enter__(self):self.open()return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.close()class ModbusRTU:"""Modbus RTU协议实现"""def __init__(self, port: str, baudrate: int = 9600):self.serial_manager = SerialPortManager(port, baudrate)def open(self):return self.serial_manager.open()def close(self):self.serial_manager.close()def read_holding_registers(self, slave_id: int, start_address: int, quantity: int) -> Optional[List[int]]:"""读取保持寄存器 (功能码 0x03)Args:slave_id: 从站地址start_address: 起始地址quantity: 寄存器数量Returns:寄存器值列表"""# 构建请求报文request = struct.pack('>BBHH',slave_id, # 从站地址0x03, # 功能码start_address, # 起始地址quantity # 寄存器数量)# 添加CRC校验crc = self._calculate_crc(request)request += struct.pack('<H', crc)# 发送并接收expected_length = 5 + (quantity * 2)response = self.serial_manager.send_and_receive(request, expected_length)if not response or not self._validate_crc(response):print("Modbus通信错误或CRC校验失败")return None# 解析数据values = []for i in range(quantity):offset = 3 + (i * 2)value = struct.unpack('>H', response[offset:offset+2])[0]values.append(value)return valuesdef write_single_register(self, slave_id: int, address: int, value: int) -> bool:"""写单个寄存器 (功能码 0x06)"""request = struct.pack('>BBHH',slave_id,0x06,address,value)crc = self._calculate_crc(request)request += struct.pack('<H', crc)response = self.serial_manager.send_and_receive(request, 8)return response is not None and self._validate_crc(response)def write_multiple_registers(self, slave_id: int,start_address: int,values: List[int]) -> bool:"""写多个寄存器 (功能码 0x10)"""quantity = len(values)byte_count = quantity * 2# 构建请求头request = struct.pack('>BBHHB',slave_id,0x10,start_address,quantity,byte_count)# 添加数据for value in values:request += struct.pack('>H', value)# 添加CRCcrc = self._calculate_crc(request)request += struct.pack('<H', crc)response = self.serial_manager.send_and_receive(request, 8)return response is not None and self._validate_crc(response)@staticmethoddef _calculate_crc(data: bytes) -> int:"""计算CRC16校验"""crc = 0xFFFFfor byte in data:crc ^= bytefor _ in range(8):if crc & 0x0001:crc = (crc >> 1) ^ 0xA001else:crc >>= 1return crc@staticmethoddef _validate_crc(data: bytes) -> bool:"""验证CRC校验"""if len(data) < 2:return Falsereceived_crc = struct.unpack('<H', data[-2:])[0]calculated_crc = ModbusRTU._calculate_crc(data[:-2])return received_crc == calculated_crc
使用示例
# 示例1: 基础串口通信
with SerialPortManager('/dev/ttyUSB0', 9600) as serial_mgr:# 发送字符串serial_mgr.send_string("Hello Device\n")# 读取响应response = serial_mgr.read_until(b'\n')print(f"收到: {response.decode()}")# 示例2: Modbus RTU通信
modbus = ModbusRTU('/dev/ttyUSB0', 9600)
modbus.open()try:# 读取从站1的保持寄存器values = modbus.read_holding_registers(slave_id=1, start_address=0, quantity=10)if values:print(f"读取到的寄存器值: {values}")# 写入单个寄存器success = modbus.write_single_register(slave_id=1, address=0, value=1234)print(f"写入{'成功' if success else '失败'}")finally:modbus.close()
网口通信控制
2.1 TCP/IP 通信
C# TCP 客户端实现
using System;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;namespace IndustrialAutomation.Network
{public class TcpClientManager : IDisposable{private TcpClient _client;private NetworkStream _stream;private bool _isConnected;private CancellationTokenSource _cancellationTokenSource;public event EventHandler<byte[]> DataReceived;public event EventHandler<Exception> ErrorOccurred;public event EventHandler Connected;public event EventHandler Disconnected;public string ServerIP { get; private set; }public int ServerPort { get; private set; }public bool IsConnected => _isConnected && _client?.Connected == true;public TcpClientManager(string serverIP, int serverPort){ServerIP = serverIP;ServerPort = serverPort;}/// <summary>/// 异步连接到服务器/// </summary>public async Task<bool> ConnectAsync(int timeout = 5000){try{_client = new TcpClient();var connectTask = _client.ConnectAsync(ServerIP, ServerPort);if (await Task.WhenAny(connectTask, Task.Delay(timeout)) == connectTask){_stream = _client.GetStream();_isConnected = true;_cancellationTokenSource = new CancellationTokenSource();// 启动接收线程_ = Task.Run(() => ReceiveDataAsync(_cancellationTokenSource.Token));Connected?.Invoke(this, EventArgs.Empty);Console.WriteLine($"已连接到 {ServerIP}:{ServerPort}");return true;}else{throw new TimeoutException("连接超时");}}catch (Exception ex){ErrorOccurred?.Invoke(this, ex);return false;}}/// <summary>/// 发送数据/// </summary>public async Task<bool> SendAsync(byte[] data){try{if (_stream != null && _stream.CanWrite){await _stream.WriteAsync(data, 0, data.Length);await _stream.FlushAsync();return true;}return false;}catch (Exception ex){ErrorOccurred?.Invoke(this, ex);return false;}}/// <summary>/// 发送字符串/// </summary>public Task<bool> SendStringAsync(string message, Encoding encoding = null){encoding = encoding ?? Encoding.UTF8;byte[] data = encoding.GetBytes(message);return SendAsync(data);}/// <summary>/// 发送并等待响应/// </summary>public async Task<byte[]> SendAndReceiveAsync(byte[] data, int expectedLength, int timeout = 3000){try{if (!IsConnected) return null;await SendAsync(data);byte[] buffer = new byte[expectedLength];var readTask = _stream.ReadAsync(buffer, 0, expectedLength);if (await Task.WhenAny(readTask, Task.Delay(timeout)) == readTask){int bytesRead = await readTask;if (bytesRead == expectedLength)return buffer;}return null;}catch (Exception ex){ErrorOccurred?.Invoke(this, ex);return null;}}private async Task ReceiveDataAsync(CancellationToken token){byte[] buffer = new byte[4096];try{while (!token.IsCancellationRequested && _stream != null){int bytesRead = await _stream.ReadAsync(buffer, 0, buffer.Length, token);if (bytesRead > 0){byte[] data = new byte[bytesRead];Array.Copy(buffer, data, bytesRead);DataReceived?.Invoke(this, data);}else{// 连接已断开break;}}}catch (Exception ex){if (!token.IsCancellationRequested)ErrorOccurred?.Invoke(this, ex);}finally{Disconnect();}}public void Disconnect(){_isConnected = false;_cancellationTokenSource?.Cancel();_stream?.Close();_client?.Close();Disconnected?.Invoke(this, EventArgs.Empty);Console.WriteLine("连接已断开");}public void Dispose(){Disconnect();_cancellationTokenSource?.Dispose();_stream?.Dispose();_client?.Dispose();}}
}
C# TCP 服务器实现
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;namespace IndustrialAutomation.Network
{public class TcpServerManager : IDisposable{private TcpListener _listener;private CancellationTokenSource _cancellationTokenSource;private ConcurrentDictionary<string, TcpClient> _clients;public event EventHandler<ClientConnectedEventArgs> ClientConnected;public event EventHandler<ClientDisconnectedEventArgs> ClientDisconnected;public event EventHandler<DataReceivedEventArgs> DataReceived;public int Port { get; private set; }public bool IsRunning { get; private set; }public TcpServerManager(int port){Port = port;_clients = new ConcurrentDictionary<string, TcpClient>();}public void Start(){try{_listener = new TcpListener(IPAddress.Any, Port);_listener.Start();_cancellationTokenSource = new CancellationTokenSource();IsRunning = true;_ = Task.Run(() => AcceptClientsAsync(_cancellationTokenSource.Token));Console.WriteLine($"TCP服务器已启动,监听端口: {Port}");}catch (Exception ex){Console.WriteLine($"启动服务器失败: {ex.Message}");}}private async Task AcceptClientsAsync(CancellationToken token){while (!token.IsCancellationRequested){try{TcpClient client = await _listener.AcceptTcpClientAsync();string clientId = client.Client.RemoteEndPoint.ToString();_clients.TryAdd(clientId, client);ClientConnected?.Invoke(this, new ClientConnectedEventArgs(clientId));_ = Task.Run(() => HandleClientAsync(client, clientId, token));}catch (Exception ex){if (!token.IsCancellationRequested)Console.WriteLine($"接受客户端连接失败: {ex.Message}");}}}private async Task HandleClientAsync(TcpClient client, string clientId, CancellationToken token){NetworkStream stream = client.GetStream();byte[] buffer = new byte[4096];try{while (!token.IsCancellationRequested && client.Connected){int bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length, token);if (bytesRead > 0){byte[] data = new byte[bytesRead];Array.Copy(buffer, data, bytesRead);DataReceived?.Invoke(this, new DataReceivedEventArgs(clientId, data));}else{break;}}}catch (Exception ex){Console.WriteLine($"处理客户端数据失败: {ex.Message}");}finally{_clients.TryRemove(clientId, out _);stream.Close();client.Close();ClientDisconnected?.Invoke(this, new ClientDisconnectedEventArgs(clientId));}}public async Task<bool> SendToClientAsync(string clientId, byte[] data){if (_clients.TryGetValue(clientId, out TcpClient client)){try{NetworkStream stream = client.GetStream();await stream.WriteAsync(data, 0, data.Length);return true;}catch (Exception ex){Console.WriteLine($"发送数据失败: {ex.Message}");}}return false;}public async Task BroadcastAsync(byte[] data){foreach (var client in _clients.Values){try{NetworkStream stream = client.GetStream();await stream.WriteAsync(data, 0, data.Length);}catch { }}}public void Stop(){IsRunning = false;_cancellationTokenSource?.Cancel();foreach (var client in _clients.Values){client.Close();}_clients.Clear();_listener?.Stop();Console.WriteLine("TCP服务器已停止");}public void Dispose(){Stop();_cancellationTokenSource?.Dispose();}}public class ClientConnectedEventArgs : EventArgs{public string ClientId { get; }public ClientConnectedEventArgs(string clientId) => ClientId = clientId;}public class ClientDisconnectedEventArgs : EventArgs{public string ClientId { get; }public ClientDisconnectedEventArgs(string clientId) => ClientId = clientId;}public class DataReceivedEventArgs : EventArgs{public string ClientId { get; }public byte[] Data { get; }public DataReceivedEventArgs(string clientId, byte[] data){ClientId = clientId;Data = data;}}
}
Python TCP 通信实现
import socket
import threading
import time
from typing import Optional, Callable, Dict
from queue import Queueclass TcpClient:"""TCP客户端"""def __init__(self, host: str, port: int, buffer_size: int = 4096):self.host = hostself.port = portself.buffer_size = buffer_sizeself.socket: Optional[socket.socket] = Noneself.is_connected = Falseself.receive_thread: Optional[threading.Thread] = Noneself.running = False# 回调函数self.on_data_received: Optional[Callable[[bytes], None]] = Noneself.on_connected: Optional[Callable[[], None]] = Noneself.on_disconnected: Optional[Callable[[], None]] = Noneself.on_error: Optional[Callable[[Exception], None]] = Nonedef connect(self, timeout: float = 5.0) -> bool:"""连接到服务器"""try:self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.socket.settimeout(timeout)self.socket.connect((self.host, self.port))self.socket.settimeout(None)self.is_connected = Trueself.running = True# 启动接收线程self.receive_thread = threading.Thread(target=self._receive_loop, daemon=True)self.receive_thread.start()print(f"已连接到 {self.host}:{self.port}")if self.on_connected:self.on_connected()return Trueexcept Exception as e:print(f"连接失败: {e}")if self.on_error:self.on_error(e)return Falsedef disconnect(self):"""断开连接"""self.running = Falseself.is_connected = Falseif self.socket:try:self.socket.close()except:passif self.receive_thread and self.receive_thread.is_alive():self.receive_thread.join(timeout=2)print("连接已断开")if self.on_disconnected:self.on_disconnected()def send(self, data: bytes) -> bool:"""发送数据"""try:if self.socket and self.is_connected:self.socket.sendall(data)return Truereturn Falseexcept Exception as e:print(f"发送数据失败: {e}")if self.on_error:self.on_error(e)self.disconnect()return Falsedef send_string(self, message: str, encoding: str = 'utf-8') -> bool:"""发送字符串"""return self.send(message.encode(encoding))def send_and_receive(self, data: bytes, timeout: float = 3.0) -> Optional[bytes]:"""发送数据并等待响应"""try:if not self.is_connected:return Noneself.socket.settimeout(timeout)self.socket.sendall(data)response = self.socket.recv(self.buffer_size)self.socket.settimeout(None)return response if response else Noneexcept socket.timeout:print("接收超时")self.socket.settimeout(None)return Noneexcept Exception as e:print(f"通信失败: {e}")if self.on_error:self.on_error(e)return Nonedef _receive_loop(self):"""接收数据循环"""while self.running:try:data = self.socket.recv(self.buffer_size)if data:if self.on_data_received:self.on_data_received(data)else:# 连接已关闭breakexcept Exception as e:if self.running:print(f"接收数据错误: {e}")if self.on_error:self.on_error(e)breakself.disconnect()def __enter__(self):self.connect()return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.disconnect()class TcpServer:"""TCP服务器"""def __init__(self, host: str = '0.0.0.0', port: int = 8888, buffer_size: int = 4096):self.host = hostself.port = portself.buffer_size = buffer_sizeself.server_socket: Optional[socket.socket] = Noneself.clients: Dict[str, socket.socket] = {}self.running = False# 回调函数self.on_client_connected: Optional[Callable[[str], None]] = Noneself.on_client_disconnected: Optional[Callable[[str], None]] = Noneself.on_data_received: Optional[Callable[[str, bytes], None]] = Nonedef start(self):"""启动服务器"""try:self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.server_socket.bind((self.host, self.port))self.server_socket.listen(5)self.running = Trueprint(f"TCP服务器已启动,监听 {self.host}:{self.port}")# 启动接受连接的线程accept_thread = threading.Thread(target=self._accept_loop, daemon=True)accept_thread.start()except Exception as e:print(f"启动服务器失败: {e}")def stop(self):"""停止服务器"""self.running = False# 关闭所有客户端连接for client_id, client_socket in list(self.clients.items()):try:client_socket.close()except:passself.clients.clear()# 关闭服务器套接字if self.server_socket:try:self.server_socket.close()except:passprint("TCP服务器已停止")def _accept_loop(self):"""接受客户端连接循环"""while self.running:try:client_socket, client_address = self.server_socket.accept()client_id = f"{client_address[0]}:{client_address[1]}"self.clients[client_id] = client_socketprint(f"客户端已连接: {client_id}")if self.on_client_connected:self.on_client_connected(client_id)# 为每个客户端启动处理线程client_thread = threading.Thread(target=self._handle_client,args=(client_socket, client_id),daemon=True)client_thread.start()except Exception as e:if self.running:print(f"接受客户端连接失败: {e}")def _handle_client(self, client_socket: socket.socket, client_id: str):"""处理客户端数据"""try:while self.running:data = client_socket.recv(self.buffer_size)if data:if self.on_data_received:self.on_data_received(client_id, data)else:breakexcept Exception as e:print(f"处理客户端数据失败: {e}")finally:# 清理客户端if client_id in self.clients:del self.clients[client_id]try:client_socket.close()except:passprint(f"客户端已断开: {client_id}")if self.on_client_disconnected:self.on_client_disconnected(client_id)def send_to_client(self, client_id: str, data: bytes) -> bool:"""发送数据到指定客户端"""if client_id in self.clients:try:self.clients[client_id].sendall(data)return Trueexcept Exception as e:print(f"发送数据失败: {e}")return Falsereturn Falsedef broadcast(self, data: bytes):"""广播数据到所有客户端"""for client_id, client_socket in list(self.clients.items()):try:client_socket.sendall(data)except:pass
2.2 UDP 通信实现
Python UDP 实现
import socket
import threading
from typing import Optional, Callable, Tupleclass UdpCommunicator:"""UDP通信器(可用作客户端或服务器)"""def __init__(self, local_port: int = 0, buffer_size: int = 4096):"""初始化UDP通信器Args:local_port: 本地绑定端口,0表示自动分配buffer_size: 接收缓冲区大小"""self.local_port = local_portself.buffer_size = buffer_sizeself.socket: Optional[socket.socket] = Noneself.running = Falseself.receive_thread: Optional[threading.Thread] = None# 回调函数self.on_data_received: Optional[Callable[[bytes, Tuple[str, int]], None]] = Nonedef start(self):"""启动UDP通信"""try:self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.socket.bind(('0.0.0.0', self.local_port))# 获取实际绑定的端口self.local_port = self.socket.getsockname()[1]self.running = Trueself.receive_thread = threading.Thread(target=self._receive_loop, daemon=True)self.receive_thread.start()print(f"UDP通信已启动,监听端口: {self.local_port}")except Exception as e:print(f"启动UDP通信失败: {e}")def stop(self):"""停止UDP通信"""self.running = Falseif self.socket:try:self.socket.close()except:passif self.receive_thread and self.receive_thread.is_alive():self.receive_thread.join(timeout=2)print("UDP通信已停止")def send(self, data: bytes, address: Tuple[str, int]) -> bool:"""发送数据Args:data: 要发送的数据address: 目标地址 (host, port)"""try:if self.socket:self.socket.sendto(data, address)return Truereturn Falseexcept Exception as e:print(f"发送UDP数据失败: {e}")return Falsedef send_string(self, message: str, address: Tuple[str, int], encoding: str = 'utf-8') -> bool:"""发送字符串"""return self.send(message.encode(encoding), address)def send_broadcast(self, data: bytes, port: int):"""发送广播消息"""try:self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)self.socket.sendto(data, ('<broadcast>', port))return Trueexcept Exception as e:print(f"发送广播失败: {e}")return Falsedef _receive_loop(self):"""接收数据循环"""while self.running:try:data, address = self.socket.recvfrom(self.buffer_size)if self.on_data_received:self.on_data_received(data, address)except Exception as e:if self.running:print(f"接收UDP数据错误: {e}")def __enter__(self):self.start()return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.stop()
2.3 Modbus TCP 实现
import struct
from typing import Optional, Listclass ModbusTCP:"""Modbus TCP协议实现"""def __init__(self, host: str, port: int = 502):self.tcp_client = TcpClient(host, port)self.transaction_id = 0def connect(self) -> bool:return self.tcp_client.connect()def disconnect(self):self.tcp_client.disconnect()def read_holding_registers(self, slave_id: int, start_address: int, quantity: int) -> Optional[List[int]]:"""读取保持寄存器"""self.transaction_id += 1# 构建Modbus TCP请求request = struct.pack('>HHHBBHH',self.transaction_id, # 事务标识符0, # 协议标识符6, # 长度slave_id, # 单元标识符0x03, # 功能码start_address, # 起始地址quantity # 数量)# 发送并接收response = self.tcp_client.send_and_receive(request, timeout=3.0)if not response or len(response) < 9:return None# 解析响应trans_id, proto_id, length, unit_id, func_code, byte_count = struct.unpack('>HHHBBB', response[:9])if func_code != 0x03 or trans_id != self.transaction_id:return None# 提取寄存器值values = []for i in range(quantity):offset = 9 + (i * 2)if offset + 2 <= len(response):value = struct.unpack('>H', response[offset:offset+2])[0]values.append(value)return valuesdef write_single_register(self, slave_id: int, address: int, value: int) -> bool:"""写单个寄存器"""self.transaction_id += 1request = struct.pack('>HHHBBHH',self.transaction_id,0,6,slave_id,0x06,address,value)response = self.tcp_client.send_and_receive(request, timeout=3.0)return response is not None and len(response) >= 12def write_multiple_registers(self, slave_id: int, start_address: int, values: List[int]) -> bool:"""写多个寄存器"""self.transaction_id += 1quantity = len(values)byte_count = quantity * 2# 构建请求头request = struct.pack('>HHHBBHHB',self.transaction_id,0,7 + byte_count,slave_id,0x10,start_address,quantity,byte_count)# 添加数据for value in values:request += struct.pack('>H', value)response = self.tcp_client.send_and_receive(request, timeout=3.0)return response is not None and len(response) >= 12
物联网控制
3.1 MQTT 协议实现
C# MQTT 客户端(使用MQTTnet库)
using MQTTnet;
using MQTTnet.Client;
using System;
using System.Text;
using System.Threading.Tasks;namespace IndustrialAutomation.IoT
{public class MqttClientManager : IDisposable{private IMqttClient _mqttClient;private MqttClientOptions _options;public event EventHandler<string> MessageReceived;public event EventHandler Connected;public event EventHandler Disconnected;public bool IsConnected => _mqttClient?.IsConnected ?? false;public MqttClientManager(string broker, int port = 1883, string clientId = null){var factory = new MqttFactory();_mqttClient = factory.CreateMqttClient();clientId = clientId ?? $"Client_{Guid.NewGuid():N}";_options = new MqttClientOptionsBuilder().WithTcpServer(broker, port).WithClientId(clientId).WithCleanSession().Build();_mqttClient.ConnectedAsync += OnConnectedAsync;_mqttClient.DisconnectedAsync += OnDisconnectedAsync;_mqttClient.ApplicationMessageReceivedAsync += OnMessageReceivedAsync;}public async Task<bool> ConnectAsync(string username = null, string password = null){try{if (!string.IsNullOrEmpty(username)){_options = new MqttClientOptionsBuilder(_options).WithCredentials(username, password).Build();}await _mqttClient.ConnectAsync(_options);return true;}catch (Exception ex){Console.WriteLine($"MQTT连接失败: {ex.Message}");return false;}}public async Task DisconnectAsync(){if (_mqttClient.IsConnected){await _mqttClient.DisconnectAsync();}}public async Task<bool> SubscribeAsync(string topic, int qos = 1){try{var subscribeOptions = new MqttClientSubscribeOptionsBuilder().WithTopicFilter(f => f.WithTopic(topic).WithQualityOfServiceLevel((MQTTnet.Protocol.MqttQualityOfServiceLevel)qos)).Build();await _mqttClient.SubscribeAsync(subscribeOptions);Console.WriteLine($"已订阅主题: {topic}");return true;}catch (Exception ex){Console.WriteLine($"订阅失败: {ex.Message}");return false;}}public async Task<bool> UnsubscribeAsync(string topic){try{var unsubscribeOptions = new MqttClientUnsubscribeOptionsBuilder().WithTopicFilter(topic).Build();await _mqttClient.UnsubscribeAsync(unsubscribeOptions);Console.WriteLine($"已取消订阅: {topic}");return true;}catch (Exception ex){Console.WriteLine($"取消订阅失败: {ex.Message}");return false;}}public async Task<bool> PublishAsync(string topic, string payload, int qos = 1, bool retain = false){try{var message = new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(payload).WithQualityOfServiceLevel((MQTTnet.Protocol.MqttQualityOfServiceLevel)qos).WithRetainFlag(retain).Build();await _mqttClient.PublishAsync(message);return true;}catch (Exception ex){Console.WriteLine($"发布消息失败: {ex.Message}");return false;}}public async Task<bool> PublishJsonAsync<T>(string topic, T data, int qos = 1){string json = System.Text.Json.JsonSerializer.Serialize(data);return await PublishAsync(topic, json, qos);}private Task OnConnectedAsync(MqttClientConnectedEventArgs arg){Console.WriteLine("MQTT已连接");Connected?.Invoke(this, EventArgs.Empty);return Task.CompletedTask;}private Task OnDisconnectedAsync(MqttClientDisconnectedEventArgs arg){Console.WriteLine("MQTT已断开");Disconnected?.Invoke(this, EventArgs.Empty);return Task.CompletedTask;}private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg){string payload = Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment);string topic = arg.ApplicationMessage.Topic;Console.WriteLine($"收到消息 - 主题: {topic}, 内容: {payload}");MessageReceived?.Invoke(this, payload);return Task.CompletedTask;}public void Dispose(){_mqttClient?.Dispose();}}
}
Python MQTT 客户端(使用paho-mqtt库)
import paho.mqtt.client as mqtt
import json
from typing import Optional, Callable, Any
import timeclass MqttClient:"""MQTT客户端封装"""def __init__(self, broker: str, port: int = 1883, client_id: Optional[str] = None):"""初始化MQTT客户端Args:broker: MQTT代理服务器地址port: 端口号client_id: 客户端ID,None则自动生成"""self.broker = brokerself.port = portself.client_id = client_id or f"client_{int(time.time())}"self.client = mqtt.Client(client_id=self.client_id)self.is_connected = False# 设置回调self.client.on_connect = self._on_connectself.client.on_disconnect = self._on_disconnectself.client.on_message = self._on_messageself.client.on_subscribe = self._on_subscribeself.client.on_publish = self._on_publish# 用户回调函数self.on_message_callback: Optional[Callable[[str, str], None]] = Noneself.on_connected_callback: Optional[Callable[[], None]] = Noneself.on_disconnected_callback: Optional[Callable[[], None]] = Nonedef connect(self, username: Optional[str] = None, password: Optional[str] = None, keepalive: int = 60) -> bool:"""连接到MQTT代理"""try:if username and password:self.client.username_pw_set(username, password)self.client.connect(self.broker, self.port, keepalive)self.client.loop_start() # 启动网络循环# 等待连接建立timeout = 10start_time = time.time()while not self.is_connected and (time.time() - start_time) < timeout:time.sleep(0.1)return self.is_connectedexcept Exception as e:print(f"MQTT连接失败: {e}")return Falsedef disconnect(self):"""断开连接"""self.client.loop_stop()self.client.disconnect()def subscribe(self, topic: str, qos: int = 1):"""订阅主题"""self.client.subscribe(topic, qos)print(f"已订阅主题: {topic}")def unsubscribe(self, topic: str):"""取消订阅"""self.client.unsubscribe(topic)print(f"已取消订阅: {topic}")def publish(self, topic: str, payload: str, qos: int = 1, retain: bool = False) -> bool:"""发布消息"""try:result = self.client.publish(topic, payload, qos, retain)return result.rc == mqtt.MQTT_ERR_SUCCESSexcept Exception as e:print(f"发布消息失败: {e}")return Falsedef publish_json(self, topic: str, data: Any, qos: int = 1, retain: bool = False) -> bool:"""发布JSON数据"""try:payload = json.dumps(data, ensure_ascii=False)return self.publish(topic, payload, qos, retain)except Exception as e:print(f"发布JSON失败: {e}")return Falsedef _on_connect(self, client, userdata, flags, rc):"""连接回调"""if rc == 0:self.is_connected = Trueprint("MQTT已连接")if self.on_connected_callback:self.on_connected_callback()else:print(f"MQTT连接失败,错误码: {rc}")def _on_disconnect(self, client, userdata, rc):"""断开连接回调"""self.is_connected = Falseprint("MQTT已断开")if self.on_disconnected_callback:self.on_disconnected_callback()def _on_message(self, client, userdata, msg):"""消息接收回调"""topic = msg.topicpayload = msg.payload.decode('utf-8')print(f"收到消息 - 主题: {topic}, 内容: {payload}")if self.on_message_callback:self.on_message_callback(topic, payload)def _on_subscribe(self, client, userdata, mid, granted_qos):"""订阅确认回调"""print(f"订阅确认,QoS: {granted_qos}")def _on_publish(self, client, userdata, mid):"""发布确认回调"""passdef __enter__(self):self.connect()return selfdef __exit__(self, exc_type, exc_val, exc_tb):self.disconnect()
3.2 HTTP RESTful API 实现
Python HTTP客户端(用于物联网平台API)
import requests
import json
from typing import Optional, Dict, Any
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retryclass IoTApiClient:"""物联网平台HTTP API客户端"""def __init__(self, base_url: str, api_key: Optional[str] = None, timeout: int = 30):"""初始化API客户端Args:base_url: API基础URLapi_key: API密钥timeout: 请求超时时间"""self.base_url = base_url.rstrip('/')self.api_key = api_keyself.timeout = timeout# 创建会话self.session = requests.Session()# 设置重试策略retry_strategy = Retry(total=3,backoff_factor=1,status_forcelist=[429, 500, 502, 503, 504],allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"])adapter = HTTPAdapter(max_retries=retry_strategy)self.session.mount("http://", adapter)self.session.mount("https://", adapter)# 设置默认请求头self.session.headers.update({'Content-Type': 'application/json','Accept': 'application/json'})if api_key:self.session.headers.update({'Authorization': f'Bearer {api_key}'})def get(self, endpoint: str, params: Optional[Dict] = None) -> Optional[Dict]:"""GET请求"""try:url = f"{self.base_url}/{endpoint.lstrip('/')}"response = self.session.get(url, params=params, timeout=self.timeout)response.raise_for_status()return response.json()except requests.exceptions.RequestException as e:print(f"GET请求失败: {e}")return Nonedef post(self, endpoint: str, data: Optional[Dict] = None) -> Optional[Dict]:"""POST请求"""try:url = f"{self.base_url}/{endpoint.lstrip('/')}"response = self.session.post(url, json=data, timeout=self.timeout)response.raise_for_status()return response.json()except requests.exceptions.RequestException as e:print(f"POST请求失败: {e}")return Nonedef put(self, endpoint: str, data: Optional[Dict] = None) -> Optional[Dict]:"""PUT请求"""try:url = f"{self.base_url}/{endpoint.lstrip('/')}"response = self.session.put(url, json=data, timeout=self.timeout)response.raise_for_status()return response.json()except requests.exceptions.RequestException as e:print(f"PUT请求失败: {e}")return Nonedef delete(self, endpoint: str) -> bool:"""DELETE请求"""try:url = f"{self.base_url}/{endpoint.lstrip('/')}"response = self.session.delete(url, timeout=self.timeout)response.raise_for_status()return Trueexcept requests.exceptions.RequestException as e:print(f"DELETE请求失败: {e}")return False# 设备管理API示例def get_device(self, device_id: str) -> Optional[Dict]:"""获取设备信息"""return self.get(f"/devices/{device_id}")def get_devices(self, page: int = 1, page_size: int = 20) -> Optional[Dict]:"""获取设备列表"""return self.get("/devices", params={"page": page, "pageSize": page_size})def create_device(self, device_data: Dict) -> Optional[Dict]:"""创建设备"""return self.post("/devices", data=device_data)def update_device(self, device_id: str, device_data: Dict) -> Optional[Dict]:"""更新设备"""return self.put(f"/devices/{device_id}", data=device_data)def delete_device(self, device_id: str) -> bool:"""删除设备"""return self.delete(f"/devices/{device_id}")def send_command(self, device_id: str, command: str, params: Optional[Dict] = None) -> Optional[Dict]:"""向设备发送命令"""data = {"command": command,"params": params or {}}return self.post(f"/devices/{device_id}/commands", data=data)def get_telemetry(self, device_id: str, start_time: Optional[str] = None, end_time: Optional[str] = None) -> Optional[Dict]:"""获取设备遥测数据"""params = {}if start_time:params['startTime'] = start_timeif end_time:params['endTime'] = end_timereturn self.get(f"/devices/{device_id}/telemetry", params=params)
综合应用案例
4.1 智能工厂数据采集系统
import threading
import time
import queue
from datetime import datetime
from typing import Dict, Listclass IndustrialDataCollector:"""工业数据采集器 - 综合串口、网口、物联网"""def __init__(self):# 数据队列self.data_queue = queue.Queue()# 通信管理器self.modbus_rtu = Noneself.modbus_tcp = Noneself.mqtt_client = Noneself.api_client = None# 运行状态self.running = Falseself.threads: List[threading.Thread] = []def initialize(self, config: Dict):"""初始化所有通信接口"""# 初始化Modbus RTU(串口)if 'modbus_rtu' in config:cfg = config['modbus_rtu']self.modbus_rtu = ModbusRTU(cfg['port'], cfg['baudrate'])self.modbus_rtu.open()print("Modbus RTU已初始化")# 初始化Modbus TCP(网口)if 'modbus_tcp' in config:cfg = config['modbus_tcp']self.modbus_tcp = ModbusTCP(cfg['host'], cfg['port'])self.modbus_tcp.connect()print("Modbus TCP已初始化")# 初始化MQTT(物联网)if 'mqtt' in config:cfg = config['mqtt']self.mqtt_client = MqttClient(cfg['broker'], cfg['port'])self.mqtt_client.connect(cfg.get('username'), cfg.get('password'))self.mqtt_client.subscribe(cfg['subscribe_topic'])# 设置MQTT消息回调self.mqtt_client.on_message_callback = self._on_mqtt_messageprint("MQTT已初始化")# 初始化HTTP API(云平台)if 'api' in config:cfg = config['api']self.api_client = IoTApiClient(cfg['base_url'], cfg['api_key'])print("API客户端已初始化")def start(self):"""启动数据采集"""self.running = True# 启动各个数据采集线程if self.modbus_rtu:thread = threading.Thread(target=self._collect_modbus_rtu_data, daemon=True)thread.start()self.threads.append(thread)if self.modbus_tcp:thread = threading.Thread(target=self._collect_modbus_tcp_data, daemon=True)thread.start()self.threads.append(thread)# 启动数据处理线程thread = threading.Thread(target=self._process_data, daemon=True)thread.start()self.threads.append(thread)print("数据采集已启动")def stop(self):"""停止数据采集"""self.running = False# 等待线程结束for thread in self.threads:thread.join(timeout=2)# 关闭所有连接if self.modbus_rtu:self.modbus_rtu.close()if self.modbus_tcp:self.modbus_tcp.disconnect()if self.mqtt_client:self.mqtt_client.disconnect()print("数据采集已停止")def _collect_modbus_rtu_data(self):"""采集Modbus RTU数据(串口设备)"""while self.running:try:# 读取传感器数据values = self.modbus_rtu.read_holding_registers(slave_id=1,start_address=0,quantity=10)if values:data = {'source': 'modbus_rtu','timestamp': datetime.now().isoformat(),'device_id': 'sensor_001','data': {'temperature': values[0] / 10.0,'humidity': values[1] / 10.0,'pressure': values[2] / 100.0,# 更多传感器数据...}}self.data_queue.put(data)time.sleep(1) # 1秒采集一次except Exception as e:print(f"Modbus RTU采集错误: {e}")time.sleep(5)def _collect_modbus_tcp_data(self):"""采集Modbus TCP数据(网口PLC)"""while self.running:try:# 读取PLC数据values = self.modbus_tcp.read_holding_registers(slave_id=1,start_address=0,quantity=20)if values:data = {'source': 'modbus_tcp','timestamp': datetime.now().isoformat(),'device_id': 'plc_001','data': {'motor_speed': values[0],'motor_current': values[1] / 100.0,'production_count': values[2],'alarm_status': values[3],# 更多PLC数据...}}self.data_queue.put(data)time.sleep(0.5) # 500ms采集一次except Exception as e:print(f"Modbus TCP采集错误: {e}")time.sleep(5)def _on_mqtt_message(self, topic: str, payload: str):"""处理MQTT消息"""try:import jsonmessage_data = json.loads(payload)data = {'source': 'mqtt','timestamp': datetime.now().isoformat(),'topic': topic,'data': message_data}self.data_queue.put(data)except Exception as e:print(f"MQTT消息处理错误: {e}")def _process_data(self):"""处理采集的数据"""while self.running:try:# 从队列获取数据(阻塞,超时1秒)data = self.data_queue.get(timeout=1)# 数据处理逻辑print(f"处理数据: {data['source']} - {data['device_id'] if 'device_id' in data else data.get('topic', 'N/A')}")# 1. 本地存储(可选)self._store_locally(data)# 2. 上传到云平台if self.api_client:self._upload_to_cloud(data)# 3. 通过MQTT发布(可选)if self.mqtt_client:self._publish_to_mqtt(data)# 4. 实时分析和告警self._analyze_and_alert(data)except queue.Empty:continueexcept Exception as e:print(f"数据处理错误: {e}")def _store_locally(self, data: Dict):"""本地存储数据"""# 这里可以存储到SQLite、CSV等passdef _upload_to_cloud(self, data: Dict):"""上传数据到云平台"""try:result = self.api_client.post('/telemetry', data=data)if result:print(f"数据已上传到云平台")except Exception as e:print(f"上传云平台失败: {e}")def _publish_to_mqtt(self, data: Dict):"""通过MQTT发布数据"""try:topic = f"factory/data/{data.get('device_id', 'unknown')}"self.mqtt_client.publish_json(topic, data)except Exception as e:print(f"MQTT发布失败: {e}")def _analyze_and_alert(self, data: Dict):"""数据分析和告警"""# 实现告警逻辑if 'data' in data:values = data['data']# 温度告警if 'temperature' in values and values['temperature'] > 80:print(f"⚠️ 温度告警: {values['temperature']}°C")# 发送告警...# 其他告警逻辑...# 使用示例
if __name__ == "__main__":# 配置config = {'modbus_rtu': {'port': '/dev/ttyUSB0','baudrate': 9600},'modbus_tcp': {'host': '192.168.1.100','port': 502},'mqtt': {'broker': 'mqtt.example.com','port': 1883,'username': 'user','password': 'pass','subscribe_topic': 'factory/commands/#'},'api': {'base_url': 'https://api.iot-platform.com','api_key': 'your_api_key_here'}}# 创建采集器collector = IndustrialDataCollector()collector.initialize(config)collector.start()try:# 运行采集while True:time.sleep(1)except KeyboardInterrupt:print("\n正在停止...")collector.stop()
最佳实践与故障排查
5.1 通信最佳实践
串口通信
- 始终添加超时机制,避免程序挂起
- 使用CRC或校验位确保数据完整性
- 实现重试机制,处理通信失败
- 清空缓冲区,避免旧数据干扰
- 使用线程或异步,避免阻塞主程序
网口通信
- 使用心跳包保持连接活跃
- 实现断线重连机制
- 限制并发连接数
- 使用连接池提高效率
- 设置合理的超时时间
物联网通信
- 使用QoS机制保证消息可靠性
- 实现消息去重
- 压缩大数据,减少带宽
- 使用TLS加密保护数据安全
- 实现离线缓存,网络恢复后上传
5.2 常见故障排查
串口问题
# 检查可用串口
ports = SerialPortManager.list_ports()
print("可用串口:", ports)# 测试串口是否可打开
try:with SerialPortManager(port, baudrate) as serial:print("串口正常")
except Exception as e:print(f"串口错误: {e}")
网络连接问题
import socketdef check_tcp_connection(host: str, port: int) -> bool:"""检查TCP连接是否可达"""try:sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.settimeout(3)result = sock.connect_ex((host, port))sock.close()return result == 0except:return False# 使用
if check_tcp_connection('192.168.1.100', 502):print("设备在线")
else:print("设备离线或端口不可达")
5.3 性能优化建议
- 批量操作: 合并多个读写请求,减少通信次数
- 数据缓存: 对不常变化的数据进行缓存
- 异步处理: 使用异步I/O提高吞吐量
- 连接复用: 保持长连接,避免频繁建立连接
- 数据压缩: 对大量数据进行压缩传输
5.4 安全性建议
- 加密通信: 使用TLS/SSL加密敏感数据
- 身份认证: 实现设备认证和用户认证
- 访问控制: 限制设备访问权限
- 输入验证: 验证所有输入数据,防止注入攻击
- 日志记录: 记录所有关键操作,便于审计
总结
本文档详细介绍了工业自动化领域的三大核心通信技术:
- 串口控制: RS232/RS485通信、Modbus RTU协议
- 网口控制: TCP/IP、UDP、Modbus TCP协议
- 物联网控制: MQTT、HTTP RESTful API
通过C#和Python的完整实现代码,您可以快速构建工业自动化系统。记住:
- 选择合适的通信方式
- 实现可靠的错误处理
- 注重系统的可维护性和扩展性
- 重视数据安全和通信稳定性