当前位置: 首页 > news >正文

Envoy 源码解析(三):Envoy 发送数据到服务端、Envoy 收到服务端响应

本文基于 Envoy 1.31.0 版本进行源码学习

5、Envoy 发送数据到服务端

在 HTTP 请求头部解码完成后,需要根据 HTTP 请求头部内容与监听器路由配置信息匹配 Cluster 并通过负载均衡来确定 Cluster 的实例地址,然后工作线程将创建与该实例地址的连接并将 HTTP 请求经过编码操作后发送出去

1)、路由匹配

L7 路由过滤器总是作为 L7 过滤器的最后一个进行创建,并在处理 HTTP 请求时调用过滤器的 Filter::decodeHeaders 方法:

// source/common/router/router.cc
Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) {
	...
  // Determine if there is a route entry or a direct response for the request.
  // 匹配路由项
  route_ = callbacks_->route();
	...
  // A route entry matches for the request.
  // 获取当前路由项
  route_entry_ = route_->routeEntry();
  ...
  // 根据路由项匹配 Cluster
  Upstream::ThreadLocalCluster* cluster =
      config_->cm_.getThreadLocalCluster(route_entry_->clusterName());
  ...
  // 获取 Cluster 的上游连接池
  std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster);
  ...
  // 创建用于请求重试的对象 retry_state_
  retry_state_ = createRetryState(
      route_entry_->retryPolicy(), headers, *cluster_, request_vcluster_, route_stats_context_,
      config_->factory_context_, callbacks_->dispatcher(), route_entry_->priority());
	...
  // 创建上游 HTTP 请求
  UpstreamRequestPtr upstream_request =
      std::make_unique<UpstreamRequest>(*this, std::move(generic_conn_pool), can_send_early_data,
                                        can_use_http3, false /*enable_half_close*/);
  LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_);
  // 处理上游 HTTP 请求
  upstream_requests_.front()->acceptHeadersFromRouter(end_stream);
 	...
  if (end_stream) {
    onRequestComplete();
  }
  // 不再继续执行其他 L7 过滤器
  return Http::FilterHeadersStatus::StopIteration;
}

路由处理过程如下图所示:

路由处理过程包括以下几个步骤:

  1. 首先根据请求头部内容匹配虚拟主机 virtualHost,然后根据 RDS 配置设置 URL 条件来找到 Cluster:

    // source/common/router/router.cc
    Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) {
      ...
      // Determine if there is a route entry or a direct response for the request.
      // 匹配路由项
      route_ = callbacks_->route();
      ...
    }
    

    route 方法调用 ActiveStream::refreshCachedRoute 方法,在配置文件对象 snapped_route_config_ 中计算路由匹配规则,snapped_route_config_ 对象是在执行 ActiveStream::decodeHeaders 方法时生成的:

    // source/common/http/conn_manager_impl.cc
    void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPtr&& headers,
                                                            bool end_stream) {
        ...
        snapped_route_config_ = connection_manager_.config_->routeConfigProvider()->configCast();
    }
    

    refreshCachedRoute 方法根据当前 HTTP 请求的头部 request_headers_ 进行后续路由处理:

    // source/common/http/conn_manager_impl.cc
    void ConnectionManagerImpl::ActiveStream::refreshCachedRoute(const Router::RouteCallback& cb) {
      		...
          // 根据 HTTP 请求头进行后续路由处理
          route = snapped_route_config_->route(cb, *request_headers_, filter_manager_.streamInfo(),
                                               stream_id_);
        	...
    }
    

    路由处理代码如下,首先根据请求头部数据 headers 在配置中查找虚拟机主机 virtualHost:

    // source/common/router/config_impl.cc
    RouteConstSharedPtr RouteMatcher::route(const RouteCallback& cb,
                                            const Http::RequestHeaderMap& headers,
                                            const StreamInfo::StreamInfo& stream_info,
                                            uint64_t random_value) const {
      // 根据 HTTP 请求头在配置中查找虚拟主机 virtualHost                                 
      const VirtualHostImpl* virtual_host = findVirtualHost(headers);
      if (virtual_host) {
        return virtual_host->getRouteFromEntries(cb, headers, stream_info, random_value);
      } else {
        return nullptr;
      }
    }
    

    RouteMatcher::findVirtualHost 方法首先从请求头部 headers 中取出 Host 值,然后在配置文件 virtual_hosts_ 列表中查找,如果匹配不到项目,则继续尝试使用 findWildcardVirtualHost 方法从通配符规则中查找路由

    // source/common/router/config_impl.cc
    const VirtualHostImpl* RouteMatcher::findVirtualHost(const Http::RequestHeaderMap& headers) const {
      ...
      // 从 HTTP 请求头中取出 Host 值
      absl::string_view host_header_value = headers.getHostValue();
      ...
      // 在配置文件 virtual_hosts_ 列表中查找匹配项(通过先用 hash 命中精确匹配 virtualHost)
      const std::string host = absl::AsciiStrToLower(host_header_value);
      const auto iter = virtual_hosts_.find(host);
      if (iter != virtual_hosts_.end()) {
        return iter->second.get();
      }
      // 如果匹配不到,则继续尝试使用 findWildcardVirtualHost 方法通过后缀或前缀通配符规则匹配 virtualHost(遍历逐个进行匹配)
      if (!wildcard_virtual_host_suffixes_.empty()) {
        const VirtualHostImpl* vhost = findWildcardVirtualHost(
            host, wildcard_virtual_host_suffixes_,
            [](absl::string_view h, int l) -> absl::string_view { return h.substr(h.size() - l); });
        if (vhost != nullptr) {
          return vhost;
        }
      }
      if (!wildcard_virtual_host_prefixes_.empty()) {
        const VirtualHostImpl* vhost = findWildcardVirtualHost(
            host, wildcard_virtual_host_prefixes_,
            [](absl::string_view h, int l) -> absl::string_view { return h.substr(0, l); });
        if (vhost != nullptr) {
          return vhost;
        }
      }
      return default_virtual_host_.get();
    }
    

    虚拟主机 virtualHost 匹配完成后,route 操作调用 VirtualHostImpl::getRouteFromEntries 方法,再根据请求头部 headers 内容轮询配置文件 routes_ 列表来查找 Cluster

    // source/common/router/config_impl.cc
    RouteConstSharedPtr VirtualHostImpl::getRouteFromRoutes(
        const RouteCallback& cb, const Http::RequestHeaderMap& headers,
        const StreamInfo::StreamInfo& stream_info, uint64_t random_value,
        absl::Span<const RouteEntryImplBaseConstSharedPtr> routes) const {
      // 根据 HTTP 请求头轮询配置文件 routes 列表来查找 Cluster
      for (auto route = routes.begin(); route != routes.end(); ++route) {
        if (!headers.Path() && !(*route)->supportsPathlessHeaders()) {
          continue;
        }
        RouteConstSharedPtr route_entry = (*route)->matches(headers, stream_info, random_value);
        if (route_entry == nullptr) {
          continue;
        }
    
        if (cb == nullptr) {
          return route_entry;
        }
    
        RouteEvalStatus eval_status = (std::next(route) == routes.end())
                                          ? RouteEvalStatus::NoMoreRoutes
                                          : RouteEvalStatus::HasMoreRoutes;
        RouteMatchStatus match_status = cb(route_entry, eval_status);
        if (match_status == RouteMatchStatus::Accept) {
          return route_entry;
        }
        if (match_status == RouteMatchStatus::Continue &&
            eval_status == RouteEvalStatus::NoMoreRoutes) {
          ENVOY_LOG(debug,
                    "return null when route match status is Continue but there is no more routes");
          return nullptr;
        }
      }
    
      ENVOY_LOG(debug, "route was resolved but final route list did not match incoming request");
      return nullptr;
    }
    
  2. route_entry_->clusterName 变量用于保存 Cluster的名称,即上游服务名称,为了减少多线程竞争,每个工作线程中都有基于线程本地存储的 ThreadLocalCluster 对象,用于查找 Cluster,并通过 getThreadLocalCluster 方法进行查询

    // source/common/router/router.cc
    void Filter::chargeUpstreamCode(uint64_t response_status_code,
                                    const Http::ResponseHeaderMap& response_headers,
                                    Upstream::HostDescriptionConstSharedPtr upstream_host,
                                    bool dropped) {
      ...
      // 根据路由项匹配 Cluster
      Upstream::ThreadLocalCluster* cluster =
          config_->cm_.getThreadLocalCluster(route_entry_->clusterName());
      ...
    }
    

小结:RDS 路由匹配时,先匹配域名维度,再匹配一个域名下的多个路径

  1. RDS 里面的多个域名:先用 hash 命中精确匹配 virtualHost,然后通过后缀或前缀通配符规则遍历逐个进行匹配 virtualHost
  2. 同一个域名下的多个路径:严格按配置顺序一一匹配的。除此之外,还支持通过 generic matching API 来进行树形的匹配,以及支持通过 route scope 控制匹配的范围
2)、获取连接池

在根据路由项匹配到 Cluster 后,工作线程通过 createConnPool 方法获取上游 Cluster 的连接池

// source/common/router/router.cc
Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) {
  ...
  // 获取 Cluster 的上游连接池
  std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster);
	...
}

这里工作线程调用 GenericGenericConnPoolFactory::createGenericConnPool 方法,创建对应协议的连接池对象,以 HttpConnPool 对象举例,HttpConnPool 构造方法调用 ClusterEntry::httpConnPool 方法来创建连接池

// source/common/upstream/cluster_manager_impl.cc
absl::optional<HttpPoolData>
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool(
    ResourcePriority priority, absl::optional<Http::Protocol> protocol,
    LoadBalancerContext* context) {
	...
  auto pool = httpConnPoolImpl(priority, protocol, context, false);
  ...
}

ClusterEntry::httpConnPoolImpl 代码如下:

// source/common/upstream/cluster_manager_impl.cc
Http::ConnectionPool::Instance*
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPoolImpl(
    ResourcePriority priority, absl::optional<Http::Protocol> downstream_protocol,
    LoadBalancerContext* context, bool peek) {
  // 调用负载均衡器的 chooseHost 方法,在当前 Cluster 的实例中挑选一个最合适的目标主机
  HostConstSharedPtr host = (peek ? peekAnotherHost(context) : chooseHost(context));
  ...
  // 创建目标主机的连接池容器
  ConnPoolsContainer& container = *parent_.getHttpConnPoolsContainer(host, true);

  // Note: to simplify this, we assume that the factory is only called in the scope of this
  // function. Otherwise, we'd need to capture a few of these variables by value.
  // 根据目标主机 Host 支持的协议类型计算散列值 hash_key,并在目标主机已创建的活跃连接池中通过 getPool 进行连接池查找
  ConnPoolsContainer::ConnPools::PoolOptRef pool =
      container.pools_->getPool(priority, hash_key, [&]() {
        // 如果连接池不存在,调用 allocateConnPool 方法创建新连接池
        auto pool = parent_.parent_.factory_.allocateConnPool(
            parent_.thread_local_dispatcher_, host, priority, upstream_protocols,
            alternate_protocol_options, !upstream_options->empty() ? upstream_options : nullptr,
            have_transport_socket_options ? context->upstreamTransportSocketOptions() : nullptr,
            parent_.parent_.time_source_, parent_.cluster_manager_state_, quic_info_);

        pool->addIdleCallback([&parent = parent_, host, priority, hash_key]() {
          parent.httpConnPoolIsIdle(host, priority, hash_key);
        });

        return pool;
      });

  if (pool.has_value()) {
    return &(pool.value().get());
  } else {
    return nullptr;
  }
}

ClusterEntry::httpConnPoolImpl 方法逻辑如下:

  1. 此时 peek 标志为 false,负载均衡过程中的 ClusterEntry::httpConnPoolImpl 方法调用 chooseHost 方法,在当前 Cluster 的实例中挑选一个最合适的目标主机 Host 并获取目标主机 Host

  2. 执行 getHttpConnPoolsContainer 方法来创建目标主机的连接池容器

    // source/common/upstream/cluster_manager_impl.cc
    ClusterManagerImpl::ThreadLocalClusterManagerImpl::ConnPoolsContainer*
    ClusterManagerImpl::ThreadLocalClusterManagerImpl::getHttpConnPoolsContainer(
        const HostConstSharedPtr& host, bool allocate) {
      auto container_iter = host_http_conn_pool_map_.find(host);
      if (container_iter == host_http_conn_pool_map_.end()) {
        if (!allocate) {
          return nullptr;
        }
        container_iter =
            host_http_conn_pool_map_.try_emplace(host, thread_local_dispatcher_, host).first;
      }
    
      return &container_iter->second;
    }
    

    getHttpConnPoolsContainer 方法将在 ThreadLocalClusterManagerImpl 对象内使用 host_http_conn_pool_map_ 散列表进行连接池查找,因此每个线程的目标主机都拥有各自独立的上游连接池

  3. 根据目标主机 Host 支持的协议类型计算散列值 hash_key,并在目标主机已创建的活跃连接池中通过 getPool 进行连接池查找,如果连接池不存在,调用 allocateConnPool 方法创建新连接池

  4. allocateConnPool 方法中根据 HTTP 类型创建对应的连接池,若类型为 HTTP1,allocateConnPool 方法将创建 FixedHttpConnPoolImpl 类型连接池实例,并在其构造方法中进行父类 HttpConnPoolImplBase 初始化,然后在父类 HttpConnPoolImplBase 中再进行祖父类 ConnPoolImplBase 初始化:

    // source/common/conn_pool/conn_pool_base.cc
    ConnPoolImplBase::ConnPoolImplBase(
        Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
        Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
        const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
        Upstream::ClusterConnectivityState& state)
        : state_(state), host_(host), priority_(priority), dispatcher_(dispatcher),
          socket_options_(options), transport_socket_options_(transport_socket_options),
          upstream_ready_cb_(dispatcher_.createSchedulableCallback([this]() { onUpstreamReady(); })) {}
    

    在 ConnPoolImplBase 构造方法中,将创建 upstream_ready_cb_ 事件回调方法,该回调方法将被注册于线程调度内,并在每个请求处理结束时被触发,表示当前请求已处理完成,可以用于处理其他待发送请求

3)、上游请求重试逻辑

在 L7 路由过滤器的 decodeHeaders 中,连接池创建完毕后将创建用于请求重试的对象 retry_state_,以及当前请求的上游对象 upstream_request:

// source/common/router/router.cc
Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) {
  ...
  // 创建用于请求重试的对象 retry_state_
  retry_state_ = createRetryState(
      route_entry_->retryPolicy(), headers, *cluster_, request_vcluster_, route_stats_context_,
      config_->factory_context_, callbacks_->dispatcher(), route_entry_->priority());
	...
  // 创建上游 HTTP 请求
  UpstreamRequestPtr upstream_request =
      std::make_unique<UpstreamRequest>(*this, std::move(generic_conn_pool), can_send_early_data,
                                        can_use_http3, false /*enable_half_close*/);
  ...
  return Http::FilterHeadersStatus::StopIteration;
}

上面的代码中首先创建了 retry_state_ 对象,用于请求重试。这里的重试与某一目标主机的连接池内的重试的区别是,连接池内的重试只会对当前已经选定的目标主机地址进行连接重试,如果目标主机确实存在连接问题或无法响应,应用请求将不会连接这个 Cluster 的其他主机地址。而路由内的重试 retry_state_ 对象,则是当某个目标主机始终由于连接问题或长期响应错误码(如5xx)而无法连接时,上游请求将有机会在 Cluster 内选择其他可用的主机地址并重新发送请求。请求重试策略如下图所示:

下面先来看下路由内的重试执行流程:

// source/common/router/router.cc
void Filter::onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPtr&& headers,
                               UpstreamRequest& upstream_request, bool end_stream) {
  		...
      const RetryStatus retry_status = retry_state_->shouldRetryHeaders(
          *headers, *downstream_headers_,
          [this, can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_,
           had_early_data = upstream_request.upstreamStreamOptions().can_send_early_data_](
              bool disable_early_data) -> void {
            doRetry((disable_early_data ? false : had_early_data), can_use_http3, TimeoutRetry::No);
          });
      ...
}

当收到上游响应数据或遇到上游访问失败时,onUpstreamHeaders 方法被调用,该方法内执行 shouldRetryHeaders 方法,根据响应头部的 headers 内容判断是否需要重新选择主机。一旦判定为需要重新选择主机,则调用 doRetry 方法进行主机选择及请求发送的重试。相关代码如下:

// source/common/router/retry_state_impl.cc
RetryStatus RetryStateImpl::shouldRetryHeaders(const Http::ResponseHeaderMap& response_headers,
                                               const Http::RequestHeaderMap& original_request,
                                               DoRetryHeaderCallback callback) {
  // This may be overridden in wouldRetryFromHeaders().
  bool disable_early_data = false;
  const RetryDecision retry_decision =
      wouldRetryFromHeaders(response_headers, original_request, disable_early_data);

  // Yes, we will retry based on the headers - try to parse a rate limited reset interval from the
  // response.
  if (retry_decision == RetryDecision::RetryWithBackoff && !reset_headers_.empty()) {
    const auto backoff_interval = parseResetInterval(response_headers);
    if (backoff_interval.has_value() && (backoff_interval.value().count() > 1L)) {
      ratelimited_backoff_strategy_ = std::make_unique<JitteredLowerBoundBackOffStrategy>(
          backoff_interval.value().count(), random_);
    }
  }
  // 进行重试
  return shouldRetry(retry_decision,
                     [disable_early_data, callback]() { callback(disable_early_data); });
}

wouldRetryFromHeaders 方法根据响应头部状态码判断是否匹配设置的重试条件,retry_on_ 变量是 Envoy 的重试策略配置

// source/common/router/retry_state_impl.cc
RetryState::RetryDecision
RetryStateImpl::wouldRetryFromHeaders(const Http::ResponseHeaderMap& response_headers,
                                      const Http::RequestHeaderMap& original_request,
                                      bool& disable_early_data) {
  ...
  uint64_t response_status = Http::Utility::getResponseStatus(response_headers);
  // 根据响应头部状态码判断是否匹配设置的重试条件,retry_on_ 变量是 Envoy 的重试策略配置
  if (retry_on_ & RetryPolicy::RETRY_ON_5XX) {
    if (Http::CodeUtility::is5xx(response_status)) {
      return RetryDecision::RetryWithBackoff;
    }
  }
	...
  return RetryDecision::NoRetry;
}

然后,shouldRetry 方法将保存外部设置的回调方式,这里为 onRetry 方法。接着调用 enableBackoffTimer 方法创建重试定时器,并在定时器时间到达后调用 onRetry 回调方法:

// source/common/router/retry_state_impl.cc
void RetryStateImpl::enableBackoffTimer() {
  if (!retry_timer_) {
    // 在定时器时间到达后调用 onRetry 回调方法
    retry_timer_ = dispatcher_.createTimer([this]() -> void { backoff_callback_(); });
  }

  if (ratelimited_backoff_strategy_ != nullptr) {
    // If we have a backoff strategy based on rate limit feedback from the response we use it.
    retry_timer_->enableTimer(
        std::chrono::milliseconds(ratelimited_backoff_strategy_->nextBackOffMs()));

    // The strategy is only valid for the response that sent the ratelimit reset header and cannot
    // be reused.
    ratelimited_backoff_strategy_.reset();

    cluster_.trafficStats()->upstream_rq_retry_backoff_ratelimited_.inc();

  } else {
    // Otherwise we use a fully jittered exponential backoff algorithm.
    retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_->nextBackOffMs()));

    cluster_.trafficStats()->upstream_rq_retry_backoff_exponential_.inc();
  }
}

接下来 doRetry 方法将进行类似路由 decodeHeaders 方法的处理,区别是 doRetry 方法将在已经得到的路由结果基础上选择目标主机并创建连接池,向上游发送请求:

// source/common/router/router.cc
void Filter::doRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry is_timeout_retry) {
  	...
    // 选择目标主机及创建连接池
    generic_conn_pool = createConnPool(*cluster);
  ...
  UpstreamRequestPtr upstream_request =
      std::make_unique<UpstreamRequest>(*this, std::move(generic_conn_pool), can_send_early_data,
                                        can_use_http3, false /*enable_tcp_tunneling*/);
  ...
  // 向上游发送请求
  upstream_requests_.front()->acceptHeadersFromRouter(
      !callbacks_->decodingBuffer() && !downstream_trailers_ && downstream_end_stream_);
  ...
}

这样不论在请求刚完成路由处理后,还是在当前连接池内所有主机连接失败,返回路由处理并进行连接重试时,工作线程都将执行UpstreamRequest::acceptHeadersFromRouter 方法处理上游连接

4)、发送上游请求

UpstreamRequest::acceptHeadersFromRouter 为上游请求发送的入口点。上游请求发送流程如下图所示:

在这里插入图片描述

UpstreamRequest::acceptHeadersFromRouter 方法调用 HttpConnPool::newStream 方法,newStream 方法使用连接池的 newStream 方法创建连接池内可取消请求对象的 handle,并将上游请求 upstreamRequest 对象当作上下游请求的关联对象 upstreamToDownstream,然后将该对象作为将来上游响应的解码器保存,其中回调方法为 HttpConnPool 对象的引用。相关代码如下:

// source/extensions/upstreams/http/http/upstream_request.cc
void HttpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) {
  callbacks_ = callbacks;
  // It's possible for a reset to happen inline within the newStream() call. In this case, we
  // might get deleted inline as well. Only write the returned handle out if it is not nullptr to
  // deal with this case.
  Envoy::Http::ConnectionPool::Cancellable* handle =
      pool_data_.value().newStream(callbacks->upstreamToDownstream(), *this,
                                   callbacks->upstreamToDownstream().upstreamStreamOptions());
  if (handle) {
    conn_pool_stream_handle_ = handle;
  }
}

HttpConnPoolImplBase::newStream 方法创建请求包装对象 HttpAttachContext,该对象保存上游请求 upstreamRequest 及 HttpConnPool 的关联关系。然后传入 ConnPoolImplBase::newStreamImpl 方法,newStreamImpl 方法将首先检查已就绪连接列表 ready_clients_ 中是否有可用连接,如果有,则直接一个连接并执行 attachStreamToClient 方法,将当前请求上下文 context 关联到该连接:

// source/common/conn_pool/conn_pool_base.cc
ConnectionPool::Cancellable* ConnPoolImplBase::newStreamImpl(AttachContext& context,
                                                             bool can_send_early_data) {
  ...
  // 可用连接不为空
  if (!ready_clients_.empty()) {
    ActiveClient& client = *ready_clients_.front();
    ENVOY_CONN_LOG(debug, "using existing fully connected connection", client);
    // 处理当前请求
    attachStreamToClient(client, context);
    // Even if there's a ready client, we may want to preconnect to handle the next incoming stream.
    tryCreateNewConnections();
    return nullptr;
  }
	...

  if (!host_->cluster().resourceManager(priority_).pendingRequests().canCreate()) {
    ENVOY_LOG(debug, "max pending streams overflow");
    onPoolFailure(nullptr, absl::string_view(), ConnectionPool::PoolFailureReason::Overflow,
                  context);
    host_->cluster().trafficStats()->upstream_rq_pending_overflow_.inc();
    // 无法将新请求放入等待队列
    return nullptr;
  }
  // 创建一个新的待处理流,并将其添加到 pending_streams_ 中
  ConnectionPool::Cancellable* pending = newPendingStream(context, can_send_early_data);
  ...
  // This must come after newPendingStream() because this function uses the
  // length of pending_streams_ to determine if a new connection is needed.
  // 调用 tryCreateNewConnections 方法检查 pending_streams_ 的长度,以此来判断是否需要创建新的连接
  const ConnectionResult result = tryCreateNewConnections();
  ...
  return pending;
}

如果没有可用连接,且此时上游请求等待队列中的请求量已经超过设置的阈值,则放弃当前请求,返回 Overflow 标志,这样下游连接将收到一个带有 Overflow 标志的失败响应。如果上游请求等待队列中的请求量还未超过设置的额阈值,工作线程则调用 newPendingStream 方法将当前请求保存到等待队列中,并调用 tryCreateNewConnections 方法创建新连接,创建的新连接以异步方式建立。在新连接建立完成后,将通过事件方式通知上游连接池继续处理等待队列中的请求。相关代码如下:

// source/common/http/conn_pool_base.cc
ConnectionPool::Cancellable*
HttpConnPoolImplBase::newPendingStream(Envoy::ConnectionPool::AttachContext& context,
                                       bool can_send_early_data) {
  Http::ResponseDecoder& decoder = *typedContext<HttpAttachContext>(context).decoder_;
  Http::ConnectionPool::Callbacks& callbacks = *typedContext<HttpAttachContext>(context).callbacks_;
  ENVOY_LOG(debug,
            "queueing stream due to no available connections (ready={} busy={} connecting={})",
            ready_clients_.size(), busy_clients_.size(), connecting_clients_.size());
  Envoy::ConnectionPool::PendingStreamPtr pending_stream(
      new HttpPendingStream(*this, decoder, callbacks, can_send_early_data));
  // 将请求放入连接池待处理请求队列 pending_streams_ 中
  return addPendingStream(std::move(pending_stream));
}

addPendingStream 方法将请求放入连接池待处理请求队列 pending_streams_ 中

ConnPoolImplBase::tryCreateNewConnections 会尝试3次创建新链接,处理过程代码如下:

// source/common/conn_pool/conn_pool_base.cc
ConnPoolImplBase::ConnectionResult ConnPoolImplBase::tryCreateNewConnections() {
  ConnPoolImplBase::ConnectionResult result;
  // Somewhat arbitrarily cap the number of connections preconnected due to new
  // incoming connections. The preconnect ratio is capped at 3, so in steady
  // state, no more than 3 connections should be preconnected. If hosts go
  // unhealthy, and connections are not immediately preconnected, it could be that
  // many connections are desired when the host becomes healthy again, but
  // overwhelming it with connections is not desirable.
  // 尝试3次创建新连接
  for (int i = 0; i < 3; ++i) {
    result = tryCreateNewConnection();
    if (result != ConnectionResult::CreatedNewConnection) {
      break;
    }
  }
  ASSERT(!is_draining_for_deletion_ || result != ConnectionResult::CreatedNewConnection);
  return result;
}

ConnPoolImplBase::ConnectionResult
ConnPoolImplBase::tryCreateNewConnection(float global_preconnect_ratio) {
  	...
    ActiveClientPtr client = instantiateActiveClient();
    ...
    LinkedList::moveIntoList(std::move(client), owningList(client->state()));
    return can_create_connection ? ConnectionResult::CreatedNewConnection
                                 : ConnectionResult::CreatedButRateLimited;
    ...
}

instantiateActiveClient 是创建新连接的主要方法,在连接创建成功后,将根据连接的当前状态将其放入可用连接 ready_clients_、已用连接 busy_clients_、正在建立连接 connecting_clients_ 的列表中。如果连接建立较慢,首先将连接放入 connecting_clients_ 列表中。如果连接建立很快,其可以马上变成 ready_clients_。如果已经有待处理请求,则会在 onUpstreamReady 回调方法中关联待处理请求并将其立即变成 busy_clients_

对于 HTTP1 的请求,instantiateActiveClient 方法通过前面连接池的 FixedHttpConnPoolImpl 构造方法内保存的回调对象创建新连接:

// source/common/http/http1/conn_pool.cc
ConnectionPool::InstancePtr
allocateConnPool(Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator,
                 Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
                 const Network::ConnectionSocket::OptionsSharedPtr& options,
                 const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
                 Upstream::ClusterConnectivityState& state) {
  return std::make_unique<FixedHttpConnPoolImpl>(
      std::move(host), std::move(priority), dispatcher, options, transport_socket_options,
      random_generator, state,
      [](HttpConnPoolImplBase* pool) {
        // 通过连接池内的回调方法创建上游连接对象
        return std::make_unique<ActiveClient>(*pool, absl::nullopt);
      },
      [](Upstream::Host::CreateConnectionData& data, HttpConnPoolImplBase* pool) {
        // 创建客户端连接解码器
        CodecClientPtr codec{new CodecClientProd(
            CodecType::HTTP1, std::move(data.connection_), data.host_description_,
            pool->dispatcher(), pool->randomGenerator(), pool->transportSocketOptions())};
        return codec;
      },
      std::vector<Protocol>{Protocol::Http11}, absl::nullopt, nullptr);
}

client_fn_ 变量用于保存 Http1::ActiveClient 构造方法:

// source/common/http/http1/conn_pool.cc
ActiveClient::ActiveClient(HttpConnPoolImplBase& parent,
                           OptRef<Upstream::Host::CreateConnectionData> data)
    // HTTP1 连接每次只能处理一个并发请求
    : Envoy::Http::ActiveClient(parent, parent.host()->cluster().maxRequestsPerConnection(),
                                /* effective_concurrent_stream_limit */ 1,
                                /* configured_concurrent_stream_limit */ 1, data) {
  parent.host()->cluster().trafficStats()->upstream_cx_http1_total_.inc();
}

可以看到,对于 HTTP1,每条上游连接每次只能处理一个 HTTP 请求。相应地,一条上游连接可以同时处理多个 HTTP2 的请求

新上游连接的父类 Http::ActiveClient 构造方法将创建 TCP 网络连接及解码器对象:

// source/common/http/conn_pool_base.h
class ActiveClient : public Envoy::ConnectionPool::ActiveClient {
public:
  ActiveClient(HttpConnPoolImplBase& parent, uint64_t lifetime_stream_limit,
               uint64_t effective_concurrent_stream_limit,
               uint64_t configured_concurrent_stream_limit,
               OptRef<Upstream::Host::CreateConnectionData> opt_data)
      : Envoy::ConnectionPool::ActiveClient(parent, lifetime_stream_limit,
                                            effective_concurrent_stream_limit,
                                            configured_concurrent_stream_limit) {
    ...
    // The static cast makes sure we call the base class host() and not
    // HttpConnPoolImplBase::host which is of a different type.
    // 创建 TCP 网络连接
    Upstream::Host::CreateConnectionData data =
        static_cast<Envoy::ConnectionPool::ConnPoolImplBase*>(&parent)->host()->createConnection(
            parent.dispatcher(), parent.socketOptions(), parent.transportSocketOptions());
    // 创建解码器对象
    initialize(data, parent);
  }

ActiveClient 对象的父类 ConnectionPool::ActiveClient 构造方法将创建连接超时检测定时器 connect_timer_。当新连接一直无法建立成功时,可以关闭并销毁该连接:

// source/common/conn_pool/conn_pool_base.cc
ActiveClient::ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
                           uint32_t effective_concurrent_streams, uint32_t concurrent_stream_limit)
    : parent_(parent), remaining_streams_(translateZeroToUnlimited(lifetime_stream_limit)),
      configured_stream_limit_(translateZeroToUnlimited(effective_concurrent_streams)),
      concurrent_stream_limit_(translateZeroToUnlimited(concurrent_stream_limit)),
      // 创建连接超时检测定时器 connect_timer_。当新连接一直无法建立成功时,可以关闭并销毁该连接
      connect_timer_(parent_.dispatcher().createTimer([this]() { onConnectTimeout(); })) {
  ...
}

void ActiveClient::onConnectTimeout() {
  ENVOY_CONN_LOG(debug, "connect timeout", *this);
  parent_.host()->cluster().trafficStats()->upstream_cx_connect_timeout_.inc();
  timed_out_ = true;
  close();
}

HostImplBase::createConnection 方法调用 DispatcherImpl::createClientConnection 方法来创建网络连接,并将网络事件注册到当前工作线程的 Dispatcher 内:

// source/common/upstream/upstream_impl.cc
Host::CreateConnectionData HostImplBase::createConnection(
    Event::Dispatcher& dispatcher, const ClusterInfo& cluster,
    const Network::Address::InstanceConstSharedPtr& address,
    const SharedConstAddressVector& address_list_or_null,
    Network::UpstreamTransportSocketFactory& socket_factory,
    const Network::ConnectionSocket::OptionsSharedPtr& options,
    Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
    HostDescriptionConstSharedPtr host) {
  auto source_address_selector = cluster.getUpstreamLocalAddressSelector();

  Network::ClientConnectionPtr connection;
  ...
    auto upstream_local_address =
        source_address_selector->getUpstreamLocalAddress(address, options);
    // 创建网络连接,并将网络事件注册到当前工作线程的 dispatcher 内
    connection = dispatcher.createClientConnection(
        address, upstream_local_address.address_,
        socket_factory.createTransportSocket(transport_socket_options, host),
        upstream_local_address.socket_options_, transport_socket_options);
 		...
  return {std::move(connection), std::move(host)};
}

ActiveClient 执行 initialize 方法调用 parent.createCodecClient 创建上游编解码器,原理类似于接收下游连接时创建解码器的过程。创建完成后将返回 CodecClientProd 对象,用于上游响应的解码:

// source/common/http/conn_pool_base.h
  void initialize(Upstream::Host::CreateConnectionData& data, HttpConnPoolImplBase& parent) {
    real_host_description_ = data.host_description_;
    // 调用 parent.createCodecClient 创建上游编解码器
    codec_client_ = parent.createCodecClient(data);
    codec_client_->addConnectionCallbacks(*this);
    Upstream::ClusterTrafficStats& traffic_stats = *parent_.host()->cluster().trafficStats();
    codec_client_->setConnectionStats(
        {traffic_stats.upstream_cx_rx_bytes_total_, traffic_stats.upstream_cx_rx_bytes_buffered_,
         traffic_stats.upstream_cx_tx_bytes_total_, traffic_stats.upstream_cx_tx_bytes_buffered_,
         &traffic_stats.bind_errors_, nullptr});
  }

CodecClient 为 CodecClientProd 的基类。CodecClientProd 构造方法首先调用父类构造方法 CodecClient::CodecClient,将自己注册为 L4 过滤器并响应网络连接事件 onEvent。同时也将自己注册为 L4 网络过滤器来进行数据的接收处理,当收到目标主机 Host 返回的上游响应时,将触发 CodecClient::onData 回调方法。相关代码如下:

// source/common/http/codec_client.cc
CodecClient::CodecClient(CodecType type, Network::ClientConnectionPtr&& connection,
                         Upstream::HostDescriptionConstSharedPtr host,
                         Event::Dispatcher& dispatcher)
    : type_(type), host_(host), connection_(std::move(connection)),
      idle_timeout_(host_->cluster().idleTimeout()) {
  if (type_ != CodecType::HTTP3) {
    // Make sure upstream connections process data and then the FIN, rather than processing
    // TCP disconnects immediately. (see https://github.com/envoyproxy/envoy/issues/1679 for
    // details)
    connection_->detectEarlyCloseWhenReadDisabled(false);
  }
  // 注册为连接建立成功事件的回调方法
  connection_->addConnectionCallbacks(*this);
  // 注册自己为 L4 读数据过滤器
  connection_->addReadFilter(Network::ReadFilterSharedPtr{new CodecReadFilter(*this)});

  if (idle_timeout_) {
    idle_timer_ = dispatcher.createTimer([this]() -> void { onIdleTimeout(); });
    enableIdleTimer();
  }

  // We just universally set no delay on connections. Theoretically we might at some point want
  // to make this configurable.
  connection_->noDelay(true);
}

CodecClientProd::CodecClientProd 还根据HTTP类型创建 codec_ 内部的编解码对象 Http1::ClientConnectionImpl,类似于处理下游连接时注册 Node.js 库的 onMessageBegin、onHeadersComplete 等回调方法

CodecReadFilter 为 L4 过滤器的轻量包装器,将调用 CodecClient::onData 方法,并在响应处理完成后结束过滤器迭代

CodecClient::onData 类似下游连接时的 ServerConnection 处理,这里将使用 codec_->dispatch 解析从目标主机 Host 返回响应数据:

// source/common/http/codec_client.cc
void CodecClient::onData(Buffer::Instance& data) {
  const Status status = codec_->dispatch(data);
	...
}

不但 CodecClient 将自己注册为网络连接事件的 L4 回调处理器,ActiveClient 也将自己注册为 L4 回调处理器,此时 ActiveClient 用于通知连接池更新连接状态,并决定是否处理其他等待请求。ActiveClient::initialize 方法中通过 addConnectionCallbacks 方法将 ActiveClient 设置为 CodecClient 的 L4 回调处理器:

// source/common/http/conn_pool_base.h
  void initialize(Upstream::Host::CreateConnectionData& data, HttpConnPoolImplBase& parent) {
    real_host_description_ = data.host_description_;
    // 调用 parent.createCodecClient 创建上游编解码器
    codec_client_ = parent.createCodecClient(data);
    // 将自己注册为 L4 回调处理器,此时 ActiveClient 用于通知连接池更新连接状态,
    // 并决定是否处理其他等待请求
    codec_client_->addConnectionCallbacks(*this);
    Upstream::ClusterTrafficStats& traffic_stats = *parent_.host()->cluster().trafficStats();
    codec_client_->setConnectionStats(
        {traffic_stats.upstream_cx_rx_bytes_total_, traffic_stats.upstream_cx_rx_bytes_buffered_,
         traffic_stats.upstream_cx_tx_bytes_total_, traffic_stats.upstream_cx_tx_bytes_buffered_,
         &traffic_stats.bind_errors_, nullptr});
  }

在新连接握手过程完成后,将调用连接池更新连接状态回调方法:

// source/common/http/conn_pool_base.h
  void onEvent(Network::ConnectionEvent event) override {
    parent_.onConnectionEvent(*this, codec_client_->connectionFailureReason(), event);
  }

ConnPoolImplBase::onConnectionEvent 回调方法用于判断新建立连接 ActiveClient 携带的网络事件状态,当连接建立成功时状态为 Network::ConnectionEvent::Connected

// source/common/conn_pool/conn_pool_base.cc
void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view failure_reason,
                                         Network::ConnectionEvent event) {
  switch (event) {
  ...
  case Network::ConnectionEvent::Connected: {
    ...
    if (client.state() == ActiveClient::State::Connecting ||
        client.state() == ActiveClient::State::ReadyForEarlyData) {
      // 设置当前连接状态
      transitionActiveClientState(client,
                                  (client.currentUnusedCapacity() > 0 ? ActiveClient::State::Ready
                                                                      : ActiveClient::State::Busy));
    }
    ...
    if (client.readyForStream()) {
      // 关联已挂起请求并开始处理
      onUpstreamReady();
    }
    ...
    break;
  }
  ...
  }
}

调用 transitionActiveClientState 方法设置当前连接状态,并调用 onUpstreamReady 方法轮询所有待处理请求,onUpstreamReady 方法从立即可用连接列表头部取出一个连接,并与一个待处理请求进行关联,同时从待处理请求列表中删除该请求:

// source/common/conn_pool/conn_pool_base.cc
void ConnPoolImplBase::onUpstreamReady() {
  while (!pending_streams_.empty() && !ready_clients_.empty()) {
    // 从立即可用连接列表头部取出一个连接
    ActiveClientPtr& client = ready_clients_.front();
    ENVOY_CONN_LOG(debug, "attaching to next stream", *client);
    // Pending streams are pushed onto the front, so pull from the back.
    // 将该可用连接与一个待处理请求进行关联
    attachStreamToClient(*client, pending_streams_.back()->context());
    state_.decrPendingStreams(1);
    // 从待处理请求列表中删除该请求
    pending_streams_.pop_back();
  }
  if (!pending_streams_.empty()) {
    tryCreateNewConnections();
  }
}

6、Envoy 收到服务端响应

1)、接收响应数据

在 Envoy 完成请求发送任务后,后端的应用将根据接收的请求进行业务处理,然后将处理结果作为响应数据发送回 Envoy。Envoy 接收上游响应的处理流程如下图所示:

CodecClient 将自己注册为 L4 读过滤器,当收到服务端返回的响应时,L4 连接的 onFileEvent 回调方法被触发,经过 ClientConnection 的 onReadReady、onRead 处理后,将调用 CodecClient::onData 回调方法。这里介绍的上游响应处理流程与下游请求处理流程的区别是,下游的 L4 过滤器是在监听器接收连接阶段执行 createNetworkFilterChain 方法创建的过滤器处理链,并在该方法内调用每个过滤器工厂类的 createFilterFactoryFromProtoTyped 方法来创建过滤器工厂。然后过滤器工厂将在收到用户请求数据时创建过滤器对象并通过 addReadFilter 方法将过滤器对象加入过滤器管理器

工作线程在创建上游连接时,上游连接关联的 CodecClient 对象构造方法中调用了 addReadFilter 方法来直接将自身对象添加为 L4 读过滤器。因此在收到上游发送回来的响应字节流后将进入 L4 网络过滤器 CodecClient::onData 方法:

// source/common/http/codec_client.cc
void CodecClient::onData(Buffer::Instance& data) {
  const Status status = codec_->dispatch(data);
  ...
}

因此当收到上游响应时,CodecClient::onData 方法调用 codec_->dispatch 触发 Node.js HTTP 解析库的回调方法。对于 HTTP1,这个 codec_ Http1::ClientConnectionImpl 对象:

// source/common/http/http1/codec_impl.cc
Http::Status ClientConnectionImpl::dispatch(Buffer::Instance& data) {
  Http::Status status = ConnectionImpl::dispatch(data);
  if (status.ok() && data.length() > 0) {
    // The HTTP/1.1 codec pauses dispatch after a single response is complete. Extraneous data
    // after a response is complete indicates an error.
    return codecProtocolError("http/1.1 protocol error: extraneous data after response complete");
  }
  return status;
}

在 HTTP 头部接收完毕后,ClientConnectionImpl::onHeadersCompleteBase 被执行,这里将检查状态码 statusCode,完成检查后调用 UpstreamRequest::decodeHeaders 方法将上游响应向下游反向发送:

// source/common/http/http1/codec_impl.cc
Envoy::StatusOr<CallbackResult> ClientConnectionImpl::onHeadersCompleteBase() {
  		...
      pending_response_.value().decoder_->decodeHeaders(std::move(headers), false);
    	...
}

UpstreamRequest::decodeHeaders 方法根据响应头部填充 stream_info_ 内容,用于定期观测数据的收集上报,然后调用 Filter::onUpstreamHeaders 方法找到对应该上游请求的原始下游请求并继续向下游推送:

// source/common/router/upstream_request.cc
void UpstreamRequest::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
  ...
  stream_info_.setResponseCode(static_cast<uint32_t>(response_code));
	...
  parent_.onUpstreamHeaders(response_code, std::move(headers), *this, end_stream);
}
2)、发送响应数据

此时上游响应进入 L7 过滤器 Filter::onUpstreamHeaders 回调方法,Filter::onUpstreamHeaders 方法在前面介绍过,其包含重试、HTTP 重定向、响应有效性判断等响应预处理工作,响应预处理完成后调用 ActiveStreamDecoderFilter::encodeHeaders 方法。响应发送处理流程如下图所示:

在上游接收的响应经过 CodecClient 内的 HTTP 解码器处理后,将调用 L7 路由过滤器 Filter::onUpstreamHeaders 方法向下游发送:

// source/common/router/router.cc
void Filter::onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPtr&& headers,
                               UpstreamRequest& upstream_request, bool end_stream) {
  		...
      const RetryStatus retry_status = retry_state_->shouldRetryHeaders(
          *headers, *downstream_headers_,
          [this, can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_,
           had_early_data = upstream_request.upstreamStreamOptions().can_send_early_data_](
              bool disable_early_data) -> void {
            doRetry((disable_early_data ? false : had_early_data), can_use_http3, TimeoutRetry::No);
          });
      ...
  if (route_entry_->internalRedirectPolicy().enabled() &&
      route_entry_->internalRedirectPolicy().shouldRedirectForResponseCode(
          static_cast<Http::Code>(response_code)) &&
      setupRedirect(*headers)) { // 处理 HTTP 重定向
    return;
    // If the redirect could not be handled, fail open and let it pass to the
    // next downstream.
  }
	...
  callbacks_->encodeHeaders(std::move(headers), end_stream,
                            StreamInfo::ResponseCodeDetails::get().ViaUpstream);
}

ActiveStreamDecoderFilter::encodeHeaders 方法为包装类型的方法,其调用 L7 过滤器管理器的 FilterManager::encodeHeaders 方法。如果下游配置了 L7 过滤器处理写请求,则这里将执行过滤器的 encodeHeaders 方法,完成过滤器处理后执行 ActiveStream::encodeHeaders 方法。变量 filter_manager_callbacks_ 保存的是原始下游请求对象 ActiveStream,接下来将通过原始下游请求对象 ActiveStream 将响应数据发送到下游连接。相关代码如下:

// source/common/http/filter_manager.cc
void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHeaderMap& headers,
                                  bool end_stream) {
  ...
  for (; entry != encoder_filters_.end(); entry++) {
    ...
    // 执行注册的其他 L7 过滤器 encode 处理逻辑
    FilterHeadersStatus status = (*entry)->handle_->encodeHeaders(headers, (*entry)->end_stream_);
    ...
  }
	...
  // 调用下游请求对象 ActiveStream 的 encodeHeaders 发送响应
  filter_manager_callbacks_.encodeHeaders(headers, modified_end_stream);
  ...
}

ActiveStream::encodeHeaders 方法内进行了发送前响应头部 headers 内容的调整,对于 HTTP1,将调用 ResponseEncoderImpl::encodeHeaders 方法进行头部数据处理:

// source/common/http/conn_manager_impl.cc
void ConnectionManagerImpl::ActiveStream::encodeHeaders(ResponseHeaderMap& headers,
                                                        bool end_stream) {
  ...
  // Now actually encode via the codec.
  // 调用 ResponseEncoderImpl::encodeHeaders 方法进行头部数据处理
  response_encoder_->encodeHeaders(headers, end_stream);
}

类似于上游数据发送前 RequestEnvoderImpl 的编码操作,ResponseEncoderImpl::encodeHeaders 方法也是首先将 HTTP 对象通过 copyToBuffer 等操作写入字节流 Buffer,然后调用 encodeHeadersBase 方法将字节流发送到 L4 连接对象上:

// source/common/http/http1/codec_impl.cc
void ResponseEncoderImpl::encodeHeaders(const ResponseHeaderMap& headers, bool end_stream) {
  ...
  // 调用 encodeHeadersBase 方法将字节流发送到 L4 连接对象上
  encodeHeadersBase(headers, absl::make_optional<uint64_t>(numeric_status), end_stream, false);
}

StreamEncoderImpl::encodeHeadersBase 方法通过 flushOutput 将响应数据发送到下游连接:

// source/common/http/http1/codec_impl.cc
void StreamEncoderImpl::encodeHeadersBase(const RequestOrResponseHeaderMap& headers,
                                          absl::optional<uint64_t> status, bool end_stream,
                                          bool bodiless_request) {
  	...
    flushOutput();
  	...
}

void StreamEncoderImpl::flushOutput(bool end_encode) {
  auto encoded_bytes = connection_.flushOutput(end_encode);
  bytes_meter_->addWireBytesSent(encoded_bytes);
}

当调用 L4 的 ConnectionImpl::write 方法时,并不是同步地将数据写入网络 Socket 并发送出去的,而是首先将发送数据保存到发送缓冲区 write_buffer_ 中,然后将一个网络写事件排队发送到当前线程时间队列中,等待事件被 Dispatcher 触发。另外,在将待发送数据放入队列前,会执行 L4 网络过滤器的 onWrite 回调方法,进行相应数据的处理,如应用自定义了 L4 写限流配置等:

// source/common/network/connection_impl.cc
void ConnectionImpl::write(Buffer::Instance& data, bool end_stream, bool through_filter_chain) {
  	...
    // 执行 L4 写过滤器
    FilterStatus status = filter_manager_.onWrite();
    ...

  write_end_stream_ = end_stream;
  if (data.length() > 0 || end_stream) {
    ENVOY_CONN_LOG(trace, "writing {} bytes, end_stream {}", *this, data.length(), end_stream);
    // TODO(mattklein123): All data currently gets moved from the source buffer to the write buffer.
    // This can lead to inefficient behavior if writing a bunch of small chunks. In this case, it
    // would likely be more efficient to copy data below a certain size. VERY IMPORTANT: If this is
    // ever changed, read the comment in SslSocket::doWrite() VERY carefully. That code assumes that
    // we never change existing write_buffer_ chain elements between calls to SSL_write(). That code
    // might need to change if we ever copy here.
    // 将待发送数据放入写队列
    write_buffer_->move(data);

    // Activating a write event before the socket is connected has the side-effect of tricking
    // doWriteReady into thinking the socket is connected. On macOS, the underlying write may fail
    // with a connection error if a call to write(2) occurs before the connection is completed.
    if (!connecting_) {
      // 排队发送网络写事件
      ioHandle().activateFileEvents(Event::FileReadyType::Write);
    }
  }
}

当 Dispatcher 处理网络写事件时,将调用 onWriteReady 回调方法,然后通过 onWriteReady 回调方法,然后通过 transport_socket_->doWrite 方法将待发送缓冲区的内容 write_buffer_ 发送到下游网络连接:

// source/common/network/connection_impl.cc
void ConnectionImpl::onWriteReady() {
  ...
  IoResult result = transport_socket_->doWrite(*write_buffer_, write_end_stream_);
  ...
}

从上面可以看出,响应数据路径处理步骤比请求路径处理步骤简单些,少了路由处理部分及连接池创建部分的逻辑。因为等待服务端响应时,反向发送响应的路径应该已经准备好了

参考:

《Istio权威指南(下) 云原生服务网格Istio架构与源码》

为何 Envoy 会经常全量推送路由变更,以及如何改善(上)

《Istio & Envoy 内幕》

Life of a Request

请求的生命周期

相关文章:

  • 回归预测 | Matlab实现NRBO-Transformer-LSTM多输入单输出回归预测
  • 水文传输规约 SL651的相关经验
  • Java的Selenium的特殊元素操作与定位之iframe切换
  • Spring Boot开发三板斧:高效构建企业级应用的核心技法
  • 【项目管理-高项】学习方法 整体概览
  • 优化 Web 性能:处理非合成动画(Non-Composited Animations)
  • Java的Selenium基本元素定位(findElement方法)
  • leetcode-动态规划20
  • 15-SpringBoot3入门-MyBatis-Plus基于service层的CRUD
  • 数据结构与算法学习笔记----贪心·排序不等式
  • CSS Text(文本)学习笔记
  • es基本概念
  • (蓝桥杯)岛屿个数
  • 树莓集团多方位拓展:园区服务及人才培养的协同发展
  • 博客文章:深入分析 PyMovie - 基于 Python和 MoviePy 的视频管理工具
  • YY forget password
  • 学透Spring Boot — 013. Spring Web-Flux 函数式风格的控制器
  • 用Python解锁未来交通:开发基于机器学习的流量预测系统
  • Java程序设计第1章:概述
  • LeetCode 249 解法揭秘:如何把“abc”和“bcd”分到一组?