上海临港文章优化关键词排名
文章目录
- 方案一:短轮询
- 方案二:长轮询
- 方法三:SSE
- 方法四:WebSocket(基于SpringBoot)
- 方法五:WebSocket(基于Netty实现)
方案一:短轮询
实现原理:
由客户端以固定的时间间隔不断的向服务器发送请求,以咨询服务器是否存在新消息。
实现方式:
简单的提供接口服务。。。
优点:
- 实现方式简单,兼容所有浏览器和服务器
缺点:
- 资源消耗大 ,如果不存在新消息,大量无效请求,浪费资源,负载提高
- 实时性滞后,当存在新消息的时候,只能在再次请求时客户端才能感知,如果缩短请求间隔,又进一步提高资源消耗
适合范围: 消息推送的处理逻辑复杂度不高,处理请求的业务耗时越长则压力越高,适合极快反馈的请求
优化方向:
- 消息反馈迅速,建议加一层缓存层,以提高返回效率
- 注意大量缓存穿透现象,且注意数据库连接池配置的优化
方案二:长轮询
实现原理:
由客户端向服务器发送请求,服务器去询问是否存在新消息,不存在新消息则阻塞该请求,超过指定时长则直接返回和客户端约定好的状态码或者返回值。
实现方式:
- 消息拉取
@RestController
public class LongPollingController {private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();@GetMapping("/poll")public ResponseEntity<String> pollForMessages(@RequestParam(value = "timeout", defaultValue = "60") int timeoutSeconds) throws InterruptedException {// 尝试从队列中获取消息,如果队列为空,则等待指定的时间String message = messageQueue.poll(timeoutSeconds, TimeUnit.SECONDS);if (message == null) {// 如果超时了还没有消息,则返回空响应或其他提示信息return ResponseEntity.noContent().build();} else {// 返回最新消息return ResponseEntity.ok(message);}}public void addMessage(String message) {messageQueue.offer(message);}
}
- 发布消息
@RestController
public class MessagePublisherController {private final LongPollingController longPollingController;@Autowiredpublic MessagePublisherController(LongPollingController longPollingController) {this.longPollingController = longPollingController;}@PostMapping("/publish")public ResponseEntity<String> publishMessage(@RequestBody String message) {longPollingController.addMessage(message);return ResponseEntity.ok("Message published");}
}
优点:
- 实现方式较为简单,兼容所有浏览器和服务器, 减少无效请求,相比短轮询更高效,有消息时立即推送
缺点:
- 在并发很高的场景下大量连接会占用服务器资源,可能受超时限制,难以处理服务器主动推送的场景
适合范围: 消息推送的处理逻辑复杂度不高,相较于短轮询的方式,如果不是海量并发的情况下和http连接数充裕的情况下,有着更好的性能表现。针对此场景可以使用此方案,
优化方向:
- 针对于硬件资源充裕的情况下,可以进行多节点部署,发布通知节点消息更新,可使用redis的发布订阅模式来做通知效果(或者其他消息队列也可),此优化后效果表现能力更佳
方法三:SSE
实现原理:
服务器与客户端建立单向连接,服务器可以持续向客户端推送数据,而不需要客户端重复请求。
实现方式:
后台代码
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;@RestController
public class SseController {// 存储所有活跃的SseEmitter实例private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();/*** 处理SSE请求,为每个连接创建一个新的SseEmitter实例。*/@GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter subscribe() {SseEmitter emitter = new SseEmitter();emitters.add(emitter);// 当连接关闭时移除emitteremitter.onCompletion(() -> emitters.remove(emitter));emitter.onError((Throwable t) -> emitters.remove(emitter));return emitter;}/*** 发送消息给所有已订阅的客户端。*/public void sendMessage(String message) {for (SseEmitter emitter : emitters) {try {emitter.send(SseEmitter.event().name("message").data(message));} catch (IOException e) {// 如果发生错误,则移除该emitteremitters.remove(emitter);}}}
}
优点:
- 这种方法非常适合需要从服务器单向推送数据到客户端的场景,如实时更新新闻、社交网络的新动态通知等。相比WebSocket,SSE更轻量级且易于实现,特别适合于不需要双向通信的应用。
缺点:
尽管Server-Sent Events (SSE) 在许多实时通信场景中提供了简单且有效的解决方案,但它也存在一些局限性和缺点。使用 SSE 进行消息推送时需要注意以下问题:
- 仅支持文本数据传输(简单消息推送可忽略)
限制:SSE 默认只支持 UTF-8 编码的文本数据传输。如果需要发送二进制数据(如图片、文件等),则需要额外的编码步骤。
影响:对于需要处理大量或复杂数据类型的应用程序来说,这可能会增加开发和维护的复杂性。 - 单向通信(简单消息推送可忽略)
限制:SSE 是一种从服务器到客户端的单向通信协议。它不允许客户端主动向服务器发送消息,这意味着任何客户端请求都需要通过传统的HTTP请求来实现。
影响:在需要双向通信的应用场景下(例如即时通讯应用),SSE 可能不够灵活,通常需要结合其他技术(如WebSocket)来补充其不足。 - 连接管理
限制:虽然现代浏览器对 SSE 的支持良好,但在某些情况下,如网络不稳定或者用户长时间不活动时,SSE 连接可能会断开,并且默认情况下 SSE 不提供自动重连机制。
影响:开发者需要自己实现重连逻辑,以确保在网络故障恢复后能够重新建立连接并继续接收消息。 - 负载均衡挑战
限制:由于 SSE 连接是基于 HTTP 协议的长连接,在使用负载均衡器时可能会遇到问题,因为大多数负载均衡器会话保持时间较短,可能导致连接被错误地终止。
影响:需要配置负载均衡器以支持长连接,或者采用专门的技术(如 sticky sessions)来确保同一客户端总是被路由到同一个后端服务器实例。 - 跨域资源共享 (CORS) 配置
限制:当 SSE 被用于跨域请求时,需要正确配置 CORS 头信息,否则浏览器将阻止请求。
影响:增加了额外的安全配置需求,如果不妥善处理,可能会导致请求失败。 - 资源消耗
限制:每个活跃的 SSE 连接都会占用一定的服务器资源(如内存、线程等)。随着并发用户的增加,服务器上的资源消耗也会相应增长。
影响:在高并发场景下,如果没有适当的优化措施(如使用异步I/O模型、分布式架构等),可能会导致性能瓶颈。 - 浏览器兼容性
限制:虽然大多数现代浏览器都支持 SSE,但仍有少数旧版本浏览器或特定平台上的浏览器不完全支持 SSE。
影响:需要考虑兼容性问题,可能需要为不支持 SSE 的用户提供备用方案。
适合范围: 基于SSE问题,SSE适合简单消息推送,实时性要求极高,且浏览器版本支持。不适用于海量并发场景,一直维持连接也是一种开销。
优化方向:
- 可以采用异步I/O模型(例如 Java 中的 NIO 或者 Node.js 等技术)来提高单台服务器处理大量并发连接的能力。此外,还可以通过负载均衡和水平扩展来分散流量。
方法四:WebSocket(基于SpringBoot)
实现原理:
和客户端建立双向socket连接通道,可进行实时消息互动,使用springboot即可实现该功能
实现方式:
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.stu.config.SpringUtils;
import com.stu.model.entity.RemoteUser;
import com.stu.model.UserVo;
import com.stu.model.vo.UserMessageVO;
import com.stu.service.UserService;
import com.stu.util.GZipUtils;
import org.apache.logging.log4j.util.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.util.Pair;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;/*** @ServerEndpoint 该注解可以将类定义成一个WebSocket服务器端,* @OnOpen 表示有浏览器链接过来的时候被调用* @OnClose 表示浏览器发出关闭请求的时候被调用* @OnMessage 表示浏览器发消息的时候被调用* @OnError 表示报错了*/
@Component("webSocketServer")
@ServerEndpoint(value = "/ws/client/{sid}", encoders = {WsServerEncoder.class})
public class WebSocketServer extends AbstractReceiver {private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);//静态变量,用来记录当前在线连接数private static volatile int onlineCount = 0;//concurrent包的线程安全ConcurrentHashMap,用来存放每个客户端对应的MyWebSocket对象。private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();private static final Map<String, Pair<Session, Date>> CLIENTS = new ConcurrentHashMap<>();//与某个客户端的连接会话,需要通过它来给客户端发送数据private Session session;//接收sidprivate String sid = "";public static final String WS_ONLINE_USER_KEY = "WS_ONLINE_USER_KEY";public static UserService userService@Autowiredpublic WebSocketServer(UserService userService ) {RemoteWebSocketServer.userService = userService ;}public RemoteWebSocketServer() {}/*** 连接建立成功调用的方法**/@OnOpenpublic void onOpen(Session session, @PathParam("sid") String sid) {LOGGER.info("连接建立成功调用的方法=====");this.session = session;///如果该号码已经加入过,更新Session对象if (webSocketMap.containsKey(sid)) {//将当前websocket加入set中webSocketMap.put(sid, this);LOGGER.info("旧窗口开始监听:" + sid + ",当前在线人数为" + getOnlineCount());} else {//将当前websocket加入set中webSocketMap.put(sid, this);///在线数加一addOnlineCount();//可在此处增加业务逻辑代码处理LOGGER.info("新窗口开始监听:" + sid + ",当前在线人数为" + getOnlineCount());}this.sid = sid;}/*** 连接关闭调用的方法*/@OnClosepublic void onClose() {//记录连接断开时间
// Date data = new Date();
// String key = WS_ONLINE_USER_KEY + this.sid;
// redisTemplate.opsForValue().set(key,data);//从set中删除try {LOGGER.info("有一连接[" + this.sid + "]关闭!开始推送数据");this.dataUpdate();} catch (Exception e) {LOGGER.error("修改专家下线状态失败!!!{}", e.getMessage());}webSocketMap.remove(this.sid);//在线数减1subOnlineCount();LOGGER.info("有一连接[" + sid + "]关闭!当前在线人数为" + getOnlineCount());}/*** @param session* @param error*/@OnErrorpublic void onError(Session session, Throwable error) {LOGGER.error("客户端-服务器通信发生错误,发生错误.Message:" + error.getLocalizedMessage());this.dataUpdate();error.printStackTrace();}@Overridepublic void receiveMessage(Object message) {if (message instanceof String) {Object o = JSONObject.parse(message.toString());UserMessageVO vo = JSONObject.parseObject(o.toString(), UserMessageVO.class);GZip解压缩String rtfMessage = vo.getRtfMessage();String unGzipMessage = GZipUtils.uncompress(rtfMessage);vo.setRtfMessage(unGzipMessage);StringBuilder builder = new StringBuilder();builder.append("SendUser:").append(vo.getUserPhone()).append("\nToUser:").append(vo.getToUserPhone()).append("\nMessageType").append(vo.getToUserPhone()).append("\nCreateTime").append(vo.getCreateTime()).append("\nRtfMessage").append(StrUtil.sub(unGzipMessage, 0, 100));LOGGER.info("接收到订阅消息:{}", builder);boolean isRetry = true;while (isRetry) {try {RemoteWebSocketServer.sendMessageToUser(vo);///发送成功,不重新发送isRetry = false;LOGGER.info("websocket用户[" + vo.getUserPhone() + "]向指定用户[" + vo.getToUserPhone() + "]发送消息完成.");} catch (IOException | EncodeException e) {LOGGER.error("websocket向指定用户[" + vo + "]发送消息异常,尝试2秒后重新发送.Message:" + e.getLocalizedMessage());///发送失败,尝试一次重发if (vo.getFailRetryCount() == 0) {isRetry = true;vo.setFailRetryCount(vo.getFailRetryCount() + 1);}try {Thread.sleep(2000);} catch (InterruptedException ex) {LOGGER.error("websocket尝试重新发送睡眠被打断.Message:" + e.getLocalizedMessage());}e.printStackTrace();}}} else {LOGGER.error("websocket发送消息异常,Message:消息内容解析错误...");}}/*** 收到客户端消息后调用的方法** @param message 客户端发送过来的消息**/@OnMessagepublic void onMessage(String message, Session session) {LOGGER.info("收到来自窗口" + sid + "的信息:" + message);if (Strings.isNotBlank(message)) {RemoteWebSocketServer socketServer = webSocketMap.get(sid);try {socketServer.sendMessage(message);} catch (IOException e) {e.printStackTrace();LOGGER.error("websocket群发消息回复异常.Message:" + e.getLocalizedMessage());}}}/*** 对指定用户发消息**/public static void sendMessageToUser(UserMessageVO message) throws IOException, EncodeException {LOGGER.info("用户[" + message.getUserPhone() + "]推送消息到用户[" + message.getToUserPhone() + "],推送内容:" + message);Set<Map.Entry<String, WebSocketServer>> entrySet = webSocketMap.entrySet();是否查找到指定用户boolean isFind = false;for (Map.Entry<String, WebSocketServer> item : entrySet) {if (item.getKey().equalsIgnoreCase(message.getToUserPhone())) {item.getValue().sendMessage(message);isFind = true;}}///end forif (!isFind) {LOGGER.info("该Pod未找到指定Websocket连接对象.");} else {LOGGER.info("该Pod对应Websocket对象发送消息.");}}/*** 对指定用户组发消息,推送给非专家用户**/public static void sendMessageToUsers(UserMessageVO message) throws IOException, EncodeException {String area = message.getToUserArea();List<String> toUsersPhone = message.getToUsersPhone();LOGGER.info("用户[" + message.getUserPhone() + "]推送消息到" +"[" + area + "]" + "地州" +"的用户组[" + toUsersPhone + "]," +"推送内容:" + message);int sucCount = 0, failCount = 0;Set<Map.Entry<String, WebSocketServer>> entrySet = webSocketMap.entrySet();LOGGER.info("当前在线用户信息:{}", entrySet);for (Map.Entry<String, WebSocketServer> item : entrySet) {for (int i = 0; i < toUsersPhone.size(); i++) {try {if (toUsersPhone.get(i).equals(item.getKey())) {sucCount++;item.getValue().sendMessage(message);}} catch (IOException | EncodeException e) {failCount++;LOGGER.error("websocket向所有普通用户发送消息异常.Message:" + e.getLocalizedMessage());continue;}}}LOGGER.info("【推送消息到所有普通用户】转发成功:" + sucCount + "项,失败:" + failCount + "项.");}/*** 对所有用户发消息**/public static void sendMessageToAll(Object object) throws IOException {LOGGER.info("【推送消息到所有用户】");Set<Map.Entry<String, WebSocketServer>> entrySet = webSocketMap.entrySet();int sucCount = 0, failCount = 0;for (Map.Entry<String, WebSocketServer> item : entrySet) {try {sucCount++;item.getValue().sendMessage(object);} catch (IOException | EncodeException e) {failCount++;LOGGER.error("websocket向所有用户发送消息异常.Message:" + e.getLocalizedMessage());continue;}}LOGGER.info("【推送消息到所有用户】转发成功:" + sucCount + "项,失败:" + failCount + "项.");}/*** 实现服务器主动推送*/public void sendMessage(String message) throws IOException {this.session.getBasicRemote().sendText(message);}public void sendMessage(Object message) throws EncodeException, IOException {this.session.getBasicRemote().sendObject(message);}public static synchronized int getOnlineCount() {return onlineCount;}public static synchronized void addOnlineCount() {RemoteWebSocketServer.onlineCount++;}public static synchronized void subOnlineCount() {RemoteWebSocketServer.onlineCount--;}public synchronized void dataUpdate() {//业务逻辑处理}
}
优点:
- 全双工通信,相比HTTP轮询减少重复建立TCP连接的网络开销,消息实时性得到保障。
缺点: - 资源消耗大 ,如果不存在新消息,大量无效请求,浪费资源,负载提高
- 实时性滞后,当存在新消息的时候,只能在再次请求时客户端才能感知,如果缩短请求间隔,又进一步提高资源消耗
适合范围: 消息推送的处理逻辑复杂度不高,请求耗时约长则压力越高,适合极快反馈的请求
优化方向:
- 消息反馈迅速,建议加一层缓存层,以提高返回效率
- 注意大量缓存穿透现象,且注意数据库连接池配置的优化
方法五:WebSocket(基于Netty实现)
相较于方案四,基于NIO的非阻塞IO模型,单机可支持百万级连接,全双工通信,相比HTTP轮询减少80%以上的网络开销,这个方案在千万级用户、百万级并发连接的场景下经过验证,可根据实际业务需求进行调整
Socket服务
/*** WebSocket服务器启动类* 使用Netty框架实现高性能WebSocket服务*/
public class WebSocketServer {/*** 启动WebSocket服务器*/public void start() {// 创建两个EventLoopGroup实例// bossGroup负责接收客户端连接EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 通常只需1个线程// workerGroup负责处理连接的数据读写EventLoopGroup workerGroup = new NioEventLoopGroup(); // 默认CPU核心数*2try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup)// 使用NIO传输通道.channel(NioServerSocketChannel.class)// 添加子处理器.childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 添加HTTP编解码器,因为WebSocket握手使用HTTP协议pipeline.addLast(new HttpServerCodec());// 聚合HTTP请求/响应pipeline.addLast(new HttpObjectAggregator(65536));// WebSocket协议处理器,指定访问路径为/ws// 处理握手、ping/pong帧等pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));// 自定义的消息处理器pipeline.addLast(new MessageHandler());}});// 绑定端口并启动服务器Channel ch = b.bind(8080).sync().channel();System.out.println("WebSocket服务器已启动,端口: 8080");// 等待服务器通道关闭ch.closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {// 优雅关闭线程组bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}public static void main(String[] args) {new WebSocketServer().start();}
}
消息处理器
/*** 自定义消息处理器* 继承SimpleChannelInboundHandler处理WebSocket文本帧*/
public class MessageHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {// 维护用户ID与Channel的映射关系// 使用线程安全的ConcurrentHashMapprivate static final ConcurrentHashMap<String, Channel> userChannels = new ConcurrentHashMap<>();// Redis客户端工具类(伪代码)private static final RedisClient RedisClient = new RedisClient();/*** 处理收到的文本消息帧*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {// 1. 获取当前Channel对应的用户IDString userId = getUserId(ctx.channel());// 2. 解析收到的消息Message message = parseMessage(msg.text());// 3. 根据消息类型处理if (message.getType() == MessageType.PRIVATE) {// 私聊消息处理handlePrivateMessage(userId, message);} else if (message.getType() == MessageType.GROUP) {// 群组消息处理handleGroupMessage(message);}}/*** 处理私聊消息*/private void handlePrivateMessage(String senderId, Message message) {// 查找目标用户的ChannelChannel targetChannel = userChannels.get(message.getTargetId());if (targetChannel != null && targetChannel.isActive()) {// 用户在线,直接发送targetChannel.writeAndFlush(new TextWebSocketFrame(buildMessageJson(senderId, message.getContent())));} else {// 用户离线,存储消息RedisClient.storeOfflineMessage(message.getTargetId(), message.getContent());}}/*** 处理群组消息*/private void handleGroupMessage(Message message) {// 从Redis获取群组成员列表(伪代码)Set<String> memberIds = RedisClient.getGroupMembers(message.getGroupId());// 遍历发送给每个在线成员memberIds.forEach(memberId -> {Channel channel = userChannels.get(memberId);if (channel != null && channel.isActive()) {channel.writeAndFlush(new TextWebSocketFrame(buildGroupMessageJson(message)));}});}/*** 客户端连接建立时调用*/@Overridepublic void channelActive(ChannelHandlerContext ctx) {// 1. 进行用户认证(伪代码)String userId = authenticate(ctx.channel());if (userId != null) {// 2. 建立用户ID与Channel的映射userChannels.put(userId, ctx.channel());// 3. 在Redis中标记用户在线状态RedisClient.setOnlineStatus(userId, true);// 4. 发送积压的离线消息(伪代码)sendOfflineMessages(userId, ctx.channel());} else {// 认证失败,关闭连接ctx.close();}}/*** 连接断开时调用*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) {String userId = getUserId(ctx.channel());if (userId != null) {// 移除映射关系userChannels.remove(userId);// 更新Redis中的在线状态RedisClient.setOnlineStatus(userId, false);}}/*** 异常处理*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.close();}// 辅助方法 --------------------------------------------------private String getUserId(Channel channel) {// 实际项目中可以从Channel的attr中获取return null; // 简化实现}private Message parseMessage(String text) {// JSON解析消息return new Message(); // 简化实现}private String authenticate(Channel channel) {// 实现认证逻辑return "user123"; // 简化实现}private String buildMessageJson(String sender, String content) {// 构建JSON消息return "{\"from\":\"" + sender + "\",\"content\":\"" + content + "\"}";}private void sendOfflineMessages(String userId, Channel channel) {// 从Redis获取并发送离线消息}
}/*** 消息实体类(简化版)*/
class Message {enum MessageType { PRIVATE, GROUP }private MessageType type;private String targetId; // 接收方ID/群组IDprivate String content;// getters & setters
}