当前位置: 首页 > news >正文

如何做网站数据分析聊城网站建设哪家专业

如何做网站数据分析,聊城网站建设哪家专业,站长基地gif网站素材,网站在百度的标头不对大纲 1.Netty实现HTTP服务器 2.Netty实现WebSocket 3.Netty实现的消息推送系统 (1)基于WebSocket的消息推送系统说明 (2)消息推送系统的PushServer (3)消息推送系统的连接管理封装 (4)消息推送系统的ping-pong探测 (5)消息推送系统的全连接推送 (6)消息推送系统的HTTP…

大纲

1.Netty实现HTTP服务器

2.Netty实现WebSocket

3.Netty实现的消息推送系统

(1)基于WebSocket的消息推送系统说明

(2)消息推送系统的PushServer

(3)消息推送系统的连接管理封装

(4)消息推送系统的ping-pong探测

(5)消息推送系统的全连接推送

(6)消息推送系统的HTTP响应和握手

(7)消息推送系统的运营客户端

(8)运营客户端连接PushServer

(9)运营客户端的Handler处理器

(10)运营客户端发送推送消息

(11)浏览器客户端接收推送消息

3.Netty实现的消息推送系统

(1)基于WebSocket的消息推送系统说明

(2)消息推送系统的PushServer

(3)消息推送系统的连接管理封装

(4)消息推送系统的ping-pong探测

(5)消息推送系统的全连接推送

(6)消息推送系统的HTTP响应和握手

(7)消息推送系统的运营客户端

(8)运营客户端连接PushServer

(9)运营客户端的Handler处理器

(10)运营客户端发送推送消息

(11)浏览器客户端接收推送消息

(1)基于WebSocket的消息推送系统说明

首先需要一个运营系统能够基于NettyClient和PushServer建立WebSocket长连接,然后浏览器客户端也要和PushServer建立好WebSocket长连接,接着运营系统会让NettyClient发送Push推送消息给PushServer,最后PushServer再把推送消息发送给浏览器客户端。

首先启动PushServer,然后打开多个网页客户端查看console,接着启动运营客系统在控制台输入消息,这样就可以完成一个完整的消息推送的交互了。

(2)消息推送系统的PushServer

public class NettyPushServer {private static final Logger logger = LogManager.getLogger(NettyPushServer.class);private static final int DEFAULT_PORT = 8998;private int port;public NettyPushServer(int port) {this.port = port;}public void start() throws Exception {logger.info("Netty Push Server is starting.");EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("logging", new LoggingHandler("DEBUG")).addLast("http-codec", new HttpServerCodec()).addLast("aggregator", new HttpObjectAggregator(65536)).addLast("http-chunked", new ChunkedWriteHandler()).addLast("netty-push-server-handler", new NettyPushServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind(port).sync();logger.info("Netty Push Server is started, listened[" + port + "].");channelFuture.channel().closeFuture().sync();} finally {bossEventLoopGroup.shutdownGracefully();workerEventLoopGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {NettyPushServer nettyHttpServer = new NettyPushServer(DEFAULT_PORT);nettyHttpServer.start();}
}public class NettyPushServerHandler extends SimpleChannelInboundHandler<Object> {private static final Logger logger = LogManager.getLogger(NettyPushServerHandler.class);@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {logger.info("Client Connection Established: " + ctx.channel());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.info("Client Disconnected: " + ctx.channel());}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FullHttpRequest) {handleHttpRequest(ctx, (FullHttpRequest) msg);} else if(msg instanceof WebSocketFrame) {handleWebSocketFrame(ctx, (WebSocketFrame) msg);}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame) {}private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {}
}

(3)消息推送系统的连接管理封装

//用来管理连接
public class ChannelManager {private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);private static ConcurrentHashMap<String, ChannelId> channelIds = new ConcurrentHashMap<String, ChannelId>();public static void add(Channel channel) {channelGroup.add(channel);channelIds.put(channel.id().asShortText(), channel.id());}public static void remove(Channel channel) {channelGroup.remove(channel);channelIds.remove(channel.id().asShortText());}public static Channel get(String id) {return channelGroup.find(channelIds.get(id));}public static void pushToAllChannels(TextWebSocketFrame webSocketFrame) {channelGroup.writeAndFlush(webSocketFrame);}
}public class NettyPushServerHandler extends SimpleChannelInboundHandler<Object> {...@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {logger.info("Client Connection Established: " + ctx.channel());ChannelManager.add(ctx.channel());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.info("Client Disconnected: " + ctx.channel());ChannelManager.remove(ctx.channel());}...
}

(4)消息推送系统的ping-pong探测

public class NettyPushServerHandler extends SimpleChannelInboundHandler<Object> {...private WebSocketServerHandshaker webSocketServerHandshaker;...protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FullHttpRequest) {handleHttpRequest(ctx, (FullHttpRequest) msg);} else if(msg instanceof WebSocketFrame) {handleWebSocketFrame(ctx, (WebSocketFrame) msg);}}private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame) {//WebSocket网页客户端发送的是ping消息,它会不停的ping服务端,看看长连接是否存活和有效if (webSocketFrame instanceof PingWebSocketFrame) {logger.info("Receive ping frame from client: " + ctx.channel());WebSocketFrame pongWebSocketFrame = new PongWebSocketFrame(webSocketFrame.content().retain());ctx.channel().write(pongWebSocketFrame);return;}//WebSocket网页客户端发送一个请求过来,请求关闭这个WebSocket连接if (webSocketFrame instanceof CloseWebSocketFrame) {logger.info("Receive close WebSocket request from client: " + ctx.channel());webSocketServerHandshaker.close(ctx.channel(), ((CloseWebSocketFrame) webSocketFrame).retain());return;}...}...
}

(5)消息推送系统的全连接推送

public class NettyPushServerHandler extends SimpleChannelInboundHandler<Object> {...protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FullHttpRequest) {handleHttpRequest(ctx, (FullHttpRequest) msg);} else if(msg instanceof WebSocketFrame) {handleWebSocketFrame(ctx, (WebSocketFrame) msg);}}private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame) {//WebSocket网页客户端发送的是ping消息,它会不停的ping服务端,看看长连接是否存活和有效if (webSocketFrame instanceof PingWebSocketFrame) {logger.info("Receive ping frame from client: " + ctx.channel());WebSocketFrame pongWebSocketFrame = new PongWebSocketFrame(webSocketFrame.content().retain());ctx.channel().write(pongWebSocketFrame);return;}//WebSocket网页客户端发送一个请求过来,请求关闭这个WebSocket连接if (webSocketFrame instanceof CloseWebSocketFrame) {logger.info("Receive close WebSocket request from client: " + ctx.channel());webSocketServerHandshaker.close(ctx.channel(), ((CloseWebSocketFrame) webSocketFrame).retain());return;}//WebSocket网页客户端发送请求,但它不是text文本请求if (!(webSocketFrame instanceof TextWebSocketFrame)) {logger.error("Netty Push Server only support text frame, does not support other type frame.");String errorMsg = String.format("%s type frame is not supported.", webSocketFrame.getClass().getName());throw new UnsupportedOperationException(errorMsg);}//WebSocket网页客户端发送一个文本请求过来,是TextFrame类型的String request = ((TextWebSocketFrame)webSocketFrame).text();logger.info("Receive text frame[" + request + "] from client: " + ctx.channel());//构建响应TextWebSocketFrame response = new TextWebSocketFrame(request);//发送给所有连接,全连接推送ChannelManager.pushToAllChannels(response);}...
}

(6)消息推送系统的HTTP响应和握手

public class NettyPushServerHandler extends SimpleChannelInboundHandler<Object> {...private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {if (!request.decoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST);sendHttpResponse(ctx, request, response);return;}logger.info("Receive handshake request from client: " + ctx.channel());//握手建立WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory("ws://localhost:8998/push", null, false);webSocketServerHandshaker = factory.newHandshaker(request);if (webSocketServerHandshaker == null) {WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());} else {webSocketServerHandshaker.handshake(ctx.channel(), request);logger.info("Netty push server handshake with client: " + ctx.channel());}}//HTTP响应private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, DefaultFullHttpResponse response) {if (response.status().code() != RESPONSE_CODE_OK) {ByteBuf byteBuf = Unpooled.copiedBuffer(response.status().toString(), CharsetUtil.UTF_8);response.content().writeBytes(byteBuf);logger.info("Http Response is not ok: " + byteBuf.toString(CharsetUtil.UTF_8));byteBuf.release();}ChannelFuture channelFuture = ctx.channel().writeAndFlush(response);if (response.status().code() != RESPONSE_CODE_OK) {channelFuture.addListener(ChannelFutureListener.CLOSE);}}...
}

(7)消息推送系统的运营客户端

public class OperationNettyClient {private static final Logger logger = LogManager.getLogger(OperationNettyClient.class);private static final String WEB_SOCKET_SCHEME = "ws";private static final String WSS_SCHEME = "wss";private static final String LOCAL_HOST = "127.0.0.1";private static final String PUSH_SERVER_URI = System.getProperty("url", "ws://127.0.0.1:8998/push");private static URI uri;private static String scheme;private static String host;private static int port;private static SslContext sslContext;private EventLoopGroup eventLoopGroup;public void start() throws Exception {//...}public static void main(String[] args) throws Exception {uri = new URI(PUSH_SERVER_URI);scheme = getScheme(uri);host = getHost(uri);port = getPort(uri, scheme);checkScheme(scheme);initSslContext(scheme);}private static String getScheme(URI pushServerUri) {return pushServerUri.getScheme() == null ? WEB_SOCKET_SCHEME : pushServerUri.getScheme();}private static String getHost(URI pushServerUri) {return pushServerUri.getHost() == null ? LOCAL_HOST : pushServerUri.getHost();}private static int getPort(URI pushServerUri, String scheme) {int port;if (pushServerUri.getPort() == -1) {if (WEB_SOCKET_SCHEME.equals(scheme)) {port = 80;} else if(WSS_SCHEME.equals(scheme)) {port = 443;} else {port = -1;}} else {port = pushServerUri.getPort();}return port;}//检查scheme是否是ws或wssprivate static void checkScheme(String scheme) {if (!WEB_SOCKET_SCHEME.equals(scheme) && !WSS_SCHEME.equals(scheme)) {logger.error("Only Support ws or wss scheme.");throw new RuntimeException("Only Support ws or wss scheme.");}}//如果WebSocket使用了SSL,也就是wss,那么初始化对应的sslContextprivate static void initSslContext(String scheme) throws Exception {boolean enableSSL = WSS_SCHEME.equals(scheme);if (enableSSL) {sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();} else {sslContext = null;}}
}

(8)运营客户端连接PushServer

public class OperationNettyClient {private static final Logger logger = LogManager.getLogger(OperationNettyClient.class);private static final String WEB_SOCKET_SCHEME = "ws";private static final String WSS_SCHEME = "wss";private static final String LOCAL_HOST = "127.0.0.1";private static final String PUSH_SERVER_URI = System.getProperty("url", "ws://127.0.0.1:8998/push");private static final String INPUT_MESSAGE_QUIT = "quit";private static final String INPUT_MESSAGE_CLOSE = "close";private static final String INPUT_MESSAGE_PING = "ping";private static URI uri;private static String scheme;private static String host;private static int port;private static SslContext sslContext;private EventLoopGroup eventLoopGroup;public Channel start() throws Exception {logger.info("Operation Netty Client is connecting.");eventLoopGroup = new NioEventLoopGroup();WebSocketClientHandshaker webSocketClientHandshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());final OperationNettyClientHandler operationNettyClientHandler = new OperationNettyClientHandler(webSocketClientHandshaker);Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline channelPipeline = ch.pipeline();if (sslContext != null) {channelPipeline.addLast(sslContext.newHandler(ch.alloc(), host, port));}channelPipeline.addLast(new HttpClientCodec()).addLast(new HttpObjectAggregator(65536)).addLast(WebSocketClientCompressionHandler.INSTANCE).addLast(operationNettyClientHandler);}});Channel channel = bootstrap.connect(uri.getHost(), port).sync().channel();logger.info("Operation Netty Client connected to push server.");operationNettyClientHandler.channelFuture().sync();return channel;}public void shutdownGracefully() {eventLoopGroup.shutdownGracefully();}public static void main(String[] args) throws Exception {uri = new URI(PUSH_SERVER_URI);scheme = getScheme(uri);host = getHost(uri);port = getPort(uri, scheme);checkScheme(scheme);initSslContext(scheme);OperationNettyClient operationNettyClient = new OperationNettyClient();try {Channel channel = operationNettyClient.start();} finally {operationNettyClient.shutdownGracefully();}}...
}

(9)运营客户端的Handler处理器

public class OperationNettyClientHandler extends SimpleChannelInboundHandler<Object> {private static final Logger logger = LogManager.getLogger(OperationNettyClientHandler.class);private WebSocketClientHandshaker webSocketClientHandshaker;private ChannelFuture channelFuture;public OperationNettyClientHandler(WebSocketClientHandshaker webSocketClientHandshaker) {this.webSocketClientHandshaker = webSocketClientHandshaker;}public ChannelFuture channelFuture() {return channelFuture;}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {channelFuture = ctx.newPromise();}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {webSocketClientHandshaker.handshake(ctx.channel());logger.info("Operation Netty Client send WebSocket handshake request.");}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.info("netty client disconnected.");}protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {Channel channel = ctx.channel();if (!webSocketClientHandshaker.isHandshakeComplete()) {try {webSocketClientHandshaker.finishHandshake(channel, (FullHttpResponse) msg);logger.info("Netty Client connected.");((ChannelPromise)channelFuture).setSuccess();} catch(WebSocketHandshakeException e) {logger.error("WebSocket handshake failed.", e);((ChannelPromise)channelFuture).setFailure(e);}return;}if (msg instanceof FullHttpResponse) {  FullHttpResponse response = (FullHttpResponse) msg;throw new IllegalStateException("Not Supported HTTP Response.");}WebSocketFrame webSocketFrame = (WebSocketFrame) msg;if (webSocketFrame instanceof TextWebSocketFrame) {TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) webSocketFrame;logger.info("Receives text frame: " + textWebSocketFrame.text());} else if(webSocketFrame instanceof PongWebSocketFrame) {logger.info("Receives pong frame: " + webSocketFrame);} else if(webSocketFrame instanceof CloseWebSocketFrame) {logger.info("Receives close WebSocket frame, Netty Client is closing.");channel.close();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {logger.error("Operation Netty client handler exception caught.", cause);if (!channelFuture.isDone()) {((ChannelPromise)channelFuture).setFailure(cause);}ctx.close();}
}

(10)运营客户端发送推送消息

public class OperationNettyClient {...public void waitInputMessage(Channel channel) throws Exception {BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));while(true) {logger.info("Wait for input message.");String message = bufferedReader.readLine();if (INPUT_MESSAGE_QUIT.equals(message)) {break;} else if(INPUT_MESSAGE_CLOSE.equals(message)) {channel.writeAndFlush(new CloseWebSocketFrame());channel.closeFuture().sync();break;} else if(INPUT_MESSAGE_PING.equals(message)) {WebSocketFrame webSocketFrame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] {8, 1, 8, 1}));channel.writeAndFlush(webSocketFrame);} else {WebSocketFrame webSocketFrame = new TextWebSocketFrame(message);channel.writeAndFlush(webSocketFrame);}}}public static void main(String[] args) throws Exception {uri = new URI(PUSH_SERVER_URI);scheme = getScheme(uri);host = getHost(uri);port = getPort(uri, scheme);checkScheme(scheme);initSslContext(scheme);OperationNettyClient operationNettyClient = new OperationNettyClient();try {Channel channel = operationNettyClient.start();//运营客户端发送消息入口operationNettyClient.waitInputMessage(channel);} finally {operationNettyClient.shutdownGracefully();}}...
}

(11)浏览器客户端接收推送消息

<!DOCTYPE html>
<html lang="en"><head><meta http-equiv="content-type" content="text/html; charset=utf-8" /><title>websocket网页</title></head><body onload="connectServer();"><script type="text/javascript">var websocket;function connectServer() {if ("WebSocket" in window) {console.log("your browser supports websocket!");websocket = new WebSocket("ws://localhost:8998/push");websocket.onopen = function() {console.log("established connection with push server.");}websocket.onmessage = function(ev) {var response = ev.data;console.log("receives push message from netty server: " + response);}}}</script></body>
</html>

http://www.dtcms.com/a/563711.html

相关文章:

  • 自建淘宝客网站模板wordpress加载谷歌地图
  • 做生产计划类的网站做网站的图片用什么格式
  • 商贸公司寮步网站建设怎么把网站做成自适应
  • 专业柳州网站建设公司广州建设大厦地址
  • 白人与黑人做爰网站成都住建局官网保租房
  • 优化一个网站可以做多少关键词前端开发人员怎么做网站
  • 中国建设工程人才库官方网站友情链接名词解释
  • 宣传册设计及网站建设seo怎么优化一个网站
  • 如何做适合网站大小的图片seo站外推广
  • 网站开发的业内人士wordpress的数据库
  • 平台网站开发是什么意思深圳网站建设价格是多少钱
  • 怎么在网站文本框内做超连接本溪市住房和城乡建设局网站
  • 吴江盛泽建设局网站24小时妇科免费问医生
  • seo百度站长工具wordpress付款插件
  • 网站建设的目标及服务对象成都最新通告今天
  • wordpress网站管理系统郑州一建官网
  • 网站建设人工费一年多少钱wordpress lens 模板币
  • jquery 炫酷网站游戏试玩网站怎么做
  • 做渔船的网站网站开发合同下载
  • 全新网站如何做百度竞价女性门户网站织梦模板
  • 北京开发办网站上海网站开发外包公司
  • 西安建设教育网站企业网站建设指导规范
  • 宁波网站建站推广做个简单网站大概多少钱
  • 兼职网站做任务中国有什么网站做跨境零售
  • 青山湖南昌网站建设工具磨床东莞网站建设
  • 网站设计制作的服务和质量全屏网站 内页怎么做
  • 网站内容作弊的形式产品怎么做市场推广
  • 想注册一个设计网站吗毕节地seo
  • 深圳自适应网站公司五年级上册优化设计答案
  • 网站开发哪个城市发展好海尔网站建设策划书