網(wǎng)絡(luò)數(shù)據(jù)IO處理
TChannel作為連接通道,服務(wù)端使用TService管理通道,客戶端只需使用TChannel即可
namespace SelfFramework.Network
{
public class TService
{
private Socket acceptor;
private IPEndPoint localAddress;
internal INetEventListener eventListener { private set; get; }
internal ConcurrentDictionary<long, TChannel> channelTable { private set; get; }
private ConcurrentDictionary<EndPoint, TChannel> addressTable;
private readonly SocketAsyncEventArgs innArgs;
internal readonly ThreadSynchronizationContext threadAsyncContext;
#region 心跳
private System.Timers.Timer heartTimer;
private readonly int interval_Time = 20 << 10; //定時(shí)器間隔時(shí)間,單位為毫秒
private readonly int timeOut = 60; //心跳響應(yīng)超時(shí),斷開時(shí)間,單位為秒
#endregion
private log4net.ILog Log = LogHelper.GetLogger();
internal TService(ThreadSynchronizationContext asyncContext, IPEndPoint ipEndPoint,INetEventListener eventListener)
{
this.eventListener = eventListener;
this.channelTable = new ConcurrentDictionary<long, TChannel>();
this.addressTable = new ConcurrentDictionary<EndPoint, TChannel>();
this.localAddress = ipEndPoint;
this.threadAsyncContext = asyncContext;
this.innArgs = new SocketAsyncEventArgs();
this.innArgs.Completed += OnComplete;
this.acceptor = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
this.acceptor.Bind(ipEndPoint);
this.acceptor.Listen(1000);
Log.Info("The server is started succeeded.");
this.threadAsyncContext.PostNext(this.AcceptAsync);
}
internal void Dispose()
{
this.acceptor?.Close();
this.acceptor = null;
this.innArgs.Dispose();
this.localAddress = null;
this.eventListener = null;
foreach (long id in this.channelTable.Keys.ToArray())
{
TChannel channel = this.channelTable[id];
channel.Dispose();
}
this.channelTable.Clear();
}
private void OnComplete(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Accept:
this.threadAsyncContext.Post(() => { this.OnAcceptComplete(e); });
break;
default:
throw new Exception($"socket error: {e.LastOperation}");
}
}
private void OnAcceptComplete(SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success)
{
Log.ErrorFormat("Accept faild:>{0}", e.SocketError);
return;
}
try
{
if (addressTable.TryGetValue(e.AcceptSocket.RemoteEndPoint, out TChannel channel)) //TCP重連
{
Log.Warn("tcp 重新連接");
this.eventListener.OnPeerConnected(channel);
}
else
{
long id = this.CreateAcceptChannelId(0);
TChannel newChannel = new TChannel(id, e.AcceptSocket, this);
this.channelTable.TryAdd(id, newChannel);
this.eventListener.OnPeerConnected(newChannel);
}
}
catch (Exception exception)
{
Log.ErrorFormat("Accept exception:>{0}", exception.Message);
}
// 開始新的accept
this.AcceptAsync();
}
private void AcceptAsync()
{
this.innArgs.AcceptSocket = null;
if (this.acceptor.AcceptAsync(this.innArgs))
{
return;
}
OnAcceptComplete(this.innArgs);
}
public void Update()
{
foreach (var item in channelTable.Values)
{
item.Update();
}
}
public TChannel GetChannel(long id)
{
channelTable.TryGetValue(id, out TChannel channel);
return channel;
}
internal void RemoveChannel(long id)
{
if (channelTable.TryRemove(id, out TChannel channel))
{
addressTable.TryRemove(channel.remoteAddress, out TChannel c);
}
}
//開啟心跳檢查
public void StartCheckHeartBeat()
{
heartTimer = new System.Timers.Timer(interval_Time)
{
AutoReset = true
};
heartTimer.Elapsed += (object sender, System.Timers.ElapsedEventArgs e) =>
{
foreach (TChannel channel in channelTable.Values)
{
long nowTime = Util.GetUtcNow();
if (nowTime - channel.lastHeartBeatTime > timeOut)
{
Log.WarnFormat("Heart broken, channel id->{0}", channel.channelId);
channel.OnDisconnect(DisconnectReason.HeartBroken,SocketError.Success);
}
//else
//{
// Log.DebugFormat("當(dāng)前時(shí)間:{0},該客戶端最后更新時(shí)間:{1}", nowTime, client.lastReceive_HeartTime);
//}
}
};
heartTimer.Start();
Log.Info("Enabling the Heartbeat Timer");
}
private long acceptIdGenerater = 1;
private long CreateAcceptChannelId(uint localConn)
{
return (++this.acceptIdGenerater << 32) | localConn;
}
}
}
namespace SelfFramework.Network
{
public enum ChannelType
{
Connect,
Accept,
}
public class TChannel
{
public long channelId { private set; get; }
private Socket socket; //作為客戶端
private TService service; //作為服務(wù)端
public IPEndPoint remoteAddress { private set; get; }
public ChannelType channelType { private set; get; }
public bool IsUsing { private set; get; }
public bool IsConnected { private set; get; }
public bool IsSending { private set; get; }
private SocketAsyncEventArgs innArgs = new SocketAsyncEventArgs();
private SocketAsyncEventArgs outArgs = new SocketAsyncEventArgs();
private ThreadSynchronizationContext threadAsyncContext;
internal INetEventListener eventListener;
private readonly PacketParser packetParser;
private ConcurrentQueue<byte[]> sendBuffers = new ConcurrentQueue<byte[]>();
internal byte[] temp_IV = new byte[AppConst.BlockSize>>3]; //存放臨時(shí)IV
internal double lastHeartBeatTime;
private static readonly log4net.ILog Log = LogHelper.GetLogger();
internal TChannel(long channelId, Socket socket, TService service)
{
this.channelId = channelId;
this.service = service;
this.socket = socket;
this.socket.NoDelay = true;
this.remoteAddress = socket.RemoteEndPoint as IPEndPoint;
this.channelType = ChannelType.Accept;
this.innArgs.Completed += this.OnComplete;
this.outArgs.Completed += this.OnComplete;
this.threadAsyncContext = service.threadAsyncContext;
this.eventListener = service.eventListener;
this.packetParser = new PacketParser(this);
this.IsConnected = true;
this.IsSending = false;
this.IsUsing = true;
this.lastHeartBeatTime = Util.GetUtcNow();
//開始接收數(shù)據(jù)
this.service.threadAsyncContext.PostNext(this.StartRecv);
}
internal TChannel(ThreadSynchronizationContext threadAsyncContext, IPEndPoint ipEndPoint, INetEventListener eventListener)
{
this.channelId = 1;
this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
this.socket.NoDelay = true;
this.remoteAddress = ipEndPoint;
this.channelType = ChannelType.Connect;
this.innArgs.Completed += this.OnComplete;
this.outArgs.Completed += this.OnComplete;
this.threadAsyncContext = threadAsyncContext;
this.eventListener = eventListener;
this.packetParser = new PacketParser(this);
this.IsConnected = false;
this.IsSending = false;
this.IsUsing = true;
this.threadAsyncContext.PostNext(this.ConnectAsync);
}
internal void Dispose()
{
if (IsUsing == false)
{
Log.Warn("The channel has been dispose.");
return;
}
if (this.channelType==ChannelType.Accept)
{
this.service.RemoveChannel(this.channelId);
}
this.IsUsing = false;
this.IsConnected = false;
this.socket.Shutdown(SocketShutdown.Both);
this.socket.Close();
this.innArgs.Dispose();
this.outArgs.Dispose();
this.innArgs = null;
this.outArgs = null;
this.socket = null;
this.channelId = 0;
}
public void OnDisconnect(DisconnectReason disconnectReason, SocketError socketError)
{
this.eventListener.OnPeerDisconnected(this, new DisconnectInfo(disconnectReason, socketError));
this.Dispose();
}
private void OnComplete(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Connect:
this.threadAsyncContext.Post(() => OnConnectComplete(e));
break;
case SocketAsyncOperation.Receive:
this.threadAsyncContext.Post(() => OnRecvComplete(e));
break;
case SocketAsyncOperation.Send:
this.threadAsyncContext.Post(() => OnSendComplete(e));
break;
case SocketAsyncOperation.Disconnect:
this.threadAsyncContext.Post(() => OnDisconnectComplete(e));
break;
default:
throw new Exception($"socket error: {e.LastOperation}");
}
}
internal void ConnectAsync()
{
this.outArgs.RemoteEndPoint = this.remoteAddress;
if (this.socket.ConnectAsync(this.outArgs))
{
return;
}
OnConnectComplete(this.outArgs);
}
private void OnConnectComplete(SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success)
{
this.OnError(e);
return;
}
this.eventListener.OnPeerConnected(this);
e.RemoteEndPoint = null;
this.IsConnected = true;
this.StartRecv();
}
private void OnDisconnectComplete(SocketAsyncEventArgs e)
{
this.OnDisconnect(DisconnectReason.Disconnect, e.SocketError);
}
private void StartRecv()
{
try
{
this.innArgs.SetBuffer(this.packetParser.buffer, 0, this.packetParser.buffer.Length);
}
catch (Exception e)
{
Log.ErrorFormat("recv set buffer error:>{0}", e.Message);
this.OnError(innArgs);
return;
}
if (this.socket.ReceiveAsync(this.innArgs))
{
return;
}
this.OnRecvComplete(this.innArgs);
}
private void OnRecvComplete(SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success)
{
this.OnError(e);
return;
}
int bytesTransferred = e.BytesTransferred;
if (bytesTransferred == 0)
{
this.OnDisconnectComplete(e);
return;
}
try
{
this.packetParser.Parse(bytesTransferred); //拆包
}
catch (Exception ee)
{
Log.Error($"ip: {this.remoteAddress} {ee}");
this.OnError(e);
return;
}
this.StartRecv();
}
private void StartSend()
{
try
{
//沒(méi)有數(shù)據(jù)需要發(fā)送
if (this.sendBuffers.Count == 0)
{
this.IsSending = false;
return;
}
this.IsSending = true;
sendBuffers.TryDequeue(out byte[] sendBuffer);
this.outArgs.SetBuffer(sendBuffer, 0, sendBuffer.Length);
if (this.socket.SendAsync(this.outArgs))
{
return;
}
OnSendComplete(this.outArgs);
}
catch (Exception e)
{
Log.ErrorFormat("start send exception:>{0}", e.Message);
OnError(outArgs);
}
}
private void OnSendComplete(SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success)
{
this.OnError(e);
return;
}
if (e.BytesTransferred == 0)
{
this.OnError(e);
return;
}
this.StartSend();
}
private void OnError(SocketAsyncEventArgs e)
{
this.eventListener.OnNetworkError(this, e.SocketError, e.LastOperation);
this.OnDisconnect(DisconnectReason.Exception, e.SocketError);
}
public void Send(string protocol, byte[] data)
{
if (IsConnected == false)
{
Log.WarnFormat("The connection has been disconnected and cannot be sent");
return;
}
NetDataWriter dataWriter = new NetDataWriter();
dataWriter.Reset(NetConstants.TotalHeadSize); //先寫入body部分
dataWriter.Put(protocol);
if (data!=null)
{
dataWriter.Put(data);
}
if (AppConst.isEncrypt)
{
byte[] bodyData = Util.EncryptBytes(dataWriter.RawData, NetConstants.TotalHeadSize, dataWriter.WritePos - NetConstants.TotalHeadSize, NetConstants.Key, out temp_IV);
dataWriter.Reset(NetConstants.TotalHeadSize); //重新寫入body部分
dataWriter.Append(bodyData, 0, bodyData.Length);
}
int totalSize = dataWriter.WritePos; //寫入head部分
dataWriter.Reset(0);
dataWriter.Put(totalSize); //寫入數(shù)據(jù)包大小
if (AppConst.isEncrypt)
{
dataWriter.Append(temp_IV, 0, temp_IV.Length);
}
dataWriter.Reset(totalSize, false);
sendBuffers.Enqueue(dataWriter.CopyData());
}
public void Update()
{
this.StartSend();
}
}
}
分包處理,一個(gè)數(shù)據(jù)包接收完畢后去掉head部分交由應(yīng)用層處理
namespace SelfFramework.Network
{
public enum ParserState
{
PacketHead,
PacketBody
}
public class PacketParser
{
private TChannel channel;
internal byte[] buffer; //緩存
private ParserState state;
internal NetDataWriter dataWriter;
private int offset; //緩存偏移
private int totalPacketSize; //數(shù)據(jù)包長(zhǎng)度
internal PacketParser(TChannel channel,int recvBufferSize = 8192)
{
this.channel = channel;
this.buffer = new byte[recvBufferSize];
this.dataWriter = new NetDataWriter();
}
internal void Parse(int bytesTransferred)
{
this.offset = 0;
while (bytesTransferred>0)
{
switch (this.state)
{
case ParserState.PacketHead:
{
int recvHeadSize = Math.Min(bytesTransferred, NetConstants.TotalHeadSize - this.dataWriter.WritePos);//本次接收頭部字節(jié)數(shù)
this.dataWriter.Append(this.buffer, this.offset, recvHeadSize);
bytesTransferred -= recvHeadSize;
this.offset += recvHeadSize;
if (this.dataWriter.WritePos < NetConstants.TotalHeadSize) //head部分未接收完畢
{
return;
}
this.totalPacketSize = BitConverter.ToInt32(this.dataWriter.RawData); //獲取整體數(shù)據(jù)包大小
if (this.totalPacketSize < NetConstants.TotalHeadSize) //數(shù)據(jù)包由head+body組成
{
throw new Exception($"recv packet size error, 可能是外網(wǎng)探測(cè)端口: {this.totalPacketSize}");
}
this.state = ParserState.PacketBody; //開始接收主體部分
break;
}
case ParserState.PacketBody:
{
int recvBodySize = Math.Min(bytesTransferred, this.totalPacketSize - dataWriter.WritePos);//本次接收數(shù)據(jù),可能未接收完畢
this.dataWriter.Append(this.buffer, this.offset, recvBodySize);
bytesTransferred -= recvBodySize;
this.offset += recvBodySize;
if (this.dataWriter.WritePos < this.totalPacketSize) //數(shù)據(jù)包還有數(shù)據(jù)未到達(dá)
{
return;
}
this.Dispatcher();
this.dataWriter.Reset(0); //這里執(zhí)行數(shù)據(jù)包,并開始接收下一個(gè)
this.state = ParserState.PacketHead;
break;
}
default:
throw new ArgumentOutOfRangeException();
}
}
}
private void Dispatcher()
{
byte[] bodyData = dataWriter.CopyData(NetConstants.TotalHeadSize); //去掉head部分交給應(yīng)用層
if (AppConst.isEncrypt) //body部分需解密
{
Buffer.BlockCopy(dataWriter.RawData, 4, channel.temp_IV, 0, channel.temp_IV.Length); //獲取head部分的IV
bodyData = Util.DecryptBytes(bodyData, 0, bodyData.Length, NetConstants.Key, channel.temp_IV);
}
this.channel.eventListener.OnNetworkReceive(this.channel, bodyData);
}
}
}
線程同步,將IO完成的事件全部加入一個(gè)隊(duì)列,統(tǒng)一放在一個(gè)線程內(nèi)執(zhí)行
namespace SelfFramework.Network
{
public class ThreadSynchronizationContext : SynchronizationContext
{
public static ThreadSynchronizationContext Instance { get; } = new ThreadSynchronizationContext(Thread.CurrentThread.ManagedThreadId);
private static readonly log4net.ILog Log = LogHelper.GetLogger();
private readonly int threadId;
// 線程同步隊(duì)列,發(fā)送接收socket回調(diào)都放到該隊(duì)列,由poll線程統(tǒng)一執(zhí)行
private readonly ConcurrentQueue<Action> queue = new ConcurrentQueue<Action>();
private Action action;
public ThreadSynchronizationContext(int threadId)
{
this.threadId = threadId;
}
public void Update()
{
if (!this.queue.TryDequeue(out action))
{
return;
}
try
{
action();
}
catch (Exception e)
{
Log.Error(e);
}
}
public override void Post(SendOrPostCallback callback, object state)
{
this.Post(() => callback(state));
}
public void Post(Action action)
{
if (Thread.CurrentThread.ManagedThreadId == this.threadId)
{
try
{
action();
}
catch (Exception ex)
{
Log.Error(ex);
}
return;
}
this.queue.Enqueue(action);
}
public void PostNext(Action action)
{
this.queue.Enqueue(action);
}
}
}
C#數(shù)據(jù)交互服務(wù)器(一)
C#數(shù)據(jù)交互服務(wù)器(三)
Rijndael加密算法