Envoy 源码解析(三):Envoy 发送数据到服务端、Envoy 收到服务端响应
本文基于 Envoy 1.31.0 版本进行源码学习
5、Envoy 发送数据到服务端
在 HTTP 请求头部解码完成后,需要根据 HTTP 请求头部内容与监听器路由配置信息匹配 Cluster 并通过负载均衡来确定 Cluster 的实例地址,然后工作线程将创建与该实例地址的连接并将 HTTP 请求经过编码操作后发送出去
1)、路由匹配
L7 路由过滤器总是作为 L7 过滤器的最后一个进行创建,并在处理 HTTP 请求时调用过滤器的 Filter::decodeHeaders
方法:
// source/common/router/router.cc
Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) {
...
// Determine if there is a route entry or a direct response for the request.
// 匹配路由项
route_ = callbacks_->route();
...
// A route entry matches for the request.
// 获取当前路由项
route_entry_ = route_->routeEntry();
...
// 根据路由项匹配 Cluster
Upstream::ThreadLocalCluster* cluster =
config_->cm_.getThreadLocalCluster(route_entry_->clusterName());
...
// 获取 Cluster 的上游连接池
std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster);
...
// 创建用于请求重试的对象 retry_state_
retry_state_ = createRetryState(
route_entry_->retryPolicy(), headers, *cluster_, request_vcluster_, route_stats_context_,
config_->factory_context_, callbacks_->dispatcher(), route_entry_->priority());
...
// 创建上游 HTTP 请求
UpstreamRequestPtr upstream_request =
std::make_unique<UpstreamRequest>(*this, std::move(generic_conn_pool), can_send_early_data,
can_use_http3, false /*enable_half_close*/);
LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_);
// 处理上游 HTTP 请求
upstream_requests_.front()->acceptHeadersFromRouter(end_stream);
...
if (end_stream) {
onRequestComplete();
}
// 不再继续执行其他 L7 过滤器
return Http::FilterHeadersStatus::StopIteration;
}
路由处理过程如下图所示:

路由处理过程包括以下几个步骤:
-
首先根据请求头部内容匹配虚拟主机 virtualHost,然后根据 RDS 配置设置 URL 条件来找到 Cluster:
// source/common/router/router.cc Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) { ... // Determine if there is a route entry or a direct response for the request. // 匹配路由项 route_ = callbacks_->route(); ... }
route 方法调用
ActiveStream::refreshCachedRoute
方法,在配置文件对象snapped_route_config_
中计算路由匹配规则,snapped_route_config_
对象是在执行ActiveStream::decodeHeaders
方法时生成的:// source/common/http/conn_manager_impl.cc void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPtr&& headers, bool end_stream) { ... snapped_route_config_ = connection_manager_.config_->routeConfigProvider()->configCast(); }
refreshCachedRoute 方法根据当前 HTTP 请求的头部
request_headers_
进行后续路由处理:// source/common/http/conn_manager_impl.cc void ConnectionManagerImpl::ActiveStream::refreshCachedRoute(const Router::RouteCallback& cb) { ... // 根据 HTTP 请求头进行后续路由处理 route = snapped_route_config_->route(cb, *request_headers_, filter_manager_.streamInfo(), stream_id_); ... }
路由处理代码如下,首先根据请求头部数据 headers 在配置中查找虚拟机主机 virtualHost:
// source/common/router/config_impl.cc RouteConstSharedPtr RouteMatcher::route(const RouteCallback& cb, const Http::RequestHeaderMap& headers, const StreamInfo::StreamInfo& stream_info, uint64_t random_value) const { // 根据 HTTP 请求头在配置中查找虚拟主机 virtualHost const VirtualHostImpl* virtual_host = findVirtualHost(headers); if (virtual_host) { return virtual_host->getRouteFromEntries(cb, headers, stream_info, random_value); } else { return nullptr; } }
RouteMatcher::findVirtualHost
方法首先从请求头部 headers 中取出 Host 值,然后在配置文件virtual_hosts_
列表中查找,如果匹配不到项目,则继续尝试使用 findWildcardVirtualHost 方法从通配符规则中查找路由// source/common/router/config_impl.cc const VirtualHostImpl* RouteMatcher::findVirtualHost(const Http::RequestHeaderMap& headers) const { ... // 从 HTTP 请求头中取出 Host 值 absl::string_view host_header_value = headers.getHostValue(); ... // 在配置文件 virtual_hosts_ 列表中查找匹配项(通过先用 hash 命中精确匹配 virtualHost) const std::string host = absl::AsciiStrToLower(host_header_value); const auto iter = virtual_hosts_.find(host); if (iter != virtual_hosts_.end()) { return iter->second.get(); } // 如果匹配不到,则继续尝试使用 findWildcardVirtualHost 方法通过后缀或前缀通配符规则匹配 virtualHost(遍历逐个进行匹配) if (!wildcard_virtual_host_suffixes_.empty()) { const VirtualHostImpl* vhost = findWildcardVirtualHost( host, wildcard_virtual_host_suffixes_, [](absl::string_view h, int l) -> absl::string_view { return h.substr(h.size() - l); }); if (vhost != nullptr) { return vhost; } } if (!wildcard_virtual_host_prefixes_.empty()) { const VirtualHostImpl* vhost = findWildcardVirtualHost( host, wildcard_virtual_host_prefixes_, [](absl::string_view h, int l) -> absl::string_view { return h.substr(0, l); }); if (vhost != nullptr) { return vhost; } } return default_virtual_host_.get(); }
虚拟主机 virtualHost 匹配完成后,route 操作调用
VirtualHostImpl::getRouteFromEntries
方法,再根据请求头部 headers 内容轮询配置文件routes_
列表来查找 Cluster// source/common/router/config_impl.cc RouteConstSharedPtr VirtualHostImpl::getRouteFromRoutes( const RouteCallback& cb, const Http::RequestHeaderMap& headers, const StreamInfo::StreamInfo& stream_info, uint64_t random_value, absl::Span<const RouteEntryImplBaseConstSharedPtr> routes) const { // 根据 HTTP 请求头轮询配置文件 routes 列表来查找 Cluster for (auto route = routes.begin(); route != routes.end(); ++route) { if (!headers.Path() && !(*route)->supportsPathlessHeaders()) { continue; } RouteConstSharedPtr route_entry = (*route)->matches(headers, stream_info, random_value); if (route_entry == nullptr) { continue; } if (cb == nullptr) { return route_entry; } RouteEvalStatus eval_status = (std::next(route) == routes.end()) ? RouteEvalStatus::NoMoreRoutes : RouteEvalStatus::HasMoreRoutes; RouteMatchStatus match_status = cb(route_entry, eval_status); if (match_status == RouteMatchStatus::Accept) { return route_entry; } if (match_status == RouteMatchStatus::Continue && eval_status == RouteEvalStatus::NoMoreRoutes) { ENVOY_LOG(debug, "return null when route match status is Continue but there is no more routes"); return nullptr; } } ENVOY_LOG(debug, "route was resolved but final route list did not match incoming request"); return nullptr; }
-
route_entry_->clusterName
变量用于保存 Cluster的名称,即上游服务名称,为了减少多线程竞争,每个工作线程中都有基于线程本地存储的 ThreadLocalCluster 对象,用于查找 Cluster,并通过 getThreadLocalCluster 方法进行查询// source/common/router/router.cc void Filter::chargeUpstreamCode(uint64_t response_status_code, const Http::ResponseHeaderMap& response_headers, Upstream::HostDescriptionConstSharedPtr upstream_host, bool dropped) { ... // 根据路由项匹配 Cluster Upstream::ThreadLocalCluster* cluster = config_->cm_.getThreadLocalCluster(route_entry_->clusterName()); ... }
小结:RDS 路由匹配时,先匹配域名维度,再匹配一个域名下的多个路径
- RDS 里面的多个域名:先用 hash 命中精确匹配 virtualHost,然后通过后缀或前缀通配符规则遍历逐个进行匹配 virtualHost
- 同一个域名下的多个路径:严格按配置顺序一一匹配的。除此之外,还支持通过 generic matching API 来进行树形的匹配,以及支持通过 route scope 控制匹配的范围

2)、获取连接池
在根据路由项匹配到 Cluster 后,工作线程通过 createConnPool 方法获取上游 Cluster 的连接池
// source/common/router/router.cc
Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) {
...
// 获取 Cluster 的上游连接池
std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster);
...
}
这里工作线程调用 GenericGenericConnPoolFactory::createGenericConnPool
方法,创建对应协议的连接池对象,以 HttpConnPool 对象举例,HttpConnPool 构造方法调用 ClusterEntry::httpConnPool
方法来创建连接池
// source/common/upstream/cluster_manager_impl.cc
absl::optional<HttpPoolData>
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool(
ResourcePriority priority, absl::optional<Http::Protocol> protocol,
LoadBalancerContext* context) {
...
auto pool = httpConnPoolImpl(priority, protocol, context, false);
...
}
ClusterEntry::httpConnPoolImpl
代码如下:
// source/common/upstream/cluster_manager_impl.cc
Http::ConnectionPool::Instance*
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPoolImpl(
ResourcePriority priority, absl::optional<Http::Protocol> downstream_protocol,
LoadBalancerContext* context, bool peek) {
// 调用负载均衡器的 chooseHost 方法,在当前 Cluster 的实例中挑选一个最合适的目标主机
HostConstSharedPtr host = (peek ? peekAnotherHost(context) : chooseHost(context));
...
// 创建目标主机的连接池容器
ConnPoolsContainer& container = *parent_.getHttpConnPoolsContainer(host, true);
// Note: to simplify this, we assume that the factory is only called in the scope of this
// function. Otherwise, we'd need to capture a few of these variables by value.
// 根据目标主机 Host 支持的协议类型计算散列值 hash_key,并在目标主机已创建的活跃连接池中通过 getPool 进行连接池查找
ConnPoolsContainer::ConnPools::PoolOptRef pool =
container.pools_->getPool(priority, hash_key, [&]() {
// 如果连接池不存在,调用 allocateConnPool 方法创建新连接池
auto pool = parent_.parent_.factory_.allocateConnPool(
parent_.thread_local_dispatcher_, host, priority, upstream_protocols,
alternate_protocol_options, !upstream_options->empty() ? upstream_options : nullptr,
have_transport_socket_options ? context->upstreamTransportSocketOptions() : nullptr,
parent_.parent_.time_source_, parent_.cluster_manager_state_, quic_info_);
pool->addIdleCallback([&parent = parent_, host, priority, hash_key]() {
parent.httpConnPoolIsIdle(host, priority, hash_key);
});
return pool;
});
if (pool.has_value()) {
return &(pool.value().get());
} else {
return nullptr;
}
}
ClusterEntry::httpConnPoolImpl
方法逻辑如下:
-
此时 peek 标志为 false,负载均衡过程中的
ClusterEntry::httpConnPoolImpl
方法调用 chooseHost 方法,在当前 Cluster 的实例中挑选一个最合适的目标主机 Host 并获取目标主机 Host -
执行 getHttpConnPoolsContainer 方法来创建目标主机的连接池容器
// source/common/upstream/cluster_manager_impl.cc ClusterManagerImpl::ThreadLocalClusterManagerImpl::ConnPoolsContainer* ClusterManagerImpl::ThreadLocalClusterManagerImpl::getHttpConnPoolsContainer( const HostConstSharedPtr& host, bool allocate) { auto container_iter = host_http_conn_pool_map_.find(host); if (container_iter == host_http_conn_pool_map_.end()) { if (!allocate) { return nullptr; } container_iter = host_http_conn_pool_map_.try_emplace(host, thread_local_dispatcher_, host).first; } return &container_iter->second; }
getHttpConnPoolsContainer 方法将在 ThreadLocalClusterManagerImpl 对象内使用
host_http_conn_pool_map_
散列表进行连接池查找,因此每个线程的目标主机都拥有各自独立的上游连接池 -
根据目标主机 Host 支持的协议类型计算散列值 hash_key,并在目标主机已创建的活跃连接池中通过 getPool 进行连接池查找,如果连接池不存在,调用 allocateConnPool 方法创建新连接池
-
allocateConnPool 方法中根据 HTTP 类型创建对应的连接池,若类型为 HTTP1,allocateConnPool 方法将创建 FixedHttpConnPoolImpl 类型连接池实例,并在其构造方法中进行父类 HttpConnPoolImplBase 初始化,然后在父类 HttpConnPoolImplBase 中再进行祖父类 ConnPoolImplBase 初始化:
// source/common/conn_pool/conn_pool_base.cc ConnPoolImplBase::ConnPoolImplBase( Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority, Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options, const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options, Upstream::ClusterConnectivityState& state) : state_(state), host_(host), priority_(priority), dispatcher_(dispatcher), socket_options_(options), transport_socket_options_(transport_socket_options), upstream_ready_cb_(dispatcher_.createSchedulableCallback([this]() { onUpstreamReady(); })) {}
在 ConnPoolImplBase 构造方法中,将创建
upstream_ready_cb_
事件回调方法,该回调方法将被注册于线程调度内,并在每个请求处理结束时被触发,表示当前请求已处理完成,可以用于处理其他待发送请求
3)、上游请求重试逻辑
在 L7 路由过滤器的 decodeHeaders 中,连接池创建完毕后将创建用于请求重试的对象 retry_state_
,以及当前请求的上游对象 upstream_request:
// source/common/router/router.cc
Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) {
...
// 创建用于请求重试的对象 retry_state_
retry_state_ = createRetryState(
route_entry_->retryPolicy(), headers, *cluster_, request_vcluster_, route_stats_context_,
config_->factory_context_, callbacks_->dispatcher(), route_entry_->priority());
...
// 创建上游 HTTP 请求
UpstreamRequestPtr upstream_request =
std::make_unique<UpstreamRequest>(*this, std::move(generic_conn_pool), can_send_early_data,
can_use_http3, false /*enable_half_close*/);
...
return Http::FilterHeadersStatus::StopIteration;
}
上面的代码中首先创建了 retry_state_
对象,用于请求重试。这里的重试与某一目标主机的连接池内的重试的区别是,连接池内的重试只会对当前已经选定的目标主机地址进行连接重试,如果目标主机确实存在连接问题或无法响应,应用请求将不会连接这个 Cluster 的其他主机地址。而路由内的重试 retry_state_
对象,则是当某个目标主机始终由于连接问题或长期响应错误码(如5xx)而无法连接时,上游请求将有机会在 Cluster 内选择其他可用的主机地址并重新发送请求。请求重试策略如下图所示:

下面先来看下路由内的重试执行流程:
// source/common/router/router.cc
void Filter::onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPtr&& headers,
UpstreamRequest& upstream_request, bool end_stream) {
...
const RetryStatus retry_status = retry_state_->shouldRetryHeaders(
*headers, *downstream_headers_,
[this, can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_,
had_early_data = upstream_request.upstreamStreamOptions().can_send_early_data_](
bool disable_early_data) -> void {
doRetry((disable_early_data ? false : had_early_data), can_use_http3, TimeoutRetry::No);
});
...
}
当收到上游响应数据或遇到上游访问失败时,onUpstreamHeaders 方法被调用,该方法内执行 shouldRetryHeaders 方法,根据响应头部的 headers 内容判断是否需要重新选择主机。一旦判定为需要重新选择主机,则调用 doRetry 方法进行主机选择及请求发送的重试。相关代码如下:
// source/common/router/retry_state_impl.cc
RetryStatus RetryStateImpl::shouldRetryHeaders(const Http::ResponseHeaderMap& response_headers,
const Http::RequestHeaderMap& original_request,
DoRetryHeaderCallback callback) {
// This may be overridden in wouldRetryFromHeaders().
bool disable_early_data = false;
const RetryDecision retry_decision =
wouldRetryFromHeaders(response_headers, original_request, disable_early_data);
// Yes, we will retry based on the headers - try to parse a rate limited reset interval from the
// response.
if (retry_decision == RetryDecision::RetryWithBackoff && !reset_headers_.empty()) {
const auto backoff_interval = parseResetInterval(response_headers);
if (backoff_interval.has_value() && (backoff_interval.value().count() > 1L)) {
ratelimited_backoff_strategy_ = std::make_unique<JitteredLowerBoundBackOffStrategy>(
backoff_interval.value().count(), random_);
}
}
// 进行重试
return shouldRetry(retry_decision,
[disable_early_data, callback]() { callback(disable_early_data); });
}
wouldRetryFromHeaders 方法根据响应头部状态码判断是否匹配设置的重试条件,retry_on_
变量是 Envoy 的重试策略配置
// source/common/router/retry_state_impl.cc
RetryState::RetryDecision
RetryStateImpl::wouldRetryFromHeaders(const Http::ResponseHeaderMap& response_headers,
const Http::RequestHeaderMap& original_request,
bool& disable_early_data) {
...
uint64_t response_status = Http::Utility::getResponseStatus(response_headers);
// 根据响应头部状态码判断是否匹配设置的重试条件,retry_on_ 变量是 Envoy 的重试策略配置
if (retry_on_ & RetryPolicy::RETRY_ON_5XX) {
if (Http::CodeUtility::is5xx(response_status)) {
return RetryDecision::RetryWithBackoff;
}
}
...
return RetryDecision::NoRetry;
}
然后,shouldRetry 方法将保存外部设置的回调方式,这里为 onRetry 方法。接着调用 enableBackoffTimer 方法创建重试定时器,并在定时器时间到达后调用 onRetry 回调方法:
// source/common/router/retry_state_impl.cc
void RetryStateImpl::enableBackoffTimer() {
if (!retry_timer_) {
// 在定时器时间到达后调用 onRetry 回调方法
retry_timer_ = dispatcher_.createTimer([this]() -> void { backoff_callback_(); });
}
if (ratelimited_backoff_strategy_ != nullptr) {
// If we have a backoff strategy based on rate limit feedback from the response we use it.
retry_timer_->enableTimer(
std::chrono::milliseconds(ratelimited_backoff_strategy_->nextBackOffMs()));
// The strategy is only valid for the response that sent the ratelimit reset header and cannot
// be reused.
ratelimited_backoff_strategy_.reset();
cluster_.trafficStats()->upstream_rq_retry_backoff_ratelimited_.inc();
} else {
// Otherwise we use a fully jittered exponential backoff algorithm.
retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_->nextBackOffMs()));
cluster_.trafficStats()->upstream_rq_retry_backoff_exponential_.inc();
}
}
接下来 doRetry 方法将进行类似路由 decodeHeaders 方法的处理,区别是 doRetry 方法将在已经得到的路由结果基础上选择目标主机并创建连接池,向上游发送请求:
// source/common/router/router.cc
void Filter::doRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry is_timeout_retry) {
...
// 选择目标主机及创建连接池
generic_conn_pool = createConnPool(*cluster);
...
UpstreamRequestPtr upstream_request =
std::make_unique<UpstreamRequest>(*this, std::move(generic_conn_pool), can_send_early_data,
can_use_http3, false /*enable_tcp_tunneling*/);
...
// 向上游发送请求
upstream_requests_.front()->acceptHeadersFromRouter(
!callbacks_->decodingBuffer() && !downstream_trailers_ && downstream_end_stream_);
...
}
这样不论在请求刚完成路由处理后,还是在当前连接池内所有主机连接失败,返回路由处理并进行连接重试时,工作线程都将执行UpstreamRequest::acceptHeadersFromRouter
方法处理上游连接
4)、发送上游请求
UpstreamRequest::acceptHeadersFromRouter
为上游请求发送的入口点。上游请求发送流程如下图所示:
UpstreamRequest::acceptHeadersFromRouter
方法调用 HttpConnPool::newStream 方法,newStream 方法使用连接池的 newStream 方法创建连接池内可取消请求对象的 handle,并将上游请求 upstreamRequest 对象当作上下游请求的关联对象 upstreamToDownstream,然后将该对象作为将来上游响应的解码器保存,其中回调方法为 HttpConnPool 对象的引用。相关代码如下:
// source/extensions/upstreams/http/http/upstream_request.cc
void HttpConnPool::newStream(GenericConnectionPoolCallbacks* callbacks) {
callbacks_ = callbacks;
// It's possible for a reset to happen inline within the newStream() call. In this case, we
// might get deleted inline as well. Only write the returned handle out if it is not nullptr to
// deal with this case.
Envoy::Http::ConnectionPool::Cancellable* handle =
pool_data_.value().newStream(callbacks->upstreamToDownstream(), *this,
callbacks->upstreamToDownstream().upstreamStreamOptions());
if (handle) {
conn_pool_stream_handle_ = handle;
}
}
HttpConnPoolImplBase::newStream
方法创建请求包装对象 HttpAttachContext,该对象保存上游请求 upstreamRequest 及 HttpConnPool 的关联关系。然后传入 ConnPoolImplBase::newStreamImpl
方法,newStreamImpl 方法将首先检查已就绪连接列表 ready_clients_
中是否有可用连接,如果有,则直接一个连接并执行 attachStreamToClient 方法,将当前请求上下文 context 关联到该连接:
// source/common/conn_pool/conn_pool_base.cc
ConnectionPool::Cancellable* ConnPoolImplBase::newStreamImpl(AttachContext& context,
bool can_send_early_data) {
...
// 可用连接不为空
if (!ready_clients_.empty()) {
ActiveClient& client = *ready_clients_.front();
ENVOY_CONN_LOG(debug, "using existing fully connected connection", client);
// 处理当前请求
attachStreamToClient(client, context);
// Even if there's a ready client, we may want to preconnect to handle the next incoming stream.
tryCreateNewConnections();
return nullptr;
}
...
if (!host_->cluster().resourceManager(priority_).pendingRequests().canCreate()) {
ENVOY_LOG(debug, "max pending streams overflow");
onPoolFailure(nullptr, absl::string_view(), ConnectionPool::PoolFailureReason::Overflow,
context);
host_->cluster().trafficStats()->upstream_rq_pending_overflow_.inc();
// 无法将新请求放入等待队列
return nullptr;
}
// 创建一个新的待处理流,并将其添加到 pending_streams_ 中
ConnectionPool::Cancellable* pending = newPendingStream(context, can_send_early_data);
...
// This must come after newPendingStream() because this function uses the
// length of pending_streams_ to determine if a new connection is needed.
// 调用 tryCreateNewConnections 方法检查 pending_streams_ 的长度,以此来判断是否需要创建新的连接
const ConnectionResult result = tryCreateNewConnections();
...
return pending;
}
如果没有可用连接,且此时上游请求等待队列中的请求量已经超过设置的阈值,则放弃当前请求,返回 Overflow 标志,这样下游连接将收到一个带有 Overflow 标志的失败响应。如果上游请求等待队列中的请求量还未超过设置的额阈值,工作线程则调用 newPendingStream 方法将当前请求保存到等待队列中,并调用 tryCreateNewConnections 方法创建新连接,创建的新连接以异步方式建立。在新连接建立完成后,将通过事件方式通知上游连接池继续处理等待队列中的请求。相关代码如下:
// source/common/http/conn_pool_base.cc
ConnectionPool::Cancellable*
HttpConnPoolImplBase::newPendingStream(Envoy::ConnectionPool::AttachContext& context,
bool can_send_early_data) {
Http::ResponseDecoder& decoder = *typedContext<HttpAttachContext>(context).decoder_;
Http::ConnectionPool::Callbacks& callbacks = *typedContext<HttpAttachContext>(context).callbacks_;
ENVOY_LOG(debug,
"queueing stream due to no available connections (ready={} busy={} connecting={})",
ready_clients_.size(), busy_clients_.size(), connecting_clients_.size());
Envoy::ConnectionPool::PendingStreamPtr pending_stream(
new HttpPendingStream(*this, decoder, callbacks, can_send_early_data));
// 将请求放入连接池待处理请求队列 pending_streams_ 中
return addPendingStream(std::move(pending_stream));
}
addPendingStream 方法将请求放入连接池待处理请求队列 pending_streams_ 中
ConnPoolImplBase::tryCreateNewConnections
会尝试3次创建新链接,处理过程代码如下:
// source/common/conn_pool/conn_pool_base.cc
ConnPoolImplBase::ConnectionResult ConnPoolImplBase::tryCreateNewConnections() {
ConnPoolImplBase::ConnectionResult result;
// Somewhat arbitrarily cap the number of connections preconnected due to new
// incoming connections. The preconnect ratio is capped at 3, so in steady
// state, no more than 3 connections should be preconnected. If hosts go
// unhealthy, and connections are not immediately preconnected, it could be that
// many connections are desired when the host becomes healthy again, but
// overwhelming it with connections is not desirable.
// 尝试3次创建新连接
for (int i = 0; i < 3; ++i) {
result = tryCreateNewConnection();
if (result != ConnectionResult::CreatedNewConnection) {
break;
}
}
ASSERT(!is_draining_for_deletion_ || result != ConnectionResult::CreatedNewConnection);
return result;
}
ConnPoolImplBase::ConnectionResult
ConnPoolImplBase::tryCreateNewConnection(float global_preconnect_ratio) {
...
ActiveClientPtr client = instantiateActiveClient();
...
LinkedList::moveIntoList(std::move(client), owningList(client->state()));
return can_create_connection ? ConnectionResult::CreatedNewConnection
: ConnectionResult::CreatedButRateLimited;
...
}
instantiateActiveClient 是创建新连接的主要方法,在连接创建成功后,将根据连接的当前状态将其放入可用连接 ready_clients_
、已用连接 busy_clients_
、正在建立连接 connecting_clients_
的列表中。如果连接建立较慢,首先将连接放入 connecting_clients_
列表中。如果连接建立很快,其可以马上变成 ready_clients_
。如果已经有待处理请求,则会在 onUpstreamReady 回调方法中关联待处理请求并将其立即变成 busy_clients_
对于 HTTP1 的请求,instantiateActiveClient 方法通过前面连接池的 FixedHttpConnPoolImpl 构造方法内保存的回调对象创建新连接:
// source/common/http/http1/conn_pool.cc
ConnectionPool::InstancePtr
allocateConnPool(Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator,
Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
const Network::ConnectionSocket::OptionsSharedPtr& options,
const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
Upstream::ClusterConnectivityState& state) {
return std::make_unique<FixedHttpConnPoolImpl>(
std::move(host), std::move(priority), dispatcher, options, transport_socket_options,
random_generator, state,
[](HttpConnPoolImplBase* pool) {
// 通过连接池内的回调方法创建上游连接对象
return std::make_unique<ActiveClient>(*pool, absl::nullopt);
},
[](Upstream::Host::CreateConnectionData& data, HttpConnPoolImplBase* pool) {
// 创建客户端连接解码器
CodecClientPtr codec{new CodecClientProd(
CodecType::HTTP1, std::move(data.connection_), data.host_description_,
pool->dispatcher(), pool->randomGenerator(), pool->transportSocketOptions())};
return codec;
},
std::vector<Protocol>{Protocol::Http11}, absl::nullopt, nullptr);
}
client_fn_
变量用于保存 Http1::ActiveClient
构造方法:
// source/common/http/http1/conn_pool.cc
ActiveClient::ActiveClient(HttpConnPoolImplBase& parent,
OptRef<Upstream::Host::CreateConnectionData> data)
// HTTP1 连接每次只能处理一个并发请求
: Envoy::Http::ActiveClient(parent, parent.host()->cluster().maxRequestsPerConnection(),
/* effective_concurrent_stream_limit */ 1,
/* configured_concurrent_stream_limit */ 1, data) {
parent.host()->cluster().trafficStats()->upstream_cx_http1_total_.inc();
}
可以看到,对于 HTTP1,每条上游连接每次只能处理一个 HTTP 请求。相应地,一条上游连接可以同时处理多个 HTTP2 的请求
新上游连接的父类 Http::ActiveClient
构造方法将创建 TCP 网络连接及解码器对象:
// source/common/http/conn_pool_base.h
class ActiveClient : public Envoy::ConnectionPool::ActiveClient {
public:
ActiveClient(HttpConnPoolImplBase& parent, uint64_t lifetime_stream_limit,
uint64_t effective_concurrent_stream_limit,
uint64_t configured_concurrent_stream_limit,
OptRef<Upstream::Host::CreateConnectionData> opt_data)
: Envoy::ConnectionPool::ActiveClient(parent, lifetime_stream_limit,
effective_concurrent_stream_limit,
configured_concurrent_stream_limit) {
...
// The static cast makes sure we call the base class host() and not
// HttpConnPoolImplBase::host which is of a different type.
// 创建 TCP 网络连接
Upstream::Host::CreateConnectionData data =
static_cast<Envoy::ConnectionPool::ConnPoolImplBase*>(&parent)->host()->createConnection(
parent.dispatcher(), parent.socketOptions(), parent.transportSocketOptions());
// 创建解码器对象
initialize(data, parent);
}
ActiveClient 对象的父类 ConnectionPool::ActiveClient
构造方法将创建连接超时检测定时器 connect_timer_
。当新连接一直无法建立成功时,可以关闭并销毁该连接:
// source/common/conn_pool/conn_pool_base.cc
ActiveClient::ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
uint32_t effective_concurrent_streams, uint32_t concurrent_stream_limit)
: parent_(parent), remaining_streams_(translateZeroToUnlimited(lifetime_stream_limit)),
configured_stream_limit_(translateZeroToUnlimited(effective_concurrent_streams)),
concurrent_stream_limit_(translateZeroToUnlimited(concurrent_stream_limit)),
// 创建连接超时检测定时器 connect_timer_。当新连接一直无法建立成功时,可以关闭并销毁该连接
connect_timer_(parent_.dispatcher().createTimer([this]() { onConnectTimeout(); })) {
...
}
void ActiveClient::onConnectTimeout() {
ENVOY_CONN_LOG(debug, "connect timeout", *this);
parent_.host()->cluster().trafficStats()->upstream_cx_connect_timeout_.inc();
timed_out_ = true;
close();
}
HostImplBase::createConnection
方法调用 DispatcherImpl::createClientConnection
方法来创建网络连接,并将网络事件注册到当前工作线程的 Dispatcher 内:
// source/common/upstream/upstream_impl.cc
Host::CreateConnectionData HostImplBase::createConnection(
Event::Dispatcher& dispatcher, const ClusterInfo& cluster,
const Network::Address::InstanceConstSharedPtr& address,
const SharedConstAddressVector& address_list_or_null,
Network::UpstreamTransportSocketFactory& socket_factory,
const Network::ConnectionSocket::OptionsSharedPtr& options,
Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
HostDescriptionConstSharedPtr host) {
auto source_address_selector = cluster.getUpstreamLocalAddressSelector();
Network::ClientConnectionPtr connection;
...
auto upstream_local_address =
source_address_selector->getUpstreamLocalAddress(address, options);
// 创建网络连接,并将网络事件注册到当前工作线程的 dispatcher 内
connection = dispatcher.createClientConnection(
address, upstream_local_address.address_,
socket_factory.createTransportSocket(transport_socket_options, host),
upstream_local_address.socket_options_, transport_socket_options);
...
return {std::move(connection), std::move(host)};
}
ActiveClient 执行 initialize 方法调用 parent.createCodecClient
创建上游编解码器,原理类似于接收下游连接时创建解码器的过程。创建完成后将返回 CodecClientProd 对象,用于上游响应的解码:
// source/common/http/conn_pool_base.h
void initialize(Upstream::Host::CreateConnectionData& data, HttpConnPoolImplBase& parent) {
real_host_description_ = data.host_description_;
// 调用 parent.createCodecClient 创建上游编解码器
codec_client_ = parent.createCodecClient(data);
codec_client_->addConnectionCallbacks(*this);
Upstream::ClusterTrafficStats& traffic_stats = *parent_.host()->cluster().trafficStats();
codec_client_->setConnectionStats(
{traffic_stats.upstream_cx_rx_bytes_total_, traffic_stats.upstream_cx_rx_bytes_buffered_,
traffic_stats.upstream_cx_tx_bytes_total_, traffic_stats.upstream_cx_tx_bytes_buffered_,
&traffic_stats.bind_errors_, nullptr});
}
CodecClient 为 CodecClientProd 的基类。CodecClientProd 构造方法首先调用父类构造方法 CodecClient::CodecClient
,将自己注册为 L4 过滤器并响应网络连接事件 onEvent。同时也将自己注册为 L4 网络过滤器来进行数据的接收处理,当收到目标主机 Host 返回的上游响应时,将触发 CodecClient::onData
回调方法。相关代码如下:
// source/common/http/codec_client.cc
CodecClient::CodecClient(CodecType type, Network::ClientConnectionPtr&& connection,
Upstream::HostDescriptionConstSharedPtr host,
Event::Dispatcher& dispatcher)
: type_(type), host_(host), connection_(std::move(connection)),
idle_timeout_(host_->cluster().idleTimeout()) {
if (type_ != CodecType::HTTP3) {
// Make sure upstream connections process data and then the FIN, rather than processing
// TCP disconnects immediately. (see https://github.com/envoyproxy/envoy/issues/1679 for
// details)
connection_->detectEarlyCloseWhenReadDisabled(false);
}
// 注册为连接建立成功事件的回调方法
connection_->addConnectionCallbacks(*this);
// 注册自己为 L4 读数据过滤器
connection_->addReadFilter(Network::ReadFilterSharedPtr{new CodecReadFilter(*this)});
if (idle_timeout_) {
idle_timer_ = dispatcher.createTimer([this]() -> void { onIdleTimeout(); });
enableIdleTimer();
}
// We just universally set no delay on connections. Theoretically we might at some point want
// to make this configurable.
connection_->noDelay(true);
}
CodecClientProd::CodecClientProd
还根据HTTP类型创建 codec_
内部的编解码对象 Http1::ClientConnectionImpl
,类似于处理下游连接时注册 Node.js 库的 onMessageBegin、onHeadersComplete 等回调方法
CodecReadFilter 为 L4 过滤器的轻量包装器,将调用 CodecClient::onData
方法,并在响应处理完成后结束过滤器迭代
CodecClient::onData
类似下游连接时的 ServerConnection 处理,这里将使用 codec_->dispatch
解析从目标主机 Host 返回响应数据:
// source/common/http/codec_client.cc
void CodecClient::onData(Buffer::Instance& data) {
const Status status = codec_->dispatch(data);
...
}
不但 CodecClient 将自己注册为网络连接事件的 L4 回调处理器,ActiveClient 也将自己注册为 L4 回调处理器,此时 ActiveClient 用于通知连接池更新连接状态,并决定是否处理其他等待请求。ActiveClient::initialize
方法中通过 addConnectionCallbacks 方法将 ActiveClient 设置为 CodecClient 的 L4 回调处理器:
// source/common/http/conn_pool_base.h
void initialize(Upstream::Host::CreateConnectionData& data, HttpConnPoolImplBase& parent) {
real_host_description_ = data.host_description_;
// 调用 parent.createCodecClient 创建上游编解码器
codec_client_ = parent.createCodecClient(data);
// 将自己注册为 L4 回调处理器,此时 ActiveClient 用于通知连接池更新连接状态,
// 并决定是否处理其他等待请求
codec_client_->addConnectionCallbacks(*this);
Upstream::ClusterTrafficStats& traffic_stats = *parent_.host()->cluster().trafficStats();
codec_client_->setConnectionStats(
{traffic_stats.upstream_cx_rx_bytes_total_, traffic_stats.upstream_cx_rx_bytes_buffered_,
traffic_stats.upstream_cx_tx_bytes_total_, traffic_stats.upstream_cx_tx_bytes_buffered_,
&traffic_stats.bind_errors_, nullptr});
}
在新连接握手过程完成后,将调用连接池更新连接状态回调方法:
// source/common/http/conn_pool_base.h
void onEvent(Network::ConnectionEvent event) override {
parent_.onConnectionEvent(*this, codec_client_->connectionFailureReason(), event);
}
ConnPoolImplBase::onConnectionEvent
回调方法用于判断新建立连接 ActiveClient 携带的网络事件状态,当连接建立成功时状态为 Network::ConnectionEvent::Connected
:
// source/common/conn_pool/conn_pool_base.cc
void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view failure_reason,
Network::ConnectionEvent event) {
switch (event) {
...
case Network::ConnectionEvent::Connected: {
...
if (client.state() == ActiveClient::State::Connecting ||
client.state() == ActiveClient::State::ReadyForEarlyData) {
// 设置当前连接状态
transitionActiveClientState(client,
(client.currentUnusedCapacity() > 0 ? ActiveClient::State::Ready
: ActiveClient::State::Busy));
}
...
if (client.readyForStream()) {
// 关联已挂起请求并开始处理
onUpstreamReady();
}
...
break;
}
...
}
}
调用 transitionActiveClientState 方法设置当前连接状态,并调用 onUpstreamReady 方法轮询所有待处理请求,onUpstreamReady 方法从立即可用连接列表头部取出一个连接,并与一个待处理请求进行关联,同时从待处理请求列表中删除该请求:
// source/common/conn_pool/conn_pool_base.cc
void ConnPoolImplBase::onUpstreamReady() {
while (!pending_streams_.empty() && !ready_clients_.empty()) {
// 从立即可用连接列表头部取出一个连接
ActiveClientPtr& client = ready_clients_.front();
ENVOY_CONN_LOG(debug, "attaching to next stream", *client);
// Pending streams are pushed onto the front, so pull from the back.
// 将该可用连接与一个待处理请求进行关联
attachStreamToClient(*client, pending_streams_.back()->context());
state_.decrPendingStreams(1);
// 从待处理请求列表中删除该请求
pending_streams_.pop_back();
}
if (!pending_streams_.empty()) {
tryCreateNewConnections();
}
}
6、Envoy 收到服务端响应
1)、接收响应数据
在 Envoy 完成请求发送任务后,后端的应用将根据接收的请求进行业务处理,然后将处理结果作为响应数据发送回 Envoy。Envoy 接收上游响应的处理流程如下图所示:

CodecClient 将自己注册为 L4 读过滤器,当收到服务端返回的响应时,L4 连接的 onFileEvent 回调方法被触发,经过 ClientConnection 的 onReadReady、onRead 处理后,将调用 CodecClient::onData
回调方法。这里介绍的上游响应处理流程与下游请求处理流程的区别是,下游的 L4 过滤器是在监听器接收连接阶段执行 createNetworkFilterChain 方法创建的过滤器处理链,并在该方法内调用每个过滤器工厂类的 createFilterFactoryFromProtoTyped 方法来创建过滤器工厂。然后过滤器工厂将在收到用户请求数据时创建过滤器对象并通过 addReadFilter 方法将过滤器对象加入过滤器管理器
工作线程在创建上游连接时,上游连接关联的 CodecClient 对象构造方法中调用了 addReadFilter 方法来直接将自身对象添加为 L4 读过滤器。因此在收到上游发送回来的响应字节流后将进入 L4 网络过滤器 CodecClient::onData
方法:
// source/common/http/codec_client.cc
void CodecClient::onData(Buffer::Instance& data) {
const Status status = codec_->dispatch(data);
...
}
因此当收到上游响应时,CodecClient::onData
方法调用 codec_->dispatch
触发 Node.js HTTP 解析库的回调方法。对于 HTTP1,这个 codec_
为 Http1::ClientConnectionImpl
对象:
// source/common/http/http1/codec_impl.cc
Http::Status ClientConnectionImpl::dispatch(Buffer::Instance& data) {
Http::Status status = ConnectionImpl::dispatch(data);
if (status.ok() && data.length() > 0) {
// The HTTP/1.1 codec pauses dispatch after a single response is complete. Extraneous data
// after a response is complete indicates an error.
return codecProtocolError("http/1.1 protocol error: extraneous data after response complete");
}
return status;
}
在 HTTP 头部接收完毕后,ClientConnectionImpl::onHeadersCompleteBase
被执行,这里将检查状态码 statusCode,完成检查后调用 UpstreamRequest::decodeHeaders
方法将上游响应向下游反向发送:
// source/common/http/http1/codec_impl.cc
Envoy::StatusOr<CallbackResult> ClientConnectionImpl::onHeadersCompleteBase() {
...
pending_response_.value().decoder_->decodeHeaders(std::move(headers), false);
...
}
UpstreamRequest::decodeHeaders
方法根据响应头部填充 stream_info_
内容,用于定期观测数据的收集上报,然后调用 Filter::onUpstreamHeaders
方法找到对应该上游请求的原始下游请求并继续向下游推送:
// source/common/router/upstream_request.cc
void UpstreamRequest::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
...
stream_info_.setResponseCode(static_cast<uint32_t>(response_code));
...
parent_.onUpstreamHeaders(response_code, std::move(headers), *this, end_stream);
}
2)、发送响应数据
此时上游响应进入 L7 过滤器 Filter::onUpstreamHeaders
回调方法,Filter::onUpstreamHeaders
方法在前面介绍过,其包含重试、HTTP 重定向、响应有效性判断等响应预处理工作,响应预处理完成后调用 ActiveStreamDecoderFilter::encodeHeaders
方法。响应发送处理流程如下图所示:

在上游接收的响应经过 CodecClient 内的 HTTP 解码器处理后,将调用 L7 路由过滤器 Filter::onUpstreamHeaders
方法向下游发送:
// source/common/router/router.cc
void Filter::onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPtr&& headers,
UpstreamRequest& upstream_request, bool end_stream) {
...
const RetryStatus retry_status = retry_state_->shouldRetryHeaders(
*headers, *downstream_headers_,
[this, can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_,
had_early_data = upstream_request.upstreamStreamOptions().can_send_early_data_](
bool disable_early_data) -> void {
doRetry((disable_early_data ? false : had_early_data), can_use_http3, TimeoutRetry::No);
});
...
if (route_entry_->internalRedirectPolicy().enabled() &&
route_entry_->internalRedirectPolicy().shouldRedirectForResponseCode(
static_cast<Http::Code>(response_code)) &&
setupRedirect(*headers)) { // 处理 HTTP 重定向
return;
// If the redirect could not be handled, fail open and let it pass to the
// next downstream.
}
...
callbacks_->encodeHeaders(std::move(headers), end_stream,
StreamInfo::ResponseCodeDetails::get().ViaUpstream);
}
ActiveStreamDecoderFilter::encodeHeaders
方法为包装类型的方法,其调用 L7 过滤器管理器的 FilterManager::encodeHeaders
方法。如果下游配置了 L7 过滤器处理写请求,则这里将执行过滤器的 encodeHeaders 方法,完成过滤器处理后执行 ActiveStream::encodeHeaders
方法。变量 filter_manager_callbacks_
保存的是原始下游请求对象 ActiveStream,接下来将通过原始下游请求对象 ActiveStream 将响应数据发送到下游连接。相关代码如下:
// source/common/http/filter_manager.cc
void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHeaderMap& headers,
bool end_stream) {
...
for (; entry != encoder_filters_.end(); entry++) {
...
// 执行注册的其他 L7 过滤器 encode 处理逻辑
FilterHeadersStatus status = (*entry)->handle_->encodeHeaders(headers, (*entry)->end_stream_);
...
}
...
// 调用下游请求对象 ActiveStream 的 encodeHeaders 发送响应
filter_manager_callbacks_.encodeHeaders(headers, modified_end_stream);
...
}
ActiveStream::encodeHeaders
方法内进行了发送前响应头部 headers 内容的调整,对于 HTTP1,将调用 ResponseEncoderImpl::encodeHeaders
方法进行头部数据处理:
// source/common/http/conn_manager_impl.cc
void ConnectionManagerImpl::ActiveStream::encodeHeaders(ResponseHeaderMap& headers,
bool end_stream) {
...
// Now actually encode via the codec.
// 调用 ResponseEncoderImpl::encodeHeaders 方法进行头部数据处理
response_encoder_->encodeHeaders(headers, end_stream);
}
类似于上游数据发送前 RequestEnvoderImpl 的编码操作,ResponseEncoderImpl::encodeHeaders
方法也是首先将 HTTP 对象通过 copyToBuffer 等操作写入字节流 Buffer,然后调用 encodeHeadersBase 方法将字节流发送到 L4 连接对象上:
// source/common/http/http1/codec_impl.cc
void ResponseEncoderImpl::encodeHeaders(const ResponseHeaderMap& headers, bool end_stream) {
...
// 调用 encodeHeadersBase 方法将字节流发送到 L4 连接对象上
encodeHeadersBase(headers, absl::make_optional<uint64_t>(numeric_status), end_stream, false);
}
StreamEncoderImpl::encodeHeadersBase
方法通过 flushOutput 将响应数据发送到下游连接:
// source/common/http/http1/codec_impl.cc
void StreamEncoderImpl::encodeHeadersBase(const RequestOrResponseHeaderMap& headers,
absl::optional<uint64_t> status, bool end_stream,
bool bodiless_request) {
...
flushOutput();
...
}
void StreamEncoderImpl::flushOutput(bool end_encode) {
auto encoded_bytes = connection_.flushOutput(end_encode);
bytes_meter_->addWireBytesSent(encoded_bytes);
}
当调用 L4 的 ConnectionImpl::write
方法时,并不是同步地将数据写入网络 Socket 并发送出去的,而是首先将发送数据保存到发送缓冲区 write_buffer_
中,然后将一个网络写事件排队发送到当前线程时间队列中,等待事件被 Dispatcher 触发。另外,在将待发送数据放入队列前,会执行 L4 网络过滤器的 onWrite 回调方法,进行相应数据的处理,如应用自定义了 L4 写限流配置等:
// source/common/network/connection_impl.cc
void ConnectionImpl::write(Buffer::Instance& data, bool end_stream, bool through_filter_chain) {
...
// 执行 L4 写过滤器
FilterStatus status = filter_manager_.onWrite();
...
write_end_stream_ = end_stream;
if (data.length() > 0 || end_stream) {
ENVOY_CONN_LOG(trace, "writing {} bytes, end_stream {}", *this, data.length(), end_stream);
// TODO(mattklein123): All data currently gets moved from the source buffer to the write buffer.
// This can lead to inefficient behavior if writing a bunch of small chunks. In this case, it
// would likely be more efficient to copy data below a certain size. VERY IMPORTANT: If this is
// ever changed, read the comment in SslSocket::doWrite() VERY carefully. That code assumes that
// we never change existing write_buffer_ chain elements between calls to SSL_write(). That code
// might need to change if we ever copy here.
// 将待发送数据放入写队列
write_buffer_->move(data);
// Activating a write event before the socket is connected has the side-effect of tricking
// doWriteReady into thinking the socket is connected. On macOS, the underlying write may fail
// with a connection error if a call to write(2) occurs before the connection is completed.
if (!connecting_) {
// 排队发送网络写事件
ioHandle().activateFileEvents(Event::FileReadyType::Write);
}
}
}
当 Dispatcher 处理网络写事件时,将调用 onWriteReady 回调方法,然后通过 onWriteReady 回调方法,然后通过 transport_socket_->doWrite
方法将待发送缓冲区的内容 write_buffer_
发送到下游网络连接:
// source/common/network/connection_impl.cc
void ConnectionImpl::onWriteReady() {
...
IoResult result = transport_socket_->doWrite(*write_buffer_, write_end_stream_);
...
}
从上面可以看出,响应数据路径处理步骤比请求路径处理步骤简单些,少了路由处理部分及连接池创建部分的逻辑。因为等待服务端响应时,反向发送响应的路径应该已经准备好了
参考:
《Istio权威指南(下) 云原生服务网格Istio架构与源码》
为何 Envoy 会经常全量推送路由变更,以及如何改善(上)
《Istio & Envoy 内幕》
Life of a Request
请求的生命周期