【RelayMQ】基于 Java 实现轻量级消息队列(九)
目录
一. 建立连接
1.1 服务器启动
1.2 客户端创建连接
1.3 连接初始化
二. 自定义网络协议
2.1 网络协议格式
2.2 请求类型
2.3 方法实现
2.3.1 Channel
2.3.2 交换机
2.3.3 队列
2.3.4 绑定
2.3.5 发送消息
2.3.6 订阅消息
2.3.7 手动应答
2.4 阻塞唤醒机制
三. 请求处理流程
3.1 客户端发送请求
3.2 服务器接受和解析请求
3.3 服务器处理请求并生成响应
3.4 客户端接收和处理响应
这篇文章主要介绍网络通信模块
一. 建立连接
1.1 服务器启动
-
构建出一个线程池
-
当有客户端连接时,返回一个Socket对象用于与客户端通信
-
同时线程池中创建出一个线程, 负责处理客户端的连接
public void start() throws IOException {System.out.println("启动");//创建线程池处理客户端连接executorService = Executors.newCachedThreadPool();try {while (runnable) {//接受客户端连接Socket clientSocket = serverSocket.accept();//为每个连接创建单独的线程处理executorService.submit(() -> {try {processConnection(clientSocket);} catch (IOException e) {throw new RuntimeException(e);}});}} catch (SocketException e) {System.out.println("服务器停止运行");}
}
1.2 客户端创建连接
public Connection newConnection() throws IOException {Connection connection = new Connection(host, port);return connection;
}
1.3 连接初始化
- 设置一个扫描线程不断访问缓冲区
- 如果缓冲区不为空, 就一直读取数据
- 将读取到的数据根据协议进行处理
- 将响应内容进行处理解析
public Connection(String host, int port) throws IOException {socket = new Socket(host, port);inputStream = socket.getInputStream();outputStream = socket.getOutputStream();dataInputStream = new DataInputStream(inputStream);dataOutputStream = new DataOutputStream(outputStream);callbackPool = Executors.newFixedThreadPool(4);//启动扫描线程,用于接收和处理服务器响应Thread thread = new Thread(()->{try{while (!socket.isClosed()){Response response = readResponse();dispatchResponse(response);}} catch (SocketException e){System.out.println("连接正常断开");} catch (IOException | ClassNotFoundException | MqException e) {System.out.println("连接异常断开");e.printStackTrace();}});thread.start();
}
二. 自定义网络协议
2.1 网络协议格式
- type: 操作的类型, 不同的type数据对应不同的操作, 服务器根据这个值来决定如何处理请求。
- length: 记录了 payload 的字节长度,主要用于解决TCP协议中的 粘包问题 。
- payload: 包含了具体的操作数据
2.2 请求类型
请求类型 | 操作描述 | 对应方法 |
---|---|---|
0x1 | 创建Channel | Channel.createChannel() |
0x2 | 删除Channel | Channel.close() |
0x3 | 创建交换机 | Channel.exchangeDeclare() |
0x4 | 删除交换机 | Channel.exchangeDelete() |
0x5 | 创建队列 | Channel.queueDeclare() |
0x6 | 删除队列 | Channel.queueDelete() |
0x7 | 创建绑定 | Channel.queueBind() |
0x8 | 删除绑定 | Channel.queueUnbind() |
0x9 | 发送消息 | Channel.basicPublish() |
0xa | 订阅消息 | Channel.basicConsume() |
0xb | 手动应答 | Channel.basicAck() |
0xc | 消息推送(服务器主动) | - |
2.3 方法实现
2.3.1 Channel
/** 创建Channel* */public boolean createChannel() throws IOException {BasicArguments basicArguments = new BasicArguments();basicArguments.setChannelId(channelId);basicArguments.setRid(generateRid());byte[] payload = BinaryTool.toBytes(basicArguments);Request request = new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);//发送请求connection.writeRequest(request);//等待服务器的回复BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.isOk();}/** 删除Channel* */public boolean close() throws IOException {BasicArguments basicArguments = new BasicArguments();basicArguments.setRid(generateRid());basicArguments.setChannelId(channelId);byte[] payload = BinaryTool.toBytes(basicArguments);Request request = new Request();request.setType(0x2);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns = waitResult(basicArguments.getRid());return basicReturns.isOk();}
2.3.2 交换机
/** 创建交换机* */public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,Map<String,Object> arguments) throws IOException {ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();exchangeDeclareArguments.setRid(generateRid());exchangeDeclareArguments.setChannelId(channelId);exchangeDeclareArguments.setExchangeName(exchangeName);exchangeDeclareArguments.setExchangeType(exchangeType);exchangeDeclareArguments.setDurable(durable);exchangeDeclareArguments.setAutoDelete(autoDelete);exchangeDeclareArguments.setArguments(arguments);byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);Request request = new Request();request.setType(0x3);request.setLength(payload.length);request.setPayload(payload);//发送请求connection.writeRequest(request);//接收返回的响应BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());return basicReturns.isOk();}/** 删除交换机* */public boolean exchangeDelete(String exchangeName) throws IOException {ExchangeDeleteArguments exchangeDeleteArguments = new ExchangeDeleteArguments();exchangeDeleteArguments.setRid(generateRid());exchangeDeleteArguments.setChannelId(channelId);exchangeDeleteArguments.setExchangeName(exchangeName);byte[] payload = BinaryTool.toBytes(exchangeDeleteArguments);Request request = new Request();request.setType(0x4);request.setLength(payload.length);request.setPayload(payload);//发送请求connection.writeRequest(request);//接收返回的响应BasicReturns basicReturns = waitResult(exchangeDeleteArguments.getRid());return basicReturns.isOk();}
2.3.3 队列
/** 增加队列* */public boolean queueDeclare(String queueName,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> arguments) throws IOException {QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();queueDeclareArguments.setRid(generateRid());queueDeclareArguments.setChannelId(channelId);queueDeclareArguments.setQueueName(queueName);queueDeclareArguments.setDurable(durable);queueDeclareArguments.setExclusive(exclusive);queueDeclareArguments.setAutoDelete(autoDelete);queueDeclareArguments.setArguments(arguments);byte[] payload = BinaryTool.toBytes(queueDeclareArguments);Request request = new Request();request.setType(0x5);request.setLength(payload.length);request.setPayload(payload);//发送请求connection.writeRequest(request);//接收返回的响应BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());return basicReturns.isOk();}/** 删除队列* */public boolean queueDelete(String queueName) throws IOException {QueueDeleteArguments queueDeleteArguments = new QueueDeleteArguments();queueDeleteArguments.setRid(generateRid());queueDeleteArguments.setChannelId(channelId);queueDeleteArguments.setQueueName(queueName);byte[] payload = BinaryTool.toBytes(queueDeleteArguments);Request request = new Request();request.setType(0x6);request.setLength(payload.length);request.setPayload(payload);//发送请求connection.writeRequest(request);//接收返回的响应BasicReturns basicReturns = waitResult(queueDeleteArguments.getRid());return basicReturns.isOk();}
2.3.4 绑定
/** 创建绑定* */public boolean queueBind(String queueName,String exchangeName,String bindingKey) throws IOException {QueueBindArguments queueBindArguments = new QueueBindArguments();queueBindArguments.setRid(generateRid());queueBindArguments.setChannelId(channelId);queueBindArguments.setQueueName(queueName);queueBindArguments.setExchangeName(exchangeName);queueBindArguments.setBindingKey(bindingKey);byte[] payload = BinaryTool.toBytes(queueBindArguments);Request request = new Request();request.setType(0x7);request.setLength(payload.length);request.setPayload(payload);//发送请求connection.writeRequest(request);//接收返回的响应BasicReturns basicReturns = waitResult(queueBindArguments.getRid());return basicReturns.isOk();}/** 删除绑定* */public boolean queueUnbind(String queueName,String exchangeName) throws IOException {QueueUnBindArguments queueUnBindArguments = new QueueUnBindArguments();queueUnBindArguments.setRid(generateRid());queueUnBindArguments.setChannelId(channelId);queueUnBindArguments.setQueueName(queueName);queueUnBindArguments.setExchangeName(exchangeName);byte[] payload = BinaryTool.toBytes(queueUnBindArguments);Request request = new Request();request.setType(0x8);request.setLength(payload.length);request.setPayload(payload);//发送请求connection.writeRequest(request);//接收返回的响应BasicReturns basicReturns = waitResult(queueUnBindArguments.getRid());return basicReturns.isOk();}
2.3.5 发送消息
/** 发送消息* */public boolean basicPublic(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {BasicPublicArguments basicPublicArguments = new BasicPublicArguments();basicPublicArguments.setRid(generateRid());basicPublicArguments.setChannelId(channelId);basicPublicArguments.setExchangeName(exchangeName);basicPublicArguments.setRoutingKey(routingKey);basicPublicArguments.setBasicProperties(basicProperties);basicPublicArguments.setBody(body);byte[] payload = BinaryTool.toBytes(basicPublicArguments);Request request = new Request();request.setType(0x9);request.setLength(payload.length);request.setPayload(payload);//发送请求connection.writeRequest(request);//接收返回的响应BasicReturns basicReturns = waitResult(basicPublicArguments.getRid());return basicReturns.isOk();}
2.3.6 订阅消息
/** 订阅消息* */public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {if(this.consumer != null){throw new MqException("不能重复设置回调函数");}this.consumer = consumer;BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments();basicConsumeArguments.setRid(generateRid());basicConsumeArguments.setChannelId(channelId);basicConsumeArguments.setConsumerTag(channelId);basicConsumeArguments.setQueueName(queueName);basicConsumeArguments.setAutoAck(autoAck);byte[] payload = BinaryTool.toBytes(basicConsumeArguments);Request request = new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);//发送请求connection.writeRequest(request);//接收返回的响应BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid());return basicReturns.isOk();}
2.3.7 手动应答
/** 手动应答* */public boolean basicAck(String queueName,String messageId) throws IOException {BasicAckArguments basicAckArguments = new BasicAckArguments();basicAckArguments.setRid(generateRid());basicAckArguments.setChannelId(channelId);basicAckArguments.setQueueName(queueName);basicAckArguments.setMessageId(messageId);byte[] payload = BinaryTool.toBytes(basicAckArguments);Request request = new Request();request.setType(0xb);request.setLength(payload.length);request.setPayload(payload);//发送请求connection.writeRequest(request);//接收返回的响应BasicReturns basicReturns = waitResult(basicAckArguments.getRid());return basicReturns.isOk();}
2.4 阻塞唤醒机制
引入阻塞唤醒机制, 保证客户端可以可靠获取服务器的响应
private BasicReturns waitResult(String rid) {BasicReturns basicReturns = null;while((basicReturns = basicReturnsMap.get(rid)) ==null){synchronized (this) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}}//读取走了就需要删除哈希表中的值basicReturnsMap.remove(rid);return basicReturns;}public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(),basicReturns);synchronized (this){//返回的响应数据回来了,唤醒之前等待的线程notifyAll();}}
三. 请求处理流程
3.1 客户端发送请求
public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();System.out.println("发送请求成功");
}
3.2 服务器接受和解析请求
private void processConnection(Socket clientSocket) throws IOException {try (InputStream inputStream = clientSocket.getInputStream();OutputStream outputStream = clientSocket.getOutputStream()) {try (DataInputStream dataInputStream = new DataInputStream(inputStream);DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) {while (true) {// 1. 读取请求并解析Request request = readRequest(dataInputStream);// 2. 根据请求计算响应Response response = process(request, clientSocket);// 3. 把响应写回客户端writeResponse(dataOutputStream, response);}}}
}
3.3 服务器处理请求并生成响应
private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {
// 1. payload的内容根据type来决定,计算响应,必须先对请求进行更近一步的解析(将去取的内容全部转换为BasicArguments(读取到的内容全是BasicArguments的子类))BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload());System.out.println("rid = " + basicArguments.getRid() + ",channelId= " + basicArguments.getChannelId());
// 2. 根据type的值,进行区分请求要干什么boolean ok = true;if (request.getType() == 0x1) {//创建channelsessions.put(basicArguments.getChannelId(), clientSocket);} else if (request.getType() == 0x2) {//删除channelsessions.remove(basicArguments.getChannelId());} else if (request.getType() == 0x3) {ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments;ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(), arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x4) {ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments;ok = virtualHost.exchangeDelete(arguments.getExchangeName());} else if (request.getType() == 0x5) {QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments;ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(), arguments.isExclusive(),arguments.isAutoDelete(), arguments.getArguments());} else if (request.getType() == 0x6) {QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments;ok = virtualHost.queueDelete(arguments.getQueueName());} else if (request.getType() == 0x7) {QueueBindArguments arguments = (QueueBindArguments) basicArguments;ok = virtualHost.queueBind(arguments.getQueueName(),arguments.getExchangeName(),arguments.getBindingKey());} else if (request.getType() == 0x8) {QueueUnBindArguments arguments = (QueueUnBindArguments) basicArguments;ok = virtualHost.queueUnbind(arguments.getQueueName(),arguments.getExchangeName());} else if (request.getType() == 0x9) {BasicPublicArguments arguments = (BasicPublicArguments) basicArguments;ok = virtualHost.basicPublic(arguments.getExchangeName(),arguments.getRoutingKey(),arguments.getBasicProperties(),arguments.getBody());} else if (request.getType() == 0xa) {//实现订阅消息BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments;ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(),new Consumer() {//回调函数要做的事:将消息推送给客户端@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {//这里consumerTag就是对应的channelId,根据channelId可以得到socket对象,从而发送给对应的客户端(返回给socket对象)
// 1.根据channelId查找socket对象(consumerTag的值应该和channelId的值一样)Socket clientSocket = sessions.get(consumerTag);if(clientSocket == null||clientSocket.isClosed()){throw new MqException("订阅消息的客户端已经关闭");}
// 2.构造响应数据//返回的具体数据/描述SubScribeReturns subScribeReturns = new SubScribeReturns();//只有一个请求对应一个响应的过程,才需要rid匹配subScribeReturns.setRid("");subScribeReturns.setChannelId(consumerTag);subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);//将数据转为2进制,然后写入Respond中并返回byte[] payload = BinaryTool.toBytes(subScribeReturns);Response response = new Response();response.setType(0xc);response.setLength(payload.length);response.setPayload(payload);// 3,将数据写回给客户端DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream,response);}});} else if (request.getType() == 0xb) {BasicAckArguments arguments = (BasicAckArguments) basicArguments;ok = virtualHost.basicAck(arguments.getQueueName(),arguments.getMessageId());}else {throw new MqException("未知的type");}
// 3.构造响应BasicReturns basicReturns = new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payLoad = BinaryTool.toBytes(basicReturns);Response response = new Response();response.setType(request.getType());response.setLength(payLoad.length);response.setPayload(payLoad);return response;}
3.4 客户端接收和处理响应
private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {// 处理服务器主动推送的消息(回调函数情况)if(response.getType()==0xc){SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());Channel channel = channelMap.get(subScribeReturns.getChannelId());if(channel==null){throw new MqException("该消息对应的Channel在客户端不存在");}// 使用线程池异步执行回调函数callbackPool.submit(()->{try {channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch (Exception e) {e.printStackTrace();}});} else {// 处理普通响应BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());Channel channel = channelMap.get(basicReturns.getChannelId());if(channel==null){throw new MqException("该消息对应的Channel在客户端不存在");}// 将响应存入对应Channel的basicReturnsMap中channel.putReturns(basicReturns);}
}
如果类型为0xc, 表示这个消息是服务器主动发送给客户端的, 需要单独分配一个线程去处理