Netty的简单使用
一、基本概念
1.1、IO模型
1.同步阻塞
户空间切换到内核空间( 此时调用进程挂起,阻塞等待,称之为阻塞 )。
2.同步非阻塞
用户空间切换到内核空间( 此时调用进程立即返回,称之为非阻塞,然后不断轮询 )。
3.多路复用
一个线程可以监视多个文件描述符。
4.信号驱动
在使用信号驱动 I/O 时,当数据准备就绪后,内核通过发送一个 SIGIO 信号通知应用进程,应用进程就可以开始读取数据了。
5.异步IO
从内核缓冲区拷贝数据到用户态缓冲区的过程由OS异步完成,应用进程只需要在指定的数组中引用数据即可。
1.2、零拷贝
主要技术是 DMA数据传输、内存区域映射
1.减少数据在内核缓冲区和用户进程缓冲区之间反复的 I/O 拷贝操作;
2.减少用户进程地址空间和内核地址空间之间因为上下文切换而带来的 CPU 开销。
1.3、Netty特性
提高服务器吞吐量
1.基于NIO模型的高性能网络通信框架,对NIO模型做了封装,提供简单易用的API。
2.优化了零拷贝、高性能无锁队列、内存池、支持多种通信协议(HTTP、WebSocket)、自带编解码解决TCP粘包/黏包问题;支持多种传输类型(阻塞、非阻塞、epoll、poll等)、安全性好有完整的(SSL/TLS) 。
1.4、Netty核心组件
网络通信层
Bootstrap: 负责客户端启动,链接远程的 NettyServcer
ServerBootStrap: 服务端的监听
Channel: 网络通信载体
事件调度层(事件调度器)
EventLoopGroup: 本质是线程池,负责接受IO请求,分配线程去执行任务。
EventLoop: 执行任务的线程。
服务编排层
ChannelPipeline:负责处理多个 ChannelHandler,把多个 ChannelHandler 组成链。
ChannelHandler:针对IO数据的处理器,
ChannelHandlerContext:保存 ChannelHandler 的上下文信息。
1.5、Reactor模型支持
Reactor:负责将IO事件分发给Handler
Acceptor:处理客户端连接请求
Handlers:执行业务逻辑的读写
单线程单Reactor模型:线程易阻塞
多线程单Reactor模型:Reactor易阻塞
单线程多Reactor模型:主从Reactor模型(MainReactor、SubReactor)
二、重要组件
2.1、引导器(组件组装器)
Bootstrap: Netty 服务端的组件组装工厂类
ServerBootstrap: Netty 客户端的组件组装工厂类
Bootstrap的启动流程
1.配置 EventLoopGroup 线程组。
2.配置 Channel 的类型。
3.设置 ServerSocketChannel 对应的 Handler。
4.设置网络监听的端口。
5.设置 SocketChannel 对应的 Handler。
6.配置 Channel 参数。
7.启动 Netty Server。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;import java.net.InetSocketAddress;public class Test {public static void main(String[]args) throws InterruptedException {int port = 8080;// 创建一个服务端的启动器ServerBootstrap bootstrap = new ServerBootstrap();// boss线程池EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);// worker线程池EventLoopGroup workerLoopGroup = new NioEventLoopGroup(10);// 分配线程池bootstrap.group(bossLoopGroup, workerLoopGroup);// 设置父Channel类型bootstrap.channel(NioServerSocketChannel.class);// 设置父Channel的Handlerbootstrap.handler(new LoggingHandler(LogLevel.INFO)); // LoggingHandler是自定义的Handler// 设置父Channel监听端口bootstrap.localAddress(new InetSocketAddress(port));// 配置父Channel参数bootstrap.option(ChannelOption.SO_KEEPALIVE, true);bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);// 配置子Channel参数bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);bootstrap.childOption(ChannelOption.TCP_NODELAY, true);// 配置子Channel的Pipelinebootstrap.childHandler(new ChannelInitializer() {// 建立客户端连接时,会创建一个子通道并初始化protected void initChannel(SocketChannel ch) throws Exception {// 向子通道的流水线添加Handler业务处理器ch.pipeline().addLast(new XXXXHandler());}});// 绑定并启动ChannelFuture channelFuture = bootstrap.bind().sync();System.out.printf(" 服务器启动成功,监听端口: " + channelFuture.channel().localAddress());// 等待Channel关闭ChannelFuture closeFuture = channelFuture.channel().closeFuture();closeFuture.sync();// 另外还需要注意,关闭Channel后,需要释放Reactor线程池资源:// 释放掉所有资源,包括创建的反应器线程workerLoopGroup.shutdownGracefully();bossLoopGroup.shutdownGracefully();}
}
2.2、线程池组 与 线程
EventLoopGroup/NioEventLoopGroup(默认线程数是CPU * 2)
常用参数
Threads:内部的 NioEventLoop 数量,默认CPU核数2倍。
Executor:默认为Netty自定义的 ThreadPerTaskExecutor,为每个任务创建一个线程处理。
EventExecutorChooserFactory:用于创建Chooser(负载均衡器)对象,用于从NioEventLoopGroup中选择一个EventLoop,默认采用round-robin算法。
SelectorProvider:Java NIO提供的工具类,SelectorProvider使用了 JDK 的 SPI 机制来创建 Selector、ServerSocketChannel、SocketChannel 等对象。
SelectStrategyFactory:用来创建 SelectStrategy(控制EventLoop轮询方式的策略) 的工厂,默认为DefaultSelectStrategy。
RejectedExecutionHandler:线程池的任务拒绝策略,默认抛 RejectedExecutionException 异常。
EventLoopTaskQueueFactory:任务队列工厂类,默认线程安全的 MpscUnboundedArrayQueue 无锁队列。
2.3、EventLoop/NioEventLoop
需要负责IO事件和非IO事件,内部有一个线程一直在执行 Selector 的 select 方法或者正在处理 SelectedKeys。有其它任务 taskQueue队列中(线程安全,默认容量16)。
2.4、Channel 与 ChannelPipeline
每个 Channel 会绑定一个 ChannelPipeline(有Head和Tail两个节点),每个 ChannelPipeline 以双向链表的形式包含多个 ChannelHandlerContext,
每个 ChannelHandler(ChannelInboundHandler、ChannelOutboundHandler)都对应一个 ChannelHandlerContext。
Inbound 事件和 Outbound 事件的传播方向相反,Inbound 事件的传播方向为 Head -> Tail,而 Outbound 事件传播方向是 Tail -> Head。
异常事件的处理顺序与 ChannelHandler 的添加顺序相同,依次向后传播,与 Inbound 事件和 Outbound 事件无关。
Channel的子类
NioSocketChannel:TCP Socket 传输通道。
NioServerSocketChannel:TCP Socket 服务器端监听通道。
NioDatagramChannel:UDP传输通道。
NioSctpChannel:Sctp传输通道。
NioSctpServerChannel:Sctp服务器端监听通道。
OioSocketChannel:同步阻塞式 TCP Socket 传输通道。
OioServerSocketChannel:同步阻塞式 TCP Socket 服务器端监听通道。
OioDatagramChannel:同步阻塞式UDP传输通道。
OioSctpChannel:同步阻塞式Sctp传输通道。
OioSctpServerChannel:同步阻塞式Sctp服务器端监听通道。
2.5、ChannelHandler
业务处理的处理器编写业务逻辑。
ChannelInboundHandler(入站处理器)
channelRegistered: Channel被注册到EventLoop
channelUnregistered: Channel从EventLoop中取消注册
channelActive: Channel处于就绪状态,可以被读写
channelInactive: Channel处于非就绪状态
channelRead: Channel可以从远端读取到数据
channelReadComplete: Channel读取数据完成
userEventTriggered: 用户事件触发时
channelWritabilityChanged: Channel的写状态发生变化
ChannelOutboundHandler(出站处理器)
bind: 监听地址(IP+端口)绑定:完成底层JavaIO通道的地址绑定。
connect:连接服务端:完成底层JavaIO通道的服务器端的连接操作。
disconnect:断开服务器连接:断开底层JavaIO通道的服务器端连接。
close: 主动关闭通道:关闭底层的通道,例如服务器端的新连接监听通道。
write: 写数据到底层:完成Netty通道向底层JavaIO通道的数据写入操作。此方法仅仅是触发一下操作而已,并不是完成实际的数据写入操作。
flush: 清空缓冲区数据,将数据写到对端。
ChannelInitializer:Pipieline流水线中装配业务处理器。
2.6、Encoder/Decoder
Decoder解码器:将二进制类型数据解码成Java对象。
ByteToMessageDecoder/ReplayingDecoder:将字节流解码为消息对象。
MessageToMessageDecoder:将一种消息类型解码为另外一种消息类型。
Encoder编码器: 把Java对象,编码为最终的二进制数据。
MessageToByteEncoder:对象编码成字节流;
MessageToMessageEncoder:一种消息类型编码成另外一种消息类型。
2.7、ByteBuf
Netty中大量使用了堆外内存,以提升性能。
1.容量可以按需动态扩展,类似于 StringBuffer。
2.读写采用了不同的指针,读写模式可以随意切换,不需要调用 flip 方法。
3.通过内置的复合缓冲类型可以实现零拷贝。
4.支持引用计数。
5.支持缓存池。
Netty 实现零拷贝机制非常重要的数据结构,由多个 ByteBuf 组合而成的虚拟 Buffer 对象,内部保存着每个 ByteBuf 的引用关系。
三、案例
3.1、服务端
import com.base.netty03.handler.ServerHeartbeatHandler;
import com.base.netty03.handler.WebSocketServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;import java.util.concurrent.TimeUnit;/*** 服务端封装*/
public class WebSocketServer {/*** boss 线程组用于处理连接工作*/private EventLoopGroup bossGroup;/*** work 线程组用于数据处理*/private EventLoopGroup workerGroup;private WebSocketServerHandler webSocketServerHandler;public WebSocketServer() {//创建两个线程组 boosGroup、workerGroupbossGroup = new NioEventLoopGroup();workerGroup = new NioEventLoopGroup();webSocketServerHandler = new WebSocketServerHandler();}public void start(int port, String name) {try {// 创建服务端的启动对象,设置参数ServerBootstrap bootstrap = new ServerBootstrap();// 设置两个线程组 boosGroup 和 workerGroupbootstrap.group(bossGroup, workerGroup)// 设置服务端通道实现类型.channel(NioServerSocketChannel.class)// 设置线程队列得到连接个数.option(ChannelOption.SO_BACKLOG, 128)// 设置保持活动连接状态.childOption(ChannelOption.SO_KEEPALIVE, true)// 使用匿名内部类的形式初始化通道对象.childHandler(new ChannelInitializer() {@Overrideprotected void initChannel(Channel channel) throws Exception {// 给pipeline管道设置处理器channel.pipeline().addLast(new HttpServerCodec());channel.pipeline().addLast(new HttpObjectAggregator(65536));channel.pipeline().addLast(new WebSocketServerProtocolHandler("/websocket", null, false, 65536, false, false, false));channel.pipeline().addLast(new IdleStateHandler(5, 2, 0, TimeUnit.SECONDS));channel.pipeline().addLast(new ServerHeartbeatHandler());channel.pipeline().addLast(webSocketServerHandler);}});// 给 workerGroup 的 EventLoop 对应的管道设置处理器// 绑定端口号,启动服务端ChannelFuture channelFuture = bootstrap.bind(port).sync();System.out.println(name + " 已启动");// 定时发送心跳startHeartbeat();// 对通道关闭进行监听channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {}}/*** 开启心跳监测*/public void startHeartbeat() {HeartbeatThread thread = new HeartbeatThread(webSocketServerHandler.getChannelGroup());thread.start();}public void send(String text) {for (Channel channel : webSocketServerHandler.getChannelGroup()) {channel.writeAndFlush(new TextWebSocketFrame(Unpooled.copiedBuffer(text, CharsetUtil.UTF_8)));}}public boolean hasClient() {return webSocketServerHandler.getChannelGroup().size() > 0;}
}
3.2、服务端消息处理
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;/*** 服务端消息处理*/
@ChannelHandler.Sharable
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {private ChannelGroup channelGroup;public WebSocketServerHandler() {channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);}public ChannelGroup getChannelGroup() {return channelGroup;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) {if (msg instanceof PongWebSocketFrame) {System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发来的心跳:PONG");}if (msg instanceof TextWebSocketFrame) {TextWebSocketFrame frame = (TextWebSocketFrame) msg;System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发来的消息:" + frame.text());// 测试转发消息for (Channel channel : channelGroup) {if (!ctx.channel().id().toString().equals(channel.id().toString())) {channel.writeAndFlush(new TextWebSocketFrame(Unpooled.copiedBuffer(frame.text(), CharsetUtil.UTF_8)));System.out.println("服务端向客户端 " + channel.id().toString() + " 转发消息:" + frame.text());}}}}@Overridepublic void channelActive(ChannelHandlerContext ctx) {channelGroup.add(ctx.channel());System.out.println("客户端" + ctx.channel().id().toString() + "已连接");}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {channelGroup.remove(ctx.channel());System.out.println("客户端" + ctx.channel().id() + "已断开");}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {ctx.close();}
}
3.3、客户端封装
import com.base.netty03.handler.ClientHeartbeatHandler;
import com.base.netty03.handler.WebSocketClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.timeout.IdleStateHandler;import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;/*** 客户端封装*/
public class WebSocketClient {/*** 非阻塞线程池*/private NioEventLoopGroup eventExecutors;/*** TCP 传输通道*/private Channel channel;/*** 心跳类*/private HeartbeatThread heartbeatThread;public WebSocketClient() {eventExecutors = new NioEventLoopGroup();}public Channel getChannel() {return channel;}public void connect(String ip, int port, String name) {try {WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(new URI("ws://" + ip + ":" + port + "/websocket"), WebSocketVersion.V13, null, false, new DefaultHttpHeaders());WebSocketClientHandler handler = new WebSocketClientHandler(handshaker);ClientHeartbeatHandler heartbeatHandler = new ClientHeartbeatHandler();// 创建bootstrap对象,配置参数Bootstrap bootstrap = new Bootstrap();// 设置线程组bootstrap.group(eventExecutors)// 设置客户端的通道实现类型.channel(NioSocketChannel.class)// 使用匿名内部类初始化通道.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 添加客户端通道的处理器ch.pipeline().addLast(new HttpClientCodec());ch.pipeline().addLast(new HttpObjectAggregator(65536));ch.pipeline().addLast(new WebSocketClientProtocolHandler(handshaker, true, false));ch.pipeline().addLast(new IdleStateHandler(5, 2, 0, TimeUnit.SECONDS));ch.pipeline().addLast(heartbeatHandler);ch.pipeline().addLast(handler);}});// 连接服务端ChannelFuture channelFuture = bootstrap.connect(ip, port);// 在连接关闭后尝试重连channelFuture.channel().closeFuture().addListener(future -> {try {if (heartbeatThread != null && heartbeatThread.isAlive()) {System.out.println("停止发送心跳线程");heartbeatThread.close();}Thread.sleep(2000);System.out.println("重新连接服务端");connect(ip, port, name); // 重新连接服务端} catch (Exception e) {e.printStackTrace();}});channelFuture.sync();channel = channelFuture.channel();System.out.println(name + " 已启动");// 定时发送心跳heartbeatThread = new HeartbeatThread(channel);heartbeatThread.start();// 对通道关闭进行监听channelFuture.channel().closeFuture().sync();} catch (InterruptedException | URISyntaxException e) {e.printStackTrace();} finally {}}
}
3.4、客户端消息处理
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;/*** 客户端消息处理*/
@ChannelHandler.Sharable
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {private WebSocketClientHandshaker handshaker;private ChannelPromise handshakeFuture;public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {this.handshaker = handshaker;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) {if (!handshaker.isHandshakeComplete()) {try {handshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);handshakeFuture.setSuccess();} catch (WebSocketHandshakeException e) {handshakeFuture.setFailure(e);}return;}if (msg instanceof PongWebSocketFrame) {System.out.println("收到服务端" + ctx.channel().remoteAddress() + "发来的心跳:PONG");}if (msg instanceof TextWebSocketFrame) {TextWebSocketFrame frame = (TextWebSocketFrame) msg;System.out.println("收到服务端" + ctx.channel().remoteAddress() + "发来的消息:" + frame.text()); // 接收服务端发送过来的消息}}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {handshakeFuture = ctx.newPromise();}@Overridepublic void channelActive(ChannelHandlerContext ctx) {}@Overridepublic void channelInactive(ChannelHandlerContext ctx) {System.out.println("客户端下线");}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {ctx.close();}
}
3.5、心跳类
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;/*** 定时发送心跳的类*/
public class HeartbeatThread extends Thread {private ChannelGroup channelGroup;private boolean running = true;public HeartbeatThread(Channel channel) {channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);channelGroup.add(channel);}public HeartbeatThread(ChannelGroup channelGroup) {this.channelGroup = channelGroup;}@Overridepublic void run() {while (running) {try {if (channelGroup.size() > 0) {System.out.println("发送心跳");for (Channel channel : channelGroup) {channel.writeAndFlush(new PingWebSocketFrame());}}Thread.sleep(5000);} catch (Exception e) {e.printStackTrace();}}}public void close() {running = false;}
}/*** 服务端心跳事件处理*/
public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) { // 读空闲System.out.println("断开与客户端的连接, channel id=" + ctx.channel().id());ctx.channel().close();} else if (event.state() == IdleState.WRITER_IDLE) { // 写空闲} else if (event.state() == IdleState.ALL_IDLE) { // 读写空闲}}}
}import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;/*** 客户端心跳事件处理*/
public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) { // 读空闲System.out.println("断开与服务端的连接");ctx.channel().close();} else if (event.state() == IdleState.WRITER_IDLE) { // 写空闲} else if (event.state() == IdleState.ALL_IDLE) { // 读写空闲}}}
}
3.6、模拟器
/*** 服务端测试主机*/
public class WebSocketServerHost {public static void main(String[] args) {WebSocketServer webSocketServer = new WebSocketServer();SendDataToClientThread thread = new SendDataToClientThread(webSocketServer);thread.start();webSocketServer.start(40005, "WebSocket服务端");}
}class SendDataToClientThread extends Thread {private WebSocketServer webSocketServer;private int index = 1;public SendDataToClientThread(WebSocketServer webSocketServer) {this.webSocketServer = webSocketServer;}@Overridepublic void run() {try {while (index <= 5) {if (webSocketServer.hasClient()) {String msg = "服务端发送的测试消息, index = " + index;webSocketServer.send(msg);index++;}Thread.sleep(8000);}} catch (Exception e) {e.printStackTrace();}}
}import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.CharsetUtil;/*** 客户端测试主机*/
public class WebSocketClientHost {public static void main(String[] args) {WebSocketClient webSocketClient = new WebSocketClient();SendDataToServerThread thread = new SendDataToServerThread(webSocketClient);thread.start();webSocketClient.connect("127.0.0.1", 40005, "WebSocket客户端");}
}class SendDataToServerThread extends Thread {private WebSocketClient webSocketClient;private int index = 1;public SendDataToServerThread(WebSocketClient webSocketClient) {this.webSocketClient = webSocketClient;}@Overridepublic void run() {try {while (index <= 5) {Channel channel = webSocketClient.getChannel();if (channel != null && channel.isActive()) {String msg = "客户端发送的测试消息, index = " + index;channel.writeAndFlush(new TextWebSocketFrame(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8)));index++;}Thread.sleep(8000);}} catch (Exception e) {e.printStackTrace();}}
}