当前位置: 首页 > news >正文

netty-socketio + springboot 消息推送服务

netty-socketio + springboot 消息推送服务

  • 后端
    • 1. 目录结构:
    • 代码
      • pom文件:
      • application.yml:
      • SocketIOConfig:
      • PushMessage:
      • ISocketIOService
      • SocketIOServiceImpl:
      • pushMessageController:
      • 启动类:MessagePushDemo:
  • 前端
    • 安装客户端
    • client.js
  • 参考文档:

  • 背景:后端业务代码中调用这个消息推送服务,主动推送消息给客户端。前端(客户端)连接 消息推送服务,并可以实时的收到服务器发来的消息。

  • 注意点:客户端与服务器(netty-socketio)版本不兼容会导致:客户端链接服务端成功,但是收不到服务端发来的消息。官方推荐的版本匹配。

  • 文章参考文档放在最后

后端

1. 目录结构:

在这里插入图片描述

代码

pom文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>messagePushDemo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <properties>
<!--    <maven.compiler.source>11</maven.compiler.source>-->
<!--    <maven.compiler.target>11</maven.compiler.target>-->
    <swagger.core.version>1.6.2</swagger.core.version>
    <java.version>1.8</java.version>
  </properties>
<!--  <packaging>jar</packaging>-->

<!--  <name>messagePushDemo</name>-->
<!--  <url>http://maven.apache.org</url>-->

  <parent>
    <artifactId>spring-boot-starter-parent</artifactId>
    <groupId>org.springframework.boot</groupId>
    <version>2.0.0.RELEASE</version>
  </parent>

  <dependencies>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
      <version>3.13.0</version>
    </dependency>

    <!-- Swagger 依赖配置 -->
    <dependency>
      <groupId>io.swagger</groupId>
      <artifactId>swagger-models</artifactId>
      <version>${swagger.core.version}</version>
    </dependency>
    <dependency>
      <groupId>io.swagger</groupId>
      <artifactId>swagger-annotations</artifactId>
      <version>${swagger.core.version}</version>
    </dependency>

    <!-- Spring MVC依赖 -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- springBoot的Test依赖 -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
    </dependency>

    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.49.Final</version>
    </dependency>
    <dependency>
      <groupId>com.corundumstudio.socketio</groupId>
      <artifactId>netty-socketio</artifactId>
      <version>1.7.19</version>
    </dependency>

    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.73</version>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <scope>provided</scope>
    </dependency>

  </dependencies>



  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.tomcat.maven</groupId>
        <artifactId>tomcat7-maven-plugin</artifactId>
        <version>2.1</version>
      </plugin>
    </plugins>
  </build>

</project>

application.yml:

# Tomcat
server:
  port: 8080

socketio:
  # host在本地测试可以设置为localhost或者本机IP
  host: localhost
  port: 9099
  # 在Linux服务器跑可换成服务器外网IP
  public:
    host: localhost
  allowCustomRequests: true
  # socket连接数大小(如只监听一个端口boss线程组为1即可)
  bossCount: 1
  # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
  maxFramePayloadLength: 1048576
  # 设置http交互最大内容长度
  maxHttpContentLength: 1048576
  # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
  pingInterval: 25000
  # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
  pingTimeout: 6000000
  # 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间
  upgradeTimeout: 1000000
  workCount: 100

SocketIOConfig:


import com.corundumstudio.socketio.SocketConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.corundumstudio.socketio.SocketIOServer;

@Configuration
public class SocketIOConfig {

    @Value("${socketio.host}")
    private String host;

    @Value("${socketio.port}")
    private Integer port;

    @Value("${socketio.bossCount}")
    private int bossCount;

    @Value("${socketio.workCount}")
    private int workCount;

    @Value("${socketio.allowCustomRequests}")
    private boolean allowCustomRequests;

    @Value("${socketio.upgradeTimeout}")
    private int upgradeTimeout;

    @Value("${socketio.pingTimeout}")
    private int pingTimeout;

    @Value("${socketio.pingInterval}")
    private int pingInterval;

    /**
     * 以下配置在上面的application.properties中已经注明
     * @return
     */
    @Bean
    public SocketIOServer socketIOServer() {
        SocketConfig socketConfig = new SocketConfig();
        socketConfig.setTcpNoDelay(true);
        socketConfig.setSoLinger(0);
        com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
        config.setSocketConfig(socketConfig);
        config.setHostname(host);
        config.setPort(port);
        config.setBossThreads(bossCount);
        config.setWorkerThreads(workCount);
        config.setAllowCustomRequests(allowCustomRequests);
        config.setUpgradeTimeout(upgradeTimeout);
        config.setPingTimeout(pingTimeout);
        config.setPingInterval(pingInterval);
        return new SocketIOServer(config);
    }
}

PushMessage:

package org.example.msg;

import io.swagger.annotations.ApiModelProperty;
public class PushMessage {
    @ApiModelProperty(value = "登录用户编号/唯一标识")
    private String clientId;
		@ApiModelProperty(value = "推送事件")
    private String event;
    @ApiModelProperty(value = "推送内容")
    private String content;

    public PushMessage() {
    }

    public PushMessage(String clientId, String event, String content) {
        this.clientId = clientId;
        this.event = event;
        this.content = content;
    }

    private PushMessage(Builder builder) {
        setClientId(builder.clientId);
        setEvent(builder.event);
        setContent(builder.content);
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public String getEvent() {
        return event;
    }

    public void setEvent(String event) {
        this.event = event;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public static final class Builder {
        private String clientId;
        private String event;
        private String content;

        private Builder() {
        }

        public Builder clientId(String val) {
            clientId = val;
            return this;
        }

        public Builder event(String val) {
            event = val;
            return this;
        }

        public Builder content(String val) {
            content = val;
            return this;
        }

        public PushMessage build() {
            return new PushMessage(this);
        }
    }
}

ISocketIOService

package org.example.service;


import org.example.msg.PushMessage;

public interface ISocketIOService {

    /**
     * 推送的事件
     */
    String PUSH_EVENT = "push_event";

    /**
     * 聊天的事件
     */
    String IM_EVENT = "im_event";

    /**
     * 登录的事件
     */
    String LOGIN_EVENT = "login_event";

    /**
     * 启动服务
     */
    void start() throws Exception;

    /**
     * 停止服务
     */
    void stop();

    /**
     * 推送信息
     */
    void pushMessageToUser(PushMessage pushMessage);
}

SocketIOServiceImpl:

package org.example.service.impl;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.StringUtils;
import org.example.msg.PushMessage;
import org.example.service.ISocketIOService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;

import static org.example.service.ISocketIOService.IM_EVENT;
import static org.example.service.ISocketIOService.PUSH_EVENT;

@Service(value = "socketIOService")
public class SocketIOServiceImpl implements ISocketIOService {
    private Logger logger = LoggerFactory.getLogger(SocketIOServiceImpl.class);
    /**
     * 用来存已连接的客户端
     */
    private static Map<String, SocketIOClient> clientMap = new ConcurrentHashMap<>();

    @Autowired
    private SocketIOServer server;

    /**
     * Spring IoC容器创建之后,在加载SocketIOServiceImpl Bean之后启动
     * @throws Exception
     */
    @PostConstruct
    private void autoStartup() throws Exception {
        start();
    }

    /**
     * Spring IoC容器在销毁SocketIOServiceImpl Bean之前关闭,避免重启项目服务端口占用问题
     * @throws Exception
     */
    @PreDestroy
    private void autoStop() throws Exception  {
        stop();
    }
    
    @Override
    public void start() {
        // 监听客户端连接
        server.addConnectListener(client -> {
            String clientId = getParamsByClient(client);
            if (clientId != null) {
                if (clientMap.isEmpty()) {
                    clientMap.put(clientId, client);
                    logger.info("clientId: {} connected...", clientId);
                }else {
                    if (clientMap.containsKey(clientId)) {
                        SocketIOClient preClient = clientMap.get(clientId);
                        preClient.disconnect();
                        logger.info("旧链接已断开");
                    }
                    clientMap.put(clientId, client);
                    logger.info("clientId: {} connected...", clientId);
                }
            }
        });

        // 监听客户端断开连接
        server.addDisconnectListener(client -> {
            String clientId = getParamsByClient(client);
            if (clientId != null) {
                clientMap.remove(clientId);
                client.disconnect();
                logger.info("clientId: {} disconnected...", clientId);
            }
        });

        // 处理自定义的事件,与连接监听类似
        server.addEventListener(PUSH_EVENT, PushMessage.class, (client, data, ackSender) -> {
            logger.info("eventListener data: {}", data);
        });
        server.addEventListener(IM_EVENT, PushMessage.class, (client, data, ackSender) -> {
            logger.info("eventListener data: {}", data);
        });
        server.start();
    }

    @Override
    public void stop() {
        if (server != null) {
            server.stop();
            server = null;
            logger.info("server stop!");
        }
    }

    @Override
    public void pushMessageToUser(PushMessage pushMessage) {
        String clientIds = pushMessage.getClientId();
        if (StringUtils.isNotBlank(clientIds)) {
            for (String clientId : clientIds.split(",")) {
                SocketIOClient client = clientMap.get(clientId);
                if (client != null) {
                    client.sendEvent(pushMessage.getEvent(), pushMessage.getContent());
                    logger.info("push message: {}, toClientId: {}", pushMessage.getContent(), clientId);
                }else {
                    logger.info("当前客户端无连接");
                }
            }
        }
    }

    /**
     * 此方法为获取client连接中的参数,可根据需求更改
     * @param client
     * @return
     */
    private String getParamsByClient(SocketIOClient client) {
        // 从请求的连接中拿出参数(这里的clientId必须是唯一标识)
        return client.getHandshakeData().getSingleUrlParam("clientId");
    }
}

pushMessageController:

package org.example.controller;


import org.example.msg.PushMessage;
import org.example.service.ISocketIOService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class pushMessageController {


    @Autowired
    private ISocketIOService socketIOService;


    @GetMapping("/im/push")
    @ResponseBody
    public String pushMessage(@RequestParam String clientId,
                                       @RequestParam String content){
        if (clientId == null || clientId.isEmpty()){
            return "参数为空错误";
        }
        PushMessage pushMessage = new PushMessage(clientId, ISocketIOService.IM_EVENT, content);
        socketIOService.pushMessageToUser(pushMessage);
        return "success";
    }

}

启动类:MessagePushDemo:

package org.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * Hello world!
 *
 */
@SpringBootApplication
public class MessagePushDemo
{
    public static void main( String[] args )
    {
        SpringApplication.run(MessagePushDemo.class, args);
//        System.out.println( "Hello World!" );
    }
}

前端

安装客户端

npm install socket.io-client@2
我安装的2开头的版本,注意和服务器的版本兼容

client.js

const io = require('socket.io-client');

// 配置连接选项
const options = {
    // 开启重连机制
    reconnection: true, 
    // 最大重连次数
    reconnectionAttempts: Infinity, 
    // 重连间隔时间,初始为 1000 毫秒
    reconnectionDelay: 1000, 
    // 重连间隔时间上限,最大为 5000 毫秒
    reconnectionDelayMax: 5000, 
    // 连接超时时间,设置为 10000 毫秒
    timeout: 10000 
};

// 使用配置选项连接到服务器
const socket = io('http://localhost:9099?clientId=122', options);

// 监听连接成功事件
socket.on('connect', () => {
    console.log('连接成功');
});

// 监听事件
socket.on('im_event', (msg) => {
    console.log('事件内容:', msg);
});

// 监听连接错误事件
//socket.on('connect_error', (error) => {
//    console.error('连接错误:', error);
//});

// 监听断开连接事件
socket.on('disconnect', () => {
    console.log('连接已断开');
});

参考文档:

socket.io官方文档
代码原博主文章

相关文章:

  • matlab实现的InSAR图像Goldstein滤波
  • 自动驾驶“无图化”开源框架争议:技术革新还是行业风险?
  • 3.1.3.4 Spring Boot使用使用Listener组件
  • 住建厅八大员建筑资料员证备考题库
  • 配置过编译选项,也有keystore文件,但是Android Studio签名apk失败。
  • 数据库管理工具实战:IDEA 与 DBeaver 连接 TDengine(二)
  • 人工智能100问☞第2问:机器学习的核心原理是什么?
  • 基于SSM的校园美食交流系统
  • RocketMQ深度百科全书式解析
  • CXL3.0 CDAT(Coherent Device Attributes Table)
  • VMware虚拟机Ubuntu磁盘扩容
  • 博途 TIA Portal之1200做从站与汇川EASY的TCP通讯
  • windows10系统下找不到microbit指南方案
  • XSS 防御转义规则笔记
  • Unity6下架中国区,团结引擎接棒:这是分裂,还是本地化的开始?
  • 关于深度学习局部视野与全局视野的一些思考
  • 网关与路由器知识点
  • Navicat分组、查询分享
  • 人工智能训练师-个人学习记录
  • OpenCV 图形API(30)图像滤波-----腐蚀操作函数erode()
  • 建设自己的网站步骤/整合网络营销公司
  • 网站排名要怎么做/百度网站首页提交入口
  • 免费的视频网站推广软件/什么叫做关键词
  • 微网站素材/百度搜索排名与点击有关吗
  • 手机网站seo教程/宁德seo推广
  • 网站有标题/百度网盘下载电脑版官方下载