Okhttp 网络请求流程 (4.9.0)
1
OkHttpClient.Builder().build().newCall(Request.Builder().build()).execute()
- 构建HttpClient
- 配置拦截器
- 设置超时时间
- 设置SSL认证
- 设置Dispatcher调度器
- 重试
- 缓存
- cookie
- dns
- 代理
- 构建Request
- URL 部分
- Header 部分
- RequestBody 部分
- 生成Call对象 实际是RealCall
- 发起请求execute()或enqueue()
- execute
调用Dispatcher.exexute 将Call添加到Dispatcher.runningSyncCalls 中,然后调用getResponseWithInterceptorChain()执行网络请求 - enqueue
- enqueue:将Call封装成AsyncCall,调用 Dispatcher.enqueue方法
- Dispatcher.enqueue(): 中 将 AsyncCall 添加到 readyAsyncCalls,获取到当前host请求数量,添加到AsyncCall;调用promoteAndExecute()
- promoteAndExecute():
- 从readyAsyncCalls中遍历获取等待请求的AsyncCall
- 判断正在进行的网络请求个数是否>=maxRequests(默认64),如果>=结束循环。
- 如果没有判断当前AsyncCall callsPerHost(默认5)是否大于maxRequestsPerHost 如果>= 结束本次循环,执行下一个 AsyncCall,
- 如果有符合上述两个条件的AsyncCall,从 readyAsyncCalls 移除; callsPerHost 加一;添加到 runningAsyncCalls,添加到局部变量executableCalls中,
- 使用线程池executorService 遍历执行executableCalls
- AsyncCall.run: 调用 getResponseWithInterceptorChain() 获取都response 回调给 Callback,最终调用dispatcher.finished(this)结束本次调用
- execute
有上述流程可知promoteAndExecute是用来尝试执行等待队列符合条件的AsyncCall,一共有四处调用
- Dispatcher.enqueue
- Dispatcher.finished
- setMaxRequests
- setCallsPerHost
无论是异步请求还是同步请求,真正的网络请求网络请求是在getResponseWithInterceptorChain()中执行的
- 装载拦截器
1 2 3 4 5 6 7 8 9 10
val interceptors = mutableListOf<Interceptor>() interceptors += client.interceptors interceptors += RetryAndFollowUpInterceptor(client) interceptors += BridgeInterceptor(client.cookieJar) interceptors += CacheInterceptor(client.cache) interceptors += ConnectInterceptor if (!forWebSocket) { interceptors += client.networkInterceptors } interceptors += CallServerInterceptor(forWebSocket)
- 执行拦截器责任链
1 2 3 4 5 6 7 8 9 10 11
val chain = RealInterceptorChain( call = this, interceptors = interceptors, index = 0, exchange = null, request = originalRequest, connectTimeoutMillis = client.connectTimeoutMillis, readTimeoutMillis = client.readTimeoutMillis, writeTimeoutMillis = client.writeTimeoutMillis ) val response = chain.proceed(originalRequest)
从上述代码可知,系统先执行用户自定义拦截器又执行的Okhttp自带的五个系统拦截器
系统拦截器
- RetryAndFollowUpInterceptor 负责重试/重定向
- BridgeInterceptor 负责将原始Requset转换给发送给服务端的Request以及将Response转化成对调用方友好的Response,具体就是对request添加Content-Type、Content-Length、Connection、Accept-Encoding等请求头以及对返回结果进行解压等。
- CacheInterceptor
负责读取缓存以及更新缓存。 - ConnectInterceptor
负责与服务器建立连接 - CallServerInterceptor
负责从服务器读取响应的数据 主要的工作就是把请求的Request写入到服务端,然后从服务端读取Response。- 写入请求头
- 写入请求体
- 读取响应头
- 读取响应体
AsyncCall 异步请求Call封装
- 网络回调 responseCallback
- callsPerHost
- request
- call
- 实现了Runnable
Dispatcher 调度器
- 决定了网络请求的执行时机和执行线程,管理最大可执行任务数和每个Host最大可执行任务数
- 三个任务集合
- readyAsyncCalls (异步等待队列)
- runningAsyncCalls (正在执行的异步网络请求队列)
- runningSyncCalls (正在执行的同步网络请求队列)
- 线程池executorService
1 2 3
ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS, SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false)) }
类解析
- Http2ExchangeCodec 负责网络请求的数据交换,写入请求头、写入请求体、读取响应头、读取响应体
- Exchange 像是 Http2ExchangeCodec 的一个装饰器,大部分实际操作都是 Http2ExchangeCodec 完成的 ;主要在 CallServerInterceptor 内被使用
- RealConnection 链接
- connectSocket 创建 Socket
- RealConnectionPool 管理链接池
- ConnectInterceptor 负责网络链接,创建 Exchange ,创建或复用 Connection 都在其方法内
- 代码如下
1 2 3 4 5 6
override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val exchange = realChain.call.initExchange(chain) val connectedChain = realChain.copy(exchange = exchange) return connectedChain.proceed(realChain.request) }
主要业务逻辑在 realChain.call.initExchange(chain),其initExchange代码如下
1 2 3 4 5 6 7 8 9 10
internal fun initExchange(chain: RealInterceptorChain): Exchange { ... val exchangeFinder = this.exchangeFinder!! val codec = exchangeFinder.find(client, chain) val result = Exchange(this, eventListener, exchangeFinder, codec) this.interceptorScopedExchange = result this.exchange = result ... return result }
分析可知每次 Exchange 都是一个新的对象,但是 codec 是可以复用,exchangeFinder对象 是在 RetryAndFollowUpInterceptor 内创建的;
ExchangeFinder.find -> ExchangeFinder.findHealthyConnection -> ExchangeFinder.findConnection1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
private fun findConnection( connectTimeout: Int, readTimeout: Int, writeTimeout: Int, pingIntervalMillis: Int, connectionRetryEnabled: Boolean ): RealConnection { ... val callConnection = call.connection ... if (call.connection != null) { check(toClose == null) return callConnection } ... if (connectionPool.callAcquirePooledConnection(address, call, null, false)) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result } ... // Connect. Tell the call about the connecting call so async cancels work. val newConnection = RealConnection(connectionPool, route) call.connectionToCancel = newConnection try { newConnection.connect( connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener ) } finally { call.connectionToCancel = null } ... synchronized(newConnection) { connectionPool.put(newConnection) call.acquireConnectionNoEvents(newConnection) } ... return newConnection }
从上述代码可知,我一共有3中方式 获取到 Connection :
- 复用 Call 中 Connection
- 从 connectionPool 中获取
- 创建一个新的 connection 并添加到 connectionPool 中
- RealConnection.connect 方法内就是创建Socket的部分
我们主要分析从 connectionPool 中获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
fun callAcquirePooledConnection( address: Address, call: RealCall, routes: List<Route>?, requireMultiplexed: Boolean ): Boolean { for (connection in connections) { synchronized(connection) { if (requireMultiplexed && !connection.isMultiplexed) return@synchronized if (!connection.isEligible(address, routes)) return@synchronized call.acquireConnectionNoEvents(connection) return true } } return false }
主要就是从 connections 遍历获取,判断 connection 是否支持复用,是否符合复用条件(主要是host)。最后调用 call.acquireConnectionNoEvents(connection) 添加到 Call 中
- 代码如下
- Route 根据代理设置,DNS设置等,决定了本次网络请求所请求对接的主机