当前位置: 首页 > wzjs >正文

php快速建站系统矿产网站建设价格

php快速建站系统,矿产网站建设价格,做社交网站要注册哪类商标,网站建设课程 谷建1.Netty实现HTTP服务器 (1)HTTP请求消息和响应消息 HTTP请求消息由三部分组成:请求行、请求头、请求体。HTTP响应消息也由三部分组成:响应行、响应头、响应体。 (2)Netty实现的HTTP协议栈的优势 Netty实现的HTTP协议栈无论在性能还是在可靠性上&#xff…

1.Netty实现HTTP服务器

(1)HTTP请求消息和响应消息

HTTP请求消息由三部分组成:请求行、请求头、请求体。HTTP响应消息也由三部分组成:响应行、响应头、响应体。

(2)Netty实现的HTTP协议栈的优势

Netty实现的HTTP协议栈无论在性能还是在可靠性上,都表现优异,非常适合在非Web容器下的场景使用。相比于传统的Tomcat、Jetty等Web容器,它更加轻量和小巧,灵活性和定制性也更好。

(3)Netty实现的HTTP服务器

public class NettyHttpServer {private static final Logger logger = LogManager.getLogger(NettyHttpServer.class);private static final int DEFAULT_PORT = 8998;private int port;public NettyHttpServer(int port) {this.port = port;}public void start() throws Exception {logger.info("Netty Http Server is starting.");EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline()//数据进来是自上而下,数据回去时自下而上.addLast("http-decoder", new HttpRequestDecoder()).addLast("http-aggregator", new HttpObjectAggregator(65536)).addLast("http-encoder", new HttpResponseEncoder()).addLast("http-chunked", new ChunkedWriteHandler()).addLast("netty-http-server-handler", new NettyHttpServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind(port).sync();logger.info("Netty Http Server is started, listened[" + port + "].");channelFuture.channel().closeFuture().sync();} finally {bossEventLoopGroup.shutdownGracefully();workerEventLoopGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {NettyHttpServer nettyHttpServer = new NettyHttpServer(DEFAULT_PORT);nettyHttpServer.start();}
}public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {private static final Logger logger = LogManager.getLogger(NettyHttpServerHandler.class);protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {logger.info(request);}
}

启动NettyHttpServer,然后在浏览器进行访问,就可以看到输出如下:

Netty Http Server is starting.
Netty Http Server is started, listened[8998].
HttpObjectAggregator$AggregatedFullHttpRequest(decodeResult: success, version: HTTP/1.1, content: CompositeByteBuf(ridx: 0, widx: 0, cap: 0, components=0))
GET / HTTP/1.1
Host: localhost:8998
Connection: keep-alive
sec-ch-ua: ".Not/A)Brand";v="99", "Google Chrome";v="103", "Chromium";v="103"
sec-ch-ua-mobile: ?0
sec-ch-ua-platform: "macOS"
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9
Sec-Fetch-Site: none
Sec-Fetch-Mode: navigate
Sec-Fetch-User: ?1
Sec-Fetch-Dest: document
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8,id;q=0.7,ar;q=0.6
Cookie: _ga=GA1.1.629604539.1641093986
content-length: 0

(4)请求的解析处理和响应的编码处理

HTTP服务器最关键的就是请求的解析处理和响应的编码处理,比如会向ChannelPipeline中先后添加不同的解码器和编码器。

首先添加HTTP请求消息解码器HttpRequestDecoder,因为浏览器会把按照HTTP协议组织起来的请求数据序列化成字节数组发送给服务器,而HttpRequestDecoder可以按照HTTP协议从接收到的字节数组中读取出一个完整的请求数据。

然后添加HttpObjectAggregator解码器,它的作用是将多个消息转换为单一的FullHttpRequest或者FullHttpResponse。

接着添加HTTP响应消息编码器HttpResponseEncoder,它的作用是对HTTP响应消息进行编码。

以及添加ChunkedWriteHandler处理器,用来支持异步发送大的码流时也不会占用过多的内存,防止内存溢出。

最后添加NettyHttpServerHandler处理器,用于处理HTTP服务器的响应输出。

public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {private static final Logger logger = LogManager.getLogger(NettyHttpServerHandler.class);protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {String method = request.getMethod().name();String uri = request.getUri();logger.info("Receives Http Request: " + method + " " + uri + ".");String html = "<html><body>Hello, I am Netty Http Server.</body></html>";ByteBuf byteBuf = Unpooled.copiedBuffer(html, CharsetUtil.UTF_8);FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);response.headers().set("content-type", "text/html;charset=UTF-8");response.content().writeBytes(byteBuf);byteBuf.release();ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);}
}

2.Netty实现WebSocket

(1)HTTP协议的弊端

一.HTTP协议为半双工通信

半双工通信指数据可以在客户端和服务端两个方向传输,但是不能同时传输。它意味着同一时刻,只能有一个方向上的数据在进行传输。

二.HTTP消息冗长而繁琐

HTTP消息包含消息头、消息体、换行符等,通常情况下采用文本方式传输。相比于其他的二进制通信协议,冗长而繁琐。

(2)消息推送之Ajax短轮询

Ajax短轮询是基于HTTP短连接的。具体就是用一个定时器由浏览器对服务器发出HTTP请求,由服务器返回数据给客户端浏览器。

Ajax短轮询的代码实现简单。但由于HTTP请求的Header非常冗长,里面可用数据的比例非常低,所以比较占用带宽和服务器资源,且数据同步不及时。

(3)消息推送之WebSocket

WebSocket本质是一个TCP连接,采用的是全双工通信。

WebSocket中,浏览器和服务器只需要做一个握手动作,两者之间就会形成一条快速通道进行直接数据传输。由于WebSocket基于TCP双向全双工进行消息传递,所以在同一时刻既可以发送消息,也可以接收消息,比HTTP半双工性能好。

WebSocket通过ping/pong帧来保持链路激活,实时性很高,但是浏览器支持度低、代码实现复杂。

(4)WebSocket连接的建立

建立WebSocket连接时,需要通过客户端或者浏览器发出握手请求,这个握手请求是一个HTTP请求。该HTTP请求包含了附加头信息"Upgrade:WebSocket",表明这是一个申请协议升级的HTTP请求。

服务端解析这些附加的头信息,然后生成应答信息返回给客户端。这样客户端和服务端的WebSocket连接就建立起来了,双方可以通过这个连接通道自由地传递信息,并且这个连接会持续存在直到一方主动关闭连接。

(5)基于WebSocket协议开发NettyServer

可以基于TCP协议用Netty来开发客户端和服务端进行相互通信,但粘包半包问题需要手动进行处理。

可以基于HTTP协议用Netty来开发一个HTTP服务器,服务器接收浏览器发送过来的HTTP请求后,返回HTTP响应回浏览器。

也可以基于WebSocket协议来开发一个Netty服务器,此时前端HTML代码会基于socket协议和Netty服务器建立长连接。这样Netty服务器就可以和浏览器里运行的HTML通过WebSocket协议建立长连接,从而使得Netty服务器可以主动推送数据给浏览器里的HTML页面。

WebSocket协议底层也是基于TCP协议来实现的,只不过是在TCP协议的基础上封装了一层更高层次的WebSocket协议。

public class NettyWebSocketServer {private static final Logger logger = LogManager.getLogger(NettyWebSocketServer.class);private static final int DEFAULT_PORT = 8998;private int port;public NettyWebSocketServer(int port) {this.port = port;}public void start() throws Exception {logger.info("Netty WebSocket Server is starting.");EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new HttpServerCodec()).addLast(new ChunkedWriteHandler()).addLast(new HttpObjectAggregator(1024 * 32)).addLast(new WebSocketServerProtocolHandler("/websocket")).addLast("netty-web-socket-server-handler", new NettyWebSocketServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind(port).sync();logger.info("Netty WebSocket Server server is started, listened[" + port + "].");channelFuture.channel().closeFuture().sync();} finally {bossEventLoopGroup.shutdownGracefully();workerEventLoopGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {NettyWebSocketServer nettyHttpServer = new NettyWebSocketServer(DEFAULT_PORT);nettyHttpServer.start();}
}

(6)WebSocketServer的请求数据处理逻辑开发

public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {private static final Logger logger = LogManager.getLogger(NettyWebSocketServerHandler.class);private static ChannelGroup webSocketClients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {//WebSocket网页代码里发送过来的数据String request = msg.text();logger.info("Netty Server receives request: " + request + ".");TextWebSocketFrame response = new TextWebSocketFrame("Hello, I am Netty Server.");ctx.writeAndFlush(response);}//如果一个网页WebSocket客户端跟Netty Server建立了连接之后,会触发这个方法@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {webSocketClients.add(ctx.channel());}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {logger.info("websocket client is closed, channel id: " + ctx.channel().id().asLongText() + "[" + ctx.channel().id().asShortText() + "]");}
}

(7)WebSocketServer的HTTP与chunk处理分析

浏览器里面运行的是HTML网页代码,WebSocket就是在HTML网页代码里嵌入WebSocket代码,可以让浏览器里的HTML网页代码跟NettyServer建立连接,并且是基于长连接发送数据。

浏览器会按照WebSocket协议(底层基于HTTP协议)组织请求数据格式,然后把数据序列化成字节数组,接着通过网络连接把字节数组传输发送给NettyServer,NettyServer会通过数据处理链条来进行处理接收到的字节数组数据。

protected void initChannel(SocketChannel ch) throws Exception {ch.pipeline()//浏览器的字节数组数据进来以后,字节数组数据先用http协议来处理,把字节数组转换为一个HttpRequest请求对象//最后数据返回给浏览器前,又会对HttpResponse对象进行编码成字节数组.addLast(new HttpServerCodec())//chunked write用于大量数据流时的分chunk块,也就是数据实在是太大了就必须得分chunk//大量数据流进来时,可以分chunk块来读;大量数据流出去时,可以分chunk块来写;.addLast(new ChunkedWriteHandler())//如果想要让很多http不要拆分成很多段过来,可以把完整的请求数据聚合到一起再给过来.addLast(new HttpObjectAggregator(1024 * 32))//基于前面已经转换好的请求数据对象,会在这里基于WebSocket协议再次做一个处理//由于传输时是基于http协议传输过来的,而封装的内容是按webSocket协议来封装的http请求数据//所以必须在这里提取http请求里面的数据,然后按照WebSocket协议来进行解析处理,把数据提取出来作为WebSocket的数据片段.addLast(new WebSocketServerProtocolHandler("/websocket"))//响应数据输出时,顺序是反的,第一步原始数据必须先经过WebSocket协议转换//WebSocket协议数据,必须经过HTTP协议处理,但最终会encode编码成一个HTTP协议的响应数据//然后服务端将HTTP响应数据序列化的字节数组,传输给浏览器//浏览器拿到字节数组后进行反序列化,拿到一个HTTP协议响应数据,提取出内容再按照WebSocket协议来处理//最终把普通的数据给WebSocket代码.addLast("netty-web-socket-server-handler", new NettyWebSocketServerHandler());
}

(8)WebSocket网页客户端代码开发与实现

<!DOCTYPE html>
<html lang="en"><head><meta http-equiv="content-type" content="text/html; charset=utf-8" /><title>websocket网页</title></head><body onload="connectServer();"><script type="text/javascript">var websocket;function connectServer() {if ("WebSocket" in window) {console.log("Your browser supports websocket!");websocket = new WebSocket("ws://localhost:8998/websocket");websocket.onopen = function() {console.log("Send request to netty server.");websocket.send("I am websocket client.");}websocket.onmessage = function(ev) {var response = ev.data;console.log("Receive response from netty server: " + response);}}}</script></body>
</html>

启动Netty Server,然后打开该HTML即可看到console的输出。

WebSocket和Netty Server配合起来开发说明:

如果Server端要主动推送一些通知(push)给网页端正在浏览网页的用户,如推送用户可能感兴趣的商品、关注的新闻。那么在用户进入网页后可以询问用户,是否愿意收到服务端发送的xx提示和通知。如果用户愿意,那么网页里的WebSocket完全可以跟NettyServer构建一个长连接。这样NettyServer在必要时,就可以反向推送通知(push)给用户,浏览器里的网页可能会弹出一个push通知用户xx讯息。

3.Netty实现的消息推送系统

(1)基于WebSocket的消息推送系统说明

首先需要一个运营系统能够基于NettyClient和PushServer建立WebSocket长连接,然后浏览器客户端也要和PushServer建立好WebSocket长连接,接着运营系统会让NettyClient发送Push推送消息给PushServer,最后PushServer再把推送消息发送给浏览器客户端。

首先启动PushServer,然后打开多个网页客户端查看console,接着启动运营客系统在控制台输入消息,这样就可以完成一个完整的消息推送的交互了。

(2)消息推送系统的PushServer

public class NettyPushServer {private static final Logger logger = LogManager.getLogger(NettyPushServer.class);private static final int DEFAULT_PORT = 8998;private int port;public NettyPushServer(int port) {this.port = port;}public void start() throws Exception {logger.info("Netty Push Server is starting.");EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("logging", new LoggingHandler("DEBUG")).addLast("http-codec", new HttpServerCodec()).addLast("aggregator", new HttpObjectAggregator(65536)).addLast("http-chunked", new ChunkedWriteHandler()).addLast("netty-push-server-handler", new NettyPushServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind(port).sync();logger.info("Netty Push Server is started, listened[" + port + "].");channelFuture.channel().closeFuture().sync();} finally {bossEventLoopGroup.shutdownGracefully();workerEventLoopGroup.shutdownGracefully();}}public static void main(String[] args) throws Exception {NettyPushServer nettyHttpServer = new NettyPushServer(DEFAULT_PORT);nettyHttpServer.start();}
}public class NettyPushServerHandler extends SimpleChannelInboundHandler<Object> {private static final Logger logger = LogManager.getLogger(NettyPushServerHandler.class);@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {logger.info("Client Connection Established: " + ctx.channel());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.info("Client Disconnected: " + ctx.channel());}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FullHttpRequest) {handleHttpRequest(ctx, (FullHttpRequest) msg);} else if(msg instanceof WebSocketFrame) {handleWebSocketFrame(ctx, (WebSocketFrame) msg);}}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame) {}private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {}
}

(3)消息推送系统的连接管理封装

//用来管理连接
public class ChannelManager {private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);private static ConcurrentHashMap<String, ChannelId> channelIds = new ConcurrentHashMap<String, ChannelId>();public static void add(Channel channel) {channelGroup.add(channel);channelIds.put(channel.id().asShortText(), channel.id());}public static void remove(Channel channel) {channelGroup.remove(channel);channelIds.remove(channel.id().asShortText());}public static Channel get(String id) {return channelGroup.find(channelIds.get(id));}public static void pushToAllChannels(TextWebSocketFrame webSocketFrame) {channelGroup.writeAndFlush(webSocketFrame);}
}public class NettyPushServerHandler extends SimpleChannelInboundHandler<Object> {...@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {logger.info("Client Connection Established: " + ctx.channel());ChannelManager.add(ctx.channel());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.info("Client Disconnected: " + ctx.channel());ChannelManager.remove(ctx.channel());}...
}

(4)消息推送系统的ping-pong探测

public class NettyPushServerHandler extends SimpleChannelInboundHandler<Object> {...private WebSocketServerHandshaker webSocketServerHandshaker;...protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FullHttpRequest) {handleHttpRequest(ctx, (FullHttpRequest) msg);} else if(msg instanceof WebSocketFrame) {handleWebSocketFrame(ctx, (WebSocketFrame) msg);}}private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame) {//WebSocket网页客户端发送的是ping消息,它会不停的ping服务端,看看长连接是否存活和有效if (webSocketFrame instanceof PingWebSocketFrame) {logger.info("Receive ping frame from client: " + ctx.channel());WebSocketFrame pongWebSocketFrame = new PongWebSocketFrame(webSocketFrame.content().retain());ctx.channel().write(pongWebSocketFrame);return;}//WebSocket网页客户端发送一个请求过来,请求关闭这个WebSocket连接if (webSocketFrame instanceof CloseWebSocketFrame) {logger.info("Receive close WebSocket request from client: " + ctx.channel());webSocketServerHandshaker.close(ctx.channel(), ((CloseWebSocketFrame) webSocketFrame).retain());return;}...}...
}

(5)消息推送系统的全连接推送

public class NettyPushServerHandler extends SimpleChannelInboundHandler<Object> {...protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FullHttpRequest) {handleHttpRequest(ctx, (FullHttpRequest) msg);} else if(msg instanceof WebSocketFrame) {handleWebSocketFrame(ctx, (WebSocketFrame) msg);}}private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame) {//WebSocket网页客户端发送的是ping消息,它会不停的ping服务端,看看长连接是否存活和有效if (webSocketFrame instanceof PingWebSocketFrame) {logger.info("Receive ping frame from client: " + ctx.channel());WebSocketFrame pongWebSocketFrame = new PongWebSocketFrame(webSocketFrame.content().retain());ctx.channel().write(pongWebSocketFrame);return;}//WebSocket网页客户端发送一个请求过来,请求关闭这个WebSocket连接if (webSocketFrame instanceof CloseWebSocketFrame) {logger.info("Receive close WebSocket request from client: " + ctx.channel());webSocketServerHandshaker.close(ctx.channel(), ((CloseWebSocketFrame) webSocketFrame).retain());return;}//WebSocket网页客户端发送请求,但它不是text文本请求if (!(webSocketFrame instanceof TextWebSocketFrame)) {logger.error("Netty Push Server only support text frame, does not support other type frame.");String errorMsg = String.format("%s type frame is not supported.", webSocketFrame.getClass().getName());throw new UnsupportedOperationException(errorMsg);}//WebSocket网页客户端发送一个文本请求过来,是TextFrame类型的String request = ((TextWebSocketFrame)webSocketFrame).text();logger.info("Receive text frame[" + request + "] from client: " + ctx.channel());//构建响应TextWebSocketFrame response = new TextWebSocketFrame(request);//发送给所有连接,全连接推送ChannelManager.pushToAllChannels(response);}...
}

(6)消息推送系统的HTTP响应和握手

public class NettyPushServerHandler extends SimpleChannelInboundHandler<Object> {...private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {if (!request.decoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST);sendHttpResponse(ctx, request, response);return;}logger.info("Receive handshake request from client: " + ctx.channel());//握手建立WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory("ws://localhost:8998/push", null, false);webSocketServerHandshaker = factory.newHandshaker(request);if (webSocketServerHandshaker == null) {WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());} else {webSocketServerHandshaker.handshake(ctx.channel(), request);logger.info("Netty push server handshake with client: " + ctx.channel());}}//HTTP响应private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, DefaultFullHttpResponse response) {if (response.status().code() != RESPONSE_CODE_OK) {ByteBuf byteBuf = Unpooled.copiedBuffer(response.status().toString(), CharsetUtil.UTF_8);response.content().writeBytes(byteBuf);logger.info("Http Response is not ok: " + byteBuf.toString(CharsetUtil.UTF_8));byteBuf.release();}ChannelFuture channelFuture = ctx.channel().writeAndFlush(response);if (response.status().code() != RESPONSE_CODE_OK) {channelFuture.addListener(ChannelFutureListener.CLOSE);}}...
}

(7)消息推送系统的运营客户端

public class OperationNettyClient {private static final Logger logger = LogManager.getLogger(OperationNettyClient.class);private static final String WEB_SOCKET_SCHEME = "ws";private static final String WSS_SCHEME = "wss";private static final String LOCAL_HOST = "127.0.0.1";private static final String PUSH_SERVER_URI = System.getProperty("url", "ws://127.0.0.1:8998/push");private static URI uri;private static String scheme;private static String host;private static int port;private static SslContext sslContext;private EventLoopGroup eventLoopGroup;public void start() throws Exception {//...}public static void main(String[] args) throws Exception {uri = new URI(PUSH_SERVER_URI);scheme = getScheme(uri);host = getHost(uri);port = getPort(uri, scheme);checkScheme(scheme);initSslContext(scheme);}private static String getScheme(URI pushServerUri) {return pushServerUri.getScheme() == null ? WEB_SOCKET_SCHEME : pushServerUri.getScheme();}private static String getHost(URI pushServerUri) {return pushServerUri.getHost() == null ? LOCAL_HOST : pushServerUri.getHost();}private static int getPort(URI pushServerUri, String scheme) {int port;if (pushServerUri.getPort() == -1) {if (WEB_SOCKET_SCHEME.equals(scheme)) {port = 80;} else if(WSS_SCHEME.equals(scheme)) {port = 443;} else {port = -1;}} else {port = pushServerUri.getPort();}return port;}//检查scheme是否是ws或wssprivate static void checkScheme(String scheme) {if (!WEB_SOCKET_SCHEME.equals(scheme) && !WSS_SCHEME.equals(scheme)) {logger.error("Only Support ws or wss scheme.");throw new RuntimeException("Only Support ws or wss scheme.");}}//如果WebSocket使用了SSL,也就是wss,那么初始化对应的sslContextprivate static void initSslContext(String scheme) throws Exception {boolean enableSSL = WSS_SCHEME.equals(scheme);if (enableSSL) {sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();} else {sslContext = null;}}
}

(8)运营客户端连接PushServer

public class OperationNettyClient {private static final Logger logger = LogManager.getLogger(OperationNettyClient.class);private static final String WEB_SOCKET_SCHEME = "ws";private static final String WSS_SCHEME = "wss";private static final String LOCAL_HOST = "127.0.0.1";private static final String PUSH_SERVER_URI = System.getProperty("url", "ws://127.0.0.1:8998/push");private static final String INPUT_MESSAGE_QUIT = "quit";private static final String INPUT_MESSAGE_CLOSE = "close";private static final String INPUT_MESSAGE_PING = "ping";private static URI uri;private static String scheme;private static String host;private static int port;private static SslContext sslContext;private EventLoopGroup eventLoopGroup;public Channel start() throws Exception {logger.info("Operation Netty Client is connecting.");eventLoopGroup = new NioEventLoopGroup();WebSocketClientHandshaker webSocketClientHandshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());final OperationNettyClientHandler operationNettyClientHandler = new OperationNettyClientHandler(webSocketClientHandshaker);Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {protected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline channelPipeline = ch.pipeline();if (sslContext != null) {channelPipeline.addLast(sslContext.newHandler(ch.alloc(), host, port));}channelPipeline.addLast(new HttpClientCodec()).addLast(new HttpObjectAggregator(65536)).addLast(WebSocketClientCompressionHandler.INSTANCE).addLast(operationNettyClientHandler);}});Channel channel = bootstrap.connect(uri.getHost(), port).sync().channel();logger.info("Operation Netty Client connected to push server.");operationNettyClientHandler.channelFuture().sync();return channel;}public void shutdownGracefully() {eventLoopGroup.shutdownGracefully();}public static void main(String[] args) throws Exception {uri = new URI(PUSH_SERVER_URI);scheme = getScheme(uri);host = getHost(uri);port = getPort(uri, scheme);checkScheme(scheme);initSslContext(scheme);OperationNettyClient operationNettyClient = new OperationNettyClient();try {Channel channel = operationNettyClient.start();} finally {operationNettyClient.shutdownGracefully();}}...
}

(9)运营客户端的Handler处理器

public class OperationNettyClientHandler extends SimpleChannelInboundHandler<Object> {private static final Logger logger = LogManager.getLogger(OperationNettyClientHandler.class);private WebSocketClientHandshaker webSocketClientHandshaker;private ChannelFuture channelFuture;public OperationNettyClientHandler(WebSocketClientHandshaker webSocketClientHandshaker) {this.webSocketClientHandshaker = webSocketClientHandshaker;}public ChannelFuture channelFuture() {return channelFuture;}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {channelFuture = ctx.newPromise();}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {webSocketClientHandshaker.handshake(ctx.channel());logger.info("Operation Netty Client send WebSocket handshake request.");}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.info("netty client disconnected.");}protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {Channel channel = ctx.channel();if (!webSocketClientHandshaker.isHandshakeComplete()) {try {webSocketClientHandshaker.finishHandshake(channel, (FullHttpResponse) msg);logger.info("Netty Client connected.");((ChannelPromise)channelFuture).setSuccess();} catch(WebSocketHandshakeException e) {logger.error("WebSocket handshake failed.", e);((ChannelPromise)channelFuture).setFailure(e);}return;}if (msg instanceof FullHttpResponse) {  FullHttpResponse response = (FullHttpResponse) msg;throw new IllegalStateException("Not Supported HTTP Response.");}WebSocketFrame webSocketFrame = (WebSocketFrame) msg;if (webSocketFrame instanceof TextWebSocketFrame) {TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) webSocketFrame;logger.info("Receives text frame: " + textWebSocketFrame.text());} else if(webSocketFrame instanceof PongWebSocketFrame) {logger.info("Receives pong frame: " + webSocketFrame);} else if(webSocketFrame instanceof CloseWebSocketFrame) {logger.info("Receives close WebSocket frame, Netty Client is closing.");channel.close();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {logger.error("Operation Netty client handler exception caught.", cause);if (!channelFuture.isDone()) {((ChannelPromise)channelFuture).setFailure(cause);}ctx.close();}
}

(10)运营客户端发送推送消息

public class OperationNettyClient {...public void waitInputMessage(Channel channel) throws Exception {BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));while(true) {logger.info("Wait for input message.");String message = bufferedReader.readLine();if (INPUT_MESSAGE_QUIT.equals(message)) {break;} else if(INPUT_MESSAGE_CLOSE.equals(message)) {channel.writeAndFlush(new CloseWebSocketFrame());channel.closeFuture().sync();break;} else if(INPUT_MESSAGE_PING.equals(message)) {WebSocketFrame webSocketFrame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[] {8, 1, 8, 1}));channel.writeAndFlush(webSocketFrame);} else {WebSocketFrame webSocketFrame = new TextWebSocketFrame(message);channel.writeAndFlush(webSocketFrame);}}}public static void main(String[] args) throws Exception {uri = new URI(PUSH_SERVER_URI);scheme = getScheme(uri);host = getHost(uri);port = getPort(uri, scheme);checkScheme(scheme);initSslContext(scheme);OperationNettyClient operationNettyClient = new OperationNettyClient();try {Channel channel = operationNettyClient.start();//运营客户端发送消息入口operationNettyClient.waitInputMessage(channel);} finally {operationNettyClient.shutdownGracefully();}}...
}

(11)浏览器客户端接收推送消息

<!DOCTYPE html>
<html lang="en"><head><meta http-equiv="content-type" content="text/html; charset=utf-8" /><title>websocket网页</title></head><body onload="connectServer();"><script type="text/javascript">var websocket;function connectServer() {if ("WebSocket" in window) {console.log("your browser supports websocket!");websocket = new WebSocket("ws://localhost:8998/push");websocket.onopen = function() {console.log("established connection with push server.");}websocket.onmessage = function(ev) {var response = ev.data;console.log("receives push message from netty server: " + response);}}}</script></body>
</html>

文章转载自:东阳马生架构

原文链接:Netty基础—7.Netty实现消息推送服务 - 东阳马生架构 - 博客园

体验地址:引迈 - JNPF快速开发平台_低代码开发平台_零代码开发平台_流程设计器_表单引擎_工作流引擎_软件架构

http://www.dtcms.com/wzjs/156002.html

相关文章:

  • 太原免费网站建设企业微信scrm
  • 愿景 做中国最受欢迎的互联网网站百度账号管理
  • 宿迁网站建设公司友链交换不限内容
  • 互联网公司排名2005关键词优化软件哪家好
  • 2014网站seowin10必做的优化
  • 推广平台有哪些b2b网站南宁网络推广品牌
  • 服务类网站建设策划书百度seo优化收费标准
  • sem推广培训常州seo
  • 个人的网站软件定制开发平台
  • 石家庄网站搭建软文怎么写
  • 国外试用网站空间下载优化大师并安装
  • 国外电商平台排名谷歌seo是什么
  • 淄博网泰专业做网站上海关键词优化外包
  • 网站首页结构百度搜索优化关键词排名
  • mufen wordpress147seo工具
  • 官网网站怎么创建初学者做电商怎么入手
  • 网站做造价亚马逊开店流程及费用
  • 物流公司网站建设有什么要点广州seo关键词优化外包
  • 京东网站设计分析快手推广网站
  • 网站导航优化的描述阳江seo
  • 如何做网站呢电子商务网站建设多少钱
  • 少儿编程收费价目表广州seo招聘
  • 网站客服软件定制东莞疫情最新消息今天又封了
  • 个人怎么见个网站域名注册需要哪些条件
  • 谷歌 网站开发百度推广登录平台怎么收费
  • 武汉承接网站开发的公司dz论坛seo
  • 做视频网站 带宽网站设计公司有哪些
  • 企业在网站推广网站优化包括哪些内容
  • 建个人免费网站用哪个营销推广模式有哪些
  • 大型网站建设怎么样建网站