ip反查域名网站网站搭建外贸
本篇文章基于 OKHttp 4.11.0 版本阅读的。
1. 介绍
OKHttp 是由 Square 公司开源的,广泛应用于 Android 开发中,并且是 Retrofit 的底层实现。它是一个高效的 HTTP 客户端,适用于 Android 和 Java 应用程序。它支持 HTTP/2、连接池、GZIP 压缩、缓存等功能,能够帮助开发者更高效地处理网络请求。
从 Android 4.4 开始 HttpURLConnection 的底层实现采用的是OKHttp。
2. 基本使用
(1)基本使用
- 添加依赖
implementation "com.squareup.okhttp3.okhttp:$version"
- 使用OkHttp发送一个GET请求
var client = OkHttpClient()var request: Request = Request.Builder().url("https://api.example.com/data").build()var response: Response = client.newCall(request).execute()
var responseData: String = response.body.toString()
- 使用OkHttp发送一个POST请求
OkHttpClient client = new OkHttpClient();RequestBody body = new FormBody.Builder().add("key1", "value1").add("key2", "value2").build();Request request = new Request.Builder().url("https://api.example.com/submit").post(body).build();Response response = client.newCall(request).execute();
String responseData = response.body().string();
(2)调用流程
OkHttp 的工作流程可以概括为以下几个步骤:
- 创建请求:先创建一个OKHttpClient对象,然后通过 Request.Builder 构建一个 Request 对象。
- 执行请求:通过 OkHttpClient.newCall(request) 创建一个 Call 对象,调用 execute() (同步)或 enqueue() (异步)方法执行请求。
- 分发器:Dispatcher 分发任务,它内部维护队列与线程池,完成请求调配。
- 拦截器链:请求会经过一系列的拦截器(如重试、缓存、网络拦截器等),最终发送到服务器。
- 获取响应:服务器返回的响应会经过拦截器链处理,最终返回给调用者。
3. 源码阅读
3.1 创建请求
(1)首先在 OkHttpClient 构造方法中,通过 Builder 模式构建了 OkHttpClient 对象。作为全局的客户端对象,负责管理请求的配置(如超时、缓存、拦截器等)。
// OkHttpClient.kt
...
constructor() : this(Builder())class Builder constructor() {internal var dispatcher: Dispatcher = Dispatcher()internal var connectionPool: ConnectionPool = ConnectionPool()internal val interceptors: MutableList<Interceptor> = mutableListOf()internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()internal var retryOnConnectionFailure = trueinternal var authenticator: Authenticator = Authenticator.NONEinternal var followRedirects = trueinternal var followSslRedirects = true...
}
...
(2)然后通过 Request.Builder 构建一个 Request 对象,也是用了 build 建造者模式。封装了 HTTP 请求到 URL、方法(GET/POST)、请求头、请求体等信息。
// Request.kt
...
open class Builder {internal var url: HttpUrl? = nullinternal var method: Stringinternal var headers: Headers.Builderinternal var body: RequestBody? = null/** A mutable map of tags, or an immutable empty map if we don't have any. */internal var tags: MutableMap<Class<*>, Any> = mutableMapOf()constructor() {this.method = "GET"this.headers = Headers.Builder()}internal constructor(request: Request) {this.url = request.urlthis.method = request.methodthis.body = request.bodythis.tags = if (request.tags.isEmpty()) {mutableMapOf()} else {request.tags.toMutableMap()}this.headers = request.headers.newBuilder()}...}...
3.2 执行请求 和 分发器分发请求
(1)首先通过 OkHttpClient.newCall(request) 创建一个 Call 对象,Call 是一个接口,真正的实现是在 RealCall 中。
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
(2)然后再通过 RealCall 去执行异步请求或同步请求。
- 异步请求 enqueue()
// RealCall.kt
override fun enqueue(responseCallback: Callback) {// 代码1 用 AtomicBoolean 检查 realcall 是否被用过了check(executed.compareAndSet(false, true)) { "Already Executed" }callStart()// 代码2 调用分发器执行异步请求client.dispatcher.enqueue(AsyncCall(responseCallback))}
代码1,用 AtomicBoolean 检查 realcall 是否被用过了。
代码2,调用分发器执行异步请求。这里传的是 AsyncCall,一个 runnable。
下面我们就看一下 Dispatcher 的 enqueue() 方法,
// Dispatcher.kt
...// 代码1 异步等待队列private val readyAsyncCalls = ArrayDeque<AsyncCall>()// 代码2 异步运行队列private val runningAsyncCalls = ArrayDeque<AsyncCall>()internal fun enqueue(call: AsyncCall) {synchronized(this) {// 代码3 将Call 加入到异步等待队列readyAsyncCalls.add(call)// 代码4 判断请求是不是 websocketif (!call.call.forWebSocket) {val existingCall = findExistingCallWithHost(call.host)if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)}}// 代码5promoteAndExecute()}private fun promoteAndExecute(): Boolean {this.assertThreadDoesntHoldLock()val executableCalls = mutableListOf<AsyncCall>()val isRunning: Booleansynchronized(this) {val i = readyAsyncCalls.iterator()// 代码6 迭代异步等待队列while (i.hasNext()) {val asyncCall = i.next()//代码7 所有同时请求数不能大于64if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.// 代码8 同一个host同时请求数不能大于5。if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.// 代码9 将 call 从异步等待队列移除i.remove()asyncCall.callsPerHost.incrementAndGet()executableCalls.add(asyncCall)// 代码10 将 call 加入到异步正在执行队列中runningAsyncCalls.add(asyncCall)}isRunning = runningCallsCount() > 0}for (i in 0 until executableCalls.size) {val asyncCall = executableCalls[i]// 代码11 开始执行asyncCall.executeOn(executorService)}return isRunning}...
在 enqueue() 中,先将我们的 Call 加入到异步等待队列,然后判断请求是不是 websocket,如果不是的,调用 findExistingCallWithHost() 查找有没有已经存在的 host。如果不存在调用 promoteAndExecute(),迭代异步等待队列。这里有两个限制,所有同时请求数不能大于64。同一个host同时请求数不能大于5。如果两个限制条件都满足,将 call 从异步等待队列移除,并加入到异步正在执行队列中,然后将请求任务交给线程池去执行请求。
接着,我们再看一下 AsyncCall 的 run() 方法,这里是分发器异步请求分发流程,
// RealCall.kt
...
internal inner class AsyncCall(private val responseCallback: Callback) : Runnable {// 代码1@Volatile var callsPerHost = AtomicInteger(0)private setfun reuseCallsPerHostFrom(other: AsyncCall) {this.callsPerHost = other.callsPerHost}val host: Stringget() = originalRequest.url.hostval request: Requestget() = originalRequestval call: RealCallget() = this@RealCall/*** Attempt to enqueue this async call on [executorService]. This will attempt to clean up* if the executor has been shut down by reporting the call as failed.*/fun executeOn(executorService: ExecutorService) {client.dispatcher.assertThreadDoesntHoldLock()var success = falsetry {// 代码2 将异步任务放到线程池中executorService.execute(this)success = true} catch (e: RejectedExecutionException) {val ioException = InterruptedIOException("executor rejected")ioException.initCause(e)noMoreExchanges(ioException)responseCallback.onFailure(this@RealCall, ioException)} finally {if (!success) {client.dispatcher.finished(this) // This call is no longer running!}}}override fun run() {threadName("OkHttp ${redactedUrl()}") {var signalledCallback = falsetimeout.enter()try {// 代码3 调用责任分发请求val response = getResponseWithInterceptorChain()signalledCallback = trueresponseCallback.onResponse(this@RealCall, response)} catch (e: IOException) {if (signalledCallback) {// Do not signal the callback twice!Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)} else {responseCallback.onFailure(this@RealCall, e)}} catch (t: Throwable) {cancel()if (!signalledCallback) {val canceledException = IOException("canceled due to $t")canceledException.addSuppressed(t)responseCallback.onFailure(this@RealCall, canceledException)}throw t} finally {// 代码4 请求结束回调client.dispatcher.finished(this)}}}}
将异步任务放到线程池中后,通过 getResponseWithInterceptorChain() 执行请求,请求完成之后调用分发器 dispatch 的 finished() 方法
internal fun finished(call: AsyncCall) {call.callsPerHost.decrementAndGet()finished(runningAsyncCalls, call)}private fun <T> finished(calls: Deque<T>, call: T) {val idleCallback: Runnable?synchronized(this) {if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")idleCallback = this.idleCallback}val isRunning = promoteAndExecute()if (!isRunning && idleCallback != null) {idleCallback.run()}}
在 finish() 方法中,对请求对 host 数减1,并去启动执行任务的方法。
我们再看一下异步请求中的线程池,
// Dispatcher.ktprivate var executorServiceOrNull: ExecutorService? = null@get:Synchronized@get:JvmName("executorService") val executorService: ExecutorServiceget() {if (executorServiceOrNull == null) {executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))}return executorServiceOrNull!!}
Dispatcher 中的线程池是一个核心线程数为 0,最大线程数没有上限,当有任务加入都有新建线程,这是为了能让新来的任务及时执行,而不是等待。
- 同步请求 executed()
// RealCall.ktoverride fun execute(): Response {// 代码1 同 enqueue() 用 AtomicBoolean 检查 realcall 是否被用过了check(executed.compareAndSet(false, true)) { "Already Executed" }timeout.enter()callStart()try {// 代码2 调用分发器执行同步请求client.dispatcher.executed(this)// 代码3 通过拦截器执行请求return getResponseWithInterceptorChain()} finally {// 代码4 请求完执行同步队列的finished。client.dispatcher.finished(this)}}
代码1,同异步请求。
代码2,调用分发器执行同步请求,这里传入的是 RealCall。
代码3,调用 getResponseWithInterceptorChain() 方法,通过拦截器执行请求,后面再看。先看分发器的逻辑。
代码4,请求完执行同步队列的finished。
接着看分发器的 executed() 实现,
// Dispatcher.kt
// 代码1 同步正在运行队列
private val runningSyncCalls = ArrayDeque<RealCall>()/** Used by [Call.execute] to signal it is in-flight. */@Synchronized internal fun executed(call: RealCall) {// 代码2 将 Call 添加到同步正在运行队列中runningSyncCalls.add(call)}/** Used by [Call.execute] to signal completion. */internal fun finished(call: RealCall) {finished(runningSyncCalls, call)}private fun <T> finished(calls: Deque<T>, call: T) {val idleCallback: Runnable?synchronized(this) {if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")idleCallback = this.idleCallback}// 代码3 执行一次异步等待队列到异步正在执行队列的检查val isRunning = promoteAndExecute()if (!isRunning && idleCallback != null) {idleCallback.run()}}
在 Dispatcher 的 executed() 中逻辑很简单,就是将 Call 添加到同步正在运行队列中。同时在同步请求的 finish() 方法中也会执行一次异步等待队列到异步正在执行队列的检查。
3.3 OkHttp 拦截器责任链设计模式
- 先复习一下责任链模式:
它是对象行为型模式,为请求创建了一个接收者对象的链,在处理请求的时候执行过滤(各司其职)。
责任链上的处理者负责处理请求,客户只需要将请求发送到责任链即可,无须关心请求到处理细节和请求到传递,所以职责链将请求的发送者和请求的处理者解耦了。 - 接着看 RealCall 的 getResponseWithInterceptorChain() 方法,在请求需要执行时,通过 getResponseWithInterceptorChain() 获得请求的结果:Response。
// RealCall.kt
@Throws(IOException::class)internal fun getResponseWithInterceptorChain(): Response {// Build a full stack of interceptors.val interceptors = mutableListOf<Interceptor>()interceptors += client.interceptorsinterceptors += RetryAndFollowUpInterceptor(client)interceptors += BridgeInterceptor(client.cookieJar)interceptors += CacheInterceptor(client.cache)interceptors += ConnectInterceptorif (!forWebSocket) {interceptors += client.networkInterceptors}interceptors += CallServerInterceptor(forWebSocket)val chain = RealInterceptorChain(call = this,interceptors = interceptors,index = 0,exchange = null,request = originalRequest,connectTimeoutMillis = client.connectTimeoutMillis,readTimeoutMillis = client.readTimeoutMillis,writeTimeoutMillis = client.writeTimeoutMillis)var calledNoMoreExchanges = falsetry {val response = chain.proceed(originalRequest)if (isCanceled()) {response.closeQuietly()throw IOException("Canceled")}return response} catch (e: IOException) {calledNoMoreExchanges = truethrow noMoreExchanges(e) as Throwable} finally {if (!calledNoMoreExchanges) {noMoreExchanges(null)}}}
在 getResponseWithInterceptorChain() 方法中,默认有五大拦截器,也可以自定义拦截器,在 OkHttpClient 中 addInterceptor() 自定义应用拦截器(在请求发送前和响应返回后执行)或addNetworkInterceptor() 自定义网络拦截器(在请求发送到网络之前执行)。
将一系列拦截器生成 RealInterceptorChain 拦截器链,通过 proceed(request) 方法将请求传递给下一个拦截器。
-
addInterceptor() 和 addNetworkInterceptor() 的区别:
getResponseWithInterceptorChain() 中先去创建一个集合对象,addInterceptor() 自定义拦截器在第一个添加到集合中的,addNetworkInterceptor() 自定义网络拦截器是在倒数第二添加的。并且 networkInterceptor 拦截器只能在 http 请求中添加,如果socket 请求中不会添加。
比如:添加自定义日志拦截器,
如果是自定义拦截器,那打印的是用户提交的请求;
如果是自定义网络拦截器,打印的是真正的网络请求的日志; -
默认的五大拦截器:
- 重试重定向拦截器 RetryAndFollowUpInterceptor:在交给下一个拦截器之前,负责判断用户是否取消了请求;在获得了结果之后,会根据响应码判断是否需要重定向,如果满足条件那么就会重启执行所有拦截器。
- 桥接拦截器 BridgeInterceptor:在交给下一个拦截器之前,负责将 HTTP 协议必备的请求头加入其中(如:Host)并添加一些默认的行为(如:GZIP 压缩);在获得了结果后,调用保存 cookie 接口并解析 GZIP 数据。
- 缓存拦截器 CacheInterceptor:在交给下一个拦截器之前,读取并判断是否使用缓存;获得结果后判断是否缓存。
- 连接拦截器 ConnectInterceptor:在交给下一个拦截器之前,负责找到或者新建一个连接,并获得对应的 socket 流;在获得结果后不进行额外的处理。
- 请求服务器拦截器 CallServerInterceptor:进行真正的与服务器的通信,向服务器发送数据;解析读取的响应数据。
3.4 五大拦截器
(1)重试重定向拦截器 RetryAndFollowUpInterceptor
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChainvar request = chain.requestval call = realChain.callvar followUpCount = 0var priorResponse: Response? = nullvar newExchangeFinder = truevar recoveredFailures = listOf<IOException>()// 代码1while (true) {// 代码2 创建一个ExchangeFinder对象,获取连接(ConnectInterceptor中使用)。call.enterNetworkInterceptorExchange(request, newExchangeFinder)var response: Responsevar closeActiveExchange = true// 是否进行重试逻辑开始try {// 如果请求取消了 抛出异常if (call.isCanceled()) {throw IOException("Canceled")}try {// 代码3 将请求交给了下一个拦截器response = realChain.proceed(request)newExchangeFinder = true} catch (e: RouteException) {// The attempt to connect via a route failed. The request will not have been sent.if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {throw e.firstConnectException.withSuppressed(recoveredFailures)} else {recoveredFailures += e.firstConnectException}newExchangeFinder = falsecontinue} catch (e: IOException) {// An attempt to communicate with a server failed. The request may have been sent.if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {throw e.withSuppressed(recoveredFailures)} else {recoveredFailures += e}newExchangeFinder = falsecontinue}// --------是否进行重试逻辑结束// Attach the prior response if it exists. Such responses never have a body.if (priorResponse != null) {response = response.newBuilder().priorResponse(priorResponse.newBuilder().body(null).build()).build()}// 是否进行重定向逻辑开始val exchange = call.interceptorScopedExchange// 代码4val followUp = followUpRequest(response, exchange)if (followUp == null) {if (exchange != null && exchange.isDuplex) {call.timeoutEarlyExit()}closeActiveExchange = falsereturn response}val followUpBody = followUp.bodyif (followUpBody != null && followUpBody.isOneShot()) {closeActiveExchange = falsereturn response}response.body?.closeQuietly()if (++followUpCount > MAX_FOLLOW_UPS) {throw ProtocolException("Too many follow-up requests: $followUpCount")}request = followUppriorResponse = response// ------------是否进行重定向逻辑结束} finally {call.exitNetworkInterceptorExchange(closeActiveExchange)}}}// 是否重试private fun recover(e: IOException,call: RealCall,userRequest: Request,requestSendStarted: Boolean): Boolean {// okhttpclient配置不重试if (!client.retryOnConnectionFailure) return false// 不重试:// 1. 如果是 IO 异常(非http2中断异常)表示请求可能发出// 2. 如果请求体只能被使用一次(默认是false)if (requestSendStarted && requestIsOneShot(e, userRequest)) return false// 异常不重试:协议异常、IO中断异常(除Socket读写超时之外),ssl认证异常if (!isRecoverable(e, requestSendStarted)) return false// 是否有更多的路线if (!call.retryAfterFailure()) return false// For failure recovery, use the same route selector with a new connection.return true}private fun requestIsOneShot(e: IOException, userRequest: Request): Boolean {val requestBody = userRequest.bodyreturn (requestBody != null && requestBody.isOneShot()) ||e is FileNotFoundException}private fun isRecoverable(e: IOException, requestSendStarted: Boolean): Boolean {// If there was a protocol problem, don't recover.if (e is ProtocolException) {return false}// If there was an interruption don't recover, but if there was a timeout connecting to a route// we should try the next route (if there is one).if (e is InterruptedIOException) {return e is SocketTimeoutException && !requestSendStarted}// Look for known client-side or negotiation errors that are unlikely to be fixed by trying// again with a different route.if (e is SSLHandshakeException) {// If the problem was a CertificateException from the X509TrustManager,// do not retry.if (e.cause is CertificateException) {return false}}if (e is SSLPeerUnverifiedException) {// e.g. a certificate pinning error.return false}// An example of one we might want to retry with a different route is a problem connecting to a// proxy and would manifest as a standard IOException. Unless it is one we know we should not// retry, we return true and try a new route.return true}@Throws(IOException::class)private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? {val route = exchange?.connection?.route()val responseCode = userResponse.codeval method = userResponse.request.methodwhen (responseCode) {// 响应码 407,代理需要授权,如付费代理,需要验证身份HTTP_PROXY_AUTH -> {val selectedProxy = route!!.proxy// 需要是HTTP请求if (selectedProxy.type() != Proxy.Type.HTTP) {throw ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy")}return client.proxyAuthenticator.authenticate(route, userResponse)}// 响应码 401,服务器需要授权,如某些接口需要登陆才能使用(不安全,基本上没用了)HTTP_UNAUTHORIZED -> return client.authenticator.authenticate(route, userResponse)// 响应码 3XX 重定向响应HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {return buildRedirectRequest(userResponse, method)}// 响应码 408 请求超时HTTP_CLIENT_TIMEOUT -> {if (!client.retryOnConnectionFailure) {// 如果应用层指示我们不要重试请求,则直接返回nullreturn null}val requestBody = userResponse.request.body// 如果请求体是一次性的,则不能重试if (requestBody != null && requestBody.isOneShot()) {return null}val priorResponse = userResponse.priorResponseif (priorResponse != null && priorResponse.code == HTTP_CLIENT_TIMEOUT) {// 如果之前已经尝试过重试并且再次超时,则放弃重试return null}// 如果服务器指定了非零的重试等待时间,则不进行重试if (retryAfter(userResponse, 0) > 0) {return null}return userResponse.request}// 响应码 503 服务不可用HTTP_UNAVAILABLE -> {val priorResponse = userResponse.priorResponseif (priorResponse != null && priorResponse.code == HTTP_UNAVAILABLE) {// We attempted to retry and got another timeout. Give up.return null}if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {// specifically received an instruction to retry without delayreturn userResponse.request}return null}// 响应码 421 从当前客户端所在的 IP 地址到服务器的连接数超过了服务器许可的最大范围HTTP_MISDIRECTED_REQUEST -> {// OkHttp can coalesce HTTP/2 connections even if the domain names are different. See// RealConnection.isEligible(). If we attempted this and the server returned HTTP 421, then// we can retry on a different connection.val requestBody = userResponse.request.bodyif (requestBody != null && requestBody.isOneShot()) {return null}if (exchange == null || !exchange.isCoalescedConnection) {return null}exchange.connection.noCoalescedConnections()return userResponse.request}else -> return null}}/*** 构建基于用户响应的重定向请求。* * 该函数用于处理重定向逻辑,根据客户端配置和响应的具体信息判断是否需要跟随重定向。如果条件满足,它将构建一个新的请求对象用于重定向。* * @param userResponse 原始请求的响应对象,包含重定向所需的信息,例如重定向URL。* @param method 原始请求的HTTP方法,可能会影响重定向请求的构造方式。* @return 如果条件满足,返回一个新的重定向请求对象;否则返回null。*/private fun buildRedirectRequest(userResponse: Response, method: String): Request? {// 检查客户端是否允许重定向if (!client.followRedirects) return null// 从响应头中获取重定向位置val location = userResponse.header("Location") ?: return null// 不支持重定向到无效协议的情况val url = userResponse.request.url.resolve(location) ?: return null// 如果配置禁止跨SSL和非SSL重定向,则检查协议一致性val sameScheme = url.scheme == userResponse.request.url.schemeif (!sameScheme && !client.followSslRedirects) return null// 大多数重定向不需要请求体,根据方法判断是否保留或修改请求体val requestBuilder = userResponse.request.newBuilder()if (HttpMethod.permitsRequestBody(method)) {val responseCode = userResponse.codeval maintainBody = HttpMethod.redirectsWithBody(method) ||responseCode == HTTP_PERM_REDIRECT ||responseCode == HTTP_TEMP_REDIRECTif (HttpMethod.redirectsToGet(method) && responseCode != HTTP_PERM_REDIRECT && responseCode != HTTP_TEMP_REDIRECT) {requestBuilder.method("GET", null)} else {val requestBody = if (maintainBody) userResponse.request.body else nullrequestBuilder.method(method, requestBody)}if (!maintainBody) {requestBuilder.removeHeader("Transfer-Encoding")requestBuilder.removeHeader("Content-Length")requestBuilder.removeHeader("Content-Type")}}// 跨主机重定向时,移除所有身份验证头信息if (!userResponse.request.url.canReuseConnectionFor(url)) {requestBuilder.removeHeader("Authorization")}return requestBuilder.url(url).build()}private fun retryAfter(userResponse: Response, defaultDelay: Int): Int {val header = userResponse.header("Retry-After") ?: return defaultDelay// https://tools.ietf.org/html/rfc7231#section-7.1.3// currently ignores a HTTP-date, and assumes any non int 0 is a delayif (header.matches("\\d+".toRegex())) {return Integer.valueOf(header)}return Integer.MAX_VALUE}companion object {// 重定向最大次数限制private const val MAX_FOLLOW_UPS = 20}
}
- 重试限制(请求)
代码1,用了一个 while(true) 死循环。
代码2,创建一个ExchangeFinder对象,获取连接(ConnectInterceptor中使用)。
代码3,调用 realChain.proceed(request),将请求交给了下一个拦截器。如果这里出现了异常就会判断是否需要重试。
具体可以看 recover() 方法,有4种情况不重试:
1.okhttpclient配置不重试
2.如果请求体只能被使用一次
3.异常不重试:协议异常、IO中断异常(除Socket读写超时之外),ssl认证异常
4.是否有更多的路线
- 重定向规则(响应)
根据得到的 response 判断要不要重定向,判断 followUpRequest() 返回的是否为空,如果是空,不需要重定向,直接返回 response;否则需要重定向。
如果重定向次数超过 20 次,也不会重定向。否则执行 while 中下一次。
followUpRequest() 中主要是根据响应码判断是否要重定向。
响应码 | 说明 | 重定向条件 |
---|---|---|
407 | 代理需要授权,如付费代理,需要验证身份 | 通过proxyAuthenticator获得到了Request。例:添加Proxy-Authorization请求头 |
401 | 服务器需要授权,如某些接口需要登陆 | 通过authenticator获得到了Request。例:添加Authorization请求头 |
3XX | 重定向响应 | OkHttpClient配置允许重定向 |
408 | 请求超时。 | 1、用户允许自动重试(默认允许) 2、本次请求的结果不是响应408的重试结果 3、服务器未响应Retry-After(稍后重试),或者响应Retry-After:0。 |
503 | 服务不可用 | 1、本次请求的结果不是响应503的重试结果 2、服务器明确响应Retry-After:0,立即重试 |
421 | 从当前客户端所在的IP地址到服务器的连接数超过了服务器许可的最大范围 | 自动再次使用另一个连接对象发起请求 |
(2)桥接拦截器
桥接拦截器的作用是补全请求头和响应后的处理。
- 补全请求头
请求头 | 说明 |
---|---|
Content-Type | 请求体类型,如:application/x-www-form-urlencoded |
Content-Length/Transfer-Encoding | 请求体解析方式 |
Host | 请求的主机站点 |
Connection: Keep-Alive | 默认保持长连接 |
Accept-Encoding: gzip | 接收响应体使用gzip压缩 |
Cookie | Cookie身份识别 |
User-Agent | 用户信息,如:操作系统、浏览器等 |
- 响应后处理
得到响应:- 读取 Set-Cookie 响应头并调用接口告知用户,在下次请求则会读取对应的数据设置进入请求头,默认 CookieJar 无实现;
- 响应头 Content-Encoding 为 gzip,使用 GzipSource 包装解析。
(3)缓存拦截器
作用:缓存 HTTP 响应,减少重复请求。
- 缓存规则:
Http 的缓存我们可以按照行为将他们分为:强缓存和协商缓存。- 命中强缓存时,浏览器并不会将请求发送给服务器。强缓存是利用 Http 的返回头中的 Expires 或者 Cache-Control 两个字段来控制的,用来表示资源的缓存时间;
- 若未命中强缓存,则浏览器会将请求发送至服务器。服务器根据 http 头信息中的 Last-Modify/If-Modify-Since或Etag/If-None-Match 来判断是否命中了协商缓存。如果命中,则 http 返回码为 304,客户端从缓存中加载资源。
- 缓存策略
拦截器通过 CacheStrategy 判断使用缓存或发起网络请求。此对象中的 networkRequest 与 cacheResponse 分别代表需要发起请求或者直接使用缓存。
networkRequest | cacheResponse | 说明 |
---|---|---|
Null | Not Null | 直接使用缓存 |
Not Null | Null | 向服务器发起请求 |
Null | Null | 要求使用缓存,但是没有缓存,okHttp直接返回504 |
Not Null | Not Null | 发起请求,若得到响应为304,则更新缓存响应并返回 |
即:networkRequest 存在则优先发起网络请求,否则使用 cacheResponse 缓存,若都不存在则请求失败。
(4)连接拦截器
object ConnectInterceptor : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChain// 代码1val exchange = realChain.call.initExchange(chain)val connectedChain = realChain.copy(exchange = exchange)return connectedChain.proceed(realChain.request)}
}
- 新建连接 realChain.call.initExchange(chain)
在 ConnectInterceptor 中调用了 initExchange() 方法,
// RealCall.kt
internal fun initExchange(chain: RealInterceptorChain): Exchange {synchronized(this) {check(expectMoreExchanges) { "released" }check(!responseBodyOpen)check(!requestBodyOpen)}val exchangeFinder = this.exchangeFinder!!// 代码2val codec = exchangeFinder.find(client, chain)val result = Exchange(this, eventListener, exchangeFinder, codec)this.interceptorScopedExchange = resultthis.exchange = resultsynchronized(this) {this.requestBodyOpen = truethis.responseBodyOpen = true}if (canceled) throw IOException("Canceled")return result}
initExchange() 方法又调用了 exchangeFinder.find() 方法,返回 ExchangeCodec 对象。
// ExchangeFinder.ktfun find(client: OkHttpClient,chain: RealInterceptorChain): ExchangeCodec {try {// 代码3 生成 RealConnection 对象,是对 Socket 的封装val resultConnection = findHealthyConnection(connectTimeout = chain.connectTimeoutMillis,readTimeout = chain.readTimeoutMillis,writeTimeout = chain.writeTimeoutMillis,pingIntervalMillis = client.pingIntervalMillis,connectionRetryEnabled = client.retryOnConnectionFailure,doExtensiveHealthChecks = chain.request.method != "GET")// 代码4return resultConnection.newCodec(client, chain)} catch (e: RouteException) {trackFailure(e.lastConnectException)throw e} catch (e: IOException) {trackFailure(e)throw RouteException(e)}}
代码3,生成 RealConnection 对象,是对 Socket 的封装。
代码4,调用 newCodec() 方法,生成 ExchangeCodec 对象。
@Throws(SocketException::class)internal fun newCodec(client: OkHttpClient, chain: RealInterceptorChain): ExchangeCodec {val socket = this.socket!!val source = this.source!!val sink = this.sink!!val http2Connection = this.http2Connection// 代码5 判断使用Http2还是Http1return if (http2Connection != null) {Http2ExchangeCodec(client, this, chain, http2Connection)} else {socket.soTimeout = chain.readTimeoutMillis()source.timeout().timeout(chain.readTimeoutMillis.toLong(), MILLISECONDS)sink.timeout().timeout(chain.writeTimeoutMillis.toLong(), MILLISECONDS)Http1ExchangeCodec(client, this, source, sink)}}
在 newCodec() 中,代码5 判断使用Http2还是Http1。
因为Http是基于TCP/IP协议,最终还是调用connect()方法通过socket建立连接
- 连接池
在 ExchangeFinder 中,生成 RealConnection 对象之前,会先判断能不能拿到连接对象复用
// ExchangeFinder.kt@Throws(IOException::class)private fun findHealthyConnection(connectTimeout: Int,readTimeout: Int,writeTimeout: Int,pingIntervalMillis: Int,connectionRetryEnabled: Boolean,doExtensiveHealthChecks: Boolean): RealConnection {while (true) {val candidate = findConnection(connectTimeout = connectTimeout,readTimeout = readTimeout,writeTimeout = writeTimeout,pingIntervalMillis = pingIntervalMillis,connectionRetryEnabled = connectionRetryEnabled)// Confirm that the connection is good.if (candidate.isHealthy(doExtensiveHealthChecks)) {return candidate}// If it isn't, take it out of the pool.candidate.noNewExchanges()// Make sure we have some routes left to try. One example where we may exhaust all the routes// would happen if we made a new connection and it immediately is detected as unhealthy.if (nextRouteToTry != null) continueval routesLeft = routeSelection?.hasNext() ?: trueif (routesLeft) continueval routesSelectionLeft = routeSelector?.hasNext() ?: trueif (routesSelectionLeft) continuethrow IOException("exhausted all routes")}}@Throws(IOException::class)private fun findConnection(connectTimeout: Int,readTimeout: Int,writeTimeout: Int,pingIntervalMillis: Int,connectionRetryEnabled: Boolean): RealConnection {if (call.isCanceled()) throw IOException("Canceled")// Attempt to reuse the connection from the call.val callConnection = call.connection // This may be mutated by releaseConnectionNoEvents()!if (callConnection != null) {var toClose: Socket? = nullsynchronized(callConnection) {if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {toClose = call.releaseConnectionNoEvents()}}// If the call's connection wasn't released, reuse it. We don't call connectionAcquired() here// because we already acquired it.if (call.connection != null) {check(toClose == null)return callConnection}// The call's connection was released.toClose?.closeQuietly()eventListener.connectionReleased(call, callConnection)}// We need a new connection. Give it fresh stats.refusedStreamCount = 0connectionShutdownCount = 0otherFailureCount = 0// Attempt to get a connection from the pool.if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {val result = call.connection!!eventListener.connectionAcquired(call, result)return result}// Nothing in the pool. Figure out what route we'll try next.val routes: List<Route>?val route: Routeif (nextRouteToTry != null) {// Use a route from a preceding coalesced connection.routes = nullroute = nextRouteToTry!!nextRouteToTry = null} else if (routeSelection != null && routeSelection!!.hasNext()) {// Use a route from an existing route selection.routes = nullroute = routeSelection!!.next()} else {// Compute a new route selection. This is a blocking operation!var localRouteSelector = routeSelectorif (localRouteSelector == null) {localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)this.routeSelector = localRouteSelector}val localRouteSelection = localRouteSelector.next()routeSelection = localRouteSelectionroutes = localRouteSelection.routesif (call.isCanceled()) throw IOException("Canceled")// Now that we have a set of IP addresses, make another attempt at getting a connection from// the pool. We have a better chance of matching thanks to connection coalescing.if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {val result = call.connection!!eventListener.connectionAcquired(call, result)return result}route = localRouteSelection.next()}// Connect. Tell the call about the connecting call so async cancels work.val newConnection = RealConnection(connectionPool, route)call.connectionToCancel = newConnectiontry {newConnection.connect(connectTimeout,readTimeout,writeTimeout,pingIntervalMillis,connectionRetryEnabled,call,eventListener)} finally {call.connectionToCancel = null}call.client.routeDatabase.connected(newConnection.route())// If we raced another call connecting to this host, coalesce the connections. This makes for 3// different lookups in the connection pool!if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {val result = call.connection!!nextRouteToTry = routenewConnection.socket().closeQuietly()eventListener.connectionAcquired(call, result)return result}synchronized(newConnection) {connectionPool.put(newConnection)call.acquireConnectionNoEvents(newConnection)}eventListener.connectionAcquired(call, newConnection)return newConnection}
connectionPool: RealConnectionPool 就是连接池,其实就是一个对象池。
class RealConnectionPool(taskRunner: TaskRunner,/** The maximum number of idle connections for each address. */private val maxIdleConnections: Int,keepAliveDuration: Long,timeUnit: TimeUnit
) {
...
private val cleanupTask = object : Task("$okHttpName ConnectionPool") {override fun runOnce() = cleanup(System.nanoTime())}private val connections = ConcurrentLinkedQueue<RealConnection>()...fun put(connection: RealConnection) {connection.assertThreadHoldsLock()connections.add(connection)cleanupQueue.schedule(cleanupTask)}}
// ConnectionPool.kt
class ConnectionPool internal constructor(internal val delegate: RealConnectionPool
) {constructor(maxIdleConnections: Int,keepAliveDuration: Long,timeUnit: TimeUnit) : this(RealConnectionPool(taskRunner = TaskRunner.INSTANCE,maxIdleConnections = maxIdleConnections,keepAliveDuration = keepAliveDuration,timeUnit = timeUnit))constructor() : this(5, 5, TimeUnit.MINUTES)}
连接池就是一个装载 RealConnection 的ConcurrentLinkedQueue 队列。
在put方法中,除了往队列中添加 connection,还启动了一个周期性任务 cleanupTask,在任务中调用cleanup(),清除无效连接。
在默认连接池中,最大允许的空闲连接数是 5,连接最大允许的空闲时间 5 分钟。
1、连接池中 连接对象 闲置了多久 超过了 5 分钟没用的连接,就清理掉
2、连接池中 存放了 大量的 空闲连接对象,超过 5个空闲连接,如何清理?
把空闲时间最长的连接一个个清理掉,至少不超过 5 个(LRU思想)
connection:keep-alive
(5)请求服务拦截器 CallServerInterceptor
/*** 此拦截器是拦截链中的最后一个拦截器。它负责向服务器发起网络请求。** @param forWebSocket 指示此拦截器是否用于 WebSocket 连接*/
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {// 将传入的 Chain 转换为 RealInterceptorChain,以获取更详细的上下文信息val realChain = chain as RealInterceptorChainval exchange = realChain.exchange!!val request = realChain.requestval requestBody = request.bodyval sentRequestMillis = System.currentTimeMillis()var invokeStartEvent = truevar responseBuilder: Response.Builder? = nullvar sendRequestException: IOException? = nulltry {// 写入请求头到服务器exchange.writeRequestHeaders(request)if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {// 如果请求方法允许带有请求体,并且请求体不为空,则处理请求体if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {// 如果请求头中包含 "Expect: 100-continue",则等待服务器返回 "HTTP/1.1 100 Continue"exchange.flushRequest()responseBuilder = exchange.readResponseHeaders(expectContinue = true)exchange.responseHeadersStart()invokeStartEvent = false}if (responseBuilder == null) {if (requestBody.isDuplex()) {// 准备一个双工请求体,以便应用程序稍后发送请求体exchange.flushRequest()val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()requestBody.writeTo(bufferedRequestBody)} else {// 如果 "Expect: 100-continue" 的期望已满足,则写入请求体val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()requestBody.writeTo(bufferedRequestBody)bufferedRequestBody.close()}} else {// 如果没有收到 "HTTP/1.1 100 Continue" 响应,则不发送请求体exchange.noRequestBody()if (!exchange.connection.isMultiplexed) {// 如果 "Expect: 100-continue" 的期望未满足,防止 HTTP/1 连接被重用exchange.noNewExchangesOnConnection()}}} else {// 如果请求方法不允许带有请求体或请求体为空,则不发送请求体exchange.noRequestBody()}if (requestBody == null || !requestBody.isDuplex()) {// 完成请求发送exchange.finishRequest()}} catch (e: IOException) {if (e is ConnectionShutdownException) {throw e // 请求未发送,因此没有响应可读取}if (!exchange.hasFailure) {throw e // 请求发送失败,不要尝试读取响应}sendRequestException = e}try {if (responseBuilder == null) {// 读取响应头responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!if (invokeStartEvent) {exchange.responseHeadersStart()invokeStartEvent = false}}var response = responseBuilder.request(request).handshake(exchange.connection.handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build()var code = response.codeif (shouldIgnoreAndWaitForRealResponse(code)) {// 如果响应码为 100 或者在 102 到 199 之间,则忽略并等待实际响应responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!if (invokeStartEvent) {exchange.responseHeadersStart()}response = responseBuilder.request(request).handshake(exchange.connection.handshake()).sentRequestAtMillis(sentRequestMillis).receivedResponseAtMillis(System.currentTimeMillis()).build()code = response.code}// 结束响应头读取exchange.responseHeadersEnd(response)response = if (forWebSocket && code == 101) {// 如果是 WebSocket 升级连接,确保拦截器看到非空响应体response.newBuilder().body(EMPTY_RESPONSE).build()} else {response.newBuilder().body(exchange.openResponseBody(response)).build()}if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||"close".equals(response.header("Connection"), ignoreCase = true)) {// 如果请求或响应头中包含 "Connection: close",则关闭连接exchange.noNewExchangesOnConnection()}if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {// 如果响应码为 204 或 205,但响应体长度不为零,则抛出异常throw ProtocolException("HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")}return response} catch (e: IOException) {if (sendRequestException != null) {sendRequestException.addSuppressed(e)throw sendRequestException}throw e}}/*** 判断是否应忽略当前响应并等待实际响应。** @param code HTTP 响应码* @return 如果应忽略当前响应并等待实际响应,则返回 true;否则返回 false*/private fun shouldIgnoreAndWaitForRealResponse(code: Int): Boolean = when {// 如果服务器发送了 100-continue,即使我们没有请求它,也应再次尝试读取实际响应状态code == 100 -> true// 处理 Processing (102) 和 Early Hints (103),以及任何新的 1xx 状态码,但不包括 100 和 101code in (102 until 200) -> trueelse -> false}
}
请求头中 Expect 为 100-continue:
- 使用场景:一般出现于上传大容量请求体或者需要验证。代表了先询问服务器是否愿意接收发送请求体数据。
- OkHttp 的做法:
- 如果服务器允许则返回100, 客户端继续发送请求体。
- 如果服务器不允许则直接返回给用户。
- 同时服务器也可能会忽略此请求头,一致无法读取应答,此时抛出超时异常。
4. 其它
4.1 OKHttp 如何处理大文件上传和下载?
- 大文件上传:可以通过 RequestBody 的 create 方法创建流式请求体,避免一次性加载大文件到内存中。
实现步骤:- 创建 RequestBody,使用 File 或 InputStream 作为数据源,避免一次性加载大文件到内存中。
- 将 RequestBody 封装到 MultipartBody 中,支持文件上传。
- 构建 Request 并执行上传。
// 1. 创建文件
File file = new File("path/to/large/file.zip");// 2. 创建 RequestBody
RequestBody requestBody = new MultipartBody.Builder().setType(MultipartBody.FORM).addFormDataPart("file", file.getName(), RequestBody.create(file, MediaType.parse("application/octet-stream"))).build();// 3. 构建 Request
Request request = new Request.Builder().url("https://example.com/upload").post(requestBody).build();// 4. 执行上传
OkHttpClient client = new OkHttpClient();
try (Response response = client.newCall(request).execute()) {if (response.isSuccessful()) {System.out.println("Upload successful!");} else {System.out.println("Upload failed: " + response.code());}
}
- 大文件下载:可以通过 ResponseBody 的 source 方法获取输入流,逐步写入文件,避免内存溢出。
实现步骤:- 构建 Request,设置下载 URL。
- 执行请求并获取 ResponseBody。
- 使用 BufferedSink 将响应体写入文件。
// 1. 构建 Request
Request request = new Request.Builder().url("https://example.com/largefile.zip").build();// 2. 执行下载
OkHttpClient client = new OkHttpClient();
try (Response response = client.newCall(request).execute()) {if (response.isSuccessful()) {// 3. 获取 ResponseBodyResponseBody body = response.body();if (body != null) {// 4. 创建文件输出流File file = new File("path/to/save/largefile.zip");try (BufferedSink sink = Okio.buffer(Okio.sink(file))) {// 5. 将响应体写入文件sink.writeAll(body.source());}System.out.println("Download successful!");}} else {System.out.println("Download failed: " + response.code());}
}
- 断点续传:通过设置 Range 请求头,下载文件的指定部分。记录已下载的文件大小,从断点处继续下载。
实现步骤:- 检查本地已下载的文件大小。
- 设置 Range 请求头,从断点处开始下载。
- 将下载的数据追加到本地文件中。
// 1. 检查本地文件大小
File file = new File("path/to/save/largefile.zip");
long fileSize = file.exists() ? file.length() : 0;// 2. 构建 Request,设置 Range 请求头
Request request = new Request.Builder().url("https://example.com/largefile.zip").header("Range", "bytes=" + fileSize + "-") // 从断点处开始下载.build();// 3. 执行下载
OkHttpClient client = new OkHttpClient();
try (Response response = client.newCall(request).execute()) {if (response.isSuccessful()) {ResponseBody body = response.body();if (body != null) {// 4. 将下载的数据追加到本地文件try (BufferedSink sink = Okio.buffer(Okio.appendingSink(file))) {sink.writeAll(body.source());}System.out.println("Download successful!");}} else {System.out.println("Download failed: " + response.code());}
}
- 扩展:可以提到 OKHttp 的 ProgressListener 可以用于监控上传和下载的进度。
4.2 OKHttp 如何处理 SSL/TLS?
-
OKHttp 默认支持 SSL/TLS,并且会自动处理证书验证。开发者可以通过 OkHttpClient.Builder 自定义 SSL 配置,例如设置自定义的信任管理器(TrustManager)或证书(X509Certificate)。
-
扩展: 可以提到 OKHttp 支持 HTTP/2 的 ALPN(应用层协议协商),能够自动选择最佳的协议进行通信。
4.3 OKHttp如何复用TCP连接
- OKHttp中有一个连接池,连接池就是一个对象池,用了一个 ConcurrentLinkedQueue 缓存所有的有效连接对象,当我们需要一个连接发起请求时,我们先去连接池中查找。
- 能够满足复用连接的对象,一定是和本次请求的域名、端口、设置的代理、设置的DNS解析等参数一定是相同的,才能复用。
- 连接池也会去清理垃圾连接。如超过了5分钟没用过的连接,还有超过了5个闲置连接后,从最久闲置的连接开始执行清理
5. 总结
OkHttp是一个功能强大且灵活的HTTP客户端库,适用于各种网络请求场景。
核心设计思想是链式处理,通过拦截器机制将请求和响应的处理过程分解为多个独立的步骤。它主要由OkHttpClient、Request、Response、Call和Interceptor等组件组成。工作流程包括创建请求、执行请求、经过拦截器链处理、获取响应等。拦截器链是OkHttp的核心机制,内置了重试、缓存、连接、网络等拦截器。此外,OkHttp通过连接池复用连接,通过调度器管理异步请求的并发数量。它的性能优化措施包括HTTP/2支持、GZIP压缩和缓存等。