当前位置: 首页 > news >正文

Java获取被nginx代理的emqx客户端真实ip

Java获取被nginx代理的emqx客户端真实ip

契机

⚙ 使用nginx作为负载均衡(Load Balancing)的时候,发现真实ip无法获取。几经折腾终于拿到真实ip,又发现被代理的端口又无法使用非代理模式连接,由于之前暴露的docker端口有限,从中部分取巧终于拨开云雾见光明,再结合Java客户端获取真实ip一气呵成。

EMQX配置

#docker部署
#18083为管理页面端口,1883为默认mqtt端口,8883为默认mqtts端口
#如果还没有新建emqx容器,建议多暴露端口,具体原因见下文
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.7.2
  • 进入emqx管理页面,新建proxy_tcp监听器,类型为tcp,端口为8883
  • 然后关闭之前ssl-8883监听
  • 借用端口是因为docker暴露到宿主机的端口不够了,一旦设置比如proxy_tcp监听器的8883代理监听后,客户端就只能通过nginx代理连接这个8883端口了,直接连接8883就不行了

请添加图片描述

#获取emqx的容器id,假如为1111
docker ps | grep emqx #将配置文件拷贝出来
docker cp 1111:/opt/emqx/etc/emqx.conf .#修改,添加下面项目
vim emqx.conf #打开proxy_tcp的代理监听
listeners.tcp.proxy_tcp {proxy_protocol = true
}#关闭default的代理,默认是关闭的
#但是一旦打开过,就要手动设置为false
listeners.tcp.default {proxy_protocol = false
}#放置回去
docker cp ./emqx.conf 1111:/opt/emqx/etc/#重启容器
docker restart 1111

此时

  • 直接使用客户端设备连接8883端口失败,无论mqtt/mqtts协议
  • 1883端口可以使用mqtt协议可以正常连接
  • emqx不存放证书,ssl认证是一个耗时的操作,丢到nginx去搞

nginx配置

#nginx安装 - 略#nginx安装模块
sudo yum install nginx-mod-stream#修改nginx配置文件
vim /etc/nginx/nginx.conf#配置文件如下
#192.168.0.1为emqx所在服务器,与nginx不在一个服务器
#监听1883为mqtt
#监听8883为mqtts
stream {upstream stream_backend {zone tcp_servers 64k;hash $remote_addr;server 192.168.0.1:1883 max_fails=2 fail_timeout=30s;}server {listen 1883;proxy_pass stream_backend;proxy_buffer_size 4k;}upstream stream_backend_ssl {zone tcp_servers 64k;hash $remote_addr;server 192.168.0.1:8883 max_fails=2 fail_timeout=30s;}server {listen 8883 ssl;proxy_protocol on;        proxy_pass stream_backend_ssl;proxy_buffer_size 4k;ssl_handshake_timeout 15s;ssl_certificate     /etc/nginx/cert/_.xx.pem;ssl_certificate_key /etc/nginx/cert/_.xx.key;}}#重启nginx
sudo systemctl restart nginx

此时

  • 8883为mqtts端口,链接mqtts,emqx控制台可以看到真实ip
  • 1883为mqtt端口,链接mqtt,emqx控制台看不到真实ip
  • 理论上业务只暴露mqtts端口到外网,mqtt为内部调试方便抓包用的

请添加图片描述

脚本获取

设置api密钥,保存下username和pasword

请添加图片描述

#!/bin/bash# Check if both host and password arguments are provided
if [ $# -lt 2 ]; thenecho "Usage: $0 <host> <password>"exit 1
fi# Define host, username, and password
HOST="$1"
USERNAME="username"
PASSWORD="$2"# Encode username and password in Base64
AUTH=$(echo -n "$USERNAME:$PASSWORD" | base64)# Create the curl command
curl -H "Authorization: Basic $AUTH" "http://$HOST:18083/api/v5/clients?like_username=server"
#运行
chmod +x ./emqx_curl.sh
./emqx_curl.sh localhost your_password_here

Java获取

service

package com.bothsavage.common.mqtt.service;import com.alibaba.fastjson.JSONObject;
import com.bothsavage.common.mqtt.config.interceptor.EmqxAuthInterceptor;
import com.bothsavage.common.mqtt.entity.dto.ClientInfoDTO;
import com.bothsavage.common.mqtt.entity.dto.EmqxRespDTO;import feign.Headers;import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;/*** Emqx请求** @author bothSavage*/
@FeignClient(name = "emqxRmiService", url = "${emqx.rmi.url:http://127.0.0.1:18083/}",configuration = EmqxAuthInterceptor.class)
public interface EmqxRmiService {/*** 获取客户端信息** @param clientId 客户端id* @return -*/@GetMapping(value = "/api/v5/clients/{id}")@Headers(value = {"Content-Type=application/json;charset=UTF-8", "Accept=application/json"})ClientInfoDTO getClientById(@PathVariable("id") String clientId);/*** 查询客户端信息** @param likeUserName 客户端名称* @return -*/@GetMapping(value = "/api/v5/clients")@Headers(value = {"Content-Type=application/json;charset=UTF-8", "Accept=application/json"})EmqxRespDTO<ClientInfoDTO> getClientsByUserName(@RequestParam("like_username") String likeUserName);}

auth

package com.bothsavage.common.mqtt.config.interceptor;import feign.RequestInterceptor;
import feign.RequestTemplate;import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;import java.util.Base64;/*** Emqx请求权限拦截** @author bothSavage*/
@Configuration
public class EmqxAuthInterceptor implements RequestInterceptor {@Value("${emqx.rmi.username:x}")private String username;@Value("${emqx.rmi.password:x}")private String password;@Overridepublic void apply(RequestTemplate template) {String auth = username + ":" + password;String encodedAuth = Base64.getEncoder().encodeToString(auth.getBytes());template.header("Authorization", "Basic " + encodedAuth);}}

dto

package com.bothsavage.common.mqtt.entity.dto;import com.alibaba.fastjson.annotation.JSONField;import lombok.Data;/*** dto** @author bothSavage*/
@Data
public class ClientInfoDTO {@JSONField(name = "clientid")private String clientid;@JSONField(name = "mqueue_len")private Integer mqueueLen;@JSONField(name = "reductions")private Integer reductions;@JSONField(name = "keepalive")private Integer keepalive;@JSONField(name = "listener")private String listener;@JSONField(name = "proto_ver")private Integer protoVer;@JSONField(name = "recv_msg.dropped.await_pubrel_timeout")private Integer recvMsgDroppedAwaitPubrelTimeout;@JSONField(name = "send_msg.dropped.expired")private Integer sendMsgDroppedExpired;@JSONField(name = "mountpoint")private Object mountpoint;@JSONField(name = "mailbox_len")private Integer mailboxLen;@JSONField(name = "send_msg")private Integer sendMsg;@JSONField(name = "zone")private String zone;@JSONField(name = "subscriptions_cnt")private Integer subscriptionsCnt;@JSONField(name = "heap_size")private Integer heapSize;@JSONField(name = "recv_msg")private Integer recvMsg;@JSONField(name = "recv_cnt")private Integer recvCnt;@JSONField(name = "send_msg.dropped.too_large")private Integer sendMsgDroppedTooLarge;@JSONField(name = "awaiting_rel_cnt")private Integer awaitingRelCnt;@JSONField(name = "subscriptions_max")private String subscriptionsMax;@JSONField(name = "recv_msg.qos0")private Integer recvMsgQos0;@JSONField(name = "recv_msg.qos1")private Integer recvMsgQos1;@JSONField(name = "recv_msg.qos2")private Integer recvMsgQos2;@JSONField(name = "node")private String node;@JSONField(name = "inflight_max")private Integer inflightMax;@JSONField(name = "port")private Integer port;@JSONField(name = "recv_pkt")private Integer recvPkt;@JSONField(name = "send_oct")private Integer sendOct;@JSONField(name = "inflight_cnt")private Integer inflightCnt;@JSONField(name = "awaiting_rel_max")private String awaitingRelMax;@JSONField(name = "is_persistent")private Boolean isPersistent;@JSONField(name = "send_msg.dropped")private Integer sendMsgDropped;@JSONField(name = "recv_msg.dropped")private Integer recvMsgDropped;@JSONField(name = "clean_start")private Boolean cleanStart;@JSONField(name = "send_msg.qos0")private Integer sendMsgQos0;@JSONField(name = "created_at")private String createdAt;@JSONField(name = "connected_at")private String connectedAt;@JSONField(name = "enable_authn")private Boolean enableAuthn;@JSONField(name = "mqueue_dropped")private Integer mqueueDropped;@JSONField(name = "is_bridge")private Boolean isBridge;@JSONField(name = "send_msg.dropped.queue_full")private Integer sendMsgDroppedQueueFull;@JSONField(name = "proto_name")private String protoName;@JSONField(name = "ip_address")private String ipAddress;@JSONField(name = "send_cnt")private Integer sendCnt;@JSONField(name = "connected")private Boolean connected;@JSONField(name = "recv_oct")private Integer recvOct;@JSONField(name = "mqueue_max")private Integer mqueueMax;@JSONField(name = "send_msg.qos2")private Integer sendMsgQos2;@JSONField(name = "send_msg.qos1")private Integer sendMsgQos1;@JSONField(name = "expiry_interval")private Integer expiryInterval;@JSONField(name = "send_pkt")private Integer sendPkt;@JSONField(name = "username")private String username;}

test

clientIp = emqxRmiService.getClientById("clientId").getIpAddress();

总结

  • 路线清晰,只是操作起来稍微麻烦点
  • 注意:emqx打开了代理的访问的监听器,只能通过nginx来中转,无法直接链接了
  • 当前只用了一个emqx,没有做集群
  • docker端口暴露需要前期规划好,要不然特别麻烦

写到最后

请添加图片描述

http://www.dtcms.com/a/343410.html

相关文章:

  • STM32F030/070芯片解密及应用
  • DAY 23|动态规划1
  • LeetCode234~258题解
  • 深入解析JUC线程间通信:使用ReentrantLock与Condition实现精准线程调度
  • 32、智能仓库管理与优化系统 (模拟) - /物流与仓储组件/warehouse-optimization-system
  • IPSec 与 IKE 核心知识点总结
  • 使用Python 创建虚拟环境的两种方式
  • 订单簿数据深度学习方法在大单发现应用
  • 让医学数据更直观——MedCalc 23.1.7 最新版使用体验
  • sageattention低比特量化注意力机制,比FlashAttention快5 倍
  • DeepSeek-V3.1 Claude Code: 革命性的AI编码助手详解与应用指南
  • 论文图片在要求dpi下,压缩尺寸
  • ES_预处理
  • java18学习笔记-Simple Web Server
  • 美国联邦调查局警告俄罗斯针对思科设备的网络间谍活动
  • 残差神经网络(ResNet)
  • 矫平机与纵剪:一条钢卷“变身”的全过程
  • 【UE5-Airsim】Windows10下安装UE5-Airsim的仿真环境
  • leetcode 1658 将x减到0的最小操作数
  • 同题异构解决leetcode第3646题下一个特殊回文数
  • Linux网络socket套接字(上)
  • linux 之virtio 的驱动框架
  • Motocycle 智能仪表盘
  • 白光干涉测量系统的复合相移三维重建和多视场形貌拼接的复现
  • 【自然语言处理与大模型】微调与RAG的区别
  • JavaScript基础语法five
  • 【Protues仿真】基于AT89C52单片机的数码管驱动事例
  • 力扣905:按奇偶排序数组
  • 2025-08-21 Python进阶4——错误和异常
  • 开发者中使用——控制台打印数据