当前位置: 首页 > news >正文

深入解析Java Socket服务器实现:从基础到优雅停机

本文将详细解析一个基于Java Socket实现的服务器程序,涵盖线程池管理、心跳检测、优雅停机等关键特性,并最终提供完整代码实现。

1. 架构概述

这个Socket服务器实现具有以下核心特性:

  1. 基于Java原生Socket API实现
  2. 使用线程池处理客户端连接
  3. 内置心跳检测机制
  4. 支持优雅停机
  5. 限制最大连接数

2. 核心组件解析

2.1 启动入口 - RelayTask

@Component
public class RelayTask implements CommandLineRunner {@Autowiredprivate SocketService socketService;@Overridepublic void run(String... args) throws Exception {socketService.apply();}
}

这是一个Spring Boot应用的启动类,实现了CommandLineRunner接口,在应用启动后自动执行SocketServiceapply()方法启动Socket服务器。

2.2 Socket服务主类 - SocketService

SocketService是整个Socket服务器的核心实现类,采用单例模式(通过Spring的@Service注解实现)。

关键配置参数
private static final int PORT = 5555;
private static final int MAX_CLIENTS = 10;
private static final long HEARTBEAT_INTERVAL = 30000; // 30秒心跳检测

这些参数定义了服务器的基本配置:
• 监听端口:5555
• 最大客户端连接数:10
• 心跳检测间隔:30秒

线程管理
private static final ExecutorService threadPool = Executors.newFixedThreadPool(MAX_CLIENTS);
private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

使用两个线程池:

  1. 主线程池:固定大小(等于最大客户端数),处理客户端连接
  2. 调度线程池:单线程,用于心跳检测
客户端管理
private static final ConcurrentHashMap<String, ClientHandler> activeClients = new ConcurrentHashMap<>();

使用线程安全的ConcurrentHashMap存储活跃客户端,键为客户端ID,值为对应的处理器实例。

2.3 客户端处理器 - ClientHandler

ClientHandler是处理单个客户端连接的核心内部类,实现了Runnable接口。

关键字段
private final Socket clientSocket;
private final String clientId;
private final PrintWriter out;
private final BufferedReader in;
private volatile long lastActivityTime;
private volatile boolean running = true;

clientSocket: 客户端Socket连接
clientId: 由客户端IP和端口组成的唯一标识
out/in: 输出/输入流
lastActivityTime: 记录最后活动时间(用于心跳检测)
running: 控制处理循环的开关

核心方法
public void run() {try {String inputLine;while (running && (inputLine = in.readLine()) != null) {lastActivityTime = System.currentTimeMillis();processClientMessage(inputLine);}} catch (SocketException e) {// 处理异常} finally {// 清理资源}
}

这是客户端处理的主循环,不断读取客户端消息并处理,同时更新最后活动时间。

2.4 心跳检测机制

private void startHeartbeatChecker() {scheduler.scheduleAtFixedRate(() -> {long currentTime = System.currentTimeMillis();activeClients.forEach((id, handler) -> {if (currentTime - handler.lastActivityTime > HEARTBEAT_INTERVAL) {System.out.printf("客户端[%s]心跳超时,强制断开%n", id);handler.disconnect();activeClients.remove(id);}});}, 10, 10, TimeUnit.SECONDS); // 每10秒检查一次
}

心跳检测器每10秒检查一次所有客户端,如果发现某个客户端超过30秒没有活动,则强制断开连接。

2.5 优雅停机实现

Runtime.getRuntime().addShutdownHook(new Thread(() -> {System.out.println("\n接收到关闭信号,开始优雅停机...");shutdownServer(); // 执行自定义关闭逻辑
}));

通过注册JVM关闭钩子,在服务器进程收到终止信号时执行自定义的关闭逻辑:

  1. 关闭心跳检测线程
  2. 通知所有客户端服务器即将关闭
  3. 断开所有客户端连接
  4. 关闭线程池

3. 完整代码实现

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.*;@Component
public class RelayTask implements CommandLineRunner {@Autowiredprivate SocketService socketService;@Overridepublic void run(String... args) throws Exception {socketService.apply();}
}@Service
public class SocketService {// 服务器配置private static final int PORT = 5555;private static final int MAX_CLIENTS = 10;private static final long HEARTBEAT_INTERVAL = 30000; // 30秒心跳检测// 线程池和客户端集合private static final ExecutorService threadPool =Executors.newFixedThreadPool(MAX_CLIENTS);private static final ConcurrentHashMap<String, ClientHandler> activeClients =new ConcurrentHashMap<>();private static final ScheduledExecutorService scheduler =Executors.newSingleThreadScheduledExecutor();public void apply() {Runtime.getRuntime().addShutdownHook(new Thread(() -> {System.out.println("\n接收到关闭信号,开始优雅停机...");shutdownServer(); // 执行自定义关闭逻辑}));System.out.printf("Socket服务器启动,监听端口: %d,最大连接数: %d%n", PORT, MAX_CLIENTS);// 启动心跳检测线程startHeartbeatChecker();try (ServerSocket serverSocket = new ServerSocket(PORT)) {while (true) {try {Socket clientSocket = serverSocket.accept();// 检查是否达到最大连接数if (activeClients.size() >= MAX_CLIENTS) {rejectClient(clientSocket);continue;}// 创建客户端处理器ClientHandler handler = new ClientHandler(clientSocket);String clientId = handler.getClientId();// 添加到活跃客户端列表activeClients.put(clientId, handler);threadPool.execute(handler);System.out.printf("客户端[%s]已连接,当前连接数: %d/%d%n",clientId, activeClients.size(), MAX_CLIENTS);} catch (IOException e) {System.err.println("接受客户端连接时出错: " + e.getMessage());}}} catch (IOException e) {System.err.println("服务器启动失败: " + e.getMessage());} finally {shutdownServer();}}// 客户端处理器private class ClientHandler implements Runnable {private final Socket clientSocket;private final String clientId;private final PrintWriter out;private final BufferedReader in;private volatile long lastActivityTime;private volatile boolean running = true;public ClientHandler(Socket socket) throws IOException {this.clientSocket = socket;this.clientId = socket.getInetAddress().getHostAddress() + ":" + socket.getPort();this.out = new PrintWriter(socket.getOutputStream(), true);this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));this.lastActivityTime = System.currentTimeMillis();// 发送欢迎消息sendMessage("SERVER|欢迎连接到服务器,你的ID: " + clientId);}public String getClientId() {return clientId;}public void sendMessage(String message) {out.println(message);}public void disconnect() {running = false;try {clientSocket.close();} catch (IOException e) {// 忽略关闭异常}}@Overridepublic void run() {try {String inputLine;while (running && (inputLine = in.readLine()) != null) {lastActivityTime = System.currentTimeMillis();// 处理客户端消息processClientMessage(inputLine);}} catch (SocketException e) {System.out.printf("客户端[%s]异常断开: %s%n", clientId, e.getMessage());} catch (IOException e) {System.err.printf("与客户端[%s]通信出错: %s%n", clientId, e.getMessage());} finally {// 客户端断开处理activeClients.remove(clientId);System.out.printf("客户端[%s]已断开,剩余连接数: %d%n",clientId, activeClients.size());disconnect();}}// 处理消息逻辑private void processClientMessage(String message) {System.out.printf("收到客户端[%s]消息: %s%n", clientId, message);}}// ---------- 服务器管理方法 ----------// 拒绝新连接(达到最大连接数时)private void rejectClient(Socket clientSocket) throws IOException {PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);out.println("SERVER|ERROR|服务器已达最大连接数,请稍后再试");clientSocket.close();System.out.println("已拒绝新连接(达到最大连接数)");}// 启动心跳检测线程private void startHeartbeatChecker() {scheduler.scheduleAtFixedRate(() -> {long currentTime = System.currentTimeMillis();activeClients.forEach((id, handler) -> {if (currentTime - handler.lastActivityTime > HEARTBEAT_INTERVAL) {System.out.printf("客户端[%s]心跳超时,强制断开%n", id);handler.disconnect();activeClients.remove(id);}});}, 10, 10, TimeUnit.SECONDS); // 每10秒检查一次}// 关闭服务器private void shutdownServer() {System.out.println("服务器关闭中...");// 关闭心跳检测线程if (scheduler != null) {scheduler.shutdownNow();System.out.println("心跳检测线程已关闭");}// 通知所有客户端activeClients.forEach((id, handler) -> {handler.sendMessage("SERVER|WARN|服务器即将关闭");handler.disconnect();});// 关闭线程池threadPool.shutdown();try {if (!threadPool.awaitTermination(5, TimeUnit.SECONDS)) {threadPool.shutdownNow();}} catch (InterruptedException e) {threadPool.shutdownNow();Thread.currentThread().interrupt();}System.out.println("服务器已关闭");}// 获取当前连接数(可用于监控)public int getCurrentConnections() {return activeClients.size();}
}

4. 扩展建议

这个基础实现可以进一步扩展:

  1. 协议扩展:实现更复杂的消息协议,如JSON或Protobuf
  2. 认证机制:添加客户端认证功能
  3. 消息广播:实现向所有客户端广播消息的功能
  4. 性能监控:添加连接数、吞吐量等监控指标
  5. 配置外部化:将端口、最大连接数等参数移到配置文件中

5. 总结

本文详细解析了一个功能完善的Java Socket服务器实现,涵盖了线程池管理、心跳检测、优雅停机等关键特性。这个实现既保持了简洁性,又具备了生产环境所需的核心功能,可以作为开发更复杂网络应用的坚实基础。

相关文章:

  • ICP 减少的是 不必要 的回表,而不是 所有 回表
  • 《Java 并发编程实践》阅读笔记(一):线程重要性
  • 空洞卷积(膨胀卷积/扩张卷积)本质理解
  • Java 本地缓存的实现:常见的四种方式
  • 逻辑过期怎么设计
  • PclSharp ——pcl的c#nuget包
  • 自己的账号
  • 细说STM32单片机FreeRTOS任务管理API函数vTaskList()的使用方法
  • 软件测试|App测试相关面试题(3)
  • 2025年第二期DAMA认证考试通知已发布!
  • kali下maven 的安装与配置
  • 自动化构建工具:makemakefile
  • 求简单交错序列前N项和
  • 【Linux我做主】探秘gcc/g++和动静态库
  • RestControllerAdvice 和 ControllerAdvice 两个注解的区别与联系
  • 二十、FTP云盘
  • Operator 开发入门系列(一):Hello World
  • 【Java学习笔记】标识符和保留字
  • NLP高频面试题(四十七)——探讨Transformer中的注意力机制:MHA、MQA与GQA
  • 火山云如何运营
  • 全文丨中华人民共和国传染病防治法
  • 海尔智家一季度营收791亿元:净利润增长15%,海外市场收入增超12%
  • 李铁案二审今日宣判
  • 中国体育报关注徐梦桃、王曼昱、盛李豪等获评全国先进工作者:为建设体育强国再立新功
  • 一季度我国服务进出口总额19741.8亿元,同比增长8.7%
  • 从咖啡节到话剧、演唱会,上海虹口“文旅商体展”联动促消费