当前位置: 首页 > news >正文

Nacos源码—9.Nacos升级gRPC分析八

大纲

10.gRPC客户端初始化分析

11.gRPC客户端的心跳机制(健康检查)

12.gRPC服务端如何处理客户端的建立连接请求

13.gRPC服务端如何映射各种请求与对应的Handler处理类

14.gRPC简单介绍

12.gRPC服务端如何处理客户端的建立连接请求

(1)gRPC服务端是如何启动的

(2)connectionId如何绑定Client对象的

(1)gRPC服务端是如何启动的

BaseRpcServer类有一个被@PostConstruct修饰的start()方法,该方法会调用BaseGrpcServer的startServer()方法来启动gRPC服务端。

在BaseGrpcServer的startServer()方法中,首先会调用BaseGrpcServer的addServices()方法添加服务,然后会使用建造者模式通过ServerBuilder创建gRPC框架的Server对象,最后启动gRPC框架的Server服务端,即启动一个NettyServer服务端。

//abstract rpc server.
public abstract class BaseRpcServer {...//Start sever. 启动gRPC服务端@PostConstructpublic void start() throws Exception {String serverName = getClass().getSimpleName();Loggers.REMOTE.info("Nacos {} Rpc server starting at port {}", serverName, getServicePort());//调用BaseGrpcServer.startServer()方法启动gRPC服务端startServer();Loggers.REMOTE.info("Nacos {} Rpc server started at port {}", serverName, getServicePort());Runtime.getRuntime().addShutdownHook(new Thread(() -> {Loggers.REMOTE.info("Nacos {} Rpc server stopping", serverName);try {BaseRpcServer.this.stopServer();Loggers.REMOTE.info("Nacos {} Rpc server stopped successfully...", serverName);} catch (Exception e) {Loggers.REMOTE.error("Nacos {} Rpc server stopped fail...", serverName, e);}}));}//get service port.public int getServicePort() {return EnvUtil.getPort() + rpcPortOffset();}...
}//Grpc implementation as a rpc server.
public abstract class BaseGrpcServer extends BaseRpcServer {...@Overridepublic void startServer() throws Exception {final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();//server interceptor to set connection id. 定义请求拦截器ServerInterceptor serverInterceptor = new ServerInterceptor() {@Overridepublic <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers, ServerCallHandler<T, S> next) {Context ctx = Context.current().withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID)).withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP)).withValue(CONTEXT_KEY_CONN_REMOTE_PORT, call.getAttributes().get(TRANS_KEY_REMOTE_PORT)).withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {Channel internalChannel = getInternalChannel(call);ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);}return Contexts.interceptCall(ctx, call, headers, next);}};//1.调用BaseGrpcServer.addServices()方法添加服务addServices(handlerRegistry, serverInterceptor);//2.创建一个gRPC框架的Server对象,使用了建造者模式server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor()).maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry).compressorRegistry(CompressorRegistry.getDefaultInstance()).decompressorRegistry(DecompressorRegistry.getDefaultInstance()).addTransportFilter(new ServerTransportFilter() {@Overridepublic Attributes transportReady(Attributes transportAttrs) {InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);InetSocketAddress localAddress = (InetSocketAddress) transportAttrs.get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);int remotePort = remoteAddress.getPort();int localPort = localAddress.getPort();String remoteIp = remoteAddress.getAddress().getHostAddress();Attributes attrWrapper = transportAttrs.toBuilder().set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort).set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort).set(TRANS_KEY_LOCAL_PORT, localPort).build();String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID);Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = {} ", connectionId);return attrWrapper;}@Overridepublic void transportTerminated(Attributes transportAttrs) {String connectionId = null;try {connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);} catch (Exception e) {//Ignore}if (StringUtils.isNotBlank(connectionId)) {Loggers.REMOTE_DIGEST.info("Connection transportTerminated,connectionId = {} ", connectionId);connectionManager.unregister(connectionId);}}}).build();//3.启动gRPC框架的Serverserver.start();}...
}

(2)connectionId如何绑定Client对象的

BaseGrpcServer的startServer()方法在执行addServices()方法添加服务时,就会对connectionId与Client对象进行绑定。

绑定会由GrpcBiStreamRequestAcceptor的requestBiStream()方法触发。具体就是会调用ConnectionManager.register()方法来实现绑定,即先通过执行"connections.put(connectionId, connection)"代码,将connectionId和connection连接对象,放入到ConnectionManager的connections这个Map属性中。再执行ClientConnectionEventListenerRegistry的notifyClientConnected()方法,把Connection连接对象包装成Client对象。

将Connection连接对象包装成Client对象时,又会继续调用ConnectionBasedClientManager的clientConnected()方法,该方法便会根据connectionId创建出一个Client对象,然后将其放入到ConnectionBasedClientManager的clients这个Map中,从而实现connectionId与Client对象的关联。

//Grpc implementation as a rpc server.
public abstract class BaseGrpcServer extends BaseRpcServer {...private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {//unary common call register.final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder().setType(MethodDescriptor.MethodType.UNARY).setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_SERVICE_NAME, REQUEST_METHOD_NAME)).setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();//对gRPC客户端请求的服务进行映射处理final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));//构建ServerServiceDefinition服务final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(REQUEST_SERVICE_NAME).addMethod(unaryPayloadMethod, payloadHandler).build();//添加服务到gRPC的请求流程中handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));//bi stream register.//处理客户端连接对象的关联//也就是调用GrpcBiStreamRequestAcceptor.requestBiStream()方法对ConnectionId与Client对象进行绑定final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall((responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder().setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME)).setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build())).setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition.builder(REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));}...
}@Service
public class GrpcBiStreamRequestAcceptor extends BiRequestStreamGrpc.BiRequestStreamImplBase {...@Overridepublic StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) {StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() {...@Overridepublic void onNext(Payload payload) {...//创建连接信息对象,把一些元信息放入到这个对象中ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(),remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(),setUpRequest.getClientVersion(), appName, setUpRequest.getLabels());metaInfo.setTenant(setUpRequest.getTenant());//把连接信息包装到连接对象中Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get());connection.setAbilities(setUpRequest.getAbilities());boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted();//ConnectionManager.register()方法,会将connectionId和连接对象进行绑定if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) {...}...}...};return streamObserver;}...
}@Service
public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> {//存储connectionId对应的Connection对象Map<String, Connection> connections = new ConcurrentHashMap<>();...//register a new connect.public synchronized boolean register(String connectionId, Connection connection) {if (connection.isConnected()) {if (connections.containsKey(connectionId)) {return true;}if (!checkLimit(connection)) {return false;}if (traced(connection.getMetaInfo().clientIp)) {connection.setTraced(true);}//将connectionId与Connection连接对象进行绑定connections.put(connectionId, connection);connectionForClientIp.get(connection.getMetaInfo().clientIp).getAndIncrement();//把Connection连接对象包装成Client对象clientConnectionEventListenerRegistry.notifyClientConnected(connection);Loggers.REMOTE_DIGEST.info("new connection registered successfully, connectionId = {},connection={} ", connectionId, connection);return true;}return false;}...
}@Service
public class ClientConnectionEventListenerRegistry {final List<ClientConnectionEventListener> clientConnectionEventListeners = new ArrayList<ClientConnectionEventListener>();//notify where a new client connectedpublic void notifyClientConnected(final Connection connection) {for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {try {//调用ConnectionBasedClientManager.clientConnected()方法clientConnectionEventListener.clientConnected(connection);} catch (Throwable throwable) {Loggers.REMOTE.info("[NotifyClientConnected] failed for listener {}", clientConnectionEventListener.getName(), throwable);}}}...
}@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>();@Overridepublic void clientConnected(Connection connect) {if (!RemoteConstants.LABEL_MODULE_NAMING.equals(connect.getMetaInfo().getLabel(RemoteConstants.LABEL_MODULE))) {return;}//把Connection对象中的信息取出来,放到ClientAttributes对象中ClientAttributes attributes = new ClientAttributes();attributes.addClientAttribute(ClientConstants.CONNECTION_TYPE, connect.getMetaInfo().getConnectType());attributes.addClientAttribute(ClientConstants.CONNECTION_METADATA, connect.getMetaInfo());//传入connectionId和连接信息clientConnected(connect.getMetaInfo().getConnectionId(), attributes);}@Overridepublic boolean clientConnected(String clientId, ClientAttributes attributes) {String type = attributes.getClientAttribute(ClientConstants.CONNECTION_TYPE);ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);//这里的clientId就是connectionId,根据connectionId创建出Client对象return clientConnected(clientFactory.newClient(clientId, attributes));}@Overridepublic boolean clientConnected(final Client client) {//最后将connectionId与Client对象进行绑定,放入到ConnectionBasedClientManager的clients这个Map中clients.computeIfAbsent(client.getClientId(), s -> {Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());return (ConnectionBasedClient) client;});return true;}...
}

(3)总结

13.gRPC服务端如何映射各种请求与对应的Handler处理类

gRPC服务端会如何处理客户端请求,如何找到对应的Handler处理类。

在gRPC服务端启动时,会调用BaseGrpcServer的startServer()方法,其中就会执行到BaseGrpcServer的addServices()方法。在BaseGrpcServer的addServices()方法中,就会进行请求与Handler映射,也就是调用GrpcRequestAcceptor的request()方法进行请求与Handler映射。

在GrpcRequestAcceptor的request()方法中,首先会从请求对象中获取请求type,然后会通过请求type获取一个Handler对象,最后调用RequestHandler的模版方法handleRequest(),从而调用具体Handler对象的handle()方法。

//Grpc implementation as a rpc server.
public abstract class BaseGrpcServer extends BaseRpcServer {@Autowiredprivate GrpcRequestAcceptor grpcCommonRequestAcceptor;...private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {//unary common call register.final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder().setType(MethodDescriptor.MethodType.UNARY).setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_SERVICE_NAME, REQUEST_METHOD_NAME)).setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();//对gRPC客户端发出的请求进行Handler处理类的映射处理final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall((request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));//构建ServerServiceDefinition服务final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(REQUEST_SERVICE_NAME).addMethod(unaryPayloadMethod, payloadHandler).build();//添加服务到gRPC的请求流程中handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));//bi stream register.//处理客户端连接对象的关联//也就是调用GrpcBiStreamRequestAcceptor.requestBiStream()方法对ConnectionId与Client对象进行绑定final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall((responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder().setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME)).setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build())).setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition.builder(REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));}...
}@Service
public class GrpcRequestAcceptor extends RequestGrpc.RequestImplBase {...@Overridepublic void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {...//首先从请求对象中获取请求typeString type = grpcRequest.getMetadata().getType();...//然后通过请求type获取一个Handler对象RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);...//最后调用RequestHandler的模版方法handleRequest(),从而调用具体Handler对象的handle()方法Response response = requestHandler.handleRequest(request, requestMeta);...}...
}public abstract class RequestHandler<T extends Request, S extends Response> {@Autowiredprivate RequestFilters requestFilters;//Handler request.public Response handleRequest(T request, RequestMeta meta) throws NacosException {for (AbstractRequestFilter filter : requestFilters.filters) {try {Response filterResult = filter.filter(request, meta, this.getClass());if (filterResult != null && !filterResult.isSuccess()) {return filterResult;}} catch (Throwable throwable) {Loggers.REMOTE.error("filter error", throwable);}}//调用具体Handler的handle()方法return handle(request, meta);}//Handler request.public abstract S handle(T request, RequestMeta meta) throws NacosException;
}@Service
public class RequestHandlerRegistry implements ApplicationListener<ContextRefreshedEvent> {Map<String, RequestHandler> registryHandlers = new HashMap<String, RequestHandler>();@Autowiredprivate TpsMonitorManager tpsMonitorManager;//Get Request Handler By request Type.public RequestHandler getByRequestType(String requestType) {return registryHandlers.get(requestType);}@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {//获取全部继承了RequestHandler类的实现类Map<String, RequestHandler> beansOfType = event.getApplicationContext().getBeansOfType(RequestHandler.class);Collection<RequestHandler> values = beansOfType.values();for (RequestHandler requestHandler : values) {Class<?> clazz = requestHandler.getClass();boolean skip = false;while (!clazz.getSuperclass().equals(RequestHandler.class)) {if (clazz.getSuperclass().equals(Object.class)) {skip = true;break;}clazz = clazz.getSuperclass();}if (skip) {continue;}try {Method method = clazz.getMethod("handle", Request.class, RequestMeta.class);if (method.isAnnotationPresent(TpsControl.class) && TpsControlConfig.isTpsControlEnabled()) {TpsControl tpsControl = method.getAnnotation(TpsControl.class);String pointName = tpsControl.pointName();TpsMonitorPoint tpsMonitorPoint = new TpsMonitorPoint(pointName);tpsMonitorManager.registerTpsControlPoint(tpsMonitorPoint);}} catch (Exception e) {//ignore.}Class tClass = (Class) ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments()[0];registryHandlers.putIfAbsent(tClass.getSimpleName(), requestHandler);}}
}

14.gRPC简单介绍

(1)gRPC是什么

(2)gRPC的特性

(3)gRPC和Dubbo的区别

(1)gRPC是什么

gRPC是一个高性能、开源和通用的RPC框架。gRPC基于ProtoBuf序列化协议开发,且支持众多开发语言。gRPC是面向服务端和移动端,基于HTTP 2设计的,带来诸如双向流、流控、头部压缩、单TCP连接上的多复用请求等特。这些特性使得其在移动设备上表现更好,更省电和节省空间占用。

(2)gRPC的特性

一.gRPC可以跨语言使用

二.基于IDL(接口定义语言Interface Define Language)文件定义服务

通过proto3工具生成指定语言的数据结构、服务端接口以及客户端Stub。

三.通信协议基于标准的HTTP 2设计

支持双向流、消息头压缩、单TCP的多路复用、服务端推送等特性,这些特性使得gRPC在移动端设备上更加省电和节省网络流量。

四.序列化支持ProtoBuf和JSON

ProtoBuf是一种语言无关的高性能序列化框架,它是基于HTTP2和ProtoBuf的,这保障了gRPC调用的高性能。

五.安装简单,扩展方便

使用gRPC框架每秒可达到百万RPC。

(3)gRPC和Dubbo的区别

一.通讯协议

gRPC基于HTTP 2.0,Dubbo基于TCP。

二.序列化

gRPC使用ProtoBuf,Dubbo使用Hession2等基于Java的序列化技术。

三.服务注册与发现

gRPC是应用级别的服务注册,Dubbo2.0及之前的版本都是基于更细力度的服务来进行注册,Dubbo3.0之后转向应用级别的服务注册。

四.编程语言

gRPC可以使用任何语言(HTTP和ProtoBuf天然就是跨语言的),而Dubbo只能使用在构建在JVM之上的语言。

五.服务治理

gRPC自身的服务治理能力很弱,只能基于HTTP连接维度进行容错,而Dubbo可以基于服务维度进行治理。

总结:gRPC的优势在于跨语言、跨平台,但服务治理能力弱。Dubbo服务治理能力强,但受编程语言限制无法跨语言使用。

相关文章:

  • 物理:从人出生和死亡的角度来讨论基本粒子的创生和湮灭以及是否守恒?
  • Spark 缓存(Caching)
  • 配置Nginx启用Https
  • C++中void*知识详解和注意事项
  • LORA: LOW-RANK ADAPTATION OF LARGE LANGUAGE MODELS
  • Pytorch张量和损失函数
  • Java详解LeetCode 热题 100(15):LeetCode 189. 轮转数组(Rotate Array)详解
  • 工程类论文查重困局破解:基于知识图谱的跨学科语义重构技术实证研究
  • 通讯录程序
  • 利用比较预言机处理模糊的偏好数据
  • Java Spring MVC -01
  • [Java实战]Spring Boot 3 整合 Apache Shiro(二十一)
  • 多模态融合【十九】——MRFS: Mutually Reinforcing Image Fusion and Segmentation
  • GOOSE 协议中MAC配置
  • CVE-2025-31258 macOS远程视图服务沙箱逃逸漏洞PoC已公开
  • JAVA研发+前后端分离,ZKmall开源商城B2C商城如何保障系统性能?
  • 使用scp命令拷贝hadoop100中文件到其他虚拟机中
  • 深度学习之优化器【从梯度下降到自适应学习率算法】(pytorch版)
  • C语言| extern的用法作用
  • TB67S109AFTG, TB67S109AFNG是一款采用PWM斩波器的两相双极步进电机驱动器内置有时钟输入解码器。采用BiCD工艺制造
  • 图讯丨习近平出席中国-拉美和加勒比国家共同体论坛第四届部长级会议开幕式
  • 工人日报:“鼠标手”被纳入职业病,劳动保障网越织越密
  • 93岁南开退休教授陈生玺逝世,代表作《明清易代史独见》多次再版
  • 网信部门曝光网络谣言典型案例,“AI预测彩票号码百分百中奖”等在列
  • 香港将展“天方奇毯”,从地毯珍品看伊斯兰艺术
  • 媒体谈法院就“行人相撞案”道歉:执法公正,普法莫拉开“距离”