Initial KCP support

This commit is contained in:
Oleksii Shevchuk 2017-12-30 15:57:15 +02:00
parent 119bf72c89
commit 7413e4fba6
9 changed files with 193 additions and 82 deletions

View File

@ -4,3 +4,4 @@ psutil
rsa
netaddr
tinyec
https://github.com/alxchk/pykcp

@ -1 +1 @@
Subproject commit 2b7a8236a1963d8fe541370fe7ff38df2c0deab1
Subproject commit a1b0ff23e330d1ea598ccf8ccc2429c9f689f6ba

View File

@ -30,7 +30,12 @@ class Buffer(object):
return True
else:
self.waiting.clear()
return self.waiting.wait(timeout)
self.waiting.wait(timeout)
return len(self.buffer)>0
def wake(self):
self.waiting.set()
def read(self, n=-1):
"""

View File

@ -185,16 +185,21 @@ class PupyUDPClient(PupyClient):
def __init__(self, family = socket.AF_UNSPEC, socktype = socket.SOCK_DGRAM, timeout=3):
self.sock=None
super(PupyUDPClient, self).__init__()
self.family=family
self.socktype=socktype
self.timeout=timeout
self.family = family
self.socktype = socktype
self.timeout = timeout
def connect(self, host, port):
self.host=host
self.port=port
family, socktype, proto, _, sockaddr = socket.getaddrinfo(host, port, self.family, self.socktype)[0]
self.host = host
self.port = port
family, socktype, proto, _, sockaddr = socket.getaddrinfo(
host, port, self.family, self.socktype
)[0]
s = socket.socket(family, socktype, proto)
s.settimeout(self.timeout)
s.connect(sockaddr)
self.sock=s
s.setblocking(0)
self.sock = s
return s, (host, port)

View File

@ -98,6 +98,7 @@ class PupyConnection(Connection):
if DEBUG_NETWORK:
logging.debug('Sync request handled: {}'.format(seq))
if seq in self._sync_events:
del self._sync_events[seq]
@ -122,12 +123,19 @@ class PupyConnection(Connection):
self._sync_events[seq] = Event()
self._send(consts.MSG_REQUEST, seq, (handler, self._box(args)))
if DEBUG_NETWORK:
logging.debug('Request submitted: {}'.format(seq))
return seq
def _async_request(self, handler, args = (), callback = (lambda a, b: None)):
self._send_request(handler, args, async=callback)
def _dispatch_reply(self, seq, raw):
if DEBUG_NETWORK:
logging.debug('Dispatch reply: {}'.format(seq))
self._last_recv = time.time()
sync = seq not in self._async_callbacks
Connection._dispatch_reply(self, seq, raw)
@ -136,6 +144,9 @@ class PupyConnection(Connection):
self._sync_events[seq].set()
def _dispatch_exception(self, seq, raw):
if DEBUG_NETWORK:
logging.debug('Dispatch exception: {}'.format(seq))
self._last_recv = time.time()
sync = seq not in self._async_callbacks
Connection._dispatch_exception(self, seq, raw)
@ -161,14 +172,34 @@ class PupyConnectionThread(Thread):
else:
self.lock = RLock()
if DEBUG_NETWORK:
logging.debug('Create connection thread')
self.connection = PupyConnection(self.lock, *args, **kwargs)
Thread.__init__(self)
self.daemon = True
if DEBUG_NETWORK:
logging.debug('Create connection thread completed')
def run(self):
if DEBUG_NETWORK:
logging.debug('Run connection thread')
try:
if DEBUG_NETWORK:
logging.debug('Init connection')
self.connection._init_service()
if DEBUG_NETWORK:
logging.debug('Init connection complete. Acquire lock')
with self.lock:
if DEBUG_NETWORK:
logging.debug('Start serve loop')
while not self.connection.closed:
self.connection.serve(10)

View File

@ -11,6 +11,7 @@ from rpyc.core.stream import Stream
from buffer import Buffer
import socket
import select
import errno
import random
@ -221,7 +222,7 @@ class PupyUDPServer(object):
self.service=service
self.active=False
self.clients={}
self.clients = {}
self.sock=None
self.hostname=kwargs['hostname']
self.port=kwargs['port']
@ -245,22 +246,53 @@ class PupyUDPServer(object):
except socket.error as msg:
s.close()
s = None
last_exc=msg
last_exc = msg
continue
break
self.sock=s
self.sock = s
if self.sock is None:
raise last_exc
def accept(self):
try:
data, addr = self.sock.recvfrom(40960)
if data:
self.dispatch_data(data, addr)
else:
self.clients[addr].close()
minwait = None
now = None
unsent = False
for addr, client in self.clients.iteritems():
if not minwait:
now = client.clock
minwait = client.update_in(now)
else:
cwait = client.update_in(now)
if cwait < minwait:
minwait = cwait
if client.unsent > 0:
unsent = True
if minwait is not None:
minwait = minwait / 1000.0
if not unsent:
minwait = 1
r, _, _ = select.select([self.sock], [], [], minwait)
if r:
data, addr = self.sock.recvfrom(40960)
if data:
self.dispatch_data(data, addr)
else:
self.clients[addr].close()
del self.clients[addr]
except Exception as e:
logging.error(e)
raise
def on_close(self, addr):
self.clients[addr].wake()
del self.clients[addr]
def dispatch_data(self, data_received, addr):
host, port=addr[0], addr[1]
@ -274,8 +306,10 @@ class PupyUDPServer(object):
except AuthenticationError:
logging.info("failed to authenticate, rejecting data")
raise
self.clients[addr] = self.stream_class(
(self.sock, addr), self.transport_class, self.transport_kwargs, client_side=False
(self.sock, addr), self.transport_class, self.transport_kwargs,
client_side=False, close_cb=self.on_close
)
t = PupyConnectionThread(
@ -288,9 +322,8 @@ class PupyUDPServer(object):
)
t.daemon=True
t.start()
with self.clients[addr].downstream_lock:
self.clients[addr].buf_in.write(data_received)
self.clients[addr].transport.downstream_recv(self.clients[addr].buf_in)
self.clients[addr].submit(data_received)
def start(self):
self.listen()
@ -308,6 +341,6 @@ class PupyUDPServer(object):
self.close()
def close(self):
self.active=False
self.active = False
if self.sock:
self.sock.close()

View File

@ -14,6 +14,7 @@ import errno
import logging
import traceback
import zlib
import kcp
from rpyc.lib.compat import select, select_error, BYTES_LITERAL, get_exc_errno, maxint
import threading
@ -84,12 +85,12 @@ class PupySocketStream(SocketStream):
self.downstream_lock=threading.Lock()
self.transport=transport_class(self, **transport_kwargs)
self.on_connect()
self.MAX_IO_CHUNK=32000
self.compress = True
self.on_connect()
def on_connect(self):
self.transport.on_connect()
self._upstream_recv()
@ -152,8 +153,6 @@ class PupySocketStream(SocketStream):
def read(self, count):
try:
if len(self.upstream)>=count:
return self.upstream.read(count)
while len(self.upstream)<count:
if not self.sock_poll(None) and self.closed:
return None
@ -175,96 +174,128 @@ class PupySocketStream(SocketStream):
self.close()
class PupyUDPSocketStream(object):
def __init__(self, sock, transport_class, transport_kwargs={}, client_side=True):
def __init__(self, sock, transport_class, transport_kwargs={}, client_side=True, close_cb=None):
if not (type(sock) is tuple and len(sock)==2):
raise Exception("dst_addr is not supplied for UDP stream, PupyUDPSocketStream needs a reply address/port")
self.client_side=client_side
self.MAX_IO_CHUNK=40960
self.client_side = client_side
self.sock, self.dst_addr = sock[0], sock[1]
if client_side:
self.sock.connect(self.dst_addr)
dst = self.sock.fileno()
else:
dst = lambda data: self.sock.sendto(data, self.dst_addr)
self.kcp = kcp.KCP(dst, 0, interval=32, nodelay=kcp.ENABLE_NODELAY)
self.sock, self.dst_addr=sock[0], sock[1]
self.buf_in=Buffer()
self.buf_out=Buffer()
#buffers for transport
self.upstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
self.upstream = Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
self.downstream=Buffer(on_write=self._upstream_recv, transport_func=addGetPeer(self.dst_addr))
self.downstream = Buffer(on_write=self._upstream_recv, transport_func=addGetPeer(self.dst_addr))
self.upstream_lock=threading.Lock()
self.downstream_lock=threading.Lock()
self.upstream_lock = threading.Lock()
self.downstream_lock = threading.Lock()
self.transport = transport_class(self, **transport_kwargs)
self.total_timeout = 0
self.MAX_IO_CHUNK = ( self.kcp.mtu + 24 + 4 )
self.transport.mtu = self.MAX_IO_CHUNK
self.compress = True
self.close_callback = close_cb
self.transport=transport_class(self, **transport_kwargs)
self.on_connect()
self.total_timeout=0
def update_in(self, last):
return self.kcp.update_in(last)
def on_connect(self):
self.transport.on_connect()
self.transport.on_connect()
self._upstream_recv()
def poll(self, timeout):
return len(self.upstream)>0 or self._poll_read(timeout=timeout)
return len(self.upstream)>0 or self._poll_read(timeout)
def close(self):
pass
if self.close_callback:
self.close_callback(self.dst_addr)
@property
def closed(self):
return self.close()
self.closed = True
def _upstream_recv(self):
""" called as a callback on the downstream.write """
if len(self.downstream)>0:
tosend=self.downstream.read()
sent=self.sock.sendto(tosend, self.dst_addr)
if sent!=len(tosend):
print "TODO: error: all was not sent ! tosend: %s sent: %s"%(len(tosend), sent)
data = self.downstream.read()
self.kcp.send(data)
def _poll_read(self, timeout=None):
if not self.client_side:
return self.upstream.wait(timeout)
self.sock.settimeout(timeout)
try:
buf, addr=self.sock.recvfrom(self.MAX_IO_CHUNK)
except socket.timeout:
self.total_timeout+=timeout
if self.total_timeout>300:
self.sock.close() # too much inactivity, disconnect to let it reconnect
return False
except socket.error:
ex = sys.exc_info()[1]
if get_exc_errno(ex) in (errno.EAGAIN, errno.EWOULDBLOCK):
# windows just has to be a b**ch
return True
self.close()
raise EOFError(ex)
if not buf:
self.close()
raise EOFError("connection closed by peer")
self.buf_in.write(BYTES_LITERAL(buf))
self.total_timeout=0
return True
# In case of strage hangups change None to timeout
return self.upstream.wait(None)
buf = self.kcp.recv()
if buf is None:
if timeout is not None:
timeout = int(timeout) * 1000
buf = self.kcp.pollread(timeout)
if buf:
self.buf_in.write(buf)
self.total_timeout = 0
return True
return False
def read(self, count):
try:
if len(self.upstream)>=count:
return self.upstream.read(count)
while len(self.upstream)<count:
if self.client_side:
with self.downstream_lock:
if self._poll_read(0):
self.transport.downstream_recv(self.buf_in)
else:
time.sleep(0.0001)
while len(self.upstream) < count:
if not self.client_side:
raise ValueError('Method should never be used on server side')
with self.downstream_lock:
if self.buf_in or self._poll_read(10):
self.transport.downstream_recv(self.buf_in)
return self.upstream.read(count)
except Exception as e:
logging.debug(traceback.format_exc())
def write(self, data):
# The write will be done by the _upstream_recv
# callback on the downstream buffer
try:
with self.upstream_lock:
self.buf_out.write(data)
self.transport.upstream_recv(self.buf_out)
#The write will be done by the _upstream_recv callback on the downstream buffer
except Exception as e:
logging.debug(traceback.format_exc())
@property
def clock(self):
return self.kcp.clock
def submit(self, data):
if data:
with self.downstream_lock:
self.kcp.submit(data)
while True:
kcpdata = self.kcp.recv()
if kcpdata:
self.buf_in.write(kcpdata)
self.transport.downstream_recv(self.buf_in)
else:
break
@property
def unsent(self):
return self.kcp.unsent
def wake(self):
self.upstream.wake()

View File

@ -1,4 +1,4 @@
# -*- coding: UTF8 -*-
# -*- coding: utf-8 -*-
from ..base import BasePupyTransport
class DummyPupyTransport(BasePupyTransport):
@ -6,10 +6,14 @@ class DummyPupyTransport(BasePupyTransport):
"""
receiving obfuscated data from the remote client and writing deobfuscated data to downstream
"""
self.upstream.write(data.read())
d = data.read()
print "RECV: ", len(d)
self.upstream.write(d)
def upstream_recv(self, data):
"""
receiving clear-text data from local rpyc Stream and writing obfuscated data to upstream
"""
self.downstream.write(data.read())
d = data.read()
print "SEND: ", len(d)
self.downstream.write(d)

View File

@ -24,3 +24,4 @@ logutils
secretstorage
pygments
requests
https://github.com/alxchk/pykcp