当前位置: 首页 > news >正文

websocket深入-webflux+websocket

文章目录

  • 背景
  • 版本约定
  • 配置文件
  • 代码
    • 使用webflux
    • 使用websocket
      • 配置文件
      • handler基类
      • 实现类
      • 注册路由

背景

基于更复杂的情况和更高的开发要求,我们可能会遇到必须同时要使用webflux和websocket的情况。

版本约定

  • JDK21
  • Springboot 3.2.0
  • Fastjson2
  • lombok

配置文件

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.2.0</version>
</parent>
<properties>
    <maven.compiler.source>21</maven.compiler.source>
    <maven.compiler.target>21</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
    <!-- Spring Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba.fastjson2</groupId>
        <artifactId>fastjson2</artifactId>
        <version>2.0.54</version>
    </dependency>
</dependencies>

代码

只要引入webflux,就同时引入了websocket,不需要再次引入websocket

使用webflux

@RestController
@RequestMapping("/user")
public class UserFlux {

    @Autowired
    private UserService userService;

    @GetMapping("/get")
    public Mono<Result<User>> get() {
        return Mono.just(Result.httpSuccess(userService.getUser()));
    }

    /**
     * 服务器推送
     *
     * @return 由服务器决定推送多少次多少数据,推送结束前不会断开连接
     *
     * @apiNote (SSE - > Server Send Event)
     */
    @GetMapping(value = "/flux", produces = MediaType.APPLICATION_JSON_VALUE)
    public Flux<String> flux() {
        return Flux.fromStream(IntStream.range(1, 5).mapToObj(i -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException ignored) {
            }
            JSONObject obj = new JSONObject();
            obj.put("data", "hello,flux" + i);
            return obj.toJSONString();
        }));
    }
}

这里比较值得注意的是Flux返回值,这个返回值从性质上说有点像会自动close的websocket。我们看下这个/flux的返回值:


{
	"data": "hello,flux1"
}{
	"data": "hello,flux2"
}{
	"data": "hello,flux3"
}{
	"data": "hello,flux4"
}

注意这不是我拼接的,是调试结果就是这样。也就是说,/flux是分帧输出,具有流式的特性。

使用websocket

这里选择使用手动注册websocket而非Endpoint自动注解,主要是因为我想对handler做规范化

配置文件

@Configuration(proxyBeanMethods = false)
public class ReactiveWebSocketConfiguration {
 
    @Bean
    public WebSocketHandlerAdapter webSocketHandlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

handler基类

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.springframework.lang.NonNull;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
// 强制规定基类必须有泛型约束入参出参。强制规定必须进行参数校验
@Slf4j
public abstract class BaseSocketHandler<T, R> implements WebSocketHandler {
    @Override
    @NonNull
    public Mono<Void> handle(WebSocketSession session) {
        String sessionId = session.getId();
        log.info("与sessionId:【{}】 建立连接", sessionId);
        Flux<WebSocketMessage> receive = session.receive();
        Flux<R> fluxHandled = receive.flatMap(webSocketMessage -> {
            String payloadAsText = webSocketMessage.getPayloadAsText();
            if (!JSON.isValid(payloadAsText)) {
                log.error("收取参数不合法:{}", payloadAsText);
                session.close();
                throw new IllegalArgumentException("参数不合法");
            }
            TypeReference<T> reference = getTypeReference();
            if (!check(payloadAsText, reference)) {
                log.error("参数校验不通过:{}", payloadAsText);
                session.close();
                throw new IllegalArgumentException("参数校验不通过");
            }
            return handler(payloadAsText, reference);
        }).onErrorResume(throwable -> {
            log.error("连接异常,即将关闭", throwable);
            session.close();
            return Mono.error(throwable);
        });
        return session.send(
                Mono.from(fluxHandled).map(payload -> session.textMessage(JSON.toJSONString(payload)))
        );
    }

    public abstract boolean check(String payloadObject, TypeReference<T> typeReference);

    public abstract Mono<R> handler(String payload, TypeReference<T> typeReference);

    protected abstract TypeReference<T> getTypeReference();
}

实现类

// 这样继承基类的handler使用时非常简单不说,由于上层做了处置,还会更安全更好做日志
public class NoticeHandler extends BaseSocketHandler<User, UserInfo> {


    @Override
    public boolean check(String payloadObject, TypeReference<User> userTypeReference) {
        User user = JSON.parseObject(payloadObject, userTypeReference);
        return !Objects.isNull(user.getId()) && user.getId() > 0;
    }

    @Override
    public Mono<UserInfo> handler(String payload, TypeReference<User> typeReference) {
        User user = JSON.parseObject(payload, typeReference);
        UserInfo userInfo = new UserInfo();
        BeanUtils.copyProperties(user, userInfo);
        return Mono.just(userInfo);
    }

    @Override
    protected TypeReference<User> getTypeReference() {
        return new TypeReference<>() {
        };
    }
}


注册路由

import com.xu.socket.NoticeHandler;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;

import java.util.HashMap;
import java.util.Map;

@Component
public class ReactiveWebSocketServerHandlerMapping extends SimpleUrlHandlerMapping {

    public ReactiveWebSocketServerHandlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/ws/notice", new NoticeHandler());
        setUrlMap(map);
        setOrder(100);
    }
}

相关文章:

  • 大语言模型中的幻觉现象深度解析
  • 每日一题(小白)暴力娱乐篇24
  • 击球手怎么玩·棒球1号位
  • Springboot整合JAVAFX
  • 【JavaScript】面向对象与设计模式
  • 用Java写一个MVCC例子
  • 理解CSS3 的 max/min-content及fit-content等width值
  • 这是一个文章标题
  • 《网络管理》实践环节04:SNMP监控数据采集流程及SNMP协议详细分析
  • 边缘分布的定义与公式详解
  • 探索 OSPF 协议:构建高效网络的基石
  • [蓝桥杯 2024 省 B] 拔河
  • 心有猛虎,细嗅蔷薇
  • Netty之内存池的原理和实战
  • WebStorm中使用live-server插件
  • Ubuntu 安装 MySQL
  • 国产三维皇冠CAD在「工业自动控制系统装置制造」建模教程:千分表指示器
  • vue项目打包里面pubilc里的 tinymce里的js文件问题
  • Linux 驱动中的资源获取与自动管理机制:深入理解与实战
  • iphone各个机型尺寸
  • html企业网站源码/成都seo技术经理
  • 曰本真人做爰免费网站/外贸平台
  • 哪些网站的做的好看的图片/培训心得体会总结
  • 网站用什么系统/网站推广的具体方案
  • 苏州网站开发建设方法/百度账号注册申请
  • 网站制作和如何推广/seo网络培训学校