Okhttp

Posted by ooftf on April 19, 2021

Okhttp 网络请求流程 (4.9.0)

1
 OkHttpClient.Builder().build().newCall(Request.Builder().build()).execute()
  1. 构建HttpClient
    • 配置拦截器
    • 设置超时时间
    • 设置SSL认证
    • 设置Dispatcher调度器
    • 重试
    • 缓存
    • cookie
    • dns
    • 代理
  2. 构建Request
    • URL 部分
    • Header 部分
    • RequestBody 部分
  3. 生成Call对象 实际是RealCall
  4. 发起请求execute()或enqueue()
    • execute
      调用Dispatcher.exexute 将Call添加到Dispatcher.runningSyncCalls 中,然后调用getResponseWithInterceptorChain()执行网络请求
    • enqueue
      1. enqueue:将Call封装成AsyncCall,调用 Dispatcher.enqueue方法
      2. Dispatcher.enqueue(): 中 将 AsyncCall 添加到 readyAsyncCalls,获取到当前host请求数量,添加到AsyncCall;调用promoteAndExecute()
      3. promoteAndExecute():
        1. 从readyAsyncCalls中遍历获取等待请求的AsyncCall
        2. 判断正在进行的网络请求个数是否>=maxRequests(默认64),如果>=结束循环。
        3. 如果没有判断当前AsyncCall callsPerHost(默认5)是否大于maxRequestsPerHost 如果>= 结束本次循环,执行下一个 AsyncCall,
        4. 如果有符合上述两个条件的AsyncCall,从 readyAsyncCalls 移除; callsPerHost 加一;添加到 runningAsyncCalls,添加到局部变量executableCalls中,
        5. 使用线程池executorService 遍历执行executableCalls
      4. AsyncCall.run: 调用 getResponseWithInterceptorChain() 获取都response 回调给 Callback,最终调用dispatcher.finished(this)结束本次调用

有上述流程可知promoteAndExecute是用来尝试执行等待队列符合条件的AsyncCall,一共有四处调用

  1. Dispatcher.enqueue
  2. Dispatcher.finished
  3. setMaxRequests
  4. setCallsPerHost

无论是异步请求还是同步请求,真正的网络请求网络请求是在getResponseWithInterceptorChain()中执行的

  1. 装载拦截器
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    
     val interceptors = mutableListOf<Interceptor>()
     interceptors += client.interceptors
     interceptors += RetryAndFollowUpInterceptor(client)
     interceptors += BridgeInterceptor(client.cookieJar)
     interceptors += CacheInterceptor(client.cache)
     interceptors += ConnectInterceptor
     if (!forWebSocket) {
       interceptors += client.networkInterceptors
     }
     interceptors += CallServerInterceptor(forWebSocket)
    
  2. 执行拦截器责任链
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    
      val chain = RealInterceptorChain(
         call = this,
         interceptors = interceptors,
         index = 0,
         exchange = null,
         request = originalRequest,
         connectTimeoutMillis = client.connectTimeoutMillis,
         readTimeoutMillis = client.readTimeoutMillis,
         writeTimeoutMillis = client.writeTimeoutMillis
     )
      val response = chain.proceed(originalRequest)
    

    从上述代码可知,系统先执行用户自定义拦截器又执行的Okhttp自带的五个系统拦截器

系统拦截器

  • RetryAndFollowUpInterceptor 负责重试/重定向
  • BridgeInterceptor 负责将原始Requset转换给发送给服务端的Request以及将Response转化成对调用方友好的Response,具体就是对request添加Content-Type、Content-Length、Connection、Accept-Encoding等请求头以及对返回结果进行解压等。
  • CacheInterceptor
    负责读取缓存以及更新缓存。
  • ConnectInterceptor
    负责与服务器建立连接
  • CallServerInterceptor
    负责从服务器读取响应的数据 主要的工作就是把请求的Request写入到服务端,然后从服务端读取Response。
    • 写入请求头
    • 写入请求体
    • 读取响应头
    • 读取响应体

AsyncCall 异步请求Call封装

  • 网络回调 responseCallback
  • callsPerHost
  • request
  • call
  • 实现了Runnable

Dispatcher 调度器

  • 决定了网络请求的执行时机和执行线程,管理最大可执行任务数和每个Host最大可执行任务数
  • 三个任务集合
    1. readyAsyncCalls (异步等待队列)
    2. runningAsyncCalls (正在执行的异步网络请求队列)
    3. runningSyncCalls (正在执行的同步网络请求队列)
  • 线程池executorService
    1
    2
    3
    
        ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
                    SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
              }
    

    类解析

  • Http2ExchangeCodec 负责网络请求的数据交换,写入请求头、写入请求体、读取响应头、读取响应体
  • Exchange 像是 Http2ExchangeCodec 的一个装饰器,大部分实际操作都是 Http2ExchangeCodec 完成的 ;主要在 CallServerInterceptor 内被使用
  • RealConnection 链接
    • connectSocket 创建 Socket
  • RealConnectionPool 管理链接池
  • ConnectInterceptor 负责网络链接,创建 Exchange ,创建或复用 Connection 都在其方法内
    • 代码如下
      1
      2
      3
      4
      5
      6
      
      override fun intercept(chain: Interceptor.Chain): Response {
        val realChain = chain as RealInterceptorChain
        val exchange = realChain.call.initExchange(chain)
        val connectedChain = realChain.copy(exchange = exchange)
        return connectedChain.proceed(realChain.request)
      }
      

      主要业务逻辑在 realChain.call.initExchange(chain),其initExchange代码如下

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      
      internal fun initExchange(chain: RealInterceptorChain): Exchange {
        ...
        val exchangeFinder = this.exchangeFinder!!
        val codec = exchangeFinder.find(client, chain)
        val result = Exchange(this, eventListener, exchangeFinder, codec)
        this.interceptorScopedExchange = result
        this.exchange = result
        ...
        return result
      }
      

      分析可知每次 Exchange 都是一个新的对象,但是 codec 是可以复用,exchangeFinder对象 是在 RetryAndFollowUpInterceptor 内创建的;
      ExchangeFinder.find -> ExchangeFinder.findHealthyConnection -> ExchangeFinder.findConnection

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      
      private fun findConnection(
        connectTimeout: Int,
        readTimeout: Int,
        writeTimeout: Int,
        pingIntervalMillis: Int,
        connectionRetryEnabled: Boolean
      ): RealConnection {
        ...
        val callConnection = call.connection 
        ...
        if (call.connection != null) {
          check(toClose == null)
          return callConnection
        }
        ...
          
        if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
          val result = call.connection!!
          eventListener.connectionAcquired(call, result)
          return result
        }
        ...
            
        // Connect. Tell the call about the connecting call so async cancels work.
        val newConnection = RealConnection(connectionPool, route)
        call.connectionToCancel = newConnection
        try {
          newConnection.connect(
              connectTimeout,
              readTimeout,
              writeTimeout,
              pingIntervalMillis,
              connectionRetryEnabled,
              call,
              eventListener
          )
        } finally {
          call.connectionToCancel = null
        }
        ...
        synchronized(newConnection) {
          connectionPool.put(newConnection)
          call.acquireConnectionNoEvents(newConnection)
        }
        ...
        return newConnection
      }
      

      从上述代码可知,我一共有3中方式 获取到 Connection :

      1. 复用 Call 中 Connection
      2. 从 connectionPool 中获取
      3. 创建一个新的 connection 并添加到 connectionPool 中
      4. RealConnection.connect 方法内就是创建Socket的部分

      我们主要分析从 connectionPool 中获取

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      
      fun callAcquirePooledConnection(
        address: Address,
        call: RealCall,
        routes: List<Route>?,
        requireMultiplexed: Boolean
      ): Boolean {
        for (connection in connections) {
          synchronized(connection) {
            if (requireMultiplexed && !connection.isMultiplexed) return@synchronized
            if (!connection.isEligible(address, routes)) return@synchronized
            call.acquireConnectionNoEvents(connection)
            return true
          }
        }
        return false
      }
      

      主要就是从 connections 遍历获取,判断 connection 是否支持复用,是否符合复用条件(主要是host)。最后调用 call.acquireConnectionNoEvents(connection) 添加到 Call 中

  • Route 根据代理设置,DNS设置等,决定了本次网络请求所请求对接的主机