C#數(shù)據(jù)交互服務(wù)器(二)

網(wǎng)絡(luò)數(shù)據(jù)IO處理

TChannel作為連接通道,服務(wù)端使用TService管理通道,客戶端只需使用TChannel即可

namespace SelfFramework.Network
{
    public class TService
    {
        private Socket acceptor;
        private IPEndPoint localAddress;
        internal INetEventListener eventListener { private set; get; }
        internal ConcurrentDictionary<long, TChannel> channelTable { private set; get; }
        private ConcurrentDictionary<EndPoint, TChannel> addressTable;

        private readonly SocketAsyncEventArgs innArgs;
        internal readonly ThreadSynchronizationContext threadAsyncContext;

        #region 心跳
        private System.Timers.Timer heartTimer;
        private readonly int interval_Time = 20 << 10;   //定時(shí)器間隔時(shí)間,單位為毫秒
        private readonly int timeOut = 60;    //心跳響應(yīng)超時(shí),斷開時(shí)間,單位為秒
        #endregion

        private log4net.ILog Log = LogHelper.GetLogger();

        internal TService(ThreadSynchronizationContext asyncContext, IPEndPoint ipEndPoint,INetEventListener eventListener)
        {
            this.eventListener = eventListener;
            this.channelTable = new ConcurrentDictionary<long, TChannel>();
            this.addressTable = new ConcurrentDictionary<EndPoint, TChannel>();
            this.localAddress = ipEndPoint;
            this.threadAsyncContext = asyncContext;
            this.innArgs = new SocketAsyncEventArgs();
            this.innArgs.Completed += OnComplete;

            this.acceptor = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            this.acceptor.Bind(ipEndPoint);
            this.acceptor.Listen(1000);

            Log.Info("The server is started succeeded.");
            this.threadAsyncContext.PostNext(this.AcceptAsync);
        }

        internal void Dispose()
        {
            this.acceptor?.Close();
            this.acceptor = null;
            this.innArgs.Dispose();
            this.localAddress = null;
            this.eventListener = null;

            foreach (long id in this.channelTable.Keys.ToArray())
            {
                TChannel channel = this.channelTable[id];
                channel.Dispose();
            }
            this.channelTable.Clear();
        }

        private void OnComplete(object sender, SocketAsyncEventArgs e)
        {
            switch (e.LastOperation)
            {
                case SocketAsyncOperation.Accept:
                    this.threadAsyncContext.Post(() => { this.OnAcceptComplete(e); });
                    break;
                default:
                    throw new Exception($"socket error: {e.LastOperation}");
            }
        }

        private void OnAcceptComplete(SocketAsyncEventArgs e)
        {
            if (e.SocketError != SocketError.Success)
            {
                Log.ErrorFormat("Accept faild:>{0}", e.SocketError);
                return;
            }

            try
            {
                if (addressTable.TryGetValue(e.AcceptSocket.RemoteEndPoint, out TChannel channel)) //TCP重連
                {
                    Log.Warn("tcp 重新連接");
                    this.eventListener.OnPeerConnected(channel);
                }
                else
                {
                    long id = this.CreateAcceptChannelId(0);
                    TChannel newChannel = new TChannel(id, e.AcceptSocket, this);
                    this.channelTable.TryAdd(id, newChannel);
                    this.eventListener.OnPeerConnected(newChannel);
                }
            }
            catch (Exception exception)
            {
                Log.ErrorFormat("Accept exception:>{0}", exception.Message);
            }

            // 開始新的accept
            this.AcceptAsync();
        }

        private void AcceptAsync()
        {
            this.innArgs.AcceptSocket = null;
            if (this.acceptor.AcceptAsync(this.innArgs))
            {
                return;
            }
            OnAcceptComplete(this.innArgs);
        }

        public void Update()
        {
            foreach (var item in channelTable.Values)
            {
                item.Update();
            }
        }

        public TChannel GetChannel(long id)
        {
            channelTable.TryGetValue(id, out TChannel channel);
            return channel;
        }

        internal void RemoveChannel(long id)
        {
            if (channelTable.TryRemove(id, out TChannel channel))
            {
                addressTable.TryRemove(channel.remoteAddress, out TChannel c);
            }
        }

        //開啟心跳檢查
        public void StartCheckHeartBeat()
        {
            heartTimer = new System.Timers.Timer(interval_Time)
            {
                AutoReset = true
            };

            heartTimer.Elapsed += (object sender, System.Timers.ElapsedEventArgs e) =>
            {
                foreach (TChannel channel in channelTable.Values)
                {
                    long nowTime = Util.GetUtcNow();
                    if (nowTime - channel.lastHeartBeatTime > timeOut)
                    {
                        Log.WarnFormat("Heart broken, channel id->{0}", channel.channelId);
                        channel.OnDisconnect(DisconnectReason.HeartBroken,SocketError.Success);
                    }
                    //else
                    //{
                    //    Log.DebugFormat("當(dāng)前時(shí)間:{0},該客戶端最后更新時(shí)間:{1}", nowTime, client.lastReceive_HeartTime);
                    //}
                }
            };

            heartTimer.Start();
            Log.Info("Enabling the Heartbeat Timer");
        }

        private long acceptIdGenerater = 1;
        private long CreateAcceptChannelId(uint localConn)
        {
            return (++this.acceptIdGenerater << 32) | localConn;
        }
    }
}

namespace SelfFramework.Network
{
    public enum ChannelType
    {
        Connect,
        Accept,
    }

    public class TChannel
    {
        public long channelId { private set; get; }
        private Socket socket;              //作為客戶端
        private TService service;       //作為服務(wù)端
        public IPEndPoint remoteAddress { private set; get; }
        public ChannelType channelType { private set; get; }

        public bool IsUsing { private set; get; }
        public bool IsConnected { private set; get; }
        public bool IsSending { private set; get; }

        private SocketAsyncEventArgs innArgs = new SocketAsyncEventArgs();
        private SocketAsyncEventArgs outArgs = new SocketAsyncEventArgs();
        private ThreadSynchronizationContext threadAsyncContext;
        internal INetEventListener eventListener;
        private readonly PacketParser packetParser;
        private ConcurrentQueue<byte[]> sendBuffers = new ConcurrentQueue<byte[]>();

        internal byte[] temp_IV = new byte[AppConst.BlockSize>>3];  //存放臨時(shí)IV

        internal double lastHeartBeatTime;
        private static readonly log4net.ILog Log = LogHelper.GetLogger();

        internal TChannel(long channelId, Socket socket, TService service)
        {
            this.channelId = channelId;
            this.service = service;
            this.socket = socket;
            this.socket.NoDelay = true;
            this.remoteAddress = socket.RemoteEndPoint as IPEndPoint;
            this.channelType = ChannelType.Accept;

            this.innArgs.Completed += this.OnComplete;
            this.outArgs.Completed += this.OnComplete;
            this.threadAsyncContext = service.threadAsyncContext;
            this.eventListener = service.eventListener;
            this.packetParser = new PacketParser(this);

            this.IsConnected = true;
            this.IsSending = false;
            this.IsUsing = true;
            this.lastHeartBeatTime = Util.GetUtcNow();

            //開始接收數(shù)據(jù)
            this.service.threadAsyncContext.PostNext(this.StartRecv);
        }

        internal TChannel(ThreadSynchronizationContext threadAsyncContext, IPEndPoint ipEndPoint, INetEventListener eventListener)
        {
            this.channelId = 1;
            this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            this.socket.NoDelay = true;
            this.remoteAddress = ipEndPoint;
            this.channelType = ChannelType.Connect;

            this.innArgs.Completed += this.OnComplete;
            this.outArgs.Completed += this.OnComplete;
            this.threadAsyncContext = threadAsyncContext;
            this.eventListener = eventListener;
            this.packetParser = new PacketParser(this);

            this.IsConnected = false;
            this.IsSending = false;
            this.IsUsing = true;

            this.threadAsyncContext.PostNext(this.ConnectAsync);
        }

        internal void Dispose()
        {
            if (IsUsing == false)
            {
                Log.Warn("The channel has been dispose.");
                return;
            }

            if (this.channelType==ChannelType.Accept)
            {
                this.service.RemoveChannel(this.channelId);
            }
            this.IsUsing = false;
            this.IsConnected = false;
            this.socket.Shutdown(SocketShutdown.Both);
            this.socket.Close();
            this.innArgs.Dispose();
            this.outArgs.Dispose();
            this.innArgs = null;
            this.outArgs = null;
            this.socket = null;
            this.channelId = 0;
        }

        public void OnDisconnect(DisconnectReason disconnectReason, SocketError socketError)
        {
            this.eventListener.OnPeerDisconnected(this, new DisconnectInfo(disconnectReason, socketError));
            this.Dispose();
        }

        private void OnComplete(object sender, SocketAsyncEventArgs e)
        {
            switch (e.LastOperation)
            {
                case SocketAsyncOperation.Connect:
                    this.threadAsyncContext.Post(() => OnConnectComplete(e));
                    break;
                case SocketAsyncOperation.Receive:
                    this.threadAsyncContext.Post(() => OnRecvComplete(e));
                    break;
                case SocketAsyncOperation.Send:
                    this.threadAsyncContext.Post(() => OnSendComplete(e));
                    break;
                case SocketAsyncOperation.Disconnect:
                    this.threadAsyncContext.Post(() => OnDisconnectComplete(e));
                    break;
                default:
                    throw new Exception($"socket error: {e.LastOperation}");
            }
        }

        internal void ConnectAsync()
        {
            this.outArgs.RemoteEndPoint = this.remoteAddress;
            if (this.socket.ConnectAsync(this.outArgs))
            {
                return;
            }
            OnConnectComplete(this.outArgs);
        }

        private void OnConnectComplete(SocketAsyncEventArgs e)
        {
            if (e.SocketError != SocketError.Success)
            {
                this.OnError(e);
                return;
            }

            this.eventListener.OnPeerConnected(this);

            e.RemoteEndPoint = null;
            this.IsConnected = true;
            this.StartRecv();
        }

        private void OnDisconnectComplete(SocketAsyncEventArgs e)
        {
            this.OnDisconnect(DisconnectReason.Disconnect, e.SocketError);
        }

        private void StartRecv()
        {
            try
            {
                this.innArgs.SetBuffer(this.packetParser.buffer, 0, this.packetParser.buffer.Length);
            }
            catch (Exception e)
            {
                Log.ErrorFormat("recv set buffer error:>{0}", e.Message);
                this.OnError(innArgs);
                return;
            }

            if (this.socket.ReceiveAsync(this.innArgs))
            {
                return;
            }
            this.OnRecvComplete(this.innArgs);
        }

        private void OnRecvComplete(SocketAsyncEventArgs e)
        {
            if (e.SocketError != SocketError.Success)
            {
                this.OnError(e);
                return;
            }

            int bytesTransferred = e.BytesTransferred;
            if (bytesTransferred == 0)  
            {
                this.OnDisconnectComplete(e);
                return;
            }

            try
            {
                this.packetParser.Parse(bytesTransferred);      //拆包
            }
            catch (Exception ee)
            {
                Log.Error($"ip: {this.remoteAddress} {ee}");
                this.OnError(e);
                return;
            }

            this.StartRecv();
        }

        private void StartSend()
        {
            try
            {
                //沒(méi)有數(shù)據(jù)需要發(fā)送
                if (this.sendBuffers.Count == 0)
                {
                    this.IsSending = false;
                    return;
                }

                this.IsSending = true;

                sendBuffers.TryDequeue(out byte[] sendBuffer);
                this.outArgs.SetBuffer(sendBuffer, 0, sendBuffer.Length);

                if (this.socket.SendAsync(this.outArgs))
                {
                    return;
                }

                OnSendComplete(this.outArgs);
            }
            catch (Exception e)
            {
                Log.ErrorFormat("start send exception:>{0}", e.Message);
                OnError(outArgs);
            }
        }

        private void OnSendComplete(SocketAsyncEventArgs e)
        {
            if (e.SocketError != SocketError.Success)
            {
                this.OnError(e);
                return;
            }

            if (e.BytesTransferred == 0)
            {
                this.OnError(e);
                return;
            }

            this.StartSend();
        }

        private void OnError(SocketAsyncEventArgs e)
        {
            this.eventListener.OnNetworkError(this, e.SocketError, e.LastOperation);

            this.OnDisconnect(DisconnectReason.Exception, e.SocketError);
        }

        public void Send(string protocol, byte[] data)
        {
            if (IsConnected == false)
            {
                Log.WarnFormat("The connection has been disconnected and cannot be sent");
                return;
            }

            NetDataWriter dataWriter = new NetDataWriter();
            dataWriter.Reset(NetConstants.TotalHeadSize);   //先寫入body部分

            dataWriter.Put(protocol);
            if (data!=null)
            {
                dataWriter.Put(data);
            }
            if (AppConst.isEncrypt)
            {
                byte[] bodyData = Util.EncryptBytes(dataWriter.RawData, NetConstants.TotalHeadSize, dataWriter.WritePos - NetConstants.TotalHeadSize, NetConstants.Key, out temp_IV);

                dataWriter.Reset(NetConstants.TotalHeadSize);   //重新寫入body部分
                dataWriter.Append(bodyData, 0, bodyData.Length);
            }

            int totalSize = dataWriter.WritePos;       //寫入head部分
            dataWriter.Reset(0);
            dataWriter.Put(totalSize);      //寫入數(shù)據(jù)包大小
            if (AppConst.isEncrypt)
            {
                dataWriter.Append(temp_IV, 0, temp_IV.Length);
            }
            dataWriter.Reset(totalSize, false);

            sendBuffers.Enqueue(dataWriter.CopyData());
        }

        public void Update()
        {
            this.StartSend();
        }
    }
}

分包處理,一個(gè)數(shù)據(jù)包接收完畢后去掉head部分交由應(yīng)用層處理

namespace SelfFramework.Network
{
    public enum ParserState
    {
        PacketHead,
        PacketBody
    }

    public class PacketParser
    {
        private TChannel channel;

        internal byte[] buffer;     //緩存
        private ParserState state;  
        internal NetDataWriter dataWriter;
        private int offset; //緩存偏移
        private int totalPacketSize;    //數(shù)據(jù)包長(zhǎng)度

        internal PacketParser(TChannel channel,int recvBufferSize = 8192)
        {
            this.channel = channel;
            this.buffer = new byte[recvBufferSize];
            this.dataWriter = new NetDataWriter();
        }

        internal void Parse(int bytesTransferred)
        {
            this.offset = 0;
            while (bytesTransferred>0)
            {
                switch (this.state)
                {
                    case ParserState.PacketHead:
                        {
                            int recvHeadSize = Math.Min(bytesTransferred, NetConstants.TotalHeadSize - this.dataWriter.WritePos);//本次接收頭部字節(jié)數(shù)
                            this.dataWriter.Append(this.buffer, this.offset, recvHeadSize);
                            bytesTransferred -= recvHeadSize;
                            this.offset += recvHeadSize;
                            if (this.dataWriter.WritePos < NetConstants.TotalHeadSize)   //head部分未接收完畢
                            {
                                return;
                            }

                            this.totalPacketSize = BitConverter.ToInt32(this.dataWriter.RawData);       //獲取整體數(shù)據(jù)包大小
                            if (this.totalPacketSize < NetConstants.TotalHeadSize) //數(shù)據(jù)包由head+body組成
                            {
                                throw new Exception($"recv packet size error, 可能是外網(wǎng)探測(cè)端口: {this.totalPacketSize}");
                            }
                            this.state = ParserState.PacketBody;    //開始接收主體部分
                            break;
                        }
                    case ParserState.PacketBody:
                        {
                            int recvBodySize = Math.Min(bytesTransferred, this.totalPacketSize - dataWriter.WritePos);//本次接收數(shù)據(jù),可能未接收完畢
                            this.dataWriter.Append(this.buffer, this.offset, recvBodySize);
                            bytesTransferred -= recvBodySize;
                            this.offset += recvBodySize;
                            if (this.dataWriter.WritePos < this.totalPacketSize) //數(shù)據(jù)包還有數(shù)據(jù)未到達(dá)
                            {
                                return;
                            }

                            this.Dispatcher();
                            this.dataWriter.Reset(0);       //這里執(zhí)行數(shù)據(jù)包,并開始接收下一個(gè)
                            this.state = ParserState.PacketHead;
                            break;
                        }
                    default:
                        throw new ArgumentOutOfRangeException();
                }
            }
        }

        private void Dispatcher()
        {
            byte[] bodyData = dataWriter.CopyData(NetConstants.TotalHeadSize); //去掉head部分交給應(yīng)用層
            if (AppConst.isEncrypt) //body部分需解密
            {
                Buffer.BlockCopy(dataWriter.RawData, 4, channel.temp_IV, 0, channel.temp_IV.Length);        //獲取head部分的IV
                bodyData = Util.DecryptBytes(bodyData, 0, bodyData.Length, NetConstants.Key, channel.temp_IV); 
            }
            this.channel.eventListener.OnNetworkReceive(this.channel, bodyData);
        }
    }
}

線程同步,將IO完成的事件全部加入一個(gè)隊(duì)列,統(tǒng)一放在一個(gè)線程內(nèi)執(zhí)行

namespace SelfFramework.Network
{
    public class ThreadSynchronizationContext : SynchronizationContext
    {
        public static ThreadSynchronizationContext Instance { get; } = new ThreadSynchronizationContext(Thread.CurrentThread.ManagedThreadId);
        private static readonly log4net.ILog Log = LogHelper.GetLogger();

        private readonly int threadId;

        // 線程同步隊(duì)列,發(fā)送接收socket回調(diào)都放到該隊(duì)列,由poll線程統(tǒng)一執(zhí)行
        private readonly ConcurrentQueue<Action> queue = new ConcurrentQueue<Action>();

        private Action action;

        public ThreadSynchronizationContext(int threadId)
        {
            this.threadId = threadId;
        }

        public void Update()
        {
            if (!this.queue.TryDequeue(out action))
            {
                return;
            }

            try
            {
                action();
            }
            catch (Exception e)
            {
                Log.Error(e);
            }
        }

        public override void Post(SendOrPostCallback callback, object state)
        {
            this.Post(() => callback(state));
        }

        public void Post(Action action)
        {
            if (Thread.CurrentThread.ManagedThreadId == this.threadId)
            {
                try
                {
                    action();
                }
                catch (Exception ex)
                {
                    Log.Error(ex);
                }

                return;
            }

            this.queue.Enqueue(action);
        }

        public void PostNext(Action action)
        {
            this.queue.Enqueue(action);
        }
    }
}

C#數(shù)據(jù)交互服務(wù)器(一)
C#數(shù)據(jù)交互服務(wù)器(三)
Rijndael加密算法

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容