当前位置: 首页 > news >正文

内嵌式mqtt server

添加moquette依赖

 <dependency><groupId>io.moquette</groupId><artifactId>moquette-broker</artifactId><version>0.17</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-reload4j</artifactId></exclusion></exclusions>
</dependency>

配置文件类MoquetteProperties

package com.mqtt.config;import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Data
@Configuration
public class MoquetteProperties {@Value("${mqtt.port:1883}")private String mqttPort;@Value("${mqtt.host:0.0.0.0}")private String mqttHost;@Value("${mqtt.allow_anonymous:false}")private String allowAnonymous;@Value("${mqtt.username:admin}")private String username;@Value("${mqtt.password:moque123432}")private String password;}

权限认证类CustomAuthenticator

package com.mqtt.interceptor;import com.mqtt.config.MoquetteProperties;
import io.moquette.broker.security.IAuthenticator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class CustomAuthenticator implements IAuthenticator {@Autowiredprivate MoquetteProperties moquetteProperties;@Overridepublic boolean checkValid(String clientId, String username, byte[] password) {String passwordStr = new String(password);if (moquetteProperties.getUsername().equals(username)&& moquetteProperties.getPassword().equals(passwordStr)) {return true;}log.error("CustomAuthenticator checkValid: 用户名或密码错误");return false;}
}

消息拦截类MqttMessageInterceptor

package com.mqtt.interceptor;import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.netty.buffer.ByteBuf;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;@Slf4j
@Component
public class MqttMessageInterceptor extends AbstractInterceptHandler {@Overridepublic String getID() {return MqttMessageInterceptor.class.getName();}@Overridepublic void onPublish(InterceptPublishMessage msg) {String clientId = msg.getClientID();String topic = msg.getTopicName();// 获取消息的有效载荷ByteBuf payload = msg.getPayload();String content = safeReadByteBuf(payload);log.info("MqttMessageInterceptor Received message - Client: {}, Topic: {}, Payload: {}",clientId, topic, content);}@Overridepublic void onSessionLoopError(Throwable error) {log.error("MqttMessageInterceptor onSessionLoopError", error);}/*** 安全读取 ByteBuf 数据*/private String safeReadByteBuf(ByteBuf byteBuf) {try {if (byteBuf == null || !byteBuf.isReadable()) {return "";}if (byteBuf.hasArray()) {// 堆内缓冲区byte[] array = byteBuf.array();int offset = byteBuf.arrayOffset() + byteBuf.readerIndex();int length = byteBuf.readableBytes();return new String(array, offset, length, StandardCharsets.UTF_8);} else {// 堆外缓冲区byte[] array = new byte[byteBuf.readableBytes()];byteBuf.getBytes(byteBuf.readerIndex(), array);return new String(array, StandardCharsets.UTF_8);}} finally {// 确保释放 ByteBuf 资源if (byteBuf != null && byteBuf.refCnt() > 0) {byteBuf.release();}}}
}

MQTT Server类MoquetteBrokerConfig

参考
https://github.com/mochi-mqtt/server/blob/main/examples/hooks/main.go#L22

package com.mqtt.config;import com.mqtt.interceptor.CustomAuthenticator;
import com.mqtt.interceptor.MqttMessageInterceptor;
import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
import io.moquette.interception.InterceptHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.io.IOException;
import java.util.List;
import java.util.Properties;@Slf4j
@Configuration
public class MoquetteBrokerConfig {@Autowiredprivate MoquetteProperties moquetteProperties;@Bean(destroyMethod = "stopServer")public Server mqttBroker(MqttMessageInterceptor interceptor, CustomAuthenticator customAuthenticator) {// 创建 Moquette 的配置Properties properties = new Properties();// 设置监听端口为 1883properties.setProperty(IConfig.PORT_PROPERTY_NAME, moquetteProperties.getMqttPort());// 监听所有网络接口properties.setProperty(IConfig.HOST_PROPERTY_NAME, moquetteProperties.getMqttHost());// 允许匿名连接properties.setProperty(IConfig.ALLOW_ANONYMOUS_PROPERTY_NAME, moquetteProperties.getAllowAnonymous());IConfig config = new MemoryConfig(properties);// 初始化 Moquette 服务器Server mqttServer = new Server();List<InterceptHandler> handlers = List.of(interceptor);try {mqttServer.startServer(config, handlers, null, customAuthenticator, null);} catch (IOException e) {log.error("启动内嵌式MQTT服务器失败", e);throw new RuntimeException(e);}log.info("内嵌式MQTT服务器已启动,监听端口: 1883");return mqttServer;}
}

go 内嵌mqtt

配置

package constantsconst (MqttUser     = "admin"MqttPassword = "bydmqtt123432"MqttPort     = "1883"
)

MQTT Server

package mqttimport ("bytes""fmt""iaas-server-manager/constants""github.com/google/uuid""github.com/labstack/gommon/log"mqtt "github.com/mochi-mqtt/server/v2""github.com/mochi-mqtt/server/v2/hooks/auth""github.com/mochi-mqtt/server/v2/listeners""github.com/mochi-mqtt/server/v2/packets"
)// CustomAuthHook 实现自定义认证
type CustomAuthHook struct {auth.Hook
}func (h *CustomAuthHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool {username := pk.Connect.Usernamepassword := pk.Connect.Passwordif len(username) == 0 || len(password) == 0 {return false}// 自定义认证逻辑if string(username) == constants.MqttUser && string(password) == constants.MqttPassword {return true}// 或者检查数据库等外部系统// user := db.GetUser(pk.Username)// return user != nil && user.CheckPassword(pk.Password)return false
}// 自定义 Hook 实现
// https://github.com/mochi-mqtt/server/blob/main/examples/hooks/main.go#L22
type MqttCustomHook struct {mqtt.HookBase
}func (h *MqttCustomHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {log.Info("client connected", "client", cl.ID)return nil
}func (h *MqttCustomHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {if err != nil {log.Info("client disconnected", "client", cl.ID, "expire", expire, "error", err)} else {log.Info("client disconnected", "client", cl.ID, "expire", expire)}
}// OnPacketRead 在读取数据包时触发
func (h *MqttCustomHook) OnPacketRead(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {switch pk.FixedHeader.Type {case packets.Publish:log.Printf("收到PUBLISH消息 clientId = %s, TopicName = %s, Payload = %s", cl.ID, pk.TopicName, string(pk.Payload))case packets.Connect:log.Printf("客户端连接clientId = %s", cl.ID)case packets.Subscribe:log.Printf("客户端订阅clientId = %s", cl.ID)}return pk, nil // 继续处理数据包
}func (h *MqttCustomHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) {log.Printf("subscribed clientId=%s, qos=%v", cl.ID, "filters", reasonCodes)
}func (h *MqttCustomHook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {log.Printf("unsubscribed clientId=%s", cl.ID)
}func (h *MqttCustomHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {log.Printf("received from clientId=%s, payload=%s", cl.ID, string(pk.Payload))return pk, nil
}func (h *MqttCustomHook) OnPublished(cl *mqtt.Client, pk packets.Packet) {log.Printf("published to clientId=%s, payload=%s", cl.ID, string(pk.Payload))
}func (h *MqttCustomHook) Provides(b byte) bool {return bytes.Contains([]byte{mqtt.OnConnect,mqtt.OnDisconnect,mqtt.OnSubscribed,mqtt.OnUnsubscribed,mqtt.OnPublished,mqtt.OnPublish,mqtt.OnPacketRead,}, []byte{b})
}type MqttServer struct {
}// https://github.com/mochi-mqtt/server/blob/main/README-CN.md
// 启动MqttServer
func (ms *MqttServer) StartServer() {// Create the new MQTT Server.server := mqtt.New(&mqtt.Options{InlineClient: true,})// Allow all connections.errAddHook := server.AddHook(new(CustomAuthHook), nil)if errAddHook != nil {log.Fatal("CustomAuthHook AddHook fail", errAddHook)panic(errAddHook)}errAddHook = server.AddHook(new(MqttCustomHook), nil)if errAddHook != nil {log.Fatal("MqttCustomHook AddHook fail", errAddHook)panic(errAddHook)}serverId := uuid.New()// Create a TCP listener on a standard port.tcp := listeners.NewTCP(listeners.Config{ID: fmt.Sprintf("mqtt_%s", serverId), Address: fmt.Sprintf(":%s", constants.MqttPort)})err := server.AddListener(tcp)if err != nil {log.Fatal("AddListener fail", err)panic(err)}go func() {err := server.Serve()if err != nil {log.Fatal("AddListener Serve", err)panic(err)}}()log.Info("StartServer start finish, Listener On Port: ", constants.MqttPort)
}

启动

package mainfunc main() {//启动mqtt服务mqttServer := mqtt.MqttServer{}mqttServer.StartServer()
}

mqtt客户端

添加依赖

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency>

订阅消息

package com.olive;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class MqttReceiver2 {public static void main(String[] args) {String broker = "tcp://localhost:1883";String clientId = "ReceiverClient2";String topic = "exclusive/test/topic";String username = "admin";String password = "moque123432";try {MqttClient client = new MqttClient(broker, clientId);MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setCleanSession(true);client.setCallback(new MqttCallback() {@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("收到消息: " + new String(message.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {// 不需要处理}@Overridepublic void connectionLost(Throwable cause) {System.out.println("连接丢失");}});client.connect(options);System.out.println("接收方客户端已连接");client.subscribe(topic);System.out.println("已订阅主题: " + topic);} catch (MqttException e) {e.printStackTrace();}}
}

发布消息

package com.olive;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class MqttSender {public static void main(String[] args) {String broker = "tcp://localhost:1883";String clientId = "SenderClient";String topic = "patrol/publish/result";String content = "Hello from SenderClient";String username = "admin";String password = "bydmqtt123432";try {MqttClient client = new MqttClient(broker, clientId);MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setCleanSession(true);client.connect(options);System.out.println("发送方客户端已连接");MqttMessage message = new MqttMessage(content.getBytes());message.setQos(1); // 设置 QoS 级别为 1client.publish(topic, message);System.out.println("消息已发送到主题: " + topic);client.disconnect();System.out.println("发送方客户端已断开连接");} catch (MqttException e) {e.printStackTrace();}}
}

go语言发布消息

package mainimport ("fmt""log""time"mqtt "github.com/eclipse/paho.mqtt.golang"
)var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("收到消息: %s from topic: %s\n", msg.Payload(), msg.Topic())
}var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {log.Println("已连接到 MQTT Broker")// 订阅主题topic := "exclusive/test/topic"qos := byte(0)token := client.Subscribe(topic, qos, messagePubHandler)token.Wait()if token.Error() != nil {log.Printf("订阅失败: %v\n", token.Error())} else {log.Printf("已订阅主题: %s\n", topic)}
}var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {fmt.Printf("连接丢失: %v", err)
}func main() {// 设置 MQTT Broker 的地址broker := "tcp://localhost:1883"clientID := "go_mqtt_client"// 创建客户端选项opts := mqtt.NewClientOptions()opts.AddBroker(broker)opts.SetUsername("admin")opts.SetPassword("bydmqtt123432")opts.SetClientID(clientID)opts.SetAutoReconnect(true)                    // 启用自动重连opts.SetResumeSubs(true)                       // 重连后恢复订阅opts.SetMaxReconnectInterval(30 * time.Second) // 最大重连间隔// 设置回调函数opts.OnConnect = connectHandleropts.OnConnectionLost = connectLostHandler// 创建客户端client := mqtt.NewClient(opts)// 连接到 Brokerif token := client.Connect(); token.Wait() && token.Error() != nil {fmt.Println("连接失败:", token.Error())return}fmt.Println("成功连接到 MQTT Broker")// // 订阅主题// topic := "exclusive/test/topic"// qos := byte(0)// token := client.Subscribe(topic, qos, messagePubHandler)// token.Wait()// if token.Error() != nil {// 	fmt.Println("订阅失败:", token.Error())// 	return// }// fmt.Println("成功订阅主题:", topic)// 发布消息topic := "exclusive/test/topic/1"payload := "back Hello, MQTT from Go!"qos := byte(0)retain := falsetoken := client.Publish(topic, qos, retain, payload)token.Wait()if token.Error() != nil {fmt.Println("发布失败:", token.Error())return}// 保持程序运行以接收消息for {time.Sleep(1 * time.Second)}
}

相关文章:

  • 成功案例丨基于OptiStruct的三轮车车架结构刚强度仿真计算与优化
  • leetcode1609. 奇偶树-meidum
  • win10/win11禁止系统更新
  • 力扣面试150题--克隆图
  • Python删除大量文件
  • Day46 Python打卡训练营
  • 阿里140 补环境日志
  • C++.OpenGL (3/64)着色器(Shader)深入
  • 【技术】跨设备链路聚合的技术——M-LAG
  • C++.OpenGL (10/64)基础光照(Basic Lighting)
  • Python 3.11.9 安装教程
  • 两阶段提交
  • QPS、TPS、RT、IOQS、并发数等性能名词介绍
  • 大模型时代的“思考“与“行动“:人工智能的认知革命
  • Vue3 + threeJs 定义六种banner轮播图切换动画效果:百叶窗、手风琴、拼图、渐变、菱形波次、圆形扩展
  • 【Dv3Admin】系统视图菜单按钮管理API文件解析
  • SSIM、PSNR、LPIPS、MUSIQ、NRQM、NIQE 六个图像质量评估指标
  • vxe-table 如何设置单元格垂直对齐
  • MS31912TEA 多通道半桥驱动器 氛围灯 照明灯 示宽灯 转向灯驱动 后视镜方向调节 可替代DRV8912
  • 设置应用程序图标
  • wordpress做下载型网站/做网页设计一个月能挣多少
  • WordPress如何调用/鼓楼网站seo搜索引擎优化
  • 为何用wdcp建立网站连不上ftp/网络营销策略优化
  • 马格南摄影网站/网络营销推广方案策划书
  • 网站建设推广公司排名/电话营销系统
  • 长春建站价格/提高销售的10种方法