《从使用到源码:OkHttp3责任链模式剖析》

发布于:2025-09-08 ⋅ 阅读:(15) ⋅ 点赞:(0)

一 从使用开始

0.依赖引入

implementation ("com.squareup.okhttp3:okhttp:3.14.7")

1.创建OkHttpClient实例

  • 方式一:直接使用默认配置的Builder

//从源码可以看出,当我们直接new创建OkHttpClient实例时,会默认给我们配置好一个Builder
OkHttpClient okHttpClient = new OkHttpClient();
​
//源码
public OkHttpClient() {
    this(new Builder());
  }
    public Builder() {
      dispatcher = new Dispatcher();
      protocols = DEFAULT_PROTOCOLS;
      connectionSpecs = DEFAULT_CONNECTION_SPECS;
      eventListenerFactory = EventListener.factory(EventListener.NONE);
      proxySelector = ProxySelector.getDefault();
      if (proxySelector == null) {
        proxySelector = new NullProxySelector();
      }
      cookieJar = CookieJar.NO_COOKIES;
      socketFactory = SocketFactory.getDefault();
      hostnameVerifier = OkHostnameVerifier.INSTANCE;
      certificatePinner = CertificatePinner.DEFAULT;
      proxyAuthenticator = Authenticator.NONE;
      authenticator = Authenticator.NONE;
      connectionPool = new ConnectionPool();
      dns = Dns.SYSTEM;
      followSslRedirects = true;
      followRedirects = true;
      retryOnConnectionFailure = true;
      callTimeout = 0;
      connectTimeout = 10_000;
      readTimeout = 10_000;
      writeTimeout = 10_000;
      pingInterval = 0;
    }
  • 方式二:自定义配置Builder

OkHttpClient client = new OkHttpClient().newBuilder()
                                    .addInterceptor(new MyCookieInterceptor())
                                    .addNetworkInterceptor(new MyNetWorkInterceptor())
                                    .readTimeout(3000, TimeUnit.SECONDS)
                                    .build();

2.创建Request实例

Request request = new Request.Builder().url("https://www.xxx.com").build();

3.创建Call请求,发起同步或异步网络请求

 //同步请求
 Response execute = client.newCall(request).execute();
 //异步请求
client.newCall(request).enqueue(new Callback() {
                @Override
                public void onFailure(Call call, IOException e) {
                    //请求响应失败回调
                }
​
                @Override
                public void onResponse(Call call, Response response) throws IOException {
                    //请求响应成功回调
                }
   });

二 责任链的构建剖析

1.责任链模式介绍

什么是责任链模式?

一个请求沿着一条“链”传递,直到该“链”上的某个处理者处理它为止。

OkHttp中的责任链是如何体现的?

将一个网络请求Request沿着一条由多个拦截器组成的“链”顺序向后传递,若拦截器无法完全处理请求则将请求继续向后传递,直到某个拦截器完全处理它并返回Respose结果(不一定是成功的),再依据顺序层层向前传递。

2.OkHttp中的责任链模式剖析

前面我们已经了解了OkHttp的使用,接下来我们就从OkHttp进行同步请求的代码入手,来剖析其责任链在怎么构建起来的。

//同步请求
 Response execute = client.newCall(request).execute();

同步请求部分的代码如上,ctrl+鼠标左键点击newCall进入OkHttp的源码,向下追溯,最终发现调用到了RealCall的静态方法,将OkHttpClient对象和Request对象作为参数传递给了RealCall构造方法,构建了一个RealCall对象并返回。

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
  // Safely publish the Call instance to the EventListener.
  RealCall call = new RealCall(client, originalRequest, forWebSocket);
  call.transmitter = new Transmitter(client, call);
  return call;
}

再回到同步请求的代码

//同步请求
 Response execute = client.newCall(request).execute();

这次我们ctrl+鼠标左键点击execute查看同步请求的代码逻辑,此时发现走到了Call接口,execute()方法没有方法体,没关系,我们在Call接口中ctrl+h,就可以看到Call接口的继承树啦

//Call
Response execute() throws IOException;

而后你就会发现Call下只有一个子类也就是我们刚刚看过的RealCall,在RealCall中搜索execute()方法,就可以看到实际执行的到的execute()方法如下

@Override public Response execute() throws IOException {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  transmitter.timeoutEnter();
  transmitter.callStart();
  try {
    client.dispatcher().executed(this);
    return getResponseWithInterceptorChain();
  } finally {
    client.dispatcher().finished(this);
  }
}

此方法的作用主要有三个:(transmitter我们暂不关注,它的作用主要是管理Call的生命周期,关心的宝子自行了解哈)

  • 检查同步请求是否已经被执行过

  • client.dispatcher().executed(this),将同步请求存入ArrayDeque中,以实现发起多个同步请求时,按顺序进行同步请求

  • getResponseWithInterceptorChain(),创建责任链并执行,获取同步请求返回结果

我们进入client.dispatcher().executed(this)的executed(this)方法,可以看到如下代码:

/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
​
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
  runningSyncCalls.add(call);
}

可以看到在同步请求中,此方法的作用仅为记录同步请求

接下来我们进入getResponseWithInterceptorChain()方法,OkHttp责任链构建和开启就开始于这个方法:

//RealCall.class
​
Response getResponseWithInterceptorChain() throws IOException {
  // Build a full stack of interceptors.
  //创建一个拦截器集合
  List<Interceptor> interceptors = new ArrayList<>();
  /**将用户即我们自己定义的拦截器加入到此集合中,即我们刚刚在自定义配置Builder中调用.addInterceptor(new         MyCookieInterceptor())传递进来的拦截器
  **/
  interceptors.addAll(client.interceptors());
  //加入一些默认的内置拦截器
  //负责失败重试以及重定向的拦截器
  interceptors.add(new RetryAndFollowUpInterceptor(client));
  //负责把用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转换为用户友好的响应的拦截器
  interceptors.add(new BridgeInterceptor(client.cookieJar()));
  //负责读取缓存直接返回、更新缓存的拦截器
  interceptors.add(new CacheInterceptor(client.internalCache()));
  //负责和服务器建立连接的拦截器
  interceptors.add(new ConnectInterceptor(client));
  //判断当前请求是 WebSocket 握手请求 还是HTTP或HTTPS请求
  if (!forWebSocket) {
    /**
    是HTTP或HTTPS请求才添加用户在自定义配置Builder中调用addNetworkInterceptor(new MyNetWorkInterceptor())配置进来的网络拦截器,因为网络拦截器是专门为处理 HTTP 网络请求而生的,为WebSocket类型的请求设置此拦截器是无意义的,甚至可能因拦截 HTTP 帧而干扰 WebSocket 协议的正常交互
    **/
    interceptors.addAll(client.networkInterceptors());
  }
  //负责向服务器发送请求数据、从服务器读取响应数据的拦截器(此为最后一个执行的拦截器)
  interceptors.add(new CallServerInterceptor(forWebSocket));
  
  /**
  构建责任链chain,将我们刚刚的拦截器集合interceptors,原始的请求originalRequest(也就是我们创建的Request对象),此类自身的  实例传入(此处我们只关心这三个参数就好啦)
  **/
  Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
      originalRequest, this, client.connectTimeoutMillis(),
      client.readTimeoutMillis(), client.writeTimeoutMillis());
  boolean calledNoMoreExchanges = false;
  try {
    //调用proceed()方法将原始请求传递进去开启责任链
    Response response = chain.proceed(originalRequest);
    if (transmitter.isCanceled()) {
      closeQuietly(response);
      throw new IOException("Canceled");
    }
    return response;
  } catch (IOException e) {
    calledNoMoreExchanges = true;
    throw transmitter.noMoreExchanges(e);
  } finally {
    if (!calledNoMoreExchanges) {
      transmitter.noMoreExchanges(null);
    }
  }
}

上面我们分析了RealCall的getResponseWithInterceptorChain()方法,最后走到了Interceptor.Chain的proceed()方法,让我们来分析这部分源码吧

ctrl+鼠标左键点击proceed()方法,可以查看到如下源码:

/**
 * Observes, modifies, and potentially short-circuits requests going out and the corresponding
 * responses coming back in. Typically interceptors add, remove, or transform headers on the request
 * or response.
 */
public interface Interceptor {
  Response intercept(Chain chain) throws IOException;
​
  interface Chain {
    Request request();
​
    Response proceed(Request request) throws IOException;
​
    /**
     * Returns the connection the request will be executed on. This is only available in the chains
     * of network interceptors; for application interceptors this is always null.
     */
    @Nullable Connection connection();
​
    Call call();
​
    int connectTimeoutMillis();
​
    Chain withConnectTimeout(int timeout, TimeUnit unit);
​
    int readTimeoutMillis();
​
    Chain withReadTimeout(int timeout, TimeUnit unit);
​
    int writeTimeoutMillis();
​
    Chain withWriteTimeout(int timeout, TimeUnit unit);
  }
}

可以看到,proceed()方法在Interceptor接口中的Chain接口中,没有具体实现,所以我们依旧在Chain接口中ctrl+h,查看Chain接口的继承树,而后我们就又可以发现,它又是只有一个子类 RealInterceptorChain,那proceed()方法的具体实现就必然在 RealInterceptorChain中啦,如下:

//RealInterceptorChain.class
//此方法负责驱动拦截器链的执行(OkHttp 的拦截器链是核心设计,用于处理请求、响应的各种中间逻辑,如重试、缓存、网络连接等)
//每个拦截器(除最后一个)处理完毕后(但未完全处理完毕)都会回调回这个方法,在此方法中将请求交由下一个拦截器处理
@Override public Response proceed(Request request) throws IOException {
  //可以看到这里调用了RealInterceptorChain类中自己实现的同名proceed方法
  return proceed(request, transmitter, exchange);
}
​
//也就是说,这里就是proceed方法的具体实现了
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
    throws IOException {
//index变量的作用是控制拦截器的顺序执行,OkHttp的责任链就是依靠此变量来实现顺序执行的,index表示拦截器集合interceptors的索引
  //此处进行拦截器索引合法性校验
  if (index >= interceptors.size()) throw new AssertionError();
  /**
  通过记录代码调用执行到此处的次数,以统计当前拦截器链的调用次数,因为我们上面说了,每个拦截器(除最后一个)
  处理完毕后(但未完全处理完毕)都会回调回这个方法,故可以这样来统计
  **/
  calls++;
    
  //验证网络拦截器(Network Interceptor)是否修改了请求的 Host(主机)或 Port(端口)
  /**
  因为网络拦截器的职责是在网络请求建立之前和之后处理逻辑(例如,处理重定向、重试、缓存等)。一个已经建立的 TCP 连接是和特定的主机与端口绑定的。如果拦截器修改了这些信息,那么这个连接就无法被重用,这会导致不必要的连接开销,并且破坏了拦截器链的逻辑完整性。
  **/
  // If we already have a stream, confirm that the incoming request will use it.
  if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
    throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
        + " must retain the same host and port");
  }
    
  //确保每个网络拦截器只调用一次 chain.proceed() 方法
  // If we already have a stream, confirm that this is the only call to chain.proceed().
  if (this.exchange != null && calls > 1) {
    throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
        + " must call proceed() exactly once");
  }
  //创建下一个(index + 1)拦截器 链对象
  // Call the next interceptor in the chain.
  RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
      index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
  //通过index获取到当前拦截器对象
  Interceptor interceptor = interceptors.get(index);
  /**
    用当前拦截器对象调用intercept(next),将下一个拦截器 链对象传递进去,此方法为每个拦截器都有的,拦截器对请求的处理都在这个方法中进行,好了接下来就可以跳到下面看这个方法的源码了
  **/
  Response response = interceptor.intercept(next);
​
  // Confirm that the next interceptor made its required call to chain.proceed().
  if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
    throw new IllegalStateException("network interceptor " + interceptor
        + " must call proceed() exactly once");
  }
​
  // Confirm that the intercepted response isn't null.
  if (response == null) {
    throw new NullPointerException("interceptor " + interceptor + " returned null");
  }
​
  if (response.body() == null) {
    throw new IllegalStateException(
        "interceptor " + interceptor + " returned a response with no body");
  }
​
  return response;
}

下面,我们以BridgeInterceptor拦截器中的intercept方法为例,来捋一下intercept方法的执行流程

//BridgeInterceptor.class  
​
@Override public Response intercept(Chain chain) throws IOException {
    //通过拦截器链对象获取到请求对象
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();
    /**
    下面就是,对请求对象进行此拦截器的负责的一系列请求前处理,由于本章我们只关注责任链模式的整体构建,故下面的拦截器处理逻辑,我们暂不做过多的解析
    **/
    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }
​
      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }
​
    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }
​
    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive");
    }
​
    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }
​
    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies));
    }
​
    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", Version.userAgent());
    }
    //到此处,此拦截器负责的,请求前的处理已经完成
    /**
    继续通过传进来的责任链对象chain调用proceed()方法,将处理后的请求传回此方法,
    如此形成一个链路使请求能够通过责任链上的拦截器能有条不紊的向后执行:除最后一个拦截器的每个拦截器在其intercept(Chain chain)方法中进行请求的前置处理完毕后都会将处理完的请求传回到RealInterceptorChain的proceed()方法,在proceed()方法中通过index来获取到下一个要执行的拦截器对象,调用到对应的intercept(Chain chain)方法,直到到调用到最后一个拦截器对象的intercept(Chain chain)方法时,也就是CallServerInterceptor的intercept(Chain chain)方法,才会真正发起网络请求,并最终返回一个Response对象,而后返回其调用处,一层层回调回去,直至最初的调用的proceed()方法。
    **/
    Response networkResponse = chain.proceed(requestBuilder.build());
​
    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
​
    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);
​
    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }
​
    return responseBuilder.build();
  }

接下来我们来看最后一个拦截器的intercept(Chain chain)方法逻辑

//CallServerInterceptor.class  
@Override public Response intercept(Chain chain) throws IOException {
    //与其他拦截器一样通过拦截器链对象获取到请求对象
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Exchange exchange = realChain.exchange();
    Request request = realChain.request();
​
    long sentRequestMillis = System.currentTimeMillis();
​
    exchange.writeRequestHeaders(request);
    //而后就是进行一些前置处理后直接发起网络请求了
    boolean responseHeadersStarted = false;
    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        exchange.flushRequest();
        responseHeadersStarted = true;
        exchange.responseHeadersStart();
        responseBuilder = exchange.readResponseHeaders(true);
      }
​
      if (responseBuilder == null) {
        if (request.body().isDuplex()) {
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest();
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, true));
          request.body().writeTo(bufferedRequestBody);
        } else {
          // Write the request body if the "Expect: 100-continue" expectation was met.
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, false));
          request.body().writeTo(bufferedRequestBody);
          bufferedRequestBody.close();
        }
      } else {
        exchange.noRequestBody();
        if (!exchange.connection().isMultiplexed()) {
          // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
          // from being reused. Otherwise we're still obligated to transmit the request body to
          // leave the connection in a consistent state.
          exchange.noNewExchangesOnConnection();
        }
      }
    } else {
      exchange.noRequestBody();
    }
​
    if (request.body() == null || !request.body().isDuplex()) {
      exchange.finishRequest();
    }
​
    if (!responseHeadersStarted) {
      exchange.responseHeadersStart();
    }
​
    if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(false);
    }
​
    Response response = responseBuilder
        .request(request)
        .handshake(exchange.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
​
    int code = response.code();
    if (code == 100) {
      // server sent a 100-continue even though we did not request one.
      // try again to read the actual response
      response = exchange.readResponseHeaders(false)
          .request(request)
          .handshake(exchange.connection().handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
​
      code = response.code();
    }
​
    exchange.responseHeadersEnd(response);
​
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build();
    }
​
    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      exchange.noNewExchangesOnConnection();
    }
​
    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }
    //网络请求完毕将处理完毕的结果返回
    return response;
  }

在经过最后一个拦截器的intercept(Chain chain)方法后,就会一层层将结果返回,直到回到最初的proceed()方法了,至此OkHttp的责任链模式就分析完毕啦~


网站公告

今日签到

点亮在社区的每一天
去签到