当前位置: 首页 > news >正文

apache http client连接池实现原理

在java开发中我们经常会涉及到http 请求接口,一般有几种方式:

  • java自带的 HttpURLConnection
  • okHttpClient
  • apache http client

一般我们使用apache http client会比较多点,在代码中会进行如下调用方式:

 private static class HttpClientPool {private static CloseableHttpClient httpClient;private static PoolingHttpClientConnectionManager HTTP_CLIENT_POOL = new PoolingHttpClientConnectionManager();static {//连接池最大连接数:300HTTP_CLIENT_POOL.setMaxTotal(300);// 每个路由的最大连接数HTTP_CLIENT_POOL.setDefaultMaxPerRoute(50);if (httpClient == null) {httpClient = HttpClients.custom().setConnectionManager(HTTP_CLIENT_POOL).setDefaultRequestConfig(RequestConfig.custom().setConnectTimeout(10000).setSocketTimeout(10000).build()).setDefaultSocketConfig(SocketConfig.custom().setSoTimeout(10000).build()).build();}Runtime.getRuntime().addShutdownHook(new Thread(() -> {try {httpClient.close();HTTP_CLIENT_POOL.close();} catch (Exception e) {LoggerFactory.getLogger(com.ipinyou.mip.base.utils.HttpClientHolder.class).error("关闭httpClient 连接池失败!", e);}}));}public static CloseableHttpClient getHttpClient() {return httpClient;}}private static HttpResponseValue httpCallWithHttpClient(String url, HttpMethodEnum method, Map<String, String> formData, String jsonData, Map<String, String> headers) {HttpResponseValue responseValue = new HttpResponseValue();if (url == null || StringUtils.isBlank(url)) {log.info("url is empty,return ");return responseValue;}long start = System.currentTimeMillis();HttpPost post = null;HttpGet get = null;try {// 设置请求超时时间为 10 分钟RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(HTTP_CONNECT_TIMEOUT_MS) // 连接超时时间.setSocketTimeout(HTTP_CONNECT_TIMEOUT_MS) // 数据传输超时时间.build();if (method == HttpMethodEnum.POST) {post = new HttpPost(url);post.setConfig(requestConfig);if (headers != null && !headers.isEmpty()) {for (Map.Entry<String, String> kv : headers.entrySet()) {post.addHeader(kv.getKey(), kv.getValue());}}if (formData != null && !formData.isEmpty()) {List<NameValuePair> nvps = new ArrayList<>();formData.forEach((k, v) -> nvps.add(new BasicNameValuePair(k, v)));post.setEntity(new UrlEncodedFormEntity(nvps, DEFAULT_UTF8_ENCODING));} else if (StringUtils.isNotBlank(jsonData)) {StringEntity entity = new StringEntity(jsonData, ContentType.APPLICATION_JSON);entity.setContentEncoding(new BasicHeader(HTTP.CONTENT_TYPE, CONTENT_TYPE_APPLICATION_JSON));post.setEntity(entity);}}if (method == HttpMethodEnum.GET) {get = new HttpGet();get.setConfig(requestConfig);}CloseableHttpClient client = HttpClientPool.getHttpClient();//entity编码指定urf-8CloseableHttpResponse response = null;if (method == HttpMethodEnum.GET) {response = client.execute(get);} else {response = client.execute(post);}HttpEntity entity = response.getEntity();if (entity != null) {responseValue.content = EntityUtils.toString(entity, DEFAULT_UTF8_ENCODING);}response.close();responseValue.code = 200;} catch (Exception e) {log.info("httpCallWithHttpClient failed: {} ", e);} finally {log.info("httpCallWithHttpClient url=[{}],cost={} ms,", url, (System.currentTimeMillis() - start));}return responseValue;}

今天我们来研究下,apache http client的源码。
HttpClient是其内部一个抽象接口:
在这里插入图片描述
这里我们以InternalHttpClient为例来研究(CloseableHttpClient是一个抽象类):

    public CloseableHttpResponse execute(final HttpHost target,final HttpRequest request,final HttpContext context) throws IOException, ClientProtocolException {return doExecute(target, request, context);}protected CloseableHttpResponse doExecute(final HttpHost target,final HttpRequest request,final HttpContext context) throws IOException, ClientProtocolException {Args.notNull(request, "HTTP request");HttpExecutionAware execAware = null;if (request instanceof HttpExecutionAware) {execAware = (HttpExecutionAware) request;}try {final HttpRequestWrapper wrapper = HttpRequestWrapper.wrap(request, target);final HttpClientContext localcontext = HttpClientContext.adapt(context != null ? context : new BasicHttpContext());RequestConfig config = null;if (request instanceof Configurable) {config = ((Configurable) request).getConfig();}if (config == null) {final HttpParams params = request.getParams();if (params instanceof HttpParamsNames) {if (!((HttpParamsNames) params).getNames().isEmpty()) {config = HttpClientParamConfig.getRequestConfig(params, this.defaultConfig);}} else {config = HttpClientParamConfig.getRequestConfig(params, this.defaultConfig);}}if (config != null) {localcontext.setRequestConfig(config);}setupContext(localcontext);final HttpRoute route = determineRoute(target, wrapper, localcontext);return this.execChain.execute(route, wrapper, localcontext, execAware);} catch (final HttpException httpException) {throw new ClientProtocolException(httpException);}}

实际执行发送请求是在InternalHttpClient中,发送时会根据determineRoute来确定本次请求的路径,请求中的HttpPost、HttpGet都是实现了HttpUriRequest接口,在请求前,会通过HttpUriRequest获取到请求的地址信息,将其封装到HttpHost中,主要包含如下信息:

public final class HttpHost implements java.lang.Cloneable, java.io.Serializable {public static final java.lang.String DEFAULT_SCHEME_NAME = "http";protected final java.lang.String hostname;protected final java.lang.String lcHostname;protected final int port;protected final java.lang.String schemeName;protected final java.net.InetAddress address;@Overridepublic boolean equals(final Object obj) {if (this == obj) {return true;}if (obj instanceof HttpHost) {final HttpHost that = (HttpHost) obj;return this.lcHostname.equals(that.lcHostname)&& this.port == that.port&& this.schemeName.equals(that.schemeName)&& (this.address==null ? that.address== null : this.address.equals(that.address));} else {return false;}}@Overridepublic int hashCode() {int hash = LangUtils.HASH_SEED;hash = LangUtils.hashCode(hash, this.lcHostname);hash = LangUtils.hashCode(hash, this.port);hash = LangUtils.hashCode(hash, this.schemeName);if (address!=null) {hash = LangUtils.hashCode(hash, address);}return hash;}}

可以看到,主要包含了请求地址host,端口,协议(http、https),尤其需要注意其重写了equalshashCode方法,可以看到,判断两个HttpHost 是否一样,主要是看协议(http、https)、地址、端口号
然后根据通过routePlanner.determineRouteHttpHost和一些其他信息封装到HttpRoute,表示一个请求的路由信息:

public final class HttpRoute implements RouteInfo, Cloneable {
private final HttpHost targetHost;private final InetAddress localAddress;private final List<HttpHost> proxyChain;private final TunnelType tunnelled;private final LayerType layered;private final boolean secure;@Overridepublic final boolean equals(final Object obj) {if (this == obj) {return true;}if (obj instanceof HttpRoute) {final HttpRoute that = (HttpRoute) obj;return// Do the cheapest tests first(this.secure    == that.secure) &&(this.tunnelled == that.tunnelled) &&(this.layered   == that.layered) &&LangUtils.equals(this.targetHost, that.targetHost) &&LangUtils.equals(this.localAddress, that.localAddress) &&LangUtils.equals(this.proxyChain, that.proxyChain);} else {return false;}}@Overridepublic final int hashCode() {int hash = LangUtils.HASH_SEED;hash = LangUtils.hashCode(hash, this.targetHost);hash = LangUtils.hashCode(hash, this.localAddress);if (this.proxyChain != null) {for (final HttpHost element : this.proxyChain) {hash = LangUtils.hashCode(hash, element);}}hash = LangUtils.hashCode(hash, this.secure);hash = LangUtils.hashCode(hash, this.tunnelled);hash = LangUtils.hashCode(hash, this.layered);return hash;}
}

需要注意的是,HttpRoute 重写了equalshashCode方法,也就是说,一般常规情况下,两个HttpRoute 是否相等,主要就是协议、地址、端口号,也就是说,我们经常设置的连接池的setDefaultMaxPerRoute这里设置的是协议、地址、端口号 为分类

通过HttpRoute,apache http client将会找到服务端并建立连接。

加下来通过ClientExecChain进行请求的发送:

public interface ClientExecChain {CloseableHttpResponse execute(HttpRoute route,HttpRequestWrapper request,HttpClientContext clientContext,HttpExecutionAware execAware) throws IOException, HttpException;}

其主要实现类为MainClientExec:

public CloseableHttpResponse execute(final HttpRoute route,final HttpRequestWrapper request,final HttpClientContext context,final HttpExecutionAware execAware) throws IOException, HttpException {Args.notNull(route, "HTTP route");Args.notNull(request, "HTTP request");Args.notNull(context, "HTTP context");AuthState targetAuthState = context.getTargetAuthState();if (targetAuthState == null) {targetAuthState = new AuthState();context.setAttribute(HttpClientContext.TARGET_AUTH_STATE, targetAuthState);}AuthState proxyAuthState = context.getProxyAuthState();if (proxyAuthState == null) {proxyAuthState = new AuthState();context.setAttribute(HttpClientContext.PROXY_AUTH_STATE, proxyAuthState);}if (request instanceof HttpEntityEnclosingRequest) {RequestEntityProxy.enhance((HttpEntityEnclosingRequest) request);}Object userToken = context.getUserToken();final ConnectionRequest connRequest = connManager.requestConnection(route, userToken);if (execAware != null) {if (execAware.isAborted()) {connRequest.cancel();throw new RequestAbortedException("Request aborted");} else {execAware.setCancellable(connRequest);}}final RequestConfig config = context.getRequestConfig();final HttpClientConnection managedConn;try {final int timeout = config.getConnectionRequestTimeout();managedConn = connRequest.get(timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);} catch(final InterruptedException interrupted) {Thread.currentThread().interrupt();throw new RequestAbortedException("Request aborted", interrupted);} catch(final ExecutionException ex) {Throwable cause = ex.getCause();if (cause == null) {cause = ex;}throw new RequestAbortedException("Request execution failed", cause);}context.setAttribute(HttpCoreContext.HTTP_CONNECTION, managedConn);if (config.isStaleConnectionCheckEnabled()) {// validate connectionif (managedConn.isOpen()) {this.log.debug("Stale connection check");if (managedConn.isStale()) {this.log.debug("Stale connection detected");managedConn.close();}}}final ConnectionHolder connHolder = new ConnectionHolder(this.log, this.connManager, managedConn);try {if (execAware != null) {execAware.setCancellable(connHolder);}HttpResponse response;for (int execCount = 1;; execCount++) {if (execCount > 1 && !RequestEntityProxy.isRepeatable(request)) {throw new NonRepeatableRequestException("Cannot retry request " +"with a non-repeatable request entity.");}if (execAware != null && execAware.isAborted()) {throw new RequestAbortedException("Request aborted");}if (!managedConn.isOpen()) {this.log.debug("Opening connection " + route);try {establishRoute(proxyAuthState, managedConn, route, request, context);} catch (final TunnelRefusedException ex) {if (this.log.isDebugEnabled()) {this.log.debug(ex.getMessage());}response = ex.getResponse();break;}}final int timeout = config.getSocketTimeout();if (timeout >= 0) {managedConn.setSocketTimeout(timeout);}if (execAware != null && execAware.isAborted()) {throw new RequestAbortedException("Request aborted");}if (this.log.isDebugEnabled()) {this.log.debug("Executing request " + request.getRequestLine());}if (!request.containsHeader(AUTH.WWW_AUTH_RESP)) {if (this.log.isDebugEnabled()) {this.log.debug("Target auth state: " + targetAuthState.getState());}this.authenticator.generateAuthResponse(request, targetAuthState, context);}if (!request.containsHeader(AUTH.PROXY_AUTH_RESP) && !route.isTunnelled()) {if (this.log.isDebugEnabled()) {this.log.debug("Proxy auth state: " + proxyAuthState.getState());}this.authenticator.generateAuthResponse(request, proxyAuthState, context);}response = requestExecutor.execute(request, managedConn, context);// The connection is in or can be brought to a re-usable state.if (reuseStrategy.keepAlive(response, context)) {// Set the idle duration of this connectionfinal long duration = keepAliveStrategy.getKeepAliveDuration(response, context);if (this.log.isDebugEnabled()) {final String s;if (duration > 0) {s = "for " + duration + " " + TimeUnit.MILLISECONDS;} else {s = "indefinitely";}this.log.debug("Connection can be kept alive " + s);}connHolder.setValidFor(duration, TimeUnit.MILLISECONDS);connHolder.markReusable();} else {connHolder.markNonReusable();}if (needAuthentication(targetAuthState, proxyAuthState, route, response, context)) {// Make sure the response body is fully consumed, if presentfinal HttpEntity entity = response.getEntity();if (connHolder.isReusable()) {EntityUtils.consume(entity);} else {managedConn.close();if (proxyAuthState.getState() == AuthProtocolState.SUCCESS&& proxyAuthState.getAuthScheme() != null&& proxyAuthState.getAuthScheme().isConnectionBased()) {this.log.debug("Resetting proxy auth state");proxyAuthState.reset();}if (targetAuthState.getState() == AuthProtocolState.SUCCESS&& targetAuthState.getAuthScheme() != null&& targetAuthState.getAuthScheme().isConnectionBased()) {this.log.debug("Resetting target auth state");targetAuthState.reset();}}// discard previous auth headersfinal HttpRequest original = request.getOriginal();if (!original.containsHeader(AUTH.WWW_AUTH_RESP)) {request.removeHeaders(AUTH.WWW_AUTH_RESP);}if (!original.containsHeader(AUTH.PROXY_AUTH_RESP)) {request.removeHeaders(AUTH.PROXY_AUTH_RESP);}} else {break;}}if (userToken == null) {userToken = userTokenHandler.getUserToken(context);context.setAttribute(HttpClientContext.USER_TOKEN, userToken);}if (userToken != null) {connHolder.setState(userToken);}// check for entity, release connection if possiblefinal HttpEntity entity = response.getEntity();if (entity == null || !entity.isStreaming()) {// connection not needed and (assumed to be) in re-usable stateconnHolder.releaseConnection();return new HttpResponseProxy(response, null);} else {return new HttpResponseProxy(response, connHolder);}} catch (final ConnectionShutdownException ex) {final InterruptedIOException ioex = new InterruptedIOException("Connection has been shut down");ioex.initCause(ex);throw ioex;} catch (final HttpException ex) {connHolder.abortConnection();throw ex;} catch (final IOException ex) {connHolder.abortConnection();throw ex;} catch (final RuntimeException ex) {connHolder.abortConnection();throw ex;}}

实际发送的这个方法比较长,我们分几段看。

第一步是根据上述的HttpRoute和服务端建立TCP连接

通过connManager.requestConnection(route, userToken);建立连接,在HttpClientBuilder中,设置connManagerPoolingHttpClientConnectionManager

public ConnectionRequest requestConnection(final HttpRoute route,final Object state) {Args.notNull(route, "HTTP route");if (this.log.isDebugEnabled()) {this.log.debug("Connection request: " + format(route, state) + formatStats(route));}final Future<CPoolEntry> future = this.pool.lease(route, state, null);return new ConnectionRequest() {@Overridepublic boolean cancel() {return future.cancel(true);}@Overridepublic HttpClientConnection get(final long timeout,final TimeUnit tunit) throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {return leaseConnection(future, timeout, tunit);}};}

上面这一连串下来,起始就是要通过中连接池拿连接,建立连接的实际调用在其父类AbstractConnPool中:

private E getPoolEntryBlocking(final T route, final Object state,final long timeout, final TimeUnit tunit,final Future<E> future) throws IOException, InterruptedException, TimeoutException {Date deadline = null;if (timeout > 0) {deadline = new Date (System.currentTimeMillis() + tunit.toMillis(timeout));}this.lock.lock();try {final RouteSpecificPool<T, C, E> pool = getPool(route);E entry;for (;;) {Asserts.check(!this.isShutDown, "Connection pool shut down");for (;;) {entry = pool.getFree(state);if (entry == null) {break;}if (entry.isExpired(System.currentTimeMillis())) {entry.close();}if (entry.isClosed()) {this.available.remove(entry);pool.free(entry, false);} else {break;}}if (entry != null) {this.available.remove(entry);this.leased.add(entry);onReuse(entry);return entry;}// New connection is neededfinal int maxPerRoute = getMax(route);// Shrink the pool prior to allocating a new connectionfinal int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);if (excess > 0) {for (int i = 0; i < excess; i++) {final E lastUsed = pool.getLastUsed();if (lastUsed == null) {break;}lastUsed.close();this.available.remove(lastUsed);pool.remove(lastUsed);}}if (pool.getAllocatedCount() < maxPerRoute) {final int totalUsed = this.leased.size();final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);if (freeCapacity > 0) {final int totalAvailable = this.available.size();if (totalAvailable > freeCapacity - 1) {if (!this.available.isEmpty()) {final E lastUsed = this.available.removeLast();lastUsed.close();final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());otherpool.remove(lastUsed);}}final C conn = this.connFactory.create(route);entry = pool.add(conn);this.leased.add(entry);return entry;}}boolean success = false;try {if (future.isCancelled()) {throw new InterruptedException("Operation interrupted");}pool.queue(future);this.pending.add(future);if (deadline != null) {success = this.condition.awaitUntil(deadline);} else {this.condition.await();success = true;}if (future.isCancelled()) {throw new InterruptedException("Operation interrupted");}} finally {pool.unqueue(future);this.pending.remove(future);}if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {break;}}throw new TimeoutException("Timeout waiting for connection");} finally {this.lock.unlock();}}

这里面当连接不够时,如果没超过maxPerRoutemaxTotal就会在创建一个链接,最终通过ManagedHttpClientConnectionFactory创建:

    public ManagedHttpClientConnection create(final HttpRoute route, final ConnectionConfig config) {final ConnectionConfig cconfig = config != null ? config : ConnectionConfig.DEFAULT;CharsetDecoder chardecoder = null;CharsetEncoder charencoder = null;final Charset charset = cconfig.getCharset();final CodingErrorAction malformedInputAction = cconfig.getMalformedInputAction() != null ?cconfig.getMalformedInputAction() : CodingErrorAction.REPORT;final CodingErrorAction unmappableInputAction = cconfig.getUnmappableInputAction() != null ?cconfig.getUnmappableInputAction() : CodingErrorAction.REPORT;if (charset != null) {chardecoder = charset.newDecoder();chardecoder.onMalformedInput(malformedInputAction);chardecoder.onUnmappableCharacter(unmappableInputAction);charencoder = charset.newEncoder();charencoder.onMalformedInput(malformedInputAction);charencoder.onUnmappableCharacter(unmappableInputAction);}final String id = "http-outgoing-" + Long.toString(COUNTER.getAndIncrement());return new LoggingManagedHttpClientConnection(id,log,headerlog,wirelog,cconfig.getBufferSize(),cconfig.getFragmentSizeHint(),chardecoder,charencoder,cconfig.getMessageConstraints(),incomingContentStrategy,outgoingContentStrategy,requestWriterFactory,responseParserFactory);}

到这里我们获取到了一个HttpClientConnection,但是这个时候并没有建立真正的连接。加下来通过:

 this.connManager.connect(managedConn,route,timeout > 0 ? timeout : 0,context);
public void  connect(final HttpClientConnection managedConn,final HttpRoute route,final int connectTimeout,final HttpContext context) throws IOException {Args.notNull(managedConn, "Managed Connection");Args.notNull(route, "HTTP route");final ManagedHttpClientConnection conn;synchronized (managedConn) {final CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);conn = entry.getConnection();}final HttpHost host;if (route.getProxyHost() != null) {host = route.getProxyHost();} else {host = route.getTargetHost();}final InetSocketAddress localAddress = route.getLocalSocketAddress();SocketConfig socketConfig = this.configData.getSocketConfig(host);if (socketConfig == null) {socketConfig = this.configData.getDefaultSocketConfig();}if (socketConfig == null) {socketConfig = SocketConfig.DEFAULT;}this.connectionOperator.connect(conn, host, localAddress, connectTimeout, socketConfig, context);} public void connect(final ManagedHttpClientConnection conn,final HttpHost host,final InetSocketAddress localAddress,final int connectTimeout,final SocketConfig socketConfig,final HttpContext context) throws IOException {final Lookup<ConnectionSocketFactory> registry = getSocketFactoryRegistry(context);final ConnectionSocketFactory sf = registry.lookup(host.getSchemeName());if (sf == null) {throw new UnsupportedSchemeException(host.getSchemeName() +" protocol is not supported");}final InetAddress[] addresses = host.getAddress() != null ?new InetAddress[] { host.getAddress() } : this.dnsResolver.resolve(host.getHostName());final int port = this.schemePortResolver.resolve(host);for (int i = 0; i < addresses.length; i++) {final InetAddress address = addresses[i];final boolean last = i == addresses.length - 1;Socket sock = sf.createSocket(context);sock.setSoTimeout(socketConfig.getSoTimeout());sock.setReuseAddress(socketConfig.isSoReuseAddress());sock.setTcpNoDelay(socketConfig.isTcpNoDelay());sock.setKeepAlive(socketConfig.isSoKeepAlive());if (socketConfig.getRcvBufSize() > 0) {sock.setReceiveBufferSize(socketConfig.getRcvBufSize());}if (socketConfig.getSndBufSize() > 0) {sock.setSendBufferSize(socketConfig.getSndBufSize());}final int linger = socketConfig.getSoLinger();if (linger >= 0) {sock.setSoLinger(true, linger);}conn.bind(sock);final InetSocketAddress remoteAddress = new InetSocketAddress(address, port);if (this.log.isDebugEnabled()) {this.log.debug("Connecting to " + remoteAddress);}try {sock = sf.connectSocket(connectTimeout, sock, host, remoteAddress, localAddress, context);conn.bind(sock);if (this.log.isDebugEnabled()) {this.log.debug("Connection established " + conn);}return;} catch (final SocketTimeoutException ex) {if (last) {throw new ConnectTimeoutException(ex, host, addresses);}} catch (final ConnectException ex) {if (last) {final String msg = ex.getMessage();if ("Connection timed out".equals(msg)) {throw new ConnectTimeoutException(ex, host, addresses);} else {throw new HttpHostConnectException(ex, host, addresses);}}} catch (final NoRouteToHostException ex) {if (last) {throw ex;}}if (this.log.isDebugEnabled()) {this.log.debug("Connect to " + remoteAddress + " timed out. " +"Connection will be retried using another IP address");}}}                   

建立真正的连接。以http协议为例,这里通过PlainConnectionSocketFactory创建一个普通的Socket,然后绑定到到连接上:

public Socket createSocket(final HttpContext context) throws IOException {return new Socket();}@Overridepublic Socket connectSocket(final int connectTimeout,final Socket socket,final HttpHost host,final InetSocketAddress remoteAddress,final InetSocketAddress localAddress,final HttpContext context) throws IOException {final Socket sock = socket != null ? socket : createSocket(context);if (localAddress != null) {sock.bind(localAddress);}try {sock.connect(remoteAddress, connectTimeout);} catch (final IOException ex) {try {sock.close();} catch (final IOException ignore) {}throw ex;}return sock;}

到这一步,连接就真正建立起来了。

第二步发送请求信息

请求的发送,是通过HttpRequestExecutor进行发送的:

protected HttpResponse doSendRequest(final HttpRequest request,final HttpClientConnection conn,final HttpContext context) throws IOException, HttpException {Args.notNull(request, "HTTP request");Args.notNull(conn, "Client connection");Args.notNull(context, "HTTP context");HttpResponse response = null;context.setAttribute(HttpCoreContext.HTTP_CONNECTION, conn);context.setAttribute(HttpCoreContext.HTTP_REQ_SENT, Boolean.FALSE);conn.sendRequestHeader(request);if (request instanceof HttpEntityEnclosingRequest) {// Check for expect-continue handshake. We have to flush the// headers and wait for an 100-continue response to handle it.// If we get a different response, we must not send the entity.boolean sendentity = true;final ProtocolVersion ver =request.getRequestLine().getProtocolVersion();if (((HttpEntityEnclosingRequest) request).expectContinue() &&!ver.lessEquals(HttpVersion.HTTP_1_0)) {conn.flush();// As suggested by RFC 2616 section 8.2.3, we don't wait for a// 100-continue response forever. On timeout, send the entity.if (conn.isResponseAvailable(this.waitForContinue)) {response = conn.receiveResponseHeader();if (canResponseHaveBody(request, response)) {conn.receiveResponseEntity(response);}final int status = response.getStatusLine().getStatusCode();if (status < 200) {if (status != HttpStatus.SC_CONTINUE) {throw new ProtocolException("Unexpected response: " + response.getStatusLine());}// discard 100-continueresponse = null;} else {sendentity = false;}}}if (sendentity) {conn.sendRequestEntity((HttpEntityEnclosingRequest) request);}}conn.flush();context.setAttribute(HttpCoreContext.HTTP_REQ_SENT, Boolean.TRUE);return response;}

首先是写入请求的header:

public void write(final T message) throws IOException, HttpException {Args.notNull(message, "HTTP message");writeHeadLine(message);for (final HeaderIterator it = message.headerIterator(); it.hasNext(); ) {final Header header = it.nextHeader();this.sessionBuffer.writeLine(lineFormatter.formatHeader(this.lineBuf, header));}this.lineBuf.clear();this.sessionBuffer.writeLine(this.lineBuf);}protected void doFormatHeader(final CharArrayBuffer buffer,final Header header) {final String name = header.getName();final String value = header.getValue();int len = name.length() + 2;if (value != null) {len += value.length();}buffer.ensureCapacity(len);buffer.append(name);buffer.append(": ");if (value != null) {buffer.append(value);}}

可以看到,http请求中,header每个key-value分行写入,并且按照Key: Value的格式。

写入完header之后,接下来就会写入请求体:

public void sendRequestEntity(final HttpEntityEnclosingRequest request)throws HttpException, IOException {Args.notNull(request, "HTTP request");assertOpen();if (request.getEntity() == null) {return;}this.entityserializer.serialize(this.outbuffer,request,request.getEntity());}
public void serialize(final SessionOutputBuffer outbuffer,final HttpMessage message,final HttpEntity entity) throws HttpException, IOException {Args.notNull(outbuffer, "Session output buffer");Args.notNull(message, "HTTP message");Args.notNull(entity, "HTTP entity");final OutputStream outstream = doSerialize(outbuffer, message);entity.writeTo(outstream);outstream.close();}    
protected OutputStream doSerialize(final SessionOutputBuffer outbuffer,final HttpMessage message) throws HttpException, IOException {final long len = this.lenStrategy.determineLength(message);if (len == ContentLengthStrategy.CHUNKED) {return new ChunkedOutputStream(outbuffer);} else if (len == ContentLengthStrategy.IDENTITY) {// 默认走这里return new IdentityOutputStream(outbuffer);} else {return new ContentLengthOutputStream(outbuffer, len);}}    

如果是我们常见的UrlEncodedFormEntity,则是拼接成 key1=value1&key2=value2的格式,获取其byte数组写入到流中。

第三步获取响应

在发送完请求之后,会通过doReceiveResponse获取响应:

protected HttpResponse doReceiveResponse(final HttpRequest request,final HttpClientConnection conn,final HttpContext context) throws HttpException, IOException {Args.notNull(request, "HTTP request");Args.notNull(conn, "Client connection");Args.notNull(context, "HTTP context");HttpResponse response = null;int statusCode = 0;while (response == null || statusCode < HttpStatus.SC_OK) {response = conn.receiveResponseHeader();if (canResponseHaveBody(request, response)) {conn.receiveResponseEntity(response);}statusCode = response.getStatusLine().getStatusCode();} // while intermediate responsereturn response;}

获取响应也是先获取响应的header,然后获取响应体:

public HttpResponse receiveResponseHeader()throws HttpException, IOException {assertOpen();final HttpResponse response = this.responseParser.parse();if (response.getStatusLine().getStatusCode() >= HttpStatus.SC_OK) {this.metrics.incrementResponseCount();}return response;}
public T parse() throws IOException, HttpException {final int st = this.state;switch (st) {case HEAD_LINE:try {this.message = parseHead(this.sessionBuffer);} catch (final ParseException px) {throw new ProtocolException(px.getMessage(), px);}this.state = HEADERS;//$FALL-THROUGH$case HEADERS:final Header[] headers = AbstractMessageParser.parseHeaders(this.sessionBuffer,this.messageConstraints.getMaxHeaderCount(),this.messageConstraints.getMaxLineLength(),this.lineParser,this.headerLines);this.message.setHeaders(headers);final T result = this.message;this.message = null;this.headerLines.clear();this.state = HEAD_LINE;return result;default:throw new IllegalStateException("Inconsistent parser state");}}    

最后返回HttpResponse 响应。

相关文章:

  • Android车载应用开发:Kotlin与Automotive OS深度实践
  • vue好用插件
  • 大语言模型与人工智能:技术演进、生态重构与未来挑战
  • Arthas:Java诊断利器实战指南
  • 网站制作公司哪家强?(2025最新版)
  • 【C语言】(10)—指针4
  • 用户刷题记录日历——签到表功能实现
  • 蓝耘Ubantu服务器测试最新 PP-StructureV3 教程
  • eBay健康类目新标杆,单月24万单的选品公式与流量打法拆解
  • 程序代码篇---Python处理ESP32-S3-cam视频流
  • 分布式电源的配电网无功优化
  • 新导游入行规范与职业发展指导
  • 选择合适的Azure数据库监控工具
  • SOPHGO算能科技BM1688内存使用与编解码开发指南
  • 数据库主从集群 + GTID 实现高可用
  • 【android bluetooth 协议分析 02】【bluetooth hal 层详解 3】【高通蓝牙hal主要流程介绍-上】
  • Java SpringBoot 项目中 Redis 存储 Session 具体实现步骤
  • 虚拟机NAT模式获取不到ip
  • ConcurrentHashMap导致的死锁事故
  • 单片机设计_四轴飞行器(STM32)
  • 网站数据库清空/网站推广怎么做有效果
  • 汕头企业网站建设服务/qq群排名优化软件官网
  • 长春火车站照片/电话营销系统
  • 网站运营分析/营销策略的重要性
  • 网店代运营具体做什么/引擎优化seo怎么做
  • 临安市建设局网站/seo提升排名