2020-03-03 從Apache PoolingHttpClientConnectionManager看連接

從Apache PoolingHttpClientConnectionManager看連接

緣由: 微服務(wù),以及多子系統(tǒng)外部服務(wù)調(diào)用的架構(gòu)中,經(jīng)常要借用REST方式訪問,如果每次都握手連接訪問,效率低下。如果沒有dubbo這樣的RPC框架,又想提高HTTP訪問效率,那么HTTP連接池就是比較合適的方式,這樣的實現(xiàn)基于keep-alive機制,雖然這樣的長連接沒有心跳的支持,并且受限于服務(wù)提供方的keep-alive時長,但對于連續(xù)頻繁的服務(wù)訪問,對比于每次都建立tcp連接,這樣的連接復(fù)用方式還是相對高效的。

關(guān)閉TCP連接需要雙方互相揮手完成, 經(jīng)常出現(xiàn)的CLOSE_WAIT,F(xiàn)IN_WAIT,TIME_WAIT狀態(tài)就是揮手未結(jié)束的中間狀態(tài)。

PoolingHttpClientConnectionManager管理的是客戶端的http連接池。 服務(wù)端主動關(guān)閉連接時,F(xiàn)IN到client, client 本地的TCP協(xié)議棧收到FIN并ACK,但是上層應(yīng)用程序只有在Read呈現(xiàn)-1時,才回知道server不再發(fā)送數(shù)據(jù)而主動關(guān)閉了連接, 然后client調(diào)用close關(guān)閉連接。

連接池

PoolingHttpClientConnectionManager中的連接放回連接池和創(chuàng)建連接方法: releaseConnection connect

釋放連接的時機: //CloseableHttpResponse; InputStream in = response.getEntity().getContent();

中的in實際上是 org.apache.http.client.entity.LazyDecompressingInputStream

其包裝了org.apache.http.conn.EofSensorInputStream 其中有watcher org.apache.http.conn.EofSensorWatcher

//class EofSensorInputStream

@Override

? ? public void close() throws IOException {

? ? ? ? // tolerate multiple calls to close()

? ? ? ? selfClosed = true;

? ? ? ? checkClose();

? ? }

protected void checkClose() throws IOException {

if (wrappedStream != null) {

? ? try {

? ? ? ? boolean scws = true; // should close wrapped stream?

? ? ? ? if (eofWatcher != null) {

? ? ? ? ? ? scws = eofWatcher.streamClosed(wrappedStream);? //call releaseConnection finally

? ? ? ? }

? ? ? ? if (scws) {

? ? ? ? ? ? wrappedStream.close();

? ? ? ? }

? ? } finally {

? ? ? ? wrappedStream = null;

? ? }

}

}

附,基于httpClient4.5.2,httpCore4.4.4的HttpClient工具類

public? class HttpClientUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientUtil.class);

private static CloseableHttpClient httpclient = createHttpClientDefault();

private static final String DEFAULT_CHARSET_UTF8 = "UTF-8";

private static final String DEFAULT_CONTENT_TYPE_JSON = "application/json";

private static final int MAX_TIMEOUT = 10000;

private static final int MAX_RETRY_TIMES = 10;

private static final int MAX_THREAD_TOTAL = 50;

/**

* 發(fā)送http post請求

* @param action

* @param bodyParam

* @return

* @throws Exception

*/

public static? String post(String action, Object bodyParam) throws Exception{

return post(action, null, bodyParam, null, null);

}

/**

* 發(fā)送http post請求

* @param action

* @param bodyParam

* @return

* @throws Exception

*/

public static String post(String action, Map<String, String> headerParam, Object bodyParam) throws Exception{

return post(action, headerParam, bodyParam, null, null);

}

/**

* 發(fā)送http post請求

*

* @param action

* @return

* @throws Exception

* @throws UnsupportedEncodingException

*/

public static String post(String action, Map<String, String> headerParam, Object bodyParam, String contentType, String charSet) throws Exception{

String content_type = contentType;

if (content_type == null || "".equals(content_type)) content_type = DEFAULT_CONTENT_TYPE_JSON;

String char_set = charSet;

if (char_set == null || "".equals(char_set)) char_set = DEFAULT_CHARSET_UTF8;

String url = action;

LOGGER.info("Post請求地址:" + url);

HttpPost httpPost = new HttpPost(url);

//header參數(shù)

if (headerParam != null && headerParam.size() >0){

LOGGER.info("Post請求Header:" + JSON.toJSONString(headerParam));

for (String key : headerParam.keySet()){

httpPost.addHeader(key, headerParam.get(key));

}

}

//entity數(shù)據(jù)

if (bodyParam != null ) {

LOGGER.info("Post請求Body:" + JSON.toJSONString(bodyParam));

StringEntity entity = new StringEntity(JSON.toJSONString(bodyParam),char_set);

entity.setContentEncoding(char_set);

entity.setContentType(content_type);

httpPost.setEntity(entity);

}

String resultStr = "";

CloseableHttpResponse response = null;

try {

response = httpclient.execute(httpPost);

processCertainStatus(response.getStatusLine().getStatusCode());

resultStr = EntityUtils.toString(response.getEntity(), "utf-8");

httpPost.reset();

} catch (IOException e) {

LOGGER.error("execute http get connection", e);

} finally {

try {

if(response != null)

response.close();

} catch (IOException e) {

LOGGER.error("close http get connection", e);

}

}

LOGGER.info("Post請求返回:" + resultStr);

return resultStr;

}

/**

* 發(fā)送http get請求

* @param action

* @return

* @throws Exception

*/

public static String get(String action) throws Exception{

String url =? action;

LOGGER.info("Get請求地址:" + url);

HttpGet httpGet = new HttpGet(url);

httpGet.addHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8");

CloseableHttpResponse response = null;

String resultStr = "";

try {

response = httpclient.execute(httpGet);

processCertainStatus(response.getStatusLine().getStatusCode());

resultStr = EntityUtils.toString(response.getEntity(),"utf-8");

httpGet.reset();

} catch (IOException e) {

LOGGER.error("execute http get connection", e);

} finally {

try {

if(response != null)

response.close();

} catch (IOException e) {

LOGGER.error("close http get connection", e);

}

}

LOGGER.info("Get請求返回:" + resultStr);

return resultStr;

}

private static? void processCertainStatus(int statusCode){

if(statusCode == 401){

throw new TokenInvalidException("401 token invalid!");

}

}

/**

* 發(fā)送http get請求

* @param action

* @return

* @throws Exception

*/

public static String get(String action, Map<String,String> params) throws Exception{

URIBuilder uriBuilder = new URIBuilder();

uriBuilder.setPath(action);

if (params != null){

for (String key: params.keySet()){

uriBuilder.setParameter(key, params.get(key));

}

}

return get(uriBuilder.build().toString());

}

/**

* 創(chuàng)建httpclient

* @return

*/

private static synchronized CloseableHttpClient createHttpClientDefault() {

CloseableHttpClient httpclient = null;

try {

SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null,

new TrustStrategy() {

public boolean isTrusted(

java.security.cert.X509Certificate[] chain,

String authType)

throws java.security.cert.CertificateException {

return true;

}

}).build();

? SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(sslContext, new NoopHostnameVerifier());

? ConnectionSocketFactory psf = PlainConnectionSocketFactory.getSocketFactory();?


? Registry<ConnectionSocketFactory> registryBuilder = RegistryBuilder.<ConnectionSocketFactory>create()

? ? ? ? ? ? ? ? .register("https", sslsf)

? ? ? ? ? ? ? ? .register("http", psf)

? ? ? ? ? ? ? ? .build();

RequestConfig config = RequestConfig.custom()

? .setSocketTimeout(MAX_TIMEOUT)

? .setConnectTimeout(MAX_TIMEOUT)

? .setConnectionRequestTimeout(MAX_TIMEOUT)

? .build();

//超時重試處理

HttpRequestRetryHandler retryHandler = new DefaultHttpRequestRetryHandler(MAX_RETRY_TIMES, true);

//連接管理池

PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registryBuilder);

cm.setValidateAfterInactivity(60000);

cm.setMaxTotal(MAX_THREAD_TOTAL);

cm.setDefaultMaxPerRoute(MAX_THREAD_TOTAL);

httpclient = HttpClients.custom().setConnectionManager(cm).setDefaultRequestConfig(config).setRetryHandler(retryHandler).build();

} catch (KeyManagementException e) {

LOGGER.error("KeyManagementException", e);

} catch (NoSuchAlgorithmException e) {

LOGGER.error("NoSuchAlgorithmException", e);

} catch (KeyStoreException e) {

LOGGER.error("KeyStoreException", e);

}

return httpclient;

}

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容