Simplify networking code and update dependencies

This commit is contained in:
MaxXor 2019-04-23 21:05:07 +02:00
parent 1249834f84
commit 4abbbdc853
8 changed files with 343 additions and 362 deletions

View File

@ -1,13 +1,10 @@
using ProtoBuf;
using Quasar.Client.ReverseProxy;
using Quasar.Client.ReverseProxy;
using Quasar.Common.Extensions;
using Quasar.Common.IO.Compression;
using Quasar.Common.Messages;
using Quasar.Common.Messages.ReverseProxy;
using Quasar.Common.Networking;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Security;
@ -39,10 +36,7 @@ public class Client : ISender
private void OnClientFail(Exception ex)
{
var handler = ClientFail;
if (handler != null)
{
handler(this, ex);
}
handler?.Invoke(this, ex);
}
/// <summary>
@ -68,10 +62,7 @@ private void OnClientState(bool connected)
Connected = connected;
var handler = ClientState;
if (handler != null)
{
handler(this, connected);
}
handler?.Invoke(this, connected);
}
/// <summary>
@ -84,19 +75,18 @@ private void OnClientState(bool connected)
/// </summary>
/// <param name="s">The client that has received the message.</param>
/// <param name="message">The message that has been received by the server.</param>
public delegate void ClientReadEventHandler(Client s, IMessage message);
/// <param name="messageLength">The length of the message.</param>
public delegate void ClientReadEventHandler(Client s, IMessage message, int messageLength);
/// <summary>
/// Fires an event that informs subscribers that a message has been received by the server.
/// </summary>
/// <param name="message">The message that has been received by the server.</param>
private void OnClientRead(IMessage message)
/// <param name="messageLength">The length of the message.</param>
private void OnClientRead(IMessage message, int messageLength)
{
var handler = ClientRead;
if (handler != null)
{
handler(this, message);
}
handler?.Invoke(this, message, messageLength);
}
/// <summary>
@ -109,23 +99,18 @@ private void OnClientRead(IMessage message)
/// </summary>
/// <param name="s">The client that has sent the message.</param>
/// <param name="message">The message that has been sent by the client.</param>
/// <param name="length">The length of the message.</param>
/// <param name="rawData">The message in raw bytes.</param>
public delegate void ClientWriteEventHandler(Client s, IMessage message, long length, byte[] rawData);
/// <param name="messageLength">The length of the message.</param>
public delegate void ClientWriteEventHandler(Client s, IMessage message, int messageLength);
/// <summary>
/// Fires an event that informs subscribers that the client has sent a message.
/// </summary>
/// <param name="message">The message that has been sent by the client.</param>
/// <param name="length">The length of the message.</param>
/// <param name="rawData">The message in raw bytes.</param>
private void OnClientWrite(IMessage message, long length, byte[] rawData)
/// <param name="messageLength">The length of the message.</param>
private void OnClientWrite(IMessage message, int messageLength)
{
var handler = ClientWrite;
if (handler != null)
{
handler(this, message, length, rawData);
}
handler?.Invoke(this, message, messageLength);
}
/// <summary>
@ -176,6 +161,11 @@ public ReverseProxyClient[] ProxyClients
}
}
/// <summary>
/// Gets if the client is currently connected to a server.
/// </summary>
public bool Connected { get; private set; }
/// <summary>
/// The stream used for communication.
/// </summary>
@ -212,9 +202,9 @@ public ReverseProxyClient[] ProxyClients
private byte[] _payloadBuffer;
/// <summary>
/// The Queue which holds buffers to send.
/// The queue which holds messages to send.
/// </summary>
private readonly Queue<byte[]> _sendBuffers = new Queue<byte[]>();
private readonly Queue<IMessage> _sendBuffers = new Queue<IMessage>();
/// <summary>
/// Determines if the client is currently sending messages.
@ -227,7 +217,7 @@ public ReverseProxyClient[] ProxyClients
private readonly object _sendingMessagesLock = new object();
/// <summary>
/// The Queue which holds buffers to read.
/// The queue which holds buffers to read.
/// </summary>
private readonly Queue<byte[]> _readBuffers = new Queue<byte[]>();
@ -241,35 +231,17 @@ public ReverseProxyClient[] ProxyClients
/// </summary>
private readonly object _readingMessagesLock = new object();
/// <summary>
/// The temporary header to store parts of the header.
/// </summary>
/// <remarks>
/// This temporary header is used when we have i.e.
/// only 2 bytes left to read from the buffer but need more
/// which can only be read in the next Receive callback
/// </remarks>
private byte[] _tempHeader;
/// <summary>
/// Decides if we need to append bytes to the header.
/// </summary>
private bool _appendHeader;
// Receive info
private int _readOffset;
private int _writeOffset;
private int _tempHeaderOffset;
private int _readableDataLen;
private int _payloadLen;
private ReceiveType _receiveState = ReceiveType.Header;
/// <summary>
/// Gets if the client is currently connected to a server.
/// The mutex prevents multiple simultaneous write operations on the <see cref="_stream"/>.
/// </summary>
public bool Connected { get; private set; }
private const bool compressionEnabled = true;
private readonly Mutex _singleWriteMutex = new Mutex();
/// <summary>
/// Constructor of the client, initializes serializer types.
@ -279,7 +251,6 @@ protected Client(X509Certificate2 serverCertificate)
{
_serverCertificate = serverCertificate;
_readBuffer = new byte[BUFFER_SIZE];
_tempHeader = new byte[HEADER_SIZE];
TypeRegistry.AddTypesToSerializer(typeof(IMessage), TypeRegistry.GetPacketTypes(typeof(IMessage)).ToArray());
}
@ -428,38 +399,26 @@ private void AsyncReceive(object state)
{
case ReceiveType.Header:
{
if (_readableDataLen + _tempHeaderOffset >= HEADER_SIZE)
{ // we can read the header
int headerLength = (_appendHeader)
? HEADER_SIZE - _tempHeaderOffset
: HEADER_SIZE;
if (_payloadBuffer == null)
_payloadBuffer = new byte[HEADER_SIZE];
if (_readableDataLen + _writeOffset >= HEADER_SIZE)
{
// completely received header
int headerLength = HEADER_SIZE - _writeOffset;
try
{
if (_appendHeader)
{
try
{
Array.Copy(readBuffer, _readOffset, _tempHeader, _tempHeaderOffset,
headerLength);
}
catch (Exception ex)
{
process = false;
OnClientFail(ex);
break;
}
_payloadLen = BitConverter.ToInt32(_tempHeader, 0);
_tempHeaderOffset = 0;
_appendHeader = false;
}
else
{
_payloadLen = BitConverter.ToInt32(readBuffer, _readOffset);
}
Array.Copy(readBuffer, _readOffset, _payloadBuffer, _writeOffset, headerLength);
_payloadLen = BitConverter.ToInt32(_payloadBuffer, _readOffset);
if (_payloadLen <= 0 || _payloadLen > MAX_MESSAGE_SIZE)
throw new Exception("invalid header");
// try to re-use old payload buffers which fit
if (_payloadBuffer.Length <= _payloadLen + HEADER_SIZE)
Array.Resize(ref _payloadBuffer, _payloadLen + HEADER_SIZE);
}
catch (Exception)
{
@ -469,44 +428,44 @@ private void AsyncReceive(object state)
}
_readableDataLen -= headerLength;
_writeOffset += headerLength;
_readOffset += headerLength;
_receiveState = ReceiveType.Payload;
}
else // _readableDataLen < HEADER_SIZE
else // _readableDataLen + _writeOffset < HeaderSize
{
// received only a part of the header
try
{
Array.Copy(readBuffer, _readOffset, _tempHeader, _tempHeaderOffset, _readableDataLen);
Array.Copy(readBuffer, _readOffset, _payloadBuffer, _writeOffset, _readableDataLen);
}
catch (Exception ex)
catch (Exception)
{
process = false;
OnClientFail(ex);
Disconnect();
break;
}
_tempHeaderOffset += _readableDataLen;
_appendHeader = true;
_readOffset += _readableDataLen;
_writeOffset += _readableDataLen;
process = false;
// nothing left to process
}
break;
}
case ReceiveType.Payload:
{
if (_payloadBuffer == null || _payloadBuffer.Length != _payloadLen)
_payloadBuffer = new byte[_payloadLen];
int length = (_writeOffset + _readableDataLen >= _payloadLen)
? _payloadLen - _writeOffset
int length = (_writeOffset - HEADER_SIZE + _readableDataLen) >= _payloadLen
? _payloadLen - (_writeOffset - HEADER_SIZE)
: _readableDataLen;
try
{
Array.Copy(readBuffer, _readOffset, _payloadBuffer, _writeOffset, length);
}
catch (Exception ex)
catch (Exception)
{
process = false;
OnClientFail(ex);
Disconnect();
break;
}
@ -514,54 +473,26 @@ private void AsyncReceive(object state)
_readOffset += length;
_readableDataLen -= length;
if (_writeOffset == _payloadLen)
if (_writeOffset - HEADER_SIZE == _payloadLen)
{
bool isError = _payloadBuffer.Length == 0;
if (!isError)
// completely received payload
try
{
if (compressionEnabled)
using (PayloadReader pr = new PayloadReader(_payloadBuffer, _payloadLen + HEADER_SIZE, false))
{
try
{
_payloadBuffer = SafeQuickLZ.Decompress(_payloadBuffer);
}
catch (Exception)
{
process = false;
Disconnect();
break;
}
IMessage message = pr.ReadMessage();
OnClientRead(message, _payloadBuffer.Length);
}
isError = _payloadBuffer.Length == 0; // check if payload decompression failed
}
if (isError)
catch (Exception)
{
process = false;
Disconnect();
break;
}
using (MemoryStream deserialized = new MemoryStream(_payloadBuffer))
{
try
{
IMessage message = Serializer.Deserialize<IMessage>(deserialized);
OnClientRead(message);
}
catch (Exception ex)
{
process = false;
OnClientFail(ex);
break;
}
}
_receiveState = ReceiveType.Header;
_payloadBuffer = null;
_payloadLen = 0;
_writeOffset = 0;
}
@ -574,10 +505,6 @@ private void AsyncReceive(object state)
}
}
if (_receiveState == ReceiveType.Header)
{
_writeOffset = 0; // prepare for next message
}
_readOffset = 0;
_readableDataLen = 0;
}
@ -587,58 +514,65 @@ private void AsyncReceive(object state)
/// Sends a message to the connected server.
/// </summary>
/// <typeparam name="T">The type of the message.</typeparam>
/// <param name="message">The message to be send.</param>
/// <param name="message">The message to be sent.</param>
public void Send<T>(T message) where T : IMessage
{
if (!Connected || message == null) return;
lock (_sendBuffers)
{
using (MemoryStream ms = new MemoryStream())
_sendBuffers.Enqueue(message);
lock (_sendingMessagesLock)
{
try
{
Serializer.Serialize(ms, message);
}
catch (Exception ex)
{
OnClientFail(ex);
return;
}
if (_sendingMessages) return;
byte[] payload = ms.ToArray();
_sendBuffers.Enqueue(payload);
OnClientWrite(message, payload.LongLength, payload);
lock (_sendingMessagesLock)
{
if (_sendingMessages) return;
_sendingMessages = true;
}
ThreadPool.QueueUserWorkItem(Send);
_sendingMessages = true;
ThreadPool.QueueUserWorkItem(ProcessSendBuffers);
}
}
}
/// <summary>
/// Sends a message to the connected server.
/// Blocks the thread until all messages have been sent.
/// Blocks the thread until the message has been sent.
/// </summary>
/// <typeparam name="T">The type of the message.</typeparam>
/// <param name="message">The message to be send.</param>
/// <param name="message">The message to be sent.</param>
public void SendBlocking<T>(T message) where T : IMessage
{
Send(message);
while (_sendingMessages)
if (!Connected || message == null) return;
SafeSendMessage(message);
}
/// <summary>
/// Safely sends a message and prevents multiple simultaneous
/// write operations on the <see cref="_stream"/>.
/// </summary>
/// <param name="message">The message to send.</param>
private void SafeSendMessage(IMessage message)
{
try
{
Thread.Sleep(10);
_singleWriteMutex.WaitOne();
using (PayloadWriter pw = new PayloadWriter(_stream, true))
{
OnClientWrite(message, pw.WriteMessage(message));
}
}
catch (Exception)
{
Disconnect();
SendCleanup(true);
}
finally
{
_singleWriteMutex.ReleaseMutex();
}
}
private void Send(object state)
private void ProcessSendBuffers(object state)
{
while (true)
{
@ -648,7 +582,7 @@ private void Send(object state)
return;
}
byte[] payload;
IMessage message;
lock (_sendBuffers)
{
if (_sendBuffers.Count == 0)
@ -657,33 +591,13 @@ private void Send(object state)
return;
}
payload = _sendBuffers.Dequeue();
message = _sendBuffers.Dequeue();
}
try
{
_stream.Write(BuildMessage(payload));
}
catch (Exception ex)
{
OnClientFail(ex);
SendCleanup(true);
return;
}
SafeSendMessage(message);
}
}
private byte[] BuildMessage(byte[] payload)
{
if (compressionEnabled)
payload = SafeQuickLZ.Compress(payload);
byte[] message = new byte[payload.Length + HEADER_SIZE];
Array.Copy(BitConverter.GetBytes(payload.Length), message, HEADER_SIZE);
Array.Copy(payload, 0, message, HEADER_SIZE, payload.Length);
return message;
}
private void SendCleanup(bool clear = false)
{
lock (_sendingMessagesLock)
@ -711,11 +625,11 @@ public void Disconnect()
_stream.Close();
_readOffset = 0;
_writeOffset = 0;
_tempHeaderOffset = 0;
_readableDataLen = 0;
_payloadLen = 0;
_payloadBuffer = null;
_receiveState = ReceiveType.Header;
_singleWriteMutex.Dispose();
if (_proxyClients != null)
{

View File

@ -67,7 +67,7 @@ public void Connect()
}
}
private void OnClientRead(Client client, IMessage message)
private void OnClientRead(Client client, IMessage message, int messageLength)
{
if (!Identified)
{

View File

@ -0,0 +1,75 @@
using ProtoBuf;
using Quasar.Common.Messages;
using System;
using System.IO;
namespace Quasar.Common.Networking
{
public class PayloadReader : MemoryStream
{
private readonly Stream _innerStream;
public bool LeaveInnerStreamOpen { get; }
public PayloadReader(byte[] payload, int length, bool leaveInnerStreamOpen)
{
_innerStream = new MemoryStream(payload, 0, length, false, true);
LeaveInnerStreamOpen = leaveInnerStreamOpen;
}
public PayloadReader(Stream stream, bool leaveInnerStreamOpen)
{
_innerStream = stream;
LeaveInnerStreamOpen = leaveInnerStreamOpen;
}
public int ReadInteger()
{
return BitConverter.ToInt32(ReadBytes(4), 0);
}
public byte[] ReadBytes(int length)
{
if (_innerStream.Position + length <= _innerStream.Length)
{
byte[] result = new byte[length];
_innerStream.Read(result, 0, result.Length);
return result;
}
throw new OverflowException($"Unable to read {length} bytes from stream");
}
/// <summary>
/// Reads the serialized message of the payload and deserializes it.
/// </summary>
/// <returns>The deserialized message of the payload.</returns>
public IMessage ReadMessage()
{
ReadInteger();
/* Length prefix is ignored here and already handled in Client class,
* it would cause to much trouble to check here for split or not fully
* received packets.
*/
IMessage message = Serializer.Deserialize<IMessage>(_innerStream);
return message;
}
protected override void Dispose(bool disposing)
{
try
{
if (LeaveInnerStreamOpen)
{
_innerStream.Flush();
}
else
{
_innerStream.Close();
}
}
finally
{
base.Dispose(disposing);
}
}
}
}

View File

@ -0,0 +1,65 @@
using ProtoBuf;
using Quasar.Common.Messages;
using System;
using System.IO;
namespace Quasar.Common.Networking
{
public class PayloadWriter : MemoryStream
{
private readonly Stream _innerStream;
public bool LeaveInnerStreamOpen { get; }
public PayloadWriter(Stream stream, bool leaveInnerStreamOpen)
{
_innerStream = stream;
LeaveInnerStreamOpen = leaveInnerStreamOpen;
}
public void WriteBytes(byte[] value)
{
_innerStream.Write(value, 0, value.Length);
}
public void WriteInteger(int value)
{
WriteBytes(BitConverter.GetBytes(value));
}
/// <summary>
/// Writes a serialized message as payload to the stream.
/// </summary>
/// <param name="message">The message to write.</param>
/// <returns>The amount of written bytes to the stream.</returns>
public int WriteMessage(IMessage message)
{
using (MemoryStream ms = new MemoryStream())
{
Serializer.Serialize(ms, message);
byte[] payload = ms.ToArray();
WriteInteger(payload.Length);
WriteBytes(payload);
return sizeof(int) + payload.Length;
}
}
protected override void Dispose(bool disposing)
{
try
{
if (LeaveInnerStreamOpen)
{
_innerStream.Flush();
}
else
{
_innerStream.Close();
}
}
finally
{
base.Dispose(disposing);
}
}
}
}

View File

@ -46,6 +46,7 @@
<Reference Include="System.Management" />
<Reference Include="System.Runtime.Serialization" />
<Reference Include="System.ServiceModel" />
<Reference Include="System.XML" />
</ItemGroup>
<ItemGroup>
<Compile Include="Cryptography\Aes256.cs" />
@ -150,6 +151,8 @@
<Compile Include="Messages\GetStartupItems.cs" />
<Compile Include="Messages\GetSystemInfo.cs" />
<Compile Include="Networking\ISender.cs" />
<Compile Include="Networking\PayloadReader.cs" />
<Compile Include="Networking\PayloadWriter.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Models\RegSeekerMatch.cs" />
<Compile Include="Models\RegValueData.cs" />

View File

@ -1,10 +1,7 @@
using ProtoBuf;
using Quasar.Common.IO.Compression;
using Quasar.Common.Messages;
using Quasar.Common.Messages;
using Quasar.Common.Networking;
using System;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Security;
using System.Threading;
@ -156,9 +153,9 @@ public enum ReceiveType
private readonly BufferPool _bufferPool;
/// <summary>
/// The Queue which holds buffers to send.
/// The queue which holds messages to send.
/// </summary>
private readonly Queue<byte[]> _sendBuffers = new Queue<byte[]>();
private readonly Queue<IMessage> _sendBuffers = new Queue<IMessage>();
/// <summary>
/// Determines if the client is currently sending messages.
@ -171,7 +168,7 @@ public enum ReceiveType
private readonly object _sendingMessagesLock = new object();
/// <summary>
/// The Queue which holds buffers to read.
/// The queue which holds buffers to read.
/// </summary>
private readonly Queue<byte[]> _readBuffers = new Queue<byte[]>();
@ -188,7 +185,6 @@ public enum ReceiveType
// receive info
private int _readOffset;
private int _writeOffset;
private int _tempHeaderOffset;
private int _readableDataLen;
private int _payloadLen;
private ReceiveType _receiveState = ReceiveType.Header;
@ -196,7 +192,7 @@ public enum ReceiveType
/// <summary>
/// The time when the client connected.
/// </summary>
public DateTime ConnectedTime { get; private set; }
public DateTime ConnectedTime { get; }
/// <summary>
/// The connection state of the client.
@ -239,21 +235,9 @@ public enum ReceiveType
private const int MaxMessageSize = (1024 * 1024) * 5; // 5 MB
/// <summary>
/// The temporary header to store parts of the header.
/// The mutex prevents multiple simultaneous write operations on the <see cref="_stream"/>.
/// </summary>
/// <remarks>
/// This temporary header is used when we have i.e.
/// only 2 bytes left to read from the buffer but need more
/// which can only be read in the next Receive callback
/// </remarks>
private readonly byte[] _tempHeader;
/// <summary>
/// Decides if we need to append bytes to the header.
/// </summary>
private bool _appendHeader;
private const bool compressionEnabled = true;
private readonly Mutex _singleWriteMutex = new Mutex();
public Client(BufferPool bufferPool, SslStream stream, IPEndPoint endPoint)
{
@ -266,8 +250,6 @@ public Client(BufferPool bufferPool, SslStream stream, IPEndPoint endPoint)
_stream = stream;
_bufferPool = bufferPool;
_readBuffer = _bufferPool.GetBuffer();
_tempHeader = new byte[HeaderSize];
_stream.BeginRead(_readBuffer, 0, _readBuffer.Length, AsyncReceive, null);
OnClientState(true);
}
@ -368,38 +350,26 @@ private void AsyncReceive(object state)
{
case ReceiveType.Header:
{
if (_readableDataLen + _tempHeaderOffset >= HeaderSize)
{ // we can read the header
int headerLength = (_appendHeader)
? HeaderSize - _tempHeaderOffset
: HeaderSize;
if (_payloadBuffer == null)
_payloadBuffer = new byte[HeaderSize];
if (_readableDataLen + _writeOffset >= HeaderSize)
{
// completely received header
int headerLength = HeaderSize - _writeOffset;
try
{
if (_appendHeader)
{
try
{
Array.Copy(readBuffer, _readOffset, _tempHeader, _tempHeaderOffset,
headerLength);
}
catch (Exception)
{
process = false;
Disconnect();
break;
}
_payloadLen = BitConverter.ToInt32(_tempHeader, 0);
_tempHeaderOffset = 0;
_appendHeader = false;
}
else
{
_payloadLen = BitConverter.ToInt32(readBuffer, _readOffset);
}
Array.Copy(readBuffer, _readOffset, _payloadBuffer, _writeOffset, headerLength);
_payloadLen = BitConverter.ToInt32(_payloadBuffer, _readOffset);
if (_payloadLen <= 0 || _payloadLen > MaxMessageSize)
throw new Exception("invalid header");
// try to re-use old payload buffers which fit
if (_payloadBuffer.Length <= _payloadLen + HeaderSize)
Array.Resize(ref _payloadBuffer, _payloadLen + HeaderSize);
}
catch (Exception)
{
@ -409,14 +379,16 @@ private void AsyncReceive(object state)
}
_readableDataLen -= headerLength;
_writeOffset += headerLength;
_readOffset += headerLength;
_receiveState = ReceiveType.Payload;
}
else // _readableDataLen < _parentServer.HEADER_SIZE
else // _readableDataLen + _writeOffset < HeaderSize
{
// received only a part of the header
try
{
Array.Copy(readBuffer, _readOffset, _tempHeader, _tempHeaderOffset, _readableDataLen);
Array.Copy(readBuffer, _readOffset, _payloadBuffer, _writeOffset, _readableDataLen);
}
catch (Exception)
{
@ -424,21 +396,19 @@ private void AsyncReceive(object state)
Disconnect();
break;
}
_tempHeaderOffset += _readableDataLen;
_appendHeader = true;
_readOffset += _readableDataLen;
_writeOffset += _readableDataLen;
process = false;
// nothing left to process
}
break;
}
case ReceiveType.Payload:
{
if (_payloadBuffer == null || _payloadBuffer.Length != _payloadLen)
_payloadBuffer = new byte[_payloadLen];
int length = (_writeOffset + _readableDataLen >= _payloadLen)
? _payloadLen - _writeOffset
int length = (_writeOffset - HeaderSize + _readableDataLen) >= _payloadLen
? _payloadLen - (_writeOffset - HeaderSize)
: _readableDataLen;
try
{
Array.Copy(readBuffer, _readOffset, _payloadBuffer, _writeOffset, length);
@ -449,59 +419,31 @@ private void AsyncReceive(object state)
Disconnect();
break;
}
_writeOffset += length;
_readOffset += length;
_readableDataLen -= length;
if (_writeOffset == _payloadLen)
if (_writeOffset - HeaderSize == _payloadLen)
{
bool isError = _payloadBuffer.Length == 0;
if (!isError)
// completely received payload
try
{
if (compressionEnabled)
using (PayloadReader pr = new PayloadReader(_payloadBuffer, _payloadLen + HeaderSize, false))
{
try
{
_payloadBuffer = SafeQuickLZ.Decompress(_payloadBuffer);
}
catch (Exception)
{
process = false;
Disconnect();
break;
}
IMessage message = pr.ReadMessage();
OnClientRead(message, _payloadBuffer.Length);
}
isError = _payloadBuffer.Length == 0; // check if payload decompression failed
}
if (isError)
catch (Exception)
{
process = false;
Disconnect();
break;
}
using (MemoryStream deserialized = new MemoryStream(_payloadBuffer))
{
try
{
IMessage message = Serializer.Deserialize<IMessage>(deserialized);
OnClientRead(message, _payloadBuffer.Length);
}
catch (Exception)
{
process = false;
Disconnect();
break;
}
}
_receiveState = ReceiveType.Header;
_payloadBuffer = null;
_payloadLen = 0;
_writeOffset = 0;
}
@ -514,10 +456,6 @@ private void AsyncReceive(object state)
}
}
if (_receiveState == ReceiveType.Header)
{
_writeOffset = 0; // prepare for next message
}
_readOffset = 0;
_readableDataLen = 0;
}
@ -527,58 +465,65 @@ private void AsyncReceive(object state)
/// Sends a message to the connected client.
/// </summary>
/// <typeparam name="T">The type of the message.</typeparam>
/// <param name="message">The message to be send.</param>
/// <param name="message">The message to be sent.</param>
public void Send<T>(T message) where T : IMessage
{
if (!Connected || message == null) return;
lock (_sendBuffers)
{
using (MemoryStream ms = new MemoryStream())
_sendBuffers.Enqueue(message);
lock (_sendingMessagesLock)
{
try
{
Serializer.Serialize(ms, message);
}
catch (Exception)
{
Disconnect();
return;
}
if (_sendingMessages) return;
byte[] payload = ms.ToArray();
_sendBuffers.Enqueue(payload);
OnClientWrite(message, payload.Length);
lock (_sendingMessagesLock)
{
if (_sendingMessages) return;
_sendingMessages = true;
}
ThreadPool.QueueUserWorkItem(Send);
_sendingMessages = true;
ThreadPool.QueueUserWorkItem(ProcessSendBuffers);
}
}
}
/// <summary>
/// Sends a message to the connected client.
/// Blocks the thread until all messages have been sent.
/// Blocks the thread until the message has been sent.
/// </summary>
/// <typeparam name="T">The type of the message.</typeparam>
/// <param name="message">The message to be send.</param>
/// <param name="message">The message to be sent.</param>
public void SendBlocking<T>(T message) where T : IMessage
{
Send(message);
while (_sendingMessages)
if (!Connected || message == null) return;
SafeSendMessage(message);
}
/// <summary>
/// Safely sends a message and prevents multiple simultaneous
/// write operations on the <see cref="_stream"/>.
/// </summary>
/// <param name="message">The message to send.</param>
private void SafeSendMessage(IMessage message)
{
try
{
Thread.Sleep(10);
_singleWriteMutex.WaitOne();
using (PayloadWriter pw = new PayloadWriter(_stream, true))
{
OnClientWrite(message, pw.WriteMessage(message));
}
}
catch (Exception)
{
Disconnect();
SendCleanup(true);
}
finally
{
_singleWriteMutex.ReleaseMutex();
}
}
private void Send(object state)
private void ProcessSendBuffers(object state)
{
while (true)
{
@ -588,7 +533,7 @@ private void Send(object state)
return;
}
byte[] payload;
IMessage message;
lock (_sendBuffers)
{
if (_sendBuffers.Count == 0)
@ -597,34 +542,13 @@ private void Send(object state)
return;
}
payload = _sendBuffers.Dequeue();
message = _sendBuffers.Dequeue();
}
try
{
var message = BuildMessage(payload);
_stream.Write(message);
}
catch (Exception)
{
Disconnect();
SendCleanup(true);
return;
}
SafeSendMessage(message);
}
}
private byte[] BuildMessage(byte[] payload)
{
if (compressionEnabled)
payload = SafeQuickLZ.Compress(payload);
byte[] message = new byte[payload.Length + HeaderSize];
Array.Copy(BitConverter.GetBytes(payload.Length), message, HeaderSize);
Array.Copy(payload, 0, message, HeaderSize, payload.Length);
return message;
}
private void SendCleanup(bool clear = false)
{
lock (_sendingMessagesLock)
@ -651,16 +575,16 @@ public void Disconnect()
_stream.Close();
_readOffset = 0;
_writeOffset = 0;
_tempHeaderOffset = 0;
_readableDataLen = 0;
_payloadLen = 0;
_payloadBuffer = null;
_receiveState = ReceiveType.Header;
_singleWriteMutex.Dispose();
_bufferPool.ReturnBuffer(_readBuffer);
}
OnClientState(false);
}
}
}
}

View File

@ -48,23 +48,23 @@
<DebugSymbols>false</DebugSymbols>
</PropertyGroup>
<ItemGroup>
<Reference Include="BouncyCastle.Crypto, Version=1.8.4.0, Culture=neutral, PublicKeyToken=0e99375e54769942">
<HintPath>..\packages\BouncyCastle.1.8.4\lib\BouncyCastle.Crypto.dll</HintPath>
<Reference Include="BouncyCastle.Crypto, Version=1.8.5.0, Culture=neutral, PublicKeyToken=0e99375e54769942">
<HintPath>..\packages\BouncyCastle.1.8.5\lib\BouncyCastle.Crypto.dll</HintPath>
</Reference>
<Reference Include="Gma.System.MouseKeyHook, Version=5.6.130.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\MouseKeyHook.5.6.0\lib\net40\Gma.System.MouseKeyHook.dll</HintPath>
</Reference>
<Reference Include="Mono.Cecil, Version=0.10.1.0, Culture=neutral, PublicKeyToken=50cebf1cceb9d05e, processorArchitecture=MSIL">
<HintPath>..\packages\Mono.Cecil.0.10.1\lib\net40\Mono.Cecil.dll</HintPath>
<Reference Include="Mono.Cecil, Version=0.10.3.0, Culture=neutral, PublicKeyToken=50cebf1cceb9d05e, processorArchitecture=MSIL">
<HintPath>..\packages\Mono.Cecil.0.10.3\lib\net40\Mono.Cecil.dll</HintPath>
</Reference>
<Reference Include="Mono.Cecil.Mdb, Version=0.10.1.0, Culture=neutral, PublicKeyToken=50cebf1cceb9d05e, processorArchitecture=MSIL">
<HintPath>..\packages\Mono.Cecil.0.10.1\lib\net40\Mono.Cecil.Mdb.dll</HintPath>
<Reference Include="Mono.Cecil.Mdb, Version=0.10.3.0, Culture=neutral, PublicKeyToken=50cebf1cceb9d05e, processorArchitecture=MSIL">
<HintPath>..\packages\Mono.Cecil.0.10.3\lib\net40\Mono.Cecil.Mdb.dll</HintPath>
</Reference>
<Reference Include="Mono.Cecil.Pdb, Version=0.10.1.0, Culture=neutral, PublicKeyToken=50cebf1cceb9d05e, processorArchitecture=MSIL">
<HintPath>..\packages\Mono.Cecil.0.10.1\lib\net40\Mono.Cecil.Pdb.dll</HintPath>
<Reference Include="Mono.Cecil.Pdb, Version=0.10.3.0, Culture=neutral, PublicKeyToken=50cebf1cceb9d05e, processorArchitecture=MSIL">
<HintPath>..\packages\Mono.Cecil.0.10.3\lib\net40\Mono.Cecil.Pdb.dll</HintPath>
</Reference>
<Reference Include="Mono.Cecil.Rocks, Version=0.10.1.0, Culture=neutral, PublicKeyToken=50cebf1cceb9d05e, processorArchitecture=MSIL">
<HintPath>..\packages\Mono.Cecil.0.10.1\lib\net40\Mono.Cecil.Rocks.dll</HintPath>
<Reference Include="Mono.Cecil.Rocks, Version=0.10.3.0, Culture=neutral, PublicKeyToken=50cebf1cceb9d05e, processorArchitecture=MSIL">
<HintPath>..\packages\Mono.Cecil.0.10.3\lib\net40\Mono.Cecil.Rocks.dll</HintPath>
</Reference>
<Reference Include="Mono.Nat, Version=1.2.24.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Mono.Nat.1.2.24.0\lib\net40\Mono.Nat.dll</HintPath>
@ -72,8 +72,8 @@
<Reference Include="protobuf-net, Version=2.4.0.0, Culture=neutral, PublicKeyToken=257b51d87d2e4d67, processorArchitecture=MSIL">
<HintPath>..\packages\protobuf-net.2.4.0\lib\net40\protobuf-net.dll</HintPath>
</Reference>
<Reference Include="ResourceLib, Version=2.0.0.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Vestris.ResourceLib.2.0.0\lib\net40\ResourceLib.dll</HintPath>
<Reference Include="Vestris.ResourceLib, Version=2.1.0.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\Vestris.ResourceLib.2.1.0\lib\net40\Vestris.ResourceLib.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Configuration" />

View File

@ -1,10 +1,10 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="BouncyCastle" version="1.8.4" targetFramework="net40-client" />
<package id="BouncyCastle" version="1.8.5" targetFramework="net40-client" />
<package id="ILMerge" version="2.14.1208" targetFramework="net40-client" />
<package id="Mono.Cecil" version="0.10.1" targetFramework="net40-client" />
<package id="Mono.Cecil" version="0.10.3" targetFramework="net40-client" />
<package id="Mono.Nat" version="1.2.24.0" targetFramework="net40-client" />
<package id="MouseKeyHook" version="5.6.0" targetFramework="net40-client" />
<package id="protobuf-net" version="2.4.0" targetFramework="net40-client" />
<package id="Vestris.ResourceLib" version="2.0.0" targetFramework="net40-client" />
<package id="Vestris.ResourceLib" version="2.1.0" targetFramework="net40-client" />
</packages>