從Apache PoolingHttpClientConnectionManager看連接
緣由: 微服務(wù),以及多子系統(tǒng)外部服務(wù)調(diào)用的架構(gòu)中,經(jīng)常要借用REST方式訪問,如果每次都握手連接訪問,效率低下。如果沒有dubbo這樣的RPC框架,又想提高HTTP訪問效率,那么HTTP連接池就是比較合適的方式,這樣的實現(xiàn)基于keep-alive機制,雖然這樣的長連接沒有心跳的支持,并且受限于服務(wù)提供方的keep-alive時長,但對于連續(xù)頻繁的服務(wù)訪問,對比于每次都建立tcp連接,這樣的連接復(fù)用方式還是相對高效的。
關(guān)閉TCP連接需要雙方互相揮手完成, 經(jīng)常出現(xiàn)的CLOSE_WAIT,F(xiàn)IN_WAIT,TIME_WAIT狀態(tài)就是揮手未結(jié)束的中間狀態(tài)。
PoolingHttpClientConnectionManager管理的是客戶端的http連接池。 服務(wù)端主動關(guān)閉連接時,F(xiàn)IN到client, client 本地的TCP協(xié)議棧收到FIN并ACK,但是上層應(yīng)用程序只有在Read呈現(xiàn)-1時,才回知道server不再發(fā)送數(shù)據(jù)而主動關(guān)閉了連接, 然后client調(diào)用close關(guān)閉連接。
連接池
PoolingHttpClientConnectionManager中的連接放回連接池和創(chuàng)建連接方法: releaseConnection connect
釋放連接的時機: //CloseableHttpResponse; InputStream in = response.getEntity().getContent();
中的in實際上是 org.apache.http.client.entity.LazyDecompressingInputStream
其包裝了org.apache.http.conn.EofSensorInputStream 其中有watcher org.apache.http.conn.EofSensorWatcher
//class EofSensorInputStream
@Override
? ? public void close() throws IOException {
? ? ? ? // tolerate multiple calls to close()
? ? ? ? selfClosed = true;
? ? ? ? checkClose();
? ? }
protected void checkClose() throws IOException {
if (wrappedStream != null) {
? ? try {
? ? ? ? boolean scws = true; // should close wrapped stream?
? ? ? ? if (eofWatcher != null) {
? ? ? ? ? ? scws = eofWatcher.streamClosed(wrappedStream);? //call releaseConnection finally
? ? ? ? }
? ? ? ? if (scws) {
? ? ? ? ? ? wrappedStream.close();
? ? ? ? }
? ? } finally {
? ? ? ? wrappedStream = null;
? ? }
}
}
附,基于httpClient4.5.2,httpCore4.4.4的HttpClient工具類
public? class HttpClientUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientUtil.class);
private static CloseableHttpClient httpclient = createHttpClientDefault();
private static final String DEFAULT_CHARSET_UTF8 = "UTF-8";
private static final String DEFAULT_CONTENT_TYPE_JSON = "application/json";
private static final int MAX_TIMEOUT = 10000;
private static final int MAX_RETRY_TIMES = 10;
private static final int MAX_THREAD_TOTAL = 50;
/**
* 發(fā)送http post請求
* @param action
* @param bodyParam
* @return
* @throws Exception
*/
public static? String post(String action, Object bodyParam) throws Exception{
return post(action, null, bodyParam, null, null);
}
/**
* 發(fā)送http post請求
* @param action
* @param bodyParam
* @return
* @throws Exception
*/
public static String post(String action, Map<String, String> headerParam, Object bodyParam) throws Exception{
return post(action, headerParam, bodyParam, null, null);
}
/**
* 發(fā)送http post請求
*
* @param action
* @return
* @throws Exception
* @throws UnsupportedEncodingException
*/
public static String post(String action, Map<String, String> headerParam, Object bodyParam, String contentType, String charSet) throws Exception{
String content_type = contentType;
if (content_type == null || "".equals(content_type)) content_type = DEFAULT_CONTENT_TYPE_JSON;
String char_set = charSet;
if (char_set == null || "".equals(char_set)) char_set = DEFAULT_CHARSET_UTF8;
String url = action;
LOGGER.info("Post請求地址:" + url);
HttpPost httpPost = new HttpPost(url);
//header參數(shù)
if (headerParam != null && headerParam.size() >0){
LOGGER.info("Post請求Header:" + JSON.toJSONString(headerParam));
for (String key : headerParam.keySet()){
httpPost.addHeader(key, headerParam.get(key));
}
}
//entity數(shù)據(jù)
if (bodyParam != null ) {
LOGGER.info("Post請求Body:" + JSON.toJSONString(bodyParam));
StringEntity entity = new StringEntity(JSON.toJSONString(bodyParam),char_set);
entity.setContentEncoding(char_set);
entity.setContentType(content_type);
httpPost.setEntity(entity);
}
String resultStr = "";
CloseableHttpResponse response = null;
try {
response = httpclient.execute(httpPost);
processCertainStatus(response.getStatusLine().getStatusCode());
resultStr = EntityUtils.toString(response.getEntity(), "utf-8");
httpPost.reset();
} catch (IOException e) {
LOGGER.error("execute http get connection", e);
} finally {
try {
if(response != null)
response.close();
} catch (IOException e) {
LOGGER.error("close http get connection", e);
}
}
LOGGER.info("Post請求返回:" + resultStr);
return resultStr;
}
/**
* 發(fā)送http get請求
* @param action
* @return
* @throws Exception
*/
public static String get(String action) throws Exception{
String url =? action;
LOGGER.info("Get請求地址:" + url);
HttpGet httpGet = new HttpGet(url);
httpGet.addHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8");
CloseableHttpResponse response = null;
String resultStr = "";
try {
response = httpclient.execute(httpGet);
processCertainStatus(response.getStatusLine().getStatusCode());
resultStr = EntityUtils.toString(response.getEntity(),"utf-8");
httpGet.reset();
} catch (IOException e) {
LOGGER.error("execute http get connection", e);
} finally {
try {
if(response != null)
response.close();
} catch (IOException e) {
LOGGER.error("close http get connection", e);
}
}
LOGGER.info("Get請求返回:" + resultStr);
return resultStr;
}
private static? void processCertainStatus(int statusCode){
if(statusCode == 401){
throw new TokenInvalidException("401 token invalid!");
}
}
/**
* 發(fā)送http get請求
* @param action
* @return
* @throws Exception
*/
public static String get(String action, Map<String,String> params) throws Exception{
URIBuilder uriBuilder = new URIBuilder();
uriBuilder.setPath(action);
if (params != null){
for (String key: params.keySet()){
uriBuilder.setParameter(key, params.get(key));
}
}
return get(uriBuilder.build().toString());
}
/**
* 創(chuàng)建httpclient
* @return
*/
private static synchronized CloseableHttpClient createHttpClientDefault() {
CloseableHttpClient httpclient = null;
try {
SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null,
new TrustStrategy() {
public boolean isTrusted(
java.security.cert.X509Certificate[] chain,
String authType)
throws java.security.cert.CertificateException {
return true;
}
}).build();
? SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, new NoopHostnameVerifier());
? ConnectionSocketFactory psf = PlainConnectionSocketFactory.getSocketFactory();?
? Registry<ConnectionSocketFactory> registryBuilder = RegistryBuilder.<ConnectionSocketFactory>create()
? ? ? ? ? ? ? ? .register("https", sslsf)
? ? ? ? ? ? ? ? .register("http", psf)
? ? ? ? ? ? ? ? .build();
RequestConfig config = RequestConfig.custom()
? .setSocketTimeout(MAX_TIMEOUT)
? .setConnectTimeout(MAX_TIMEOUT)
? .setConnectionRequestTimeout(MAX_TIMEOUT)
? .build();
//超時重試處理
HttpRequestRetryHandler retryHandler = new DefaultHttpRequestRetryHandler(MAX_RETRY_TIMES, true);
//連接管理池
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registryBuilder);
cm.setValidateAfterInactivity(60000);
cm.setMaxTotal(MAX_THREAD_TOTAL);
cm.setDefaultMaxPerRoute(MAX_THREAD_TOTAL);
httpclient = HttpClients.custom().setConnectionManager(cm).setDefaultRequestConfig(config).setRetryHandler(retryHandler).build();
} catch (KeyManagementException e) {
LOGGER.error("KeyManagementException", e);
} catch (NoSuchAlgorithmException e) {
LOGGER.error("NoSuchAlgorithmException", e);
} catch (KeyStoreException e) {
LOGGER.error("KeyStoreException", e);
}
return httpclient;
}
}