当前位置: 首页 > news >正文

JavaWebsocket-demo

Websocket客户端

pom依赖

		<dependency><groupId>org.java-websocket</groupId><artifactId>Java-WebSocket</artifactId><version>1.4.0</version></dependency>

客户端代码片段


@Component
@Slf4j
public class PositionAlarmListener {@Autowiredprivate BigScreenService bigScreenService;@Autowiredprivate ConfigurationSystemService configurationSystemService;@Beanpublic WebSocketClient webSocketClient() {WebSocketClient wsc = null;ThirdPartConfDto thirdPartConfDto = configurationSystemService.getConfig();Map<String, String> httpHeaders = new HashMap<>();try {
//            String reqUrl = String.format("ws://%s%s?apikey=%s", servicePort, SOCKET_URL, apikey);String reqUrl = thirdPartConfDto.getAlarmWebsocketUrl();log.info("websocketclient.position.reqUrl:{}",reqUrl);wsc = new WebSocketClient(new URI(reqUrl), httpHeaders) {@Overridepublic void onOpen(ServerHandshake serverHandshake) {log.info("UnmannedPlane==connect==build");}@Overridepublic void onMessage(String message) {log.info("websocketclient.position.receive.message:{}", message);CompletableFuture.runAsync(() -> {try {if (StringUtils.isEmpty(message)) {return;}
//                            JSONObject parse = JSONObject.parseObject(message);ThirdPositionAlarmDto thirdPositionAlarmDto = JSONObject.parseObject(message,ThirdPositionAlarmDto.class);String type = thirdPositionAlarmDto.getType();log.info("websocketclient.position.receive.message-type:{}", type);if (StringUtils.isEmpty(type)) {log.error("websocket.type.is null");return;}if(!type.equals(ThirdPositionAlarmEnum.TYPE_TAG.getCode())){log.error("websocket.type.is not tag");return;}boolean bigScreenPush = bigScreenService.pusdata(thirdPositionAlarmDto,thirdPartConfDto);} catch (Exception e) {log.error("websocketclient.position.error:", e);}});}@Overridepublic void onClose(int i, String s, boolean b) {log.info("websocketclient.position.close code:{} reason:{} {}", i, s, b);}@Overridepublic void onError(Exception e) {log.info("websocketclient.position.connect==error:", e);}};wsc.connect();return wsc;} catch (Exception e) {log.error("websocketclient.position==webSocketClient:", e);}return wsc;}}

Websocket 服务端

服务端pom依赖

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>

服务端代码片段,websocket-配置

websocket-服务配置
@Configuration
public class WebSocketConfig {/*** ServerEndpointExporter注入* 该Bean会自动注册使用@ServerEndpoint注解申明的WebSocket endpoint** @return*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}

服务端代码片段,websocket-服务端广播消息

import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.CopyOnWriteArraySet;/*** @author 谪居马道* @describe:websocket,消息广播* @date 2025/5/21*/
@Component
@ServerEndpoint("/websocket")
public class WebSocket {private final Logger log = LoggerFactory.getLogger(WebSocket.class);//与某个客户端的连接会话,需要通过它来给客户端发送数据private Session session;//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。private static CopyOnWriteArraySet<WebSocket> webSocketSet = new CopyOnWriteArraySet<WebSocket>();/**         * 连接建立成功调用的方法         */@OnOpenpublic void onOpen(Session session){this.session=session;webSocketSet.add(this);//加入set中log.info("【WebSocket消息】有新的连接,总数:{}",webSocketSet.size());}/**         * 连接关闭调用的方法         */@OnClosepublic void onClose(){webSocketSet.remove(this);//从set中删除log.info("【WebSocket消息】连接断开,总数:{}",webSocketSet.size());        }/**         * 收到客户端消息后调用的方法         * @param message 客户端发送过来的消息         */@OnMessagepublic void onMessage(String message ){log.info("【WebSocket消息】收到客户端发来的消息:{}",message);sendMessage(message);}public void sendMessage(String message){for (WebSocket webSocket:webSocketSet) {log.info("【webSocket消息】广播消息,message={}",message);try {webSocket.session.getBasicRemote ().sendText(message);} catch (Exception e) {e.printStackTrace ();}            }}}

服务端代码片段,websocket-服务端一对一消息

@Component
@ServerEndpoint("/websocket/{terminalId}")
public class WebSocketService {private final Logger logger = LoggerFactory.getLogger(WebSocketService.class);/*** 保存连接信息*/private static final Map<String, Session> CLIENTS = new ConcurrentHashMap<>();private static final Map<String, AtomicInteger> TERMINAL_IDS = new HashMap<>();/*** 需要注入的Service声明为静态,让其属于类*/private static TerminalService terminalService;/*** 注入的时候,给类的Service注入*/@Autowiredpublic void setMchDeviceInfoService(TerminalService terminalService) {WebSocketService.terminalService = terminalService;}@OnOpenpublic void onOpen(@PathParam("terminalId") String terminalId, Session session) throws Exception {logger.info(session.getRequestURI().getPath() + ",打开连接开始:" + session.getId());//当前连接已存在,关闭if (CLIENTS.containsKey(terminalId)) {onClose(CLIENTS.get(terminalId));}TERMINAL_IDS.put(terminalId, new AtomicInteger(0));CLIENTS.put(terminalId, session);logger.info(session.getRequestURI().getPath() + ",打开连接完成:" + session.getId());terminalService.terminal();}@OnClosepublic void onClose(@PathParam("terminalId") String terminalId, Session session) throws Exception {logger.info(session.getRequestURI().getPath() + ",关闭连接开始:" + session.getId());CLIENTS.remove(terminalId);TERMINAL_IDS.remove(terminalId);logger.info(session.getRequestURI().getPath() + ",关闭连接完成:" + session.getId());}@OnMessagepublic void onMessage(String message, Session session) {logger.info("前台发送消息:" + message);if ("心跳".equals(message)) {//重置当前终端心跳次数TERMINAL_IDS.get(message).set(0);return;}sendMessage("收到消息:" + message, session);}@OnErrorpublic void onError(Session session, Throwable error) {logger.error(error.toString());}public void onClose(Session session) {//判断当前连接是否在线
//        if (!session.isOpen()) {
//            return;
//        }try {session.close();} catch (IOException e) {logger.error("金斗云关闭连接异常:" + e);}}public void heartbeat() {//检查所有终端心跳次数for (String key : TERMINAL_IDS.keySet()) {//心跳3次及以上的主动断开if ((TERMINAL_IDS.get(key).intValue() >= 3)) {logger.info("心跳超时,关闭连接:" + key);onClose(CLIENTS.get(key));}}for (String key : CLIENTS.keySet()) {//记录当前终端心跳次数TERMINAL_IDS.get(key).incrementAndGet();sendMessage("心跳", CLIENTS.get(key));}}public void sendMessage(String message, Session session) {try {session.getAsyncRemote().sendText(message);logger.info("推送成功:" + message);} catch (Exception e) {logger.error("推送异常:" + e);}}public boolean sendMessage(String terminalId, String message) {try {Session session = CLIENTS.get(terminalId);session.getAsyncRemote().sendText(message);logger.info("推送成功:" + message);return true;} catch (Exception e) {logger.error("推送异常:" + e);return false;}}
}

Websocket测试工具

postman-测试

参考:
site1: https://maimai.cn/article/detail?fid=1747304025&efid=p7JdUMG2Gi0PrMX7xSXpXw
site2: https://blog.csdn.net/weixin_46768610/article/details/128711019

相关文章:

  • Spark Core 源码关键环节的深度解析
  • mapbox进阶,手写放大镜功能
  • Windows安装MongoDb.并使用.NET 9连接
  • 前后端的双精度浮点数精度不一致问题解决方案,自定义Spring的消息转换器处理JSON转换
  • [杂学笔记]浏览器多进程与多线程架构、wstring类型、哈希表、红黑树与哈希表的对比、C++标准库Random类
  • 每日算法 -【Swift 算法】寻找字符串中最长回文子串(三种经典解法全解析)
  • 工业物联网网关在变电站远程监控中的安全传输解决方案
  • vscode离线安装组件工具vsix
  • Java安全-Servlet内存马
  • 计算机网络--第一章(下)
  • 贪心算法 Part04
  • Python数据分析实战:Pandas高效处理Excel数据指南
  • DataOutputStream DataInputStream转换流
  • Mysql-数据闪回工具MyFlash
  • IOMMU打开 关闭
  • 【76. 最小覆盖子串】
  • Java使用mybatis-plus做查询时LocalDateTime报错处理方案
  • Oracle Enqueue Names
  • Nginx配置同一端口不同域名或同一IP不同端口
  • DSP定时器的计算
  • 专业外贸公司网站/宁波优化网页基本流程