diff --git a/Quasar.Client/Networking/Client.cs b/Quasar.Client/Networking/Client.cs index 32412c1f..a4091b22 100644 --- a/Quasar.Client/Networking/Client.cs +++ b/Quasar.Client/Networking/Client.cs @@ -1,13 +1,10 @@ -using ProtoBuf; -using Quasar.Client.ReverseProxy; +using Quasar.Client.ReverseProxy; using Quasar.Common.Extensions; -using Quasar.Common.IO.Compression; using Quasar.Common.Messages; using Quasar.Common.Messages.ReverseProxy; using Quasar.Common.Networking; using System; using System.Collections.Generic; -using System.IO; using System.Linq; using System.Net; using System.Net.Security; @@ -39,10 +36,7 @@ public class Client : ISender private void OnClientFail(Exception ex) { var handler = ClientFail; - if (handler != null) - { - handler(this, ex); - } + handler?.Invoke(this, ex); } /// @@ -68,10 +62,7 @@ private void OnClientState(bool connected) Connected = connected; var handler = ClientState; - if (handler != null) - { - handler(this, connected); - } + handler?.Invoke(this, connected); } /// @@ -84,19 +75,18 @@ private void OnClientState(bool connected) /// /// The client that has received the message. /// The message that has been received by the server. - public delegate void ClientReadEventHandler(Client s, IMessage message); + /// The length of the message. + public delegate void ClientReadEventHandler(Client s, IMessage message, int messageLength); /// /// Fires an event that informs subscribers that a message has been received by the server. /// /// The message that has been received by the server. - private void OnClientRead(IMessage message) + /// The length of the message. + private void OnClientRead(IMessage message, int messageLength) { var handler = ClientRead; - if (handler != null) - { - handler(this, message); - } + handler?.Invoke(this, message, messageLength); } /// @@ -109,23 +99,18 @@ private void OnClientRead(IMessage message) /// /// The client that has sent the message. /// The message that has been sent by the client. - /// The length of the message. - /// The message in raw bytes. - public delegate void ClientWriteEventHandler(Client s, IMessage message, long length, byte[] rawData); + /// The length of the message. + public delegate void ClientWriteEventHandler(Client s, IMessage message, int messageLength); /// /// Fires an event that informs subscribers that the client has sent a message. /// /// The message that has been sent by the client. - /// The length of the message. - /// The message in raw bytes. - private void OnClientWrite(IMessage message, long length, byte[] rawData) + /// The length of the message. + private void OnClientWrite(IMessage message, int messageLength) { var handler = ClientWrite; - if (handler != null) - { - handler(this, message, length, rawData); - } + handler?.Invoke(this, message, messageLength); } /// @@ -176,6 +161,11 @@ public ReverseProxyClient[] ProxyClients } } + /// + /// Gets if the client is currently connected to a server. + /// + public bool Connected { get; private set; } + /// /// The stream used for communication. /// @@ -212,9 +202,9 @@ public ReverseProxyClient[] ProxyClients private byte[] _payloadBuffer; /// - /// The Queue which holds buffers to send. + /// The queue which holds messages to send. /// - private readonly Queue _sendBuffers = new Queue(); + private readonly Queue _sendBuffers = new Queue(); /// /// Determines if the client is currently sending messages. @@ -227,7 +217,7 @@ public ReverseProxyClient[] ProxyClients private readonly object _sendingMessagesLock = new object(); /// - /// The Queue which holds buffers to read. + /// The queue which holds buffers to read. /// private readonly Queue _readBuffers = new Queue(); @@ -241,35 +231,17 @@ public ReverseProxyClient[] ProxyClients /// private readonly object _readingMessagesLock = new object(); - /// - /// The temporary header to store parts of the header. - /// - /// - /// This temporary header is used when we have i.e. - /// only 2 bytes left to read from the buffer but need more - /// which can only be read in the next Receive callback - /// - private byte[] _tempHeader; - - /// - /// Decides if we need to append bytes to the header. - /// - private bool _appendHeader; - // Receive info private int _readOffset; private int _writeOffset; - private int _tempHeaderOffset; private int _readableDataLen; private int _payloadLen; private ReceiveType _receiveState = ReceiveType.Header; /// - /// Gets if the client is currently connected to a server. + /// The mutex prevents multiple simultaneous write operations on the . /// - public bool Connected { get; private set; } - - private const bool compressionEnabled = true; + private readonly Mutex _singleWriteMutex = new Mutex(); /// /// Constructor of the client, initializes serializer types. @@ -279,7 +251,6 @@ protected Client(X509Certificate2 serverCertificate) { _serverCertificate = serverCertificate; _readBuffer = new byte[BUFFER_SIZE]; - _tempHeader = new byte[HEADER_SIZE]; TypeRegistry.AddTypesToSerializer(typeof(IMessage), TypeRegistry.GetPacketTypes(typeof(IMessage)).ToArray()); } @@ -428,38 +399,26 @@ private void AsyncReceive(object state) { case ReceiveType.Header: { - if (_readableDataLen + _tempHeaderOffset >= HEADER_SIZE) - { // we can read the header - int headerLength = (_appendHeader) - ? HEADER_SIZE - _tempHeaderOffset - : HEADER_SIZE; + if (_payloadBuffer == null) + _payloadBuffer = new byte[HEADER_SIZE]; + + if (_readableDataLen + _writeOffset >= HEADER_SIZE) + { + // completely received header + int headerLength = HEADER_SIZE - _writeOffset; try { - if (_appendHeader) - { - try - { - Array.Copy(readBuffer, _readOffset, _tempHeader, _tempHeaderOffset, - headerLength); - } - catch (Exception ex) - { - process = false; - OnClientFail(ex); - break; - } - _payloadLen = BitConverter.ToInt32(_tempHeader, 0); - _tempHeaderOffset = 0; - _appendHeader = false; - } - else - { - _payloadLen = BitConverter.ToInt32(readBuffer, _readOffset); - } + Array.Copy(readBuffer, _readOffset, _payloadBuffer, _writeOffset, headerLength); + + _payloadLen = BitConverter.ToInt32(_payloadBuffer, _readOffset); if (_payloadLen <= 0 || _payloadLen > MAX_MESSAGE_SIZE) throw new Exception("invalid header"); + + // try to re-use old payload buffers which fit + if (_payloadBuffer.Length <= _payloadLen + HEADER_SIZE) + Array.Resize(ref _payloadBuffer, _payloadLen + HEADER_SIZE); } catch (Exception) { @@ -469,44 +428,44 @@ private void AsyncReceive(object state) } _readableDataLen -= headerLength; + _writeOffset += headerLength; _readOffset += headerLength; _receiveState = ReceiveType.Payload; } - else // _readableDataLen < HEADER_SIZE + else // _readableDataLen + _writeOffset < HeaderSize { + // received only a part of the header try { - Array.Copy(readBuffer, _readOffset, _tempHeader, _tempHeaderOffset, _readableDataLen); + Array.Copy(readBuffer, _readOffset, _payloadBuffer, _writeOffset, _readableDataLen); } - catch (Exception ex) + catch (Exception) { process = false; - OnClientFail(ex); + Disconnect(); break; } - _tempHeaderOffset += _readableDataLen; - _appendHeader = true; + _readOffset += _readableDataLen; + _writeOffset += _readableDataLen; process = false; + // nothing left to process } break; } case ReceiveType.Payload: { - if (_payloadBuffer == null || _payloadBuffer.Length != _payloadLen) - _payloadBuffer = new byte[_payloadLen]; - - int length = (_writeOffset + _readableDataLen >= _payloadLen) - ? _payloadLen - _writeOffset + int length = (_writeOffset - HEADER_SIZE + _readableDataLen) >= _payloadLen + ? _payloadLen - (_writeOffset - HEADER_SIZE) : _readableDataLen; try { Array.Copy(readBuffer, _readOffset, _payloadBuffer, _writeOffset, length); } - catch (Exception ex) + catch (Exception) { process = false; - OnClientFail(ex); + Disconnect(); break; } @@ -514,54 +473,26 @@ private void AsyncReceive(object state) _readOffset += length; _readableDataLen -= length; - if (_writeOffset == _payloadLen) + if (_writeOffset - HEADER_SIZE == _payloadLen) { - bool isError = _payloadBuffer.Length == 0; - - if (!isError) + // completely received payload + try { - if (compressionEnabled) + using (PayloadReader pr = new PayloadReader(_payloadBuffer, _payloadLen + HEADER_SIZE, false)) { - try - { - _payloadBuffer = SafeQuickLZ.Decompress(_payloadBuffer); - } - catch (Exception) - { - process = false; - Disconnect(); - break; - } + IMessage message = pr.ReadMessage(); + + OnClientRead(message, _payloadBuffer.Length); } - - isError = _payloadBuffer.Length == 0; // check if payload decompression failed } - - if (isError) + catch (Exception) { process = false; Disconnect(); break; } - using (MemoryStream deserialized = new MemoryStream(_payloadBuffer)) - { - try - { - IMessage message = Serializer.Deserialize(deserialized); - - OnClientRead(message); - } - catch (Exception ex) - { - process = false; - OnClientFail(ex); - break; - } - } - _receiveState = ReceiveType.Header; - _payloadBuffer = null; _payloadLen = 0; _writeOffset = 0; } @@ -574,10 +505,6 @@ private void AsyncReceive(object state) } } - if (_receiveState == ReceiveType.Header) - { - _writeOffset = 0; // prepare for next message - } _readOffset = 0; _readableDataLen = 0; } @@ -587,58 +514,65 @@ private void AsyncReceive(object state) /// Sends a message to the connected server. /// /// The type of the message. - /// The message to be send. + /// The message to be sent. public void Send(T message) where T : IMessage { if (!Connected || message == null) return; lock (_sendBuffers) { - using (MemoryStream ms = new MemoryStream()) + _sendBuffers.Enqueue(message); + + lock (_sendingMessagesLock) { - try - { - Serializer.Serialize(ms, message); - } - catch (Exception ex) - { - OnClientFail(ex); - return; - } + if (_sendingMessages) return; - byte[] payload = ms.ToArray(); - - _sendBuffers.Enqueue(payload); - - OnClientWrite(message, payload.LongLength, payload); - - lock (_sendingMessagesLock) - { - if (_sendingMessages) return; - - _sendingMessages = true; - } - ThreadPool.QueueUserWorkItem(Send); + _sendingMessages = true; + ThreadPool.QueueUserWorkItem(ProcessSendBuffers); } } } /// /// Sends a message to the connected server. - /// Blocks the thread until all messages have been sent. + /// Blocks the thread until the message has been sent. /// /// The type of the message. - /// The message to be send. + /// The message to be sent. public void SendBlocking(T message) where T : IMessage { - Send(message); - while (_sendingMessages) + if (!Connected || message == null) return; + + SafeSendMessage(message); + } + + /// + /// Safely sends a message and prevents multiple simultaneous + /// write operations on the . + /// + /// The message to send. + private void SafeSendMessage(IMessage message) + { + try { - Thread.Sleep(10); + _singleWriteMutex.WaitOne(); + using (PayloadWriter pw = new PayloadWriter(_stream, true)) + { + OnClientWrite(message, pw.WriteMessage(message)); + } + } + catch (Exception) + { + Disconnect(); + SendCleanup(true); + } + finally + { + _singleWriteMutex.ReleaseMutex(); } } - private void Send(object state) + private void ProcessSendBuffers(object state) { while (true) { @@ -648,7 +582,7 @@ private void Send(object state) return; } - byte[] payload; + IMessage message; lock (_sendBuffers) { if (_sendBuffers.Count == 0) @@ -657,33 +591,13 @@ private void Send(object state) return; } - payload = _sendBuffers.Dequeue(); + message = _sendBuffers.Dequeue(); } - try - { - _stream.Write(BuildMessage(payload)); - } - catch (Exception ex) - { - OnClientFail(ex); - SendCleanup(true); - return; - } + SafeSendMessage(message); } } - private byte[] BuildMessage(byte[] payload) - { - if (compressionEnabled) - payload = SafeQuickLZ.Compress(payload); - - byte[] message = new byte[payload.Length + HEADER_SIZE]; - Array.Copy(BitConverter.GetBytes(payload.Length), message, HEADER_SIZE); - Array.Copy(payload, 0, message, HEADER_SIZE, payload.Length); - return message; - } - private void SendCleanup(bool clear = false) { lock (_sendingMessagesLock) @@ -711,11 +625,11 @@ public void Disconnect() _stream.Close(); _readOffset = 0; _writeOffset = 0; - _tempHeaderOffset = 0; _readableDataLen = 0; _payloadLen = 0; _payloadBuffer = null; _receiveState = ReceiveType.Header; + _singleWriteMutex.Dispose(); if (_proxyClients != null) { diff --git a/Quasar.Client/Networking/QuasarClient.cs b/Quasar.Client/Networking/QuasarClient.cs index a3db3fdc..77c3fb70 100644 --- a/Quasar.Client/Networking/QuasarClient.cs +++ b/Quasar.Client/Networking/QuasarClient.cs @@ -67,7 +67,7 @@ public void Connect() } } - private void OnClientRead(Client client, IMessage message) + private void OnClientRead(Client client, IMessage message, int messageLength) { if (!Identified) { diff --git a/Quasar.Common/Networking/PayloadReader.cs b/Quasar.Common/Networking/PayloadReader.cs new file mode 100644 index 00000000..c1ad7fc6 --- /dev/null +++ b/Quasar.Common/Networking/PayloadReader.cs @@ -0,0 +1,75 @@ +using ProtoBuf; +using Quasar.Common.Messages; +using System; +using System.IO; + +namespace Quasar.Common.Networking +{ + public class PayloadReader : MemoryStream + { + private readonly Stream _innerStream; + public bool LeaveInnerStreamOpen { get; } + + public PayloadReader(byte[] payload, int length, bool leaveInnerStreamOpen) + { + _innerStream = new MemoryStream(payload, 0, length, false, true); + LeaveInnerStreamOpen = leaveInnerStreamOpen; + } + + public PayloadReader(Stream stream, bool leaveInnerStreamOpen) + { + _innerStream = stream; + LeaveInnerStreamOpen = leaveInnerStreamOpen; + } + + public int ReadInteger() + { + return BitConverter.ToInt32(ReadBytes(4), 0); + } + + public byte[] ReadBytes(int length) + { + if (_innerStream.Position + length <= _innerStream.Length) + { + byte[] result = new byte[length]; + _innerStream.Read(result, 0, result.Length); + return result; + } + throw new OverflowException($"Unable to read {length} bytes from stream"); + } + + /// + /// Reads the serialized message of the payload and deserializes it. + /// + /// The deserialized message of the payload. + public IMessage ReadMessage() + { + ReadInteger(); + /* Length prefix is ignored here and already handled in Client class, + * it would cause to much trouble to check here for split or not fully + * received packets. + */ + IMessage message = Serializer.Deserialize(_innerStream); + return message; + } + + protected override void Dispose(bool disposing) + { + try + { + if (LeaveInnerStreamOpen) + { + _innerStream.Flush(); + } + else + { + _innerStream.Close(); + } + } + finally + { + base.Dispose(disposing); + } + } + } +} diff --git a/Quasar.Common/Networking/PayloadWriter.cs b/Quasar.Common/Networking/PayloadWriter.cs new file mode 100644 index 00000000..000d808e --- /dev/null +++ b/Quasar.Common/Networking/PayloadWriter.cs @@ -0,0 +1,65 @@ +using ProtoBuf; +using Quasar.Common.Messages; +using System; +using System.IO; + +namespace Quasar.Common.Networking +{ + public class PayloadWriter : MemoryStream + { + private readonly Stream _innerStream; + public bool LeaveInnerStreamOpen { get; } + + public PayloadWriter(Stream stream, bool leaveInnerStreamOpen) + { + _innerStream = stream; + LeaveInnerStreamOpen = leaveInnerStreamOpen; + } + + public void WriteBytes(byte[] value) + { + _innerStream.Write(value, 0, value.Length); + } + + public void WriteInteger(int value) + { + WriteBytes(BitConverter.GetBytes(value)); + } + + /// + /// Writes a serialized message as payload to the stream. + /// + /// The message to write. + /// The amount of written bytes to the stream. + public int WriteMessage(IMessage message) + { + using (MemoryStream ms = new MemoryStream()) + { + Serializer.Serialize(ms, message); + byte[] payload = ms.ToArray(); + WriteInteger(payload.Length); + WriteBytes(payload); + return sizeof(int) + payload.Length; + } + } + + protected override void Dispose(bool disposing) + { + try + { + if (LeaveInnerStreamOpen) + { + _innerStream.Flush(); + } + else + { + _innerStream.Close(); + } + } + finally + { + base.Dispose(disposing); + } + } + } +} diff --git a/Quasar.Common/Quasar.Common.csproj b/Quasar.Common/Quasar.Common.csproj index 8f85b71b..e23cd045 100644 --- a/Quasar.Common/Quasar.Common.csproj +++ b/Quasar.Common/Quasar.Common.csproj @@ -46,6 +46,7 @@ + @@ -150,6 +151,8 @@ + + diff --git a/Quasar.Server/Networking/Client.cs b/Quasar.Server/Networking/Client.cs index aa019bb7..d913b7e3 100644 --- a/Quasar.Server/Networking/Client.cs +++ b/Quasar.Server/Networking/Client.cs @@ -1,10 +1,7 @@ -using ProtoBuf; -using Quasar.Common.IO.Compression; -using Quasar.Common.Messages; +using Quasar.Common.Messages; using Quasar.Common.Networking; using System; using System.Collections.Generic; -using System.IO; using System.Net; using System.Net.Security; using System.Threading; @@ -156,9 +153,9 @@ public enum ReceiveType private readonly BufferPool _bufferPool; /// - /// The Queue which holds buffers to send. + /// The queue which holds messages to send. /// - private readonly Queue _sendBuffers = new Queue(); + private readonly Queue _sendBuffers = new Queue(); /// /// Determines if the client is currently sending messages. @@ -171,7 +168,7 @@ public enum ReceiveType private readonly object _sendingMessagesLock = new object(); /// - /// The Queue which holds buffers to read. + /// The queue which holds buffers to read. /// private readonly Queue _readBuffers = new Queue(); @@ -188,7 +185,6 @@ public enum ReceiveType // receive info private int _readOffset; private int _writeOffset; - private int _tempHeaderOffset; private int _readableDataLen; private int _payloadLen; private ReceiveType _receiveState = ReceiveType.Header; @@ -196,7 +192,7 @@ public enum ReceiveType /// /// The time when the client connected. /// - public DateTime ConnectedTime { get; private set; } + public DateTime ConnectedTime { get; } /// /// The connection state of the client. @@ -239,21 +235,9 @@ public enum ReceiveType private const int MaxMessageSize = (1024 * 1024) * 5; // 5 MB /// - /// The temporary header to store parts of the header. + /// The mutex prevents multiple simultaneous write operations on the . /// - /// - /// This temporary header is used when we have i.e. - /// only 2 bytes left to read from the buffer but need more - /// which can only be read in the next Receive callback - /// - private readonly byte[] _tempHeader; - - /// - /// Decides if we need to append bytes to the header. - /// - private bool _appendHeader; - - private const bool compressionEnabled = true; + private readonly Mutex _singleWriteMutex = new Mutex(); public Client(BufferPool bufferPool, SslStream stream, IPEndPoint endPoint) { @@ -266,8 +250,6 @@ public Client(BufferPool bufferPool, SslStream stream, IPEndPoint endPoint) _stream = stream; _bufferPool = bufferPool; _readBuffer = _bufferPool.GetBuffer(); - _tempHeader = new byte[HeaderSize]; - _stream.BeginRead(_readBuffer, 0, _readBuffer.Length, AsyncReceive, null); OnClientState(true); } @@ -368,38 +350,26 @@ private void AsyncReceive(object state) { case ReceiveType.Header: { - if (_readableDataLen + _tempHeaderOffset >= HeaderSize) - { // we can read the header - int headerLength = (_appendHeader) - ? HeaderSize - _tempHeaderOffset - : HeaderSize; + if (_payloadBuffer == null) + _payloadBuffer = new byte[HeaderSize]; + + if (_readableDataLen + _writeOffset >= HeaderSize) + { + // completely received header + int headerLength = HeaderSize - _writeOffset; try { - if (_appendHeader) - { - try - { - Array.Copy(readBuffer, _readOffset, _tempHeader, _tempHeaderOffset, - headerLength); - } - catch (Exception) - { - process = false; - Disconnect(); - break; - } - _payloadLen = BitConverter.ToInt32(_tempHeader, 0); - _tempHeaderOffset = 0; - _appendHeader = false; - } - else - { - _payloadLen = BitConverter.ToInt32(readBuffer, _readOffset); - } + Array.Copy(readBuffer, _readOffset, _payloadBuffer, _writeOffset, headerLength); + + _payloadLen = BitConverter.ToInt32(_payloadBuffer, _readOffset); if (_payloadLen <= 0 || _payloadLen > MaxMessageSize) throw new Exception("invalid header"); + + // try to re-use old payload buffers which fit + if (_payloadBuffer.Length <= _payloadLen + HeaderSize) + Array.Resize(ref _payloadBuffer, _payloadLen + HeaderSize); } catch (Exception) { @@ -409,14 +379,16 @@ private void AsyncReceive(object state) } _readableDataLen -= headerLength; + _writeOffset += headerLength; _readOffset += headerLength; _receiveState = ReceiveType.Payload; } - else // _readableDataLen < _parentServer.HEADER_SIZE + else // _readableDataLen + _writeOffset < HeaderSize { + // received only a part of the header try { - Array.Copy(readBuffer, _readOffset, _tempHeader, _tempHeaderOffset, _readableDataLen); + Array.Copy(readBuffer, _readOffset, _payloadBuffer, _writeOffset, _readableDataLen); } catch (Exception) { @@ -424,21 +396,19 @@ private void AsyncReceive(object state) Disconnect(); break; } - _tempHeaderOffset += _readableDataLen; - _appendHeader = true; + _readOffset += _readableDataLen; + _writeOffset += _readableDataLen; process = false; + // nothing left to process } break; } case ReceiveType.Payload: { - if (_payloadBuffer == null || _payloadBuffer.Length != _payloadLen) - _payloadBuffer = new byte[_payloadLen]; - - int length = (_writeOffset + _readableDataLen >= _payloadLen) - ? _payloadLen - _writeOffset + int length = (_writeOffset - HeaderSize + _readableDataLen) >= _payloadLen + ? _payloadLen - (_writeOffset - HeaderSize) : _readableDataLen; - + try { Array.Copy(readBuffer, _readOffset, _payloadBuffer, _writeOffset, length); @@ -449,59 +419,31 @@ private void AsyncReceive(object state) Disconnect(); break; } - + _writeOffset += length; _readOffset += length; _readableDataLen -= length; - - if (_writeOffset == _payloadLen) + + if (_writeOffset - HeaderSize == _payloadLen) { - bool isError = _payloadBuffer.Length == 0; - - if (!isError) + // completely received payload + try { - if (compressionEnabled) + using (PayloadReader pr = new PayloadReader(_payloadBuffer, _payloadLen + HeaderSize, false)) { - try - { - _payloadBuffer = SafeQuickLZ.Decompress(_payloadBuffer); - } - catch (Exception) - { - process = false; - Disconnect(); - break; - } + IMessage message = pr.ReadMessage(); + + OnClientRead(message, _payloadBuffer.Length); } - - isError = _payloadBuffer.Length == 0; // check if payload decompression failed } - - if (isError) + catch (Exception) { process = false; Disconnect(); break; } - - using (MemoryStream deserialized = new MemoryStream(_payloadBuffer)) - { - try - { - IMessage message = Serializer.Deserialize(deserialized); - - OnClientRead(message, _payloadBuffer.Length); - } - catch (Exception) - { - process = false; - Disconnect(); - break; - } - } - + _receiveState = ReceiveType.Header; - _payloadBuffer = null; _payloadLen = 0; _writeOffset = 0; } @@ -514,10 +456,6 @@ private void AsyncReceive(object state) } } - if (_receiveState == ReceiveType.Header) - { - _writeOffset = 0; // prepare for next message - } _readOffset = 0; _readableDataLen = 0; } @@ -527,58 +465,65 @@ private void AsyncReceive(object state) /// Sends a message to the connected client. /// /// The type of the message. - /// The message to be send. + /// The message to be sent. public void Send(T message) where T : IMessage { if (!Connected || message == null) return; lock (_sendBuffers) { - using (MemoryStream ms = new MemoryStream()) + _sendBuffers.Enqueue(message); + + lock (_sendingMessagesLock) { - try - { - Serializer.Serialize(ms, message); - } - catch (Exception) - { - Disconnect(); - return; - } + if (_sendingMessages) return; - byte[] payload = ms.ToArray(); - - _sendBuffers.Enqueue(payload); - - OnClientWrite(message, payload.Length); - - lock (_sendingMessagesLock) - { - if (_sendingMessages) return; - - _sendingMessages = true; - } - ThreadPool.QueueUserWorkItem(Send); + _sendingMessages = true; + ThreadPool.QueueUserWorkItem(ProcessSendBuffers); } } } /// /// Sends a message to the connected client. - /// Blocks the thread until all messages have been sent. + /// Blocks the thread until the message has been sent. /// /// The type of the message. - /// The message to be send. + /// The message to be sent. public void SendBlocking(T message) where T : IMessage { - Send(message); - while (_sendingMessages) + if (!Connected || message == null) return; + + SafeSendMessage(message); + } + + /// + /// Safely sends a message and prevents multiple simultaneous + /// write operations on the . + /// + /// The message to send. + private void SafeSendMessage(IMessage message) + { + try { - Thread.Sleep(10); + _singleWriteMutex.WaitOne(); + using (PayloadWriter pw = new PayloadWriter(_stream, true)) + { + OnClientWrite(message, pw.WriteMessage(message)); + } + } + catch (Exception) + { + Disconnect(); + SendCleanup(true); + } + finally + { + _singleWriteMutex.ReleaseMutex(); } } - private void Send(object state) + private void ProcessSendBuffers(object state) { while (true) { @@ -588,7 +533,7 @@ private void Send(object state) return; } - byte[] payload; + IMessage message; lock (_sendBuffers) { if (_sendBuffers.Count == 0) @@ -597,34 +542,13 @@ private void Send(object state) return; } - payload = _sendBuffers.Dequeue(); + message = _sendBuffers.Dequeue(); } - try - { - var message = BuildMessage(payload); - _stream.Write(message); - } - catch (Exception) - { - Disconnect(); - SendCleanup(true); - return; - } + SafeSendMessage(message); } } - private byte[] BuildMessage(byte[] payload) - { - if (compressionEnabled) - payload = SafeQuickLZ.Compress(payload); - - byte[] message = new byte[payload.Length + HeaderSize]; - Array.Copy(BitConverter.GetBytes(payload.Length), message, HeaderSize); - Array.Copy(payload, 0, message, HeaderSize, payload.Length); - return message; - } - private void SendCleanup(bool clear = false) { lock (_sendingMessagesLock) @@ -651,16 +575,16 @@ public void Disconnect() _stream.Close(); _readOffset = 0; _writeOffset = 0; - _tempHeaderOffset = 0; _readableDataLen = 0; _payloadLen = 0; _payloadBuffer = null; _receiveState = ReceiveType.Header; - + _singleWriteMutex.Dispose(); + _bufferPool.ReturnBuffer(_readBuffer); } OnClientState(false); } } -} \ No newline at end of file +} diff --git a/Quasar.Server/Quasar.Server.csproj b/Quasar.Server/Quasar.Server.csproj index a46ccf0e..be97105f 100644 --- a/Quasar.Server/Quasar.Server.csproj +++ b/Quasar.Server/Quasar.Server.csproj @@ -48,23 +48,23 @@ false - - ..\packages\BouncyCastle.1.8.4\lib\BouncyCastle.Crypto.dll + + ..\packages\BouncyCastle.1.8.5\lib\BouncyCastle.Crypto.dll ..\packages\MouseKeyHook.5.6.0\lib\net40\Gma.System.MouseKeyHook.dll - - ..\packages\Mono.Cecil.0.10.1\lib\net40\Mono.Cecil.dll + + ..\packages\Mono.Cecil.0.10.3\lib\net40\Mono.Cecil.dll - - ..\packages\Mono.Cecil.0.10.1\lib\net40\Mono.Cecil.Mdb.dll + + ..\packages\Mono.Cecil.0.10.3\lib\net40\Mono.Cecil.Mdb.dll - - ..\packages\Mono.Cecil.0.10.1\lib\net40\Mono.Cecil.Pdb.dll + + ..\packages\Mono.Cecil.0.10.3\lib\net40\Mono.Cecil.Pdb.dll - - ..\packages\Mono.Cecil.0.10.1\lib\net40\Mono.Cecil.Rocks.dll + + ..\packages\Mono.Cecil.0.10.3\lib\net40\Mono.Cecil.Rocks.dll ..\packages\Mono.Nat.1.2.24.0\lib\net40\Mono.Nat.dll @@ -72,8 +72,8 @@ ..\packages\protobuf-net.2.4.0\lib\net40\protobuf-net.dll - - ..\packages\Vestris.ResourceLib.2.0.0\lib\net40\ResourceLib.dll + + ..\packages\Vestris.ResourceLib.2.1.0\lib\net40\Vestris.ResourceLib.dll diff --git a/Quasar.Server/packages.config b/Quasar.Server/packages.config index 242a03af..caff8975 100644 --- a/Quasar.Server/packages.config +++ b/Quasar.Server/packages.config @@ -1,10 +1,10 @@  - + - + - + \ No newline at end of file