深入解析Java Socket服务器实现:从基础到优雅停机
本文将详细解析一个基于Java Socket实现的服务器程序,涵盖线程池管理、心跳检测、优雅停机等关键特性,并最终提供完整代码实现。
1. 架构概述
这个Socket服务器实现具有以下核心特性:
- 基于Java原生Socket API实现
- 使用线程池处理客户端连接
- 内置心跳检测机制
- 支持优雅停机
- 限制最大连接数
2. 核心组件解析
2.1 启动入口 - RelayTask
@Component
public class RelayTask implements CommandLineRunner {@Autowiredprivate SocketService socketService;@Overridepublic void run(String... args) throws Exception {socketService.apply();}
}
这是一个Spring Boot应用的启动类,实现了CommandLineRunner
接口,在应用启动后自动执行SocketService
的apply()
方法启动Socket服务器。
2.2 Socket服务主类 - SocketService
SocketService
是整个Socket服务器的核心实现类,采用单例模式(通过Spring的@Service
注解实现)。
关键配置参数
private static final int PORT = 5555;
private static final int MAX_CLIENTS = 10;
private static final long HEARTBEAT_INTERVAL = 30000; // 30秒心跳检测
这些参数定义了服务器的基本配置:
• 监听端口:5555
• 最大客户端连接数:10
• 心跳检测间隔:30秒
线程管理
private static final ExecutorService threadPool = Executors.newFixedThreadPool(MAX_CLIENTS);
private static final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
使用两个线程池:
- 主线程池:固定大小(等于最大客户端数),处理客户端连接
- 调度线程池:单线程,用于心跳检测
客户端管理
private static final ConcurrentHashMap<String, ClientHandler> activeClients = new ConcurrentHashMap<>();
使用线程安全的ConcurrentHashMap
存储活跃客户端,键为客户端ID,值为对应的处理器实例。
2.3 客户端处理器 - ClientHandler
ClientHandler
是处理单个客户端连接的核心内部类,实现了Runnable
接口。
关键字段
private final Socket clientSocket;
private final String clientId;
private final PrintWriter out;
private final BufferedReader in;
private volatile long lastActivityTime;
private volatile boolean running = true;
• clientSocket
: 客户端Socket连接
• clientId
: 由客户端IP和端口组成的唯一标识
• out
/in
: 输出/输入流
• lastActivityTime
: 记录最后活动时间(用于心跳检测)
• running
: 控制处理循环的开关
核心方法
public void run() {try {String inputLine;while (running && (inputLine = in.readLine()) != null) {lastActivityTime = System.currentTimeMillis();processClientMessage(inputLine);}} catch (SocketException e) {// 处理异常} finally {// 清理资源}
}
这是客户端处理的主循环,不断读取客户端消息并处理,同时更新最后活动时间。
2.4 心跳检测机制
private void startHeartbeatChecker() {scheduler.scheduleAtFixedRate(() -> {long currentTime = System.currentTimeMillis();activeClients.forEach((id, handler) -> {if (currentTime - handler.lastActivityTime > HEARTBEAT_INTERVAL) {System.out.printf("客户端[%s]心跳超时,强制断开%n", id);handler.disconnect();activeClients.remove(id);}});}, 10, 10, TimeUnit.SECONDS); // 每10秒检查一次
}
心跳检测器每10秒检查一次所有客户端,如果发现某个客户端超过30秒没有活动,则强制断开连接。
2.5 优雅停机实现
Runtime.getRuntime().addShutdownHook(new Thread(() -> {System.out.println("\n接收到关闭信号,开始优雅停机...");shutdownServer(); // 执行自定义关闭逻辑
}));
通过注册JVM关闭钩子,在服务器进程收到终止信号时执行自定义的关闭逻辑:
- 关闭心跳检测线程
- 通知所有客户端服务器即将关闭
- 断开所有客户端连接
- 关闭线程池
3. 完整代码实现
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.*;@Component
public class RelayTask implements CommandLineRunner {@Autowiredprivate SocketService socketService;@Overridepublic void run(String... args) throws Exception {socketService.apply();}
}@Service
public class SocketService {// 服务器配置private static final int PORT = 5555;private static final int MAX_CLIENTS = 10;private static final long HEARTBEAT_INTERVAL = 30000; // 30秒心跳检测// 线程池和客户端集合private static final ExecutorService threadPool =Executors.newFixedThreadPool(MAX_CLIENTS);private static final ConcurrentHashMap<String, ClientHandler> activeClients =new ConcurrentHashMap<>();private static final ScheduledExecutorService scheduler =Executors.newSingleThreadScheduledExecutor();public void apply() {Runtime.getRuntime().addShutdownHook(new Thread(() -> {System.out.println("\n接收到关闭信号,开始优雅停机...");shutdownServer(); // 执行自定义关闭逻辑}));System.out.printf("Socket服务器启动,监听端口: %d,最大连接数: %d%n", PORT, MAX_CLIENTS);// 启动心跳检测线程startHeartbeatChecker();try (ServerSocket serverSocket = new ServerSocket(PORT)) {while (true) {try {Socket clientSocket = serverSocket.accept();// 检查是否达到最大连接数if (activeClients.size() >= MAX_CLIENTS) {rejectClient(clientSocket);continue;}// 创建客户端处理器ClientHandler handler = new ClientHandler(clientSocket);String clientId = handler.getClientId();// 添加到活跃客户端列表activeClients.put(clientId, handler);threadPool.execute(handler);System.out.printf("客户端[%s]已连接,当前连接数: %d/%d%n",clientId, activeClients.size(), MAX_CLIENTS);} catch (IOException e) {System.err.println("接受客户端连接时出错: " + e.getMessage());}}} catch (IOException e) {System.err.println("服务器启动失败: " + e.getMessage());} finally {shutdownServer();}}// 客户端处理器private class ClientHandler implements Runnable {private final Socket clientSocket;private final String clientId;private final PrintWriter out;private final BufferedReader in;private volatile long lastActivityTime;private volatile boolean running = true;public ClientHandler(Socket socket) throws IOException {this.clientSocket = socket;this.clientId = socket.getInetAddress().getHostAddress() + ":" + socket.getPort();this.out = new PrintWriter(socket.getOutputStream(), true);this.in = new BufferedReader(new InputStreamReader(socket.getInputStream()));this.lastActivityTime = System.currentTimeMillis();// 发送欢迎消息sendMessage("SERVER|欢迎连接到服务器,你的ID: " + clientId);}public String getClientId() {return clientId;}public void sendMessage(String message) {out.println(message);}public void disconnect() {running = false;try {clientSocket.close();} catch (IOException e) {// 忽略关闭异常}}@Overridepublic void run() {try {String inputLine;while (running && (inputLine = in.readLine()) != null) {lastActivityTime = System.currentTimeMillis();// 处理客户端消息processClientMessage(inputLine);}} catch (SocketException e) {System.out.printf("客户端[%s]异常断开: %s%n", clientId, e.getMessage());} catch (IOException e) {System.err.printf("与客户端[%s]通信出错: %s%n", clientId, e.getMessage());} finally {// 客户端断开处理activeClients.remove(clientId);System.out.printf("客户端[%s]已断开,剩余连接数: %d%n",clientId, activeClients.size());disconnect();}}// 处理消息逻辑private void processClientMessage(String message) {System.out.printf("收到客户端[%s]消息: %s%n", clientId, message);}}// ---------- 服务器管理方法 ----------// 拒绝新连接(达到最大连接数时)private void rejectClient(Socket clientSocket) throws IOException {PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);out.println("SERVER|ERROR|服务器已达最大连接数,请稍后再试");clientSocket.close();System.out.println("已拒绝新连接(达到最大连接数)");}// 启动心跳检测线程private void startHeartbeatChecker() {scheduler.scheduleAtFixedRate(() -> {long currentTime = System.currentTimeMillis();activeClients.forEach((id, handler) -> {if (currentTime - handler.lastActivityTime > HEARTBEAT_INTERVAL) {System.out.printf("客户端[%s]心跳超时,强制断开%n", id);handler.disconnect();activeClients.remove(id);}});}, 10, 10, TimeUnit.SECONDS); // 每10秒检查一次}// 关闭服务器private void shutdownServer() {System.out.println("服务器关闭中...");// 关闭心跳检测线程if (scheduler != null) {scheduler.shutdownNow();System.out.println("心跳检测线程已关闭");}// 通知所有客户端activeClients.forEach((id, handler) -> {handler.sendMessage("SERVER|WARN|服务器即将关闭");handler.disconnect();});// 关闭线程池threadPool.shutdown();try {if (!threadPool.awaitTermination(5, TimeUnit.SECONDS)) {threadPool.shutdownNow();}} catch (InterruptedException e) {threadPool.shutdownNow();Thread.currentThread().interrupt();}System.out.println("服务器已关闭");}// 获取当前连接数(可用于监控)public int getCurrentConnections() {return activeClients.size();}
}
4. 扩展建议
这个基础实现可以进一步扩展:
- 协议扩展:实现更复杂的消息协议,如JSON或Protobuf
- 认证机制:添加客户端认证功能
- 消息广播:实现向所有客户端广播消息的功能
- 性能监控:添加连接数、吞吐量等监控指标
- 配置外部化:将端口、最大连接数等参数移到配置文件中
5. 总结
本文详细解析了一个功能完善的Java Socket服务器实现,涵盖了线程池管理、心跳检测、优雅停机等关键特性。这个实现既保持了简洁性,又具备了生产环境所需的核心功能,可以作为开发更复杂网络应用的坚实基础。