NIO客戶端
寫這個程序的初衷是在儒猿的分布式小文件系統(tǒng)所受到的啟發(fā),當時老師設計和我的想法有點出入,我按照自己實現(xiàn)的思路實現(xiàn)了一套邏輯,分布式小文件系統(tǒng)客戶端考慮的要點是高效的傳輸文件,在這個基礎上就要求我們必須維持和DataNode的長連接,減少和DataNode連接損耗,在此,我們就需要在第一個請求建立連接后,將連接緩存在客戶端,方便下次直接使用。
先說說主要的一些集合容器的作用:
- waitingConnectHosts 等待連接Host的緩存
- connections 所有的連接集合,這里主要是方便使用Host獲取到SelectionKey進行感知狀態(tài)變換
- connectState 所有連接狀態(tài)緩存,當一個請求嘗試連接的時候,如果在該集合中發(fā)現(xiàn)連接狀態(tài)是SUCCESS,那么就會直接獲取連接進行文件傳輸
- waitingRequests 等待發(fā)送的請求隊列,當客戶端進行請求提交的時候,請求首先會進入該隊列進行緩存
- toSendRequests 當時機恰當?shù)臅r候,會將waitingRequests的請求拉取緩存到toSendRequests中,請求在該隊列是在客戶端最后一個緩存隊列,之后就是發(fā)送了
- unfinishedResponses 未完成的響應,因為TCP的拆包問題,在一個Read事件中可能無法將一個請求進行完整的解析,這就要求我們將未讀完的請求緩存起來,等待下次Read事件進行追加讀取,完成整個響應解析。
- finishedResponses 已完成響應緩存,在該緩存中存儲是已經(jīng)完成響應但未被客戶端獲取的
- callBackMap 回調(diào)緩存,當傳入回調(diào)函數(shù)的時候會進入其中
核心類是NetworkManager,這個是管理所有連接的管理器,在這個類里面負責請求的解析,緩存,響應解析,回調(diào)函數(shù)調(diào)用
主要的完整流程如下:

image.png
這里主要說下要點:
在這里處理了粘包和拆包的問題,主要定義好該次請求的數(shù)據(jù)長度,例如文件上傳請求,請求ID + 請求類型 + 文件名長度 + 文件長度 + 文件名 + 文件內(nèi)容,如果請求ID為UUID,請求類型為int類型代表,文件名和文件長度都用int代表,那么該次請求總的大小為32 + 4 + 4 + 4 + fileNameLength + fileSize
響應的解析也是按照服務端定義的響應類型處理的
存在一個問題,所有的請求最后實例化為ByteBuffer,在高并發(fā)情況下,內(nèi)存占用是個問題
NetworkManager
package org.zymf.nio.example3.client;
import org.zymf.nio.example3.constant.Constant;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @author zhuyuemufeng
* @version 1.0
* @description: 通訊管理器
* @date 2021-07-20 21:29
*/
public class NetworkManager {
private Selector selector;
private ClientHandleThread moniter;
// 等待建立連接的機器
private ConcurrentLinkedQueue<Host> waitingConnectHosts;
// 所有的連接
private Map<Host, SelectionKey> connections;
// 每個數(shù)據(jù)節(jié)點的連接狀態(tài)
private Map<Host, Integer> connectState;
//等待發(fā)送的請求
private Map<Host, ConcurrentLinkedQueue<NetWorkClientRequest>> waitingRequests;
// 馬上準備要發(fā)送的網(wǎng)絡請求
private Map<Host, ConcurrentLinkedQueue<NetWorkClientRequest>> toSendRequests;
// 未完成解析的響應
private Map<Host, NetWorkClientRespond> unfinishedResponses;
// 已經(jīng)完成請求的響應
private Map<Host, Map<String, NetWorkClientRespond>> finishedResponses;
private Map<String,NetWorkRespondCallBack> callBackMap;
public NetworkManager() {
try {
this.selector = Selector.open();
this.moniter = new ClientHandleThread();
this.waitingConnectHosts = new ConcurrentLinkedQueue<>();
this.connections = new ConcurrentHashMap<>();
this.connectState = new ConcurrentHashMap<>();
this.waitingRequests = new ConcurrentHashMap<>();
toSendRequests = new ConcurrentHashMap<>();
this.finishedResponses = new ConcurrentHashMap<>();
this.unfinishedResponses = new ConcurrentHashMap<>();
this.callBackMap = new ConcurrentHashMap<>();
moniter.start();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* @description: 嘗試連接
* @param: * @param: host
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 8:53
*/
public void tryConnect(Host host) throws Exception {
if (!connectState.containsKey(host) || connectState.get(host) == Constant.FAIL_CONNECT){
waitingConnectHosts.offer(host);
connectState.put(host, Constant.WAITING_CONNECT);
}
}
/**
* @description: 驗證是否完成連接
* @param: * @param: host
* @param: sync 同步等待完成
* @return: int
* @author zhuyuemufeng
* @date: 2021-07-21 8:53
*/
public int finishConnect(Host host,boolean sync) throws Exception {
boolean containsKey = connectState.containsKey(host);
if (!containsKey){
throw new RuntimeException("該連接不存在");
}
int status = connectState.get(host);
if (Constant.WAITING_CONNECT == status && sync){
while (true){
if (Constant.WAITING_CONNECT != connectState.get(host)){
return connectState.get(host);
}
Thread.sleep(200);
}
}
return status;
}
/**
* @description: 發(fā)送請求
* @param: * @param: request
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 8:52
*/
public void sendRequest(NetWorkClientRequest request) {
Host host = request.getHost();
waitingRequests.get(host).offer(request);
}
/**
* @description: 發(fā)送回調(diào)請求
* @param: * @param: request
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 8:52
*/
public void sendCallBackRequest(NetWorkClientRequest request) {
Host host = request.getHost();
waitingRequests.get(host).offer(request);
callBackMap.put(request.getRequestId(),request.getNetWorkRespondCallBack());
}
/**
* @description: 同步返回響應結果,如果還沒返回就進行等待
* @param: * @param: request
* @return: org.zymf.nio.example3.client.NetWorkClientRespond
* @author zhuyuemufeng
* @date: 2021-07-21 8:52
*/
public NetWorkClientRespond waitResponseSync(NetWorkClientRequest request) throws Exception {
Host host = request.getHost();
Map<String, NetWorkClientRespond> respondMap = finishedResponses.get(host);
NetWorkClientRespond respond = null;
while ((respond = respondMap.get(request.getRequestId())) == null){
Thread.sleep(200);
}
return respond;
}
class ClientHandleThread extends Thread {
@Override
public void run() {
while (true) {
//連接注冊,狀態(tài)更新
registerConnect();
//準備請求,改變連接關注事件
prepareSendRequest();
//事件監(jiān)聽
poll();
}
}
/**
* @description: 注冊連接
* @param: * @param:
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 5:33
*/
private void registerConnect() {
Host host = null;
while ((host = waitingConnectHosts.poll()) != null) {
try {
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(new InetSocketAddress(host.getIp(), host.getPort()));
channel.register(selector, SelectionKey.OP_CONNECT);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* @description: 準備請求
* @param: * @param:
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 5:22
*/
private void prepareSendRequest(){
for (Map.Entry<Host, ConcurrentLinkedQueue<NetWorkClientRequest>> node : waitingRequests.entrySet()) {
//該連接已成功連接,并且有請求任務
int count = 0;
if (!node.getValue().isEmpty() && Constant.SUCCESS_CONNECT == connectState.get(node.getKey())){
System.out.println(">>>>>>>>>>>>準備請求");
ConcurrentLinkedQueue<NetWorkClientRequest> value = node.getValue();
ConcurrentLinkedQueue<NetWorkClientRequest> toSend = toSendRequests.get(node.getKey());
NetWorkClientRequest request = null;
while (count < Constant.MAX_SEND_REQUEST_SIZE && (request = value.poll()) != null){
count++;
System.out.println(">>>>>>>>>>>>加入toSendRequests請求池");
toSend.offer(request);
}
if (count != 0){
SelectionKey key = connections.get(node.getKey());
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
}
}
}
/**
* @description: 請求/響應讀寫
* @param: * @param:
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 5:33
*/
private void poll() {
try {
int select = selector.select(Constant.POLL_BLOCK_MAX_TIME);
if (select > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
SocketChannel channel = (SocketChannel) key.channel();
if (key.isConnectable()) {
System.out.println(">>>>>>>>>>>>觸發(fā)Connect操作");
finishConnect(key,channel);
}
if (key.isWritable()){
System.out.println(">>>>>>>>>>>>觸發(fā)Write操作");
sendRequest(key,channel);
}
if (key.isReadable()){
System.out.println(">>>>>>>>>>>>觸發(fā)Read操作");
readResponse(key, channel);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* @description: 完成連接
* @param: * @param: key
* @param: channel
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 5:11
*/
private void finishConnect(SelectionKey key,SocketChannel channel) {
Host host = null;
try {
host = getHostByChannel(channel);
if (channel.finishConnect()) {
System.out.println(host + ">>>>>>>>>>>>完成連接操作");
//修改連接狀態(tài)
connectState.put(host,Constant.SUCCESS_CONNECT);
System.out.println(host + ">>>>>>>>>>>>完成連接狀態(tài)重置");
//初始化請求隊列
waitingRequests.put(host, new ConcurrentLinkedQueue<>());
System.out.println(host + ">>>>>>>>>>>>完成初始化請求隊列");
//初始化發(fā)送請求隊列
toSendRequests.put(host, new ConcurrentLinkedQueue<>());
System.out.println(host + ">>>>>>>>>>>>完成初始化發(fā)送請求隊列");
//初始化響應集合
finishedResponses.put(host,new ConcurrentHashMap<>());
System.out.println(host + ">>>>>>>>>>>>完成初始化響應集合");
//增加連接映射關系
connections.put(host,key);
System.out.println(host + ">>>>>>>>>>>>完成加入連接集合");
}
} catch (Exception e) {
e.printStackTrace();
connectState.put(host, Constant.FAIL_CONNECT);
}
}
/**
* @description: 請求發(fā)送
* @param: * @param: key
* @param: channel
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 5:34
*/
private void sendRequest(SelectionKey key, SocketChannel channel) {
try {
Host host = getHostByChannel(channel);
ConcurrentLinkedQueue<NetWorkClientRequest> netWorkClientRequests = toSendRequests.get(host);
NetWorkClientRequest request = null;
while ((request = netWorkClientRequests.poll()) != null){
ByteBuffer buffer = request.getBuffer();
buffer.flip();
while (buffer.hasRemaining()){
channel.write(buffer);
}
}
}catch (Exception e){
e.printStackTrace();
}
//取消關注寫事件
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
/**
* @description: 響應解析
* @param: * @param: key
* @param: channel
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 5:40
*/
private void readResponse(SelectionKey key, SocketChannel channel) {
try {
Host host = getHostByChannel(channel);
NetWorkClientRespond respond = unfinishedResponses.get(host);
if (respond == null){
respond = new NetWorkClientRespond();
unfinishedResponses.put(host,respond);
}
//讀取請求ID
if (respond.getRequestBuffer().hasRemaining()){
tryBestRead(channel,respond.getRequestBuffer());
if (respond.getRequestBuffer().hasRemaining()){
return;
}
}
if (respond.getRequestId() == null){
respond.getRequestBuffer().flip();
respond.setRequestId(new String(respond.getRequestBuffer().array()));
}
//讀取響應狀態(tài)
if (respond.getStatusBuffer().hasRemaining()){
tryBestRead(channel,respond.getStatusBuffer());
if (respond.getStatusBuffer().hasRemaining()){
return;
}
}
if (respond.getStatus() == null){
respond.getStatusBuffer().flip();
respond.setStatus(respond.getStatusBuffer().getInt(0));
}
//讀取響應長度
if (respond.getContentLengthBuffer().hasRemaining()){
tryBestRead(channel,respond.getContentLengthBuffer());
if (respond.getContentLengthBuffer().hasRemaining()){
return;
}
}
if (respond.getContentLength() == null){
respond.getContentLengthBuffer().flip();
respond.setContentLength(respond.getContentLengthBuffer().getInt(0));
}
//讀取響應內(nèi)容
if (respond.getByteBuffer() == null){
respond.setByteBuffer(ByteBuffer.allocate(respond.getContentLength()));
}
if (respond.getByteBuffer().hasRemaining()){
tryBestRead(channel,respond.getByteBuffer());
}
if (!respond.getByteBuffer().hasRemaining()){
respond.getByteBuffer().flip();
respond.setFinished(true);
unfinishedResponses.remove(host);
if (callBackMap.containsKey(respond.getRequestId())){
callBackMap.get(respond.getRequestId()).process(respond);
}else {
finishedResponses.get(host).put(respond.getRequestId(),respond);
}
}
}catch (Exception e){
e.printStackTrace();
}
}
/**
* @description: 嘗試讀滿數(shù)據(jù)
* @param: * @param: channel
* @param: buffer
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-21 8:51
*/
private void tryBestRead(SocketChannel channel,ByteBuffer buffer) throws Exception {
int count = 0;
while ((count = channel.read(buffer)) > 0){}
}
/**
* @description: 從channel中獲取Host
* @param: * @param: channel
* @return: org.zymf.nio.example3.client.Host
* @author zhuyuemufeng
* @date: 2021-07-21 5:46
*/
private Host getHostByChannel(SocketChannel channel){
InetSocketAddress remoteAddress = null;
Host host = null;
try {
remoteAddress = (InetSocketAddress) channel.getRemoteAddress();
host = new Host(remoteAddress.getHostName(), remoteAddress.getPort());
}catch (Exception e){
throw new RuntimeException(e);
}
return host;
}
}
}
NetWorkClientRequest
package org.zymf.nio.example3.client;
import java.nio.ByteBuffer;
/**
* @author zhuyuemufeng
* @version 1.0
* @description: 客戶端發(fā)送請求
* @date 2021-07-20 20:52
*/
public class NetWorkClientRequest {
private String requestId;
private Host host;
private long sendTime;
private ByteBuffer buffer;
private boolean sync;
private NetWorkRespondCallBack netWorkRespondCallBack;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public Host getHost() {
return host;
}
public void setHost(Host host) {
this.host = host;
}
public long getSendTime() {
return sendTime;
}
public void setSendTime(long sendTime) {
this.sendTime = sendTime;
}
public ByteBuffer getBuffer() {
return buffer;
}
public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
}
public boolean isSync() {
return sync;
}
public void setSync(boolean sync) {
this.sync = sync;
}
public NetWorkRespondCallBack getNetWorkRespondCallBack() {
return netWorkRespondCallBack;
}
public void setNetWorkRespondCallBack(NetWorkRespondCallBack netWorkRespondCallBack) {
this.netWorkRespondCallBack = netWorkRespondCallBack;
}
}
testLoadFileDemo
Long start = System.currentTimeMillis();
NetworkManager manager = new NetworkManager();
List<NetWorkClientRequest> sendRequest = new ArrayList<>();
for (int i = 1; i < 301; i++) {
Host host = new Host("localhost", 7899);
//嘗試連接
manager.tryConnect(host);
//等待連接完成
manager.finishConnect(host, true);
String requestId = UUID.randomUUID().toString().replace("-", "");
ByteBuffer fileUpload = RequestBufferBuilder.createFileUpload(new File("E:\\oss\\netty\\img\\" + i + ".jpg")
, i + ".jpg",
requestId);
NetWorkClientRequest netWorkClientRequest = new NetWorkClientRequest();
netWorkClientRequest.setBuffer(fileUpload);
netWorkClientRequest.setHost(host);
netWorkClientRequest.setRequestId(requestId);
netWorkClientRequest.setSendTime(System.currentTimeMillis());
//發(fā)送請求
manager.sendRequest(netWorkClientRequest);
sendRequest.add(netWorkClientRequest);
}
for (NetWorkClientRequest request : sendRequest) {
//等待響應返回
NetWorkClientRespond respond = manager.waitResponseSync(request);
System.out.println(respond.getStatus());
/* System.out.println(respond.getRequestId());
System.out.println(new String(respond.getByteBuffer().array()));*/
}
long l = System.currentTimeMillis() - start;
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>: " + l);