【Netty】- 聊天室1
Channel和Session
【场景
】:用户A发送消息给用户B
Channel(网络连接通道)
用户A的Channel:
- 表示用户A当前与服务器的活跃TCP连接
- 通过该Channel,A可以主动向服务器发送消息(如channelA.writeAndFlush(msg))
- 状态:在消息发送时必须是活跃的(channelA.isActive() == true)
用户B的Channel:
- 表示用户B当前与服务器的活跃TCP连接
- 如果用户B在线,服务器会通过该Channel将消息转发给B(如channelB.writeAndFlush(msg))
- 状态:如果B离线,channelB可能不存在或已关闭(channelB == null || !channelB.isActive())
channel是物理连接,类似于打电话时的专属线路:A和服务器有一条电话线;B和服务器有另一条电话线,彼此独立。
channel只能是客户端和服务器之间的通道,客户端和客户端之间是没有channel的
channel类似于快递员:
- 用户A的快递员(ChannelA)只负责把包裹(消息)从A送到服务器
- 用户B的快递员(ChannelB)只负责把包裹(消息)从B送到服务器
- A和B的快递员不会直接交换包裹,而是通过快递中心(服务器)中转。
Session(会话状态)
用户A的Session:
- 存储A的会话信息(如用户ID、群组列表、登录状态等)
- 通过Session可以获取A的Channel(例如:sessionA.getChannel())
- 关键作用:验证A是否有权限发送消息给B(如好友关系、群组成员校验)
用户B的Session:
- 存储B的会话信息(即使用户B离线,Session可能仍存在)
- 如果B离线,Session可能标记为inactive,但保留用户数据(如未读消息队列)
- 关键作用:判断B是否在线,若离线则缓存消息
消息发送过程
- 用户A发送消息
- 用户A通过自己的channelA发送消息到服务器
- 服务器通过sessionA验证A的身份和权限
- 服务器处理消息
- 查询sessionB获取用户B的状态(在线or离线)
- 如果B在线,直接通过channelB转发消息;如果B离线,存入消息队列
- 用户B接收消息
- 如果B在线,消息通过channelB推送到B的客户端
- 如果B离线,消息暂存,下次登录时通过sessionB恢复未读消息
相当于每个用户都会有自己独立的channel
用户A -> 服务器:channelA(A的专属连接)
用户B -> 服务器:channeB(B的专属连接)
登录、业务消息发送
客户端:
@Slf4j
public class ChatClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);/*WAIT_FOR_LOGIN:倒计时锁(让线程之间通信)当数字减为0,就可以向下运行;数字没有减成0,就会阻塞等待*/CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);/*LOGIN:用来记录登录状态false - 暂未登录true - 已登录*/AtomicBoolean LOGIN = new AtomicBoolean(false);try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(group);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProtocolFrameDecoder()); // 封装(防止半包问题)ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(new MessageCodec());/*登录*/ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {// 连接建立后触发(向服务器发送消息)@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception { // system in线程// 新开一个线程:接收用户在控制台的输入,向服务器发送各种消息new Thread(()->{Scanner sc = new Scanner(System.in);System.out.println("请输入用户名:");String username = sc.nextLine();System.out.println("请输入密码:");String pwd = sc.nextLine();// 1. 构造消息对象LoginRequestMessage msg = new LoginRequestMessage(username, pwd);// 2. 发送消息ctx.writeAndFlush(msg);try {WAIT_FOR_LOGIN.await(); // 等待} catch (InterruptedException e) {e.printStackTrace();}// 3. 使用共享变量LOGIN判断是否登录成功if(!LOGIN.get()) { // 失败ctx.close();return;}// 4. 登录成功,进入菜单,选择功能while(true) {// 显示菜单System.out.println("==================================");System.out.println("send [username] [content]");System.out.println("gsend [group name] [content]");System.out.println("gcreate [group name] [m1,m2,m3...]");System.out.println("gmembers [group name]");System.out.println("gjoin [group name]");System.out.println("gquit [group name]");System.out.println("quit");System.out.println("==================================");String command = new Scanner(System.in).nextLine();String[] s = command.split(" ");switch(s[0]) {case "send": // 向某一个人发送消息ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));// username 发给 s[1]用户,内容是s[2]break;case "gsend": // 往群里发消息ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));break;case "gcreate": // 创建群聊Set<String> set = new HashSet<>(Arrays.asList(s[2].split(",")));set.add(username); // 加入自己ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], set));break;case "gmembers": // 查看群成员ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));break;case "gjoin": // 拉人进群ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));break;case "gquit": // 退出群ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));break;case "quit": // 退出菜单ctx.channel().close();break;}}}, "system in").start();}// 接收服务器返回的消息@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // nio线程log.debug("msg:{}", msg);if(msg instanceof LoginResponseMessage) {LoginResponseMessage response = (LoginResponseMessage) msg;LOGIN.set(response.isSuccess());log.debug("LOGIN.get():{}", LOGIN.get());}WAIT_FOR_LOGIN.countDown(); // 让计数 - 1(唤醒system in线程)}});}});Channel channel = bootstrap.connect("localhost", 8080).sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.error("client error", e);} finally {group.shutdownGracefully();}}
}
登录成功后进入菜单,失败后提示失败。
处理登录的handler:
@ChannelHandler.Sharable
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> { // 对LoginRequestMessage消息感兴趣@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {String username = msg.getUsername();String password = msg.getPassword();LoginResponseMessage res;if (UserServiceFactory.getUserService().login(username, password)) {// 登录成功res = new LoginResponseMessage(true, "登录成功");SessionFactory.getSession().bind(ctx.channel(), username); // 保存channel和username的对应关系} else {res = new LoginResponseMessage(false, "用户名或密码不正确");}ctx.writeAndFlush(res); // 将消息返回给客户端}
}
服务器:
@Slf4j
public class ChatServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler(); // 登录handlertry {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProtocolFrameDecoder()); // 封装(防止半包问题)ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(new MessageCodec());/*登录*/ch.pipeline().addLast(LOGIN_HANDLER); // 处理登录的handler(自定义)}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}
登录成功后,需要保存当前用户和服务器之间建立的channel
单聊消息处理
单聊Handler:
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {String to = msg.getTo();Channel channelReceiver = SessionFactory.getSession().getChannel(to); // 根据用户名找到接收方的channelif(channelReceiver != null) { // 对方在线(channel没断开)channelReceiver.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent())); // 向接收方发消息}else { // 对方不在线ctx.channel().writeAndFlush(new ChatResponseMessage(false, "对方用户不存在或不在线")); // 向发送方发消息}}
}
需要在服务器端加入这个单聊的handler(后续消息处理都要添加)
群聊建群处理
建群其实并没有创建handler,只是把“群名 - 群成员”的关系加入到map集合中。
@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {String groupName = msg.getGroupName(); // 群名Set<String> members = msg.getMembers(); // 群成员GroupSession groupSession = GroupSessionFactory.getGroupSession(); // 群管理器/*创建群:其实就是把群名 - 群成员的对应关系放到map中(此时还没有涉及到channel)*/Group group = groupSession.createGroup(groupName, members); // 如果group不存在,返回null,说明创建成功if(group == null) { // 成功// 向群成员发送拉群消息members.forEach(memberName -> {// 获取群成员的channelChannel memberChannel = SessionFactory.getSession().getChannel(memberName);if(memberChannel != null) {// 向群成员发送消息memberChannel.writeAndFlush(new GroupCreateResponseMessage(true, "你已被拉入" + groupName + "群"));}else {// 提示建群人,此人未上线ctx.writeAndFlush(new GroupCreateResponseMessage(false, memberName + "成员不在线"));}});// 向创建群的人发送成功消息ctx.writeAndFlush(new GroupCreateResponseMessage(true, groupName + "创建成功"));}else {ctx.writeAndFlush(new GroupCreateResponseMessage(false, groupName + "创建失败")); // 给创建者一个提示}}
}
往群里发消息
向群里所有的成员发送消息
@ChannelHandler.Sharable
public class GroupChatRequestMssageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {String from = msg.getFrom(); // 发送人String groupName = msg.getGroupName(); // 发哪个群String content = msg.getContent(); // 发送的内容List<Channel> membersChannel = GroupSessionFactory.getGroupSession().getMembersChannel(groupName); // 所有在线群成员的channelmembersChannel.forEach(channel -> {// 向所有的群成员发这条消息if(!channel.equals(ctx.channel())) { // 排除发送者自己channel.writeAndFlush(new GroupChatResponseMessage(from, content));}});}
}
查看群成员
查看群成员其实就是向调用这条指令的用户发送这个群里所有的群成员
@ChannelHandler.Sharable
public class GroupMembersRequestMessageHandler extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {String groupName = msg.getGroupName();Set<String> members = GroupSessionFactory.getGroupSession().getMembers(groupName);ctx.writeAndFlush(members);}
}
拉人进群
拉人进群其实就是往建群时map(群名 - 群成员)的value(群成员)中添加一个新的人
@ChannelHandler.Sharable
public class GroupJoinRequestMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {String groupName = msg.getGroupName(); // 群名String addUserName = msg.getUsername(); // 要加入的人// 判断新添加的用户是否在线Channel addUserChannel = SessionFactory.getSession().getChannel(addUserName);if(addUserChannel == null){ // 不在线ctx.writeAndFlush( addUserName + "不存在或不在线");}else { // 在线GroupSessionFactory.getGroupSession().joinMember(groupName, addUserName);// 把用户添加到群聊中ctx.writeAndFlush("已经将" + addUserName + "添加到群聊中"); // 向拉群的人发消息addUserChannel.writeAndFlush("您已加入" + groupName + "群聊中"); // 向新进群的人发消息}}
}
退出群
推出群其实就是往建群时map(群名 - 群成员)的value(群成员)中删除这个人
@ChannelHandler.Sharable
public class GroupQuitRequestMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {String groupName = msg.getGroupName();String removeUsername = msg.getUsername();// 把这个人推出群聊GroupSessionFactory.getGroupSession().removeMember(groupName, removeUsername);}
}
退出处理(正常断开、异常断开)
@Slf4j
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {/*** 当连接断开时触发(正常断开)* @param ctx* @throws Exception*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {SessionFactory.getSession().unbind(ctx.channel()); // 将当前退出用户的channel从会话管理中移除log.debug("{}已经正常断开", ctx.channel());}/*** 当出现异常时触发(异常断开)* @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {SessionFactory.getSession().unbind(ctx.channel()); // 将当前退出用户的channel从会话管理中移除log.debug("{}已经异常断开,异常是:{}", ctx.channel(), cause.getMessage());}
}
断开的时候需要解绑当前用户的channel和username的对应关系,否则服务器还会认为这个用户在线(因为channel并没有解绑)
连接假死
网络设备出现故障时(网卡、机房出问题了),此时底层的TCP已经断开了,但是应用程序没有感知到,仍然占用资源。
服务器可以连接的数是有限的
检测连接假死的手段:空闲状态检测器(其实就是个handler)
ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime));
- readerIdleTime(读空闲检测,单位:秒)
- writerIdleTime = 0(写空闲检测)
- 0 - 禁用
- 1 - 启用
- allIdleTime = 0(读写空闲检测)
- 0 - 禁用
- 1 - 启用
流程:
- 客户端连接服务器后,IdleStateHandler开始计时
- 如果在
readerIdleTime
秒内没有收到数据,就会触发READER_IDLE事件- ChannelDuplexHandler会捕获触发的事件,执行业务逻辑
- 如果收到数据,会重新开始
readerIdleTime
秒倒计时
服务端的代码修改如下:
@Slf4j
public class ChatServer {public static void main(String[] args) {NioEventLoopGroup boss = new NioEventLoopGroup();NioEventLoopGroup worker = new NioEventLoopGroup();LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler(); // 登录handlerChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();// 单聊handlerGroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler(); // 创建群聊handlerGroupJoinRequestMessageHandler GROUP_JOIN_HANDLER = new GroupJoinRequestMessageHandler(); // 拉人进群handlerGroupMembersRequestMessageHandler GROUP_MEMBERS_HANDLER = new GroupMembersRequestMessageHandler(); // 查看群成员handlerGroupQuitRequestMessageHandler GROUP_QUIT_HANDLER = new GroupQuitRequestMessageHandler(); // 退群handlerGroupChatRequestMssageHandler GROUP_CHAT_HANDLER = new GroupChatRequestMssageHandler(); // 群聊handlerQuitHandler QUIT_HANDLER = new QuitHandler(); // 退出处理handlertry {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {/*连接假死(用来判断是不是“读空闲时间过长”或“写空闲时间过长”)5s内如果没收到channel读入的数据,就会触发事件(IdleState#READER_IDLE) ==> 此时会触发下边的自定义Handler里的userEventTriggered()方法*/ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));// 双向handler(可以同时作为入站和出站处理器)ch.pipeline().addLast(new ChannelDuplexHandler() { // 每隔5s触发一次读空闲事件// 用来触发特殊事件@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {IdleStateEvent event = (IdleStateEvent) evt;// 触发读空闲事件if (event.state() == IdleState.READER_IDLE) {log.debug("已经5s没有读到数据");}super.userEventTriggered(ctx, evt);}});ch.pipeline().addLast(new ProtocolFrameDecoder()); // 封装(防止半包问题)ch.pipeline().addLast(LOGGING_HANDLER);ch.pipeline().addLast(new MessageCodec());/*登录*/ch.pipeline().addLast(LOGIN_HANDLER); // 处理登录的handler(自定义)/*向某一个人发送消息*/ch.pipeline().addLast(CHAT_HANDLER);/*创建群聊*/ch.pipeline().addLast(GROUP_CREATE_HANDLER);/*往群里发消息*/ch.pipeline().addLast(GROUP_JOIN_HANDLER);/*查看群成员*/ch.pipeline().addLast(GROUP_MEMBERS_HANDLER);/*拉人进群*/ch.pipeline().addLast(GROUP_QUIT_HANDLER);/*退出群*/ch.pipeline().addLast(GROUP_CHAT_HANDLER);/*退出菜单*/ch.pipeline().addLast(QUIT_HANDLER);}});Channel channel = serverBootstrap.bind(8080).sync().channel();channel.closeFuture().sync();} catch (InterruptedException e) {log.error("server error", e);} finally {boss.shutdownGracefully();worker.shutdownGracefully();}}
}