NIO學習_Client

NIO客戶端

寫這個程序的初衷是在儒猿的分布式小文件系統(tǒng)所受到的啟發(fā),當時老師設計和我的想法有點出入,我按照自己實現(xiàn)的思路實現(xiàn)了一套邏輯,分布式小文件系統(tǒng)客戶端考慮的要點是高效的傳輸文件,在這個基礎上就要求我們必須維持和DataNode的長連接,減少和DataNode連接損耗,在此,我們就需要在第一個請求建立連接后,將連接緩存在客戶端,方便下次直接使用。

先說說主要的一些集合容器的作用:

  • waitingConnectHosts 等待連接Host的緩存
  • connections 所有的連接集合,這里主要是方便使用Host獲取到SelectionKey進行感知狀態(tài)變換
  • connectState 所有連接狀態(tài)緩存,當一個請求嘗試連接的時候,如果在該集合中發(fā)現(xiàn)連接狀態(tài)是SUCCESS,那么就會直接獲取連接進行文件傳輸
  • waitingRequests 等待發(fā)送的請求隊列,當客戶端進行請求提交的時候,請求首先會進入該隊列進行緩存
  • toSendRequests 當時機恰當?shù)臅r候,會將waitingRequests的請求拉取緩存到toSendRequests中,請求在該隊列是在客戶端最后一個緩存隊列,之后就是發(fā)送了
  • unfinishedResponses 未完成的響應,因為TCP的拆包問題,在一個Read事件中可能無法將一個請求進行完整的解析,這就要求我們將未讀完的請求緩存起來,等待下次Read事件進行追加讀取,完成整個響應解析。
  • finishedResponses 已完成響應緩存,在該緩存中存儲是已經(jīng)完成響應但未被客戶端獲取的
  • callBackMap 回調(diào)緩存,當傳入回調(diào)函數(shù)的時候會進入其中

核心類是NetworkManager,這個是管理所有連接的管理器,在這個類里面負責請求的解析,緩存,響應解析,回調(diào)函數(shù)調(diào)用

主要的完整流程如下:


image.png

這里主要說下要點:

  1. 在這里處理了粘包和拆包的問題,主要定義好該次請求的數(shù)據(jù)長度,例如文件上傳請求,請求ID + 請求類型 + 文件名長度 + 文件長度 + 文件名 + 文件內(nèi)容,如果請求ID為UUID,請求類型為int類型代表,文件名和文件長度都用int代表,那么該次請求總的大小為32 + 4 + 4 + 4 + fileNameLength + fileSize

  2. 響應的解析也是按照服務端定義的響應類型處理的

  3. 存在一個問題,所有的請求最后實例化為ByteBuffer,在高并發(fā)情況下,內(nèi)存占用是個問題

NetworkManager
package org.zymf.nio.example3.client;

import org.zymf.nio.example3.constant.Constant;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
 * @author zhuyuemufeng
 * @version 1.0
 * @description: 通訊管理器
 * @date 2021-07-20 21:29
 */
public class NetworkManager {

    private Selector selector;

    private ClientHandleThread moniter;

    // 等待建立連接的機器
    private ConcurrentLinkedQueue<Host> waitingConnectHosts;

    // 所有的連接
    private Map<Host, SelectionKey> connections;

    // 每個數(shù)據(jù)節(jié)點的連接狀態(tài)
    private Map<Host, Integer> connectState;

    //等待發(fā)送的請求
    private Map<Host, ConcurrentLinkedQueue<NetWorkClientRequest>> waitingRequests;

    // 馬上準備要發(fā)送的網(wǎng)絡請求
    private Map<Host,  ConcurrentLinkedQueue<NetWorkClientRequest>> toSendRequests;

    // 未完成解析的響應
    private Map<Host, NetWorkClientRespond> unfinishedResponses;

    // 已經(jīng)完成請求的響應
    private Map<Host, Map<String, NetWorkClientRespond>> finishedResponses;

    private Map<String,NetWorkRespondCallBack> callBackMap;

    public NetworkManager() {
        try {
            this.selector = Selector.open();
            this.moniter = new ClientHandleThread();
            this.waitingConnectHosts = new ConcurrentLinkedQueue<>();
            this.connections = new ConcurrentHashMap<>();
            this.connectState = new ConcurrentHashMap<>();
            this.waitingRequests = new ConcurrentHashMap<>();
            toSendRequests = new ConcurrentHashMap<>();
            this.finishedResponses = new ConcurrentHashMap<>();
            this.unfinishedResponses = new ConcurrentHashMap<>();
            this.callBackMap = new ConcurrentHashMap<>();
            moniter.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /** 
     * @description: 嘗試連接 
     * @param: * @param: host 
     * @return: void 
     * @author zhuyuemufeng
     * @date: 2021-07-21 8:53
     */
    public void tryConnect(Host host) throws Exception {
        if (!connectState.containsKey(host) || connectState.get(host) == Constant.FAIL_CONNECT){
            waitingConnectHosts.offer(host);
            connectState.put(host, Constant.WAITING_CONNECT);
        }
    }

    /** 
     * @description: 驗證是否完成連接 
     * @param: * @param: host
     * @param: sync 同步等待完成
     * @return: int 
     * @author zhuyuemufeng
     * @date: 2021-07-21 8:53
     */
    public int finishConnect(Host host,boolean sync) throws Exception {
        boolean containsKey = connectState.containsKey(host);
        if (!containsKey){
            throw new RuntimeException("該連接不存在");
        }
        int status = connectState.get(host);
        if (Constant.WAITING_CONNECT == status && sync){
            while (true){
                if (Constant.WAITING_CONNECT != connectState.get(host)){
                    return connectState.get(host);
                }
                Thread.sleep(200);
            }
        }
        return status;
    }

    /** 
     * @description: 發(fā)送請求 
     * @param: * @param: request 
     * @return: void 
     * @author zhuyuemufeng
     * @date: 2021-07-21 8:52
     */
    public void sendRequest(NetWorkClientRequest request) {
        Host host = request.getHost();
        waitingRequests.get(host).offer(request);
    }

    /**
     * @description: 發(fā)送回調(diào)請求
     * @param: * @param: request
     * @return: void
     * @author zhuyuemufeng
     * @date: 2021-07-21 8:52
     */
    public void sendCallBackRequest(NetWorkClientRequest request) {
        Host host = request.getHost();
        waitingRequests.get(host).offer(request);
        callBackMap.put(request.getRequestId(),request.getNetWorkRespondCallBack());
    }

    /** 
     * @description: 同步返回響應結果,如果還沒返回就進行等待
     * @param: * @param: request 
     * @return: org.zymf.nio.example3.client.NetWorkClientRespond 
     * @author zhuyuemufeng
     * @date: 2021-07-21 8:52
     */
    public NetWorkClientRespond waitResponseSync(NetWorkClientRequest request) throws Exception {
        Host host = request.getHost();
        Map<String, NetWorkClientRespond> respondMap = finishedResponses.get(host);
        NetWorkClientRespond respond = null;
        while ((respond = respondMap.get(request.getRequestId())) == null){
            Thread.sleep(200);
        }
        return respond;
    }

    class ClientHandleThread extends Thread {

        @Override
        public void run() {
            while (true) {
                //連接注冊,狀態(tài)更新
                registerConnect();
                //準備請求,改變連接關注事件
                prepareSendRequest();
                //事件監(jiān)聽
                poll();
            }
        }

        /**
         * @description: 注冊連接
         * @param: * @param:
         * @return: void
         * @author zhuyuemufeng
         * @date: 2021-07-21 5:33
         */
        private void registerConnect() {
            Host host = null;
            while ((host = waitingConnectHosts.poll()) != null) {
                try {
                    SocketChannel channel = SocketChannel.open();
                    channel.configureBlocking(false);
                    channel.connect(new InetSocketAddress(host.getIp(), host.getPort()));
                    channel.register(selector, SelectionKey.OP_CONNECT);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        /**
         * @description: 準備請求
         * @param: * @param:
         * @return: void
         * @author zhuyuemufeng
         * @date: 2021-07-21 5:22
         */
        private void prepareSendRequest(){
            for (Map.Entry<Host, ConcurrentLinkedQueue<NetWorkClientRequest>> node : waitingRequests.entrySet()) {
                //該連接已成功連接,并且有請求任務
                int count = 0;
                if (!node.getValue().isEmpty() && Constant.SUCCESS_CONNECT == connectState.get(node.getKey())){
                    System.out.println(">>>>>>>>>>>>準備請求");
                    ConcurrentLinkedQueue<NetWorkClientRequest> value = node.getValue();
                    ConcurrentLinkedQueue<NetWorkClientRequest> toSend = toSendRequests.get(node.getKey());
                    NetWorkClientRequest request = null;
                    while (count < Constant.MAX_SEND_REQUEST_SIZE && (request = value.poll()) != null){
                        count++;
                        System.out.println(">>>>>>>>>>>>加入toSendRequests請求池");
                        toSend.offer(request);
                    }
                    if (count != 0){
                        SelectionKey key = connections.get(node.getKey());
                        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                    }
                }
            }
        }

        /**
         * @description: 請求/響應讀寫
         * @param: * @param:
         * @return: void
         * @author zhuyuemufeng
         * @date: 2021-07-21 5:33
         */
        private void poll() {
            try {
                int select = selector.select(Constant.POLL_BLOCK_MAX_TIME);
                if (select > 0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        SocketChannel channel = (SocketChannel) key.channel();
                        if (key.isConnectable()) {
                            System.out.println(">>>>>>>>>>>>觸發(fā)Connect操作");
                            finishConnect(key,channel);
                        }
                        if (key.isWritable()){
                            System.out.println(">>>>>>>>>>>>觸發(fā)Write操作");
                            sendRequest(key,channel);
                        }
                        if (key.isReadable()){
                            System.out.println(">>>>>>>>>>>>觸發(fā)Read操作");
                            readResponse(key, channel);
                        }
                    }
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        /**
         * @description: 完成連接
         * @param: * @param: key
         * @param: channel
         * @return: void
         * @author zhuyuemufeng
         * @date: 2021-07-21 5:11
         */
        private void finishConnect(SelectionKey key,SocketChannel channel) {
            Host host = null;
            try {
                host = getHostByChannel(channel);
                if (channel.finishConnect()) {
                    System.out.println(host + ">>>>>>>>>>>>完成連接操作");
                    //修改連接狀態(tài)
                    connectState.put(host,Constant.SUCCESS_CONNECT);
                    System.out.println(host + ">>>>>>>>>>>>完成連接狀態(tài)重置");
                    //初始化請求隊列
                    waitingRequests.put(host, new ConcurrentLinkedQueue<>());
                    System.out.println(host + ">>>>>>>>>>>>完成初始化請求隊列");
                    //初始化發(fā)送請求隊列
                    toSendRequests.put(host, new ConcurrentLinkedQueue<>());
                    System.out.println(host + ">>>>>>>>>>>>完成初始化發(fā)送請求隊列");
                    //初始化響應集合
                    finishedResponses.put(host,new ConcurrentHashMap<>());
                    System.out.println(host + ">>>>>>>>>>>>完成初始化響應集合");
                    //增加連接映射關系
                    connections.put(host,key);
                    System.out.println(host + ">>>>>>>>>>>>完成加入連接集合");
                }
            } catch (Exception e) {
                e.printStackTrace();
                connectState.put(host, Constant.FAIL_CONNECT);
            }
        }

        /**
         * @description: 請求發(fā)送
         * @param: * @param: key
         * @param: channel
         * @return: void
         * @author zhuyuemufeng
         * @date: 2021-07-21 5:34
         */
        private void sendRequest(SelectionKey key, SocketChannel channel) {
            try {
                Host host = getHostByChannel(channel);
                ConcurrentLinkedQueue<NetWorkClientRequest> netWorkClientRequests = toSendRequests.get(host);
                NetWorkClientRequest request = null;
                while ((request = netWorkClientRequests.poll()) != null){
                    ByteBuffer buffer = request.getBuffer();
                    buffer.flip();
                    while (buffer.hasRemaining()){
                        channel.write(buffer);
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }
            //取消關注寫事件
            key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
        }

        /**
         * @description: 響應解析
         * @param: * @param: key
         * @param: channel
         * @return: void
         * @author zhuyuemufeng
         * @date: 2021-07-21 5:40
         */
        private void readResponse(SelectionKey key, SocketChannel channel) {
            try {
                Host host = getHostByChannel(channel);
                NetWorkClientRespond respond = unfinishedResponses.get(host);
                if (respond == null){
                    respond = new NetWorkClientRespond();
                    unfinishedResponses.put(host,respond);
                }
                //讀取請求ID
                if (respond.getRequestBuffer().hasRemaining()){
                    tryBestRead(channel,respond.getRequestBuffer());
                    if (respond.getRequestBuffer().hasRemaining()){
                        return;
                    }
                }
                if (respond.getRequestId() == null){
                    respond.getRequestBuffer().flip();
                    respond.setRequestId(new String(respond.getRequestBuffer().array()));
                }

                //讀取響應狀態(tài)
                if (respond.getStatusBuffer().hasRemaining()){
                    tryBestRead(channel,respond.getStatusBuffer());
                    if (respond.getStatusBuffer().hasRemaining()){
                        return;
                    }
                }
                if (respond.getStatus() == null){
                    respond.getStatusBuffer().flip();
                    respond.setStatus(respond.getStatusBuffer().getInt(0));
                }

                //讀取響應長度
                if (respond.getContentLengthBuffer().hasRemaining()){
                    tryBestRead(channel,respond.getContentLengthBuffer());
                    if (respond.getContentLengthBuffer().hasRemaining()){
                        return;
                    }
                }
                if (respond.getContentLength() == null){
                    respond.getContentLengthBuffer().flip();
                    respond.setContentLength(respond.getContentLengthBuffer().getInt(0));
                }

                //讀取響應內(nèi)容
                if (respond.getByteBuffer() == null){
                    respond.setByteBuffer(ByteBuffer.allocate(respond.getContentLength()));
                }
                if (respond.getByteBuffer().hasRemaining()){
                    tryBestRead(channel,respond.getByteBuffer());
                }
                if (!respond.getByteBuffer().hasRemaining()){
                    respond.getByteBuffer().flip();
                    respond.setFinished(true);
                    unfinishedResponses.remove(host);
                    if (callBackMap.containsKey(respond.getRequestId())){
                        callBackMap.get(respond.getRequestId()).process(respond);
                    }else {
                        finishedResponses.get(host).put(respond.getRequestId(),respond);
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }

        /** 
         * @description: 嘗試讀滿數(shù)據(jù) 
         * @param: * @param: channel
         * @param: buffer 
         * @return: void 
         * @author zhuyuemufeng
         * @date: 2021-07-21 8:51
         */
        private void tryBestRead(SocketChannel channel,ByteBuffer buffer) throws Exception {
            int count = 0;
            while ((count = channel.read(buffer)) > 0){}
        }

        /**
         * @description: 從channel中獲取Host
         * @param: * @param: channel
         * @return: org.zymf.nio.example3.client.Host
         * @author zhuyuemufeng
         * @date: 2021-07-21 5:46
         */
        private Host getHostByChannel(SocketChannel channel){
            InetSocketAddress remoteAddress = null;
            Host host = null;
            try {
                remoteAddress = (InetSocketAddress) channel.getRemoteAddress();
                host = new Host(remoteAddress.getHostName(), remoteAddress.getPort());
            }catch (Exception e){
                throw new RuntimeException(e);
            }
            return host;
        }
    }
}
NetWorkClientRequest
package org.zymf.nio.example3.client;

import java.nio.ByteBuffer;

/**
 * @author zhuyuemufeng
 * @version 1.0
 * @description: 客戶端發(fā)送請求
 * @date 2021-07-20 20:52
 */
public class NetWorkClientRequest {

    private String requestId;

    private Host host;

    private long sendTime;

    private ByteBuffer buffer;

    private boolean sync;

    private NetWorkRespondCallBack netWorkRespondCallBack;

    public String getRequestId() {
        return requestId;
    }

    public void setRequestId(String requestId) {
        this.requestId = requestId;
    }

    public Host getHost() {
        return host;
    }

    public void setHost(Host host) {
        this.host = host;
    }

    public long getSendTime() {
        return sendTime;
    }

    public void setSendTime(long sendTime) {
        this.sendTime = sendTime;
    }

    public ByteBuffer getBuffer() {
        return buffer;
    }

    public void setBuffer(ByteBuffer buffer) {
        this.buffer = buffer;
    }

    public boolean isSync() {
        return sync;
    }

    public void setSync(boolean sync) {
        this.sync = sync;
    }

    public NetWorkRespondCallBack getNetWorkRespondCallBack() {
        return netWorkRespondCallBack;
    }

    public void setNetWorkRespondCallBack(NetWorkRespondCallBack netWorkRespondCallBack) {
        this.netWorkRespondCallBack = netWorkRespondCallBack;
    }
}
testLoadFileDemo
        Long start = System.currentTimeMillis();
        NetworkManager manager = new NetworkManager();
        List<NetWorkClientRequest> sendRequest = new ArrayList<>();
        for (int i = 1; i < 301; i++) {
            Host host = new Host("localhost", 7899);
            //嘗試連接
            manager.tryConnect(host);
            //等待連接完成
            manager.finishConnect(host, true);
            String requestId = UUID.randomUUID().toString().replace("-", "");
            ByteBuffer fileUpload = RequestBufferBuilder.createFileUpload(new File("E:\\oss\\netty\\img\\" + i + ".jpg")
                    , i + ".jpg",
                    requestId);
            NetWorkClientRequest netWorkClientRequest = new NetWorkClientRequest();
            netWorkClientRequest.setBuffer(fileUpload);
            netWorkClientRequest.setHost(host);
            netWorkClientRequest.setRequestId(requestId);
            netWorkClientRequest.setSendTime(System.currentTimeMillis());
            //發(fā)送請求
            manager.sendRequest(netWorkClientRequest);
            sendRequest.add(netWorkClientRequest);
        }
        for (NetWorkClientRequest request : sendRequest) {
            //等待響應返回
            NetWorkClientRespond respond = manager.waitResponseSync(request);
            System.out.println(respond.getStatus());
           /*  System.out.println(respond.getRequestId());
            System.out.println(new String(respond.getByteBuffer().array()));*/
        }
        long l = System.currentTimeMillis() - start;
        System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>: " + l);
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容