当前位置: 首页 > news >正文

gRPC从0到1系列【17】

文章目录

  • 双向流式RPC
      • 6.2.4 客户端代码
      • 6.2.5 测试

双向流式RPC

6.2.4 客户端代码

package cn.tcmeta.chat.grpc;import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;import java.util.Scanner;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** @author: laoren* @description: 双向流式RPC客户端实现* @version: 1.0.0* 双向流式RPC客户端实现* 演示全双工通信:客户端和服务器都可以随时发送消息**/
public class BidirectionalChatClient {private final ManagedChannel channel;private final ChatServiceGrpc.ChatServiceStub asyncStub;private volatile StreamObserver<ChatMessage> requestObserver;private final AtomicInteger messageCounter = new AtomicInteger(0);private final AtomicInteger receivedMessageCount = new AtomicInteger(0);private volatile boolean connected = false;public BidirectionalChatClient(String host, int port) {channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();this.asyncStub = ChatServiceGrpc.newStub(channel);}/*** 关闭连接** @throws InterruptedException InterruptedException*/public void shutdown() throws InterruptedException {connected = false;if (requestObserver != null) {synchronized (this) {if (requestObserver != null) {try {requestObserver.onCompleted();} catch (IllegalStateException e) {// 流已经关闭,忽略异常System.out.println("gRPC stream already closed");}requestObserver = null;}}}channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);}/*** 开始聊天** @param userId 用户ID*/public void startChat(String userId) {System.out.println("=== 启动双向流式聊天 ===");System.out.println("用户ID: " + userId);System.out.println("输入 'quit' 退出聊天");System.out.println("输入 'status' 查看连接状态");System.out.println("输入 'help' 查看帮助");System.out.println("========================\n");CountDownLatch finishLatch = new CountDownLatch(1);long startTime = System.currentTimeMillis();requestObserver = asyncStub.bidirectionalChat(new StreamObserver<>() {@Overridepublic void onNext(ChatResponse response) {receivedMessageCount.incrementAndGet();long currentTime = System.currentTimeMillis();long elapsed = currentTime - startTime;System.out.println("\n--- 收到服务器消息 (+" + elapsed + "ms) ---");System.out.println("消息ID: " + response.getMessageId());System.out.println("状态: " + response.getStatus());System.out.println("内容: " + response.getResponseMessage());System.out.println("类型: " + response.getCode());System.out.print("\n> "); // 重新显示输入提示// 特殊消息处理handleSpecialResponse(response);}@Overridepublic void onError(Throwable t) {System.err.println("双向流连接错误: " + t.getMessage());connected = false;finishLatch.countDown();}@Overridepublic void onCompleted() {long totalTime = System.currentTimeMillis() - startTime;System.out.println("\n=== 服务器关闭连接 ===");System.out.println("总共接收 " + receivedMessageCount.get() + " 条消息");System.out.println("连接持续时间: " + totalTime + "ms");connected = false;finishLatch.countDown();}});connected = true;// 启动消息发送线程startMessageSendingThread(userId, finishLatch);// 等待连接结束try {finishLatch.await();} catch (InterruptedException e) {Thread.currentThread().interrupt();}System.out.println("聊天客户端已退出");}/*** 开始发送消息的线程*/private void startMessageSendingThread(String userId, CountDownLatch finishLatch) {Thread sendThread = new Thread(() -> {Scanner scanner = new Scanner(System.in);while (connected && !Thread.currentThread().isInterrupted()) {try {System.out.print("> ");String input = scanner.nextLine().trim();if (input.isEmpty()) {continue;}// 处理命令if (handleCommand(input, finishLatch)) {continue;}// 发送聊天消息sendChatMessage(userId, input);} catch (Exception e) {System.err.println("发送消息时发生错误: " + e.getMessage());break;}}scanner.close();});sendThread.setDaemon(true);sendThread.start();}/*** 处理特殊命令*/private boolean handleCommand(String input, CountDownLatch finishLatch) {switch (input.toLowerCase()) {case "quit":case "exit":System.out.println("正在断开连接...");connected = false;if (requestObserver != null) {requestObserver.onCompleted();}finishLatch.countDown();return true;case "status":System.out.println("连接状态: " + (connected ? "已连接" : "已断开"));System.out.println("发送消息数: " + messageCounter.get());System.out.println("接收消息数: " + receivedMessageCount.get());return true;case "help":printHelp();return true;case "image":sendImageMessage("user123", "测试图片");return true;case "file":sendFileMessage("user123", "document.pdf");return true;case "system":sendSystemMessage("user123", "系统状态检查");return true;default:return false;}}/*** 打印帮助信息*/private void printHelp() {System.out.println("\n=== 可用命令 ===");System.out.println("quit/exit    - 退出聊天");System.out.println("status       - 查看连接状态");System.out.println("help         - 显示帮助");System.out.println("image        - 发送测试图片消息");System.out.println("file         - 发送测试文件消息");System.out.println("system       - 发送测试系统消息");System.out.println("其他输入     - 作为普通文本消息发送");System.out.println("================\n");}/*** 发送聊天消息*/private void sendChatMessage(String userId, String message) {long messageId = messageCounter.incrementAndGet();ChatMessage chatMessage = ChatMessage.newBuilder().setUserId(userId).setMessage(message).setTimestamp(System.currentTimeMillis()).setType(ChatMessage.MessageType.TEXT).build();requestObserver.onNext(chatMessage);System.out.println("已发送消息 [" + messageId + "]: " + message);}/*** 发送图片消息*/private void sendImageMessage(String userId, String imageDescription) {ChatMessage imageMessage = ChatMessage.newBuilder().setUserId(userId).setMessage(imageDescription).setTimestamp(System.currentTimeMillis()).setType(ChatMessage.MessageType.IMAGE).build();requestObserver.onNext(imageMessage);System.out.println("已发送图片消息: " + imageDescription);}/*** 发送文件消息*/private void sendFileMessage(String userId, String filename) {ChatMessage fileMessage = ChatMessage.newBuilder().setUserId(userId).setMessage(filename).setTimestamp(System.currentTimeMillis()).setType(ChatMessage.MessageType.FILE).build();requestObserver.onNext(fileMessage);System.out.println("已发送文件消息: " + filename);}/*** 发送系统消息*/private void sendSystemMessage(String userId, String systemMessage) {ChatMessage sysMessage = ChatMessage.newBuilder().setUserId(userId).setMessage(systemMessage).setTimestamp(System.currentTimeMillis()).setType(ChatMessage.MessageType.SYSTEM).build();requestObserver.onNext(sysMessage);System.out.println("已发送系统消息: " + systemMessage);}/*** 自动聊天演示(用于测试)*/public void startAutoChat(String userId, int messageCount, long intervalMs) {System.out.println("=== 自动聊天演示 ===");System.out.println("用户: " + userId);System.out.println("消息数: " + messageCount);System.out.println("间隔: " + intervalMs + "ms");CountDownLatch finishLatch = new CountDownLatch(1);AtomicInteger sentCount = new AtomicInteger(0);requestObserver = asyncStub.bidirectionalChat(new StreamObserver<>() {@Overridepublic void onNext(ChatResponse response) {System.out.println("收到: " + response.getResponseMessage());}@Overridepublic void onError(Throwable t) {System.err.println("自动聊天错误: " + t.getMessage());finishLatch.countDown();}@Overridepublic void onCompleted() {System.out.println("自动聊天完成");finishLatch.countDown();}});connected = true;// 自动发送消息new Thread(() -> {try {String[] autoMessages = {"你好,这是一个自动测试消息","今天的天气怎么样?","请告诉我系统状态","我需要帮助","这是一个很长的测试消息用于验证系统处理能力"};for (int i = 0; i < messageCount && connected; i++) {String message = autoMessages[i % autoMessages.length] + " [" + (i + 1) + "]";sendChatMessage(userId, message);sentCount.incrementAndGet();Thread.sleep(intervalMs);}// 发送完成信号if (connected) {requestObserver.onCompleted();}} catch (InterruptedException e) {Thread.currentThread().interrupt();}}).start();try {finishLatch.await(2, TimeUnit.MINUTES);} catch (InterruptedException e) {Thread.currentThread().interrupt();}connected = false;System.out.println("自动聊天演示结束,发送 " + sentCount.get() + " 条消息");}/*** 处理特殊服务器响应*/private void handleSpecialResponse(ChatResponse response) {switch (response.getCode()) {case INFO:if (response.getStatus().equals("HEARTBEAT")) {System.out.print("[心跳] "); // 简略显示心跳}break;case WARNING:System.out.print("[警告] ");break;case ERROR:System.out.print("[错误] ");break;case SUCCESS:if (response.getStatus().equals("BROADCAST")) {System.out.print("[广播] ");}break;}}public static void main(String[] args) throws InterruptedException {BidirectionalChatClient client = new BidirectionalChatClient("localhost", 8080);try {// 交互式聊天client.startChat("interactive_user");// 自动聊天演示(取消注释以测试)// client.startAutoChat("auto_user", 10, 1000);} finally {client.shutdown();}}
}

✅ 代码分析

  1. 基础通信模块
  • gRPC连接管理: 通过 ManagedChannel 建立与服务器的连接,支持明文传输
  • 异步存根初始化: 使用 ChatServiceGrpc.newStub() 创建异步客户端存根
  • 流观察器管理: 维护 requestObserver 用于双向流通信
  1. 聊天会话管理模块
  • 交互式聊天 (startChat):
    • 提供命令行交互界面
    • 支持用户实时输入和消息发送
    • 处理服务器响应和连接状态变化
  • 自动聊天演示 (startAutoChat):
    • 自动发送预定义消息序列
    • 支持配置消息数量和发送间隔
    • 用于性能测试和演示
  1. 消息发送模块
  • 文本消息发送 (sendChatMessage): 发送普通文本聊天消息
  • 图片消息发送 (sendImageMessage): 发送图片类型消息
  • 文件消息发送 (sendFileMessage): 发送文件类型消息
  • 系统消息发送 (sendSystemMessage): 发送系统级别消息
  • 命令消息处理 (handleCommand): 处理特殊命令如 quit、status、help 等
  1. 消息接收与处理模块

    • 服务器响应处理 (onNext): 接收并显示服务器返回的消息
    • 错误处理 (onError): 处理连接异常和错误情况
    • 连接完成处理 (onCompleted): 处理服务器主动关闭连接
    • 特殊响应处理 (handleSpecialResponse): 根据消息类型进行特殊标记显示
  2. 用户交互模块

    • 命令行界面: 提供交互式命令行输入输出
    • 帮助系统 (printHelp): 显示可用命令和使用说明
    • 状态查询: 实时显示连接状态和消息统计
    • 多线程输入: 使用独立线程处理用户输入,避免阻塞
  3. 资源管理模块

    • 连接关闭 (shutdown): 优雅关闭gRPC流和通道连接
    • 线程管理: 使用守护线程处理消息发送
    • 计数器管理: 维护发送和接收消息的计数统计
    • 同步控制: 使用 CountDownLatch 和 synchronized 确保线程安全

6.2.5 测试

✅ 1. 客户端日志

=== 启动双向流式聊天 ===
用户ID: interactive_user
输入 'quit' 退出聊天
输入 'status' 查看连接状态
输入 'help' 查看帮助
========================--- 收到服务器消息 (+417ms) ---
消息ID: WELCOME_1
状态: CONNECTED
内容: 欢迎来到双向流式RPC聊天室! 连接ID: 1
类型: INFO--- 收到服务器消息 (+10425ms) ---
消息ID: HEARTBEAT_1_1759281086742
状态: HEARTBEAT
内容: 连接状态正常
类型: INFO> [心跳] help=== 可用命令 ===
quit/exit    - 退出聊天
status       - 查看连接状态
help         - 显示帮助
image        - 发送测试图片消息
file         - 发送测试文件消息
system       - 发送测试系统消息
其他输入     - 作为普通文本消息发送
================--- 收到服务器消息 (+20418ms) ---
消息ID: HEARTBEAT_1_1759281096743
状态: HEARTBEAT
内容: 连接状态正常
类型: INFO> [心跳] status
连接状态: 已连接
发送消息数: 0
接收消息数: 3
> help=== 可用命令 ===
quit/exit    - 退出聊天
status       - 查看连接状态
help         - 显示帮助
image        - 发送测试图片消息
file         - 发送测试文件消息
system       - 发送测试系统消息
其他输入     - 作为普通文本消息发送
================--- 收到服务器消息 (+30413ms) ---
消息ID: HEARTBEAT_1_1759281106737
状态: HEARTBEAT
内容: 连接状态正常
类型: INFO> [心跳] image
已发送图片消息: 测试图片--- 收到服务器消息 (+32076ms) ---
消息ID: IMG_1_1_1
状态: PROCESSING
内容: 开始分析图片内容 (25%)
类型: INFO--- 收到服务器消息 (+32587ms) ---
消息ID: IMG_1_1_COMPLETE
状态: COMPLETED
内容: 图片分析完成,检测到3个主要对象
类型: SUCCESS> 
--- 收到服务器消息 (+32588ms) ---
消息ID: IMG_1_1_2
状态: PROCESSING
内容: 检测图片中的对象 (50%)
类型: INFO--- 收到服务器消息 (+33093ms) ---
消息ID: IMG_1_1_COMPLETE
状态: COMPLETED
内容: 图片分析完成,检测到3个主要对象
类型: SUCCESS> 
--- 收到服务器消息 (+33094ms) ---
消息ID: IMG_1_1_3
状态: PROCESSING
内容: 生成图片描述 (75%)
类型: INFO--- 收到服务器消息 (+33601ms) ---
消息ID: IMG_1_1_COMPLETE
状态: COMPLETED
内容: 图片分析完成,检测到3个主要对象
类型: SUCCESS> 
--- 收到服务器消息 (+33601ms) ---
消息ID: IMG_1_1_4
状态: PROCESSING
内容: 分析完成 (100%)
类型: INFO--- 收到服务器消息 (+34104ms) ---
消息ID: IMG_1_1_COMPLETE
状态: COMPLETED
内容: 图片分析完成,检测到3个主要对象
类型: SUCCESS> help=== 可用命令 ===
quit/exit    - 退出聊天
status       - 查看连接状态
help         - 显示帮助
image        - 发送测试图片消息
file         - 发送测试文件消息
system       - 发送测试系统消息
其他输入     - 作为普通文本消息发送
================.--- 收到服务器消息 (+40411ms) ---
消息ID: HEARTBEAT_1_1759281116735
状态: HEARTBEAT
内容: 连接状态正常
类型: INFO> [心跳] e--- 收到服务器消息 (+40644ms) ---
消息ID: TEXT_1_2
状态: PROCESSED
内容: 已收到您的消息: "sfile"
类型: SUCCESS> file
已发送文件消息: document.pdf--- 收到服务器消息 (+44869ms) ---
消息ID: FILE_1_3_10
状态: UPLOADING
内容: 文件上传进度: 10%
类型: INFO--- 收到服务器消息 (+45075ms) ---
消息ID: FILE_1_3_20
状态: UPLOADING
内容: 文件上传进度: 20%
类型: INFO// ..........
--- 收到服务器消息 (+46521ms) ---
消息ID: FILE_1_3_90
状态: UPLOADING
内容: 文件上传进度: 90%
类型: INFO--- 收到服务器消息 (+46734ms) ---
消息ID: FILE_1_3_100
状态: UPLOADING
内容: 文件上传进度: 100%
类型: INFO> help=== 可用命令 ===
quit/exit    - 退出聊天
status       - 查看连接状态
help         - 显示帮助
image        - 发送测试图片消息
file         - 发送测试文件消息
system       - 发送测试系统消息
其他输入     - 作为普通文本消息发送
================--- 收到服务器消息 (+50408ms) ---
消息ID: HEARTBEAT_1_1759281126734
状态: HEARTBEAT
内容: 连接状态正常
类型: INFO> [心跳] system
已发送系统消息: 系统状态检查--- 收到服务器消息 (+52468ms) ---
消息ID: SYS_1_4
状态: ACKNOWLEDGED
内容: 系统消息已确认: 系统状态检查
类型: SUCCESS> 天气
已发送消息 [2]: 天气
--- 收到服务器消息 (+56198ms) ---
消息ID: TEXT_1_5
状态: PROCESSED
内容: 已收到您的消息: "天气"
类型: SUCCESS--- 收到服务器消息 (+60416ms) ---
消息ID: HEARTBEAT_1_1759281136741
状态: HEARTBEAT
内容: 连接状态正常
类型: INFO> [心跳] help=== 可用命令 ===
quit/exit    - 退出聊天
status       - 查看连接状态
help         - 显示帮助
image        - 发送测试图片消息
file         - 发送测试文件消息
system       - 发送测试系统消息
其他输入     - 作为普通文本消息发送
================
--- 收到服务器消息 (+70408ms) ---
消息ID: HEARTBEAT_1_1759281146734
状态: HEARTBEAT
内容: 连接状态正常
类型: INFO> [心跳] quit
正在断开连接...
聊天客户端已退出--- 收到服务器消息 (+75178ms) ---
消息ID: GOODBYE_1
状态: DISCONNECTED
内容: 感谢使用! 统计: 5条消息, 74秒时长, 平均14953.20 ms/消息
类型: INFO=== 服务器关闭连接 ===
总共接收 30 条消息
连接持续时间: 75184msProcess finished with exit code 0

✅ 2. 服务器端日志

=== 双向流式RPC连接建立, 连接ID: 1 ===发送欢迎消息到连接 1
向连接 1 发送心跳
向连接 1 发送心跳
向连接 1 发送心跳双向流连接 1 - 收到第 1 条消息
用户: user123
消息: 测试图片
类型: IMAGE
时间: +31663ms双向流连接 1 - 收到第 2 条消息
用户: interactive_user
消息: sfile
类型: TEXT
时间: +40235ms双向流连接 1 - 收到第 3 条消息
用户: user123
消息: document.pdf
类型: FILE
时间: +44460ms向连接 1 发送心跳双向流连接 1 - 收到第 4 条消息
用户: user123
消息: 系统状态检查
类型: SYSTEM
时间: +52059ms双向流连接 1 - 收到第 5 条消息
用户: interactive_user
消息: 天气
类型: TEXT
时间: +55789ms向连接 1 发送心跳=== 双向流连接 1 关闭 ===
总共处理 5 条消息
连接持续时间: 74766ms=== 连接已关闭, 删除连接 1 ===conn_1
正在关闭gRPC服务器...
服务器已关闭
http://www.dtcms.com/a/442119.html

相关文章:

  • 浅谈内存DDR——DDR4性能优化技术
  • 静态网页模板网站电商运营培训班
  • mysqldump导入备份数据到阿里云RDS会报错吗
  • QT肝8天16--加载动态菜单
  • Spring Boot整合缓存——Redis缓存!超详细!
  • 湘潭做网站品牌磐石网络wordpress 柚子皮
  • 前端实战开发(二):React + Canvas 网络拓扑图开发:6 大核心问题与完整解决方案
  • 【C语言数据结构】第2章:线性表(2)--线性表的顺序存储结构
  • 计算机操作系统--进程:共享内存和管道的差异
  • 深圳移动网站建设公司上海建筑工程有限公司
  • 【Linux】入门指南:基础指令详解Part One
  • 使用 Docker 部署 Nginx 教程
  • 重庆做网站微信的公司上海平面网站
  • 整站优化seo公司哪家好千峰网课
  • C语言指针应用的经典案例
  • C++篇(11)继承
  • 小迪web自用笔记54
  • 网站logo如何做清晰佛山seo优化电话
  • 词袋模型BoW
  • 数据驱动AI实战:从统计学习方法到业务落地的核心方法论
  • 网站开发需求大吗第一次做怎么放进去视频网站
  • display vlan verbose 概念及题目
  • 深度学习写作:model与module; 试验与实验
  • 企业 网站 程序微信小程序开发平台
  • ViT实战二:Cls token
  • AI + 制造:从技术试点到产业刚需的 2025 实践图鉴
  • JVM内存模型剖析
  • 山东网站制作哪家好网站优化方案和实施
  • 工作中使用到的单词(软件开发)_第五版
  • Vue3 Router高级用法—菜单动态渲染