From 75ea14dfd427fb463a6d45891651bf0f3a20945d Mon Sep 17 00:00:00 2001 From: Oleksii Shevchuk Date: Fri, 9 Mar 2018 18:43:48 +0200 Subject: [PATCH] KCP shared buffer usage workarounds --- pupy/network/lib/streams/PupySocketStream.py | 44 +++++++++++--------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/pupy/network/lib/streams/PupySocketStream.py b/pupy/network/lib/streams/PupySocketStream.py index a4195594..714473b4 100644 --- a/pupy/network/lib/streams/PupySocketStream.py +++ b/pupy/network/lib/streams/PupySocketStream.py @@ -445,7 +445,8 @@ class PupyUDPSocketStream(object): while buf is not None: if buf: if self.INITIALIZED: - self.buf_in.write(buf, notify=False) + with self.buf_in: + self.buf_in.write(buf, notify=False) have_data = True elif buf == self.MAGIC: self.INITIALIZED = True @@ -467,12 +468,12 @@ class PupyUDPSocketStream(object): return self.upstream.read(count) try: - with self.downstream_lock: - while len(self.upstream) < count: - if self.buf_in or self._poll_read(10): + while len(self.upstream) < count: + if self.buf_in or self._poll_read(10): + with self.buf_in: self.transport.downstream_recv(self.buf_in) - else: - break + else: + break return self.upstream.read(count) @@ -502,21 +503,23 @@ class PupyUDPSocketStream(object): def consume(self): data = False - while True: - kcpdata = self.kcp.recv() - if kcpdata: - if self.INITIALIZED: - self.buf_in.write(kcpdata, notify=False) - data = True - elif kcpdata == self.MAGIC: - self.INITIALIZED = True + with self.downstream_lock: + while True: + kcpdata = self.kcp.recv() + if kcpdata: + if self.INITIALIZED: + with self.buf_in: + self.buf_in.write(kcpdata, notify=False) + data = True + elif kcpdata == self.MAGIC: + self.INITIALIZED = True + else: + return False else: - return False - else: - break + break - if not data: - return True + if not data: + return True if data: self.buf_in.flush() @@ -526,5 +529,6 @@ class PupyUDPSocketStream(object): def wake(self): now = time.time() if not self._wake_after or ( now >= self._wake_after ): - self.buf_in.wake() + with self.downstream_lock: + self.buf_in.wake() self._wake_after = now + self.LONG_SLEEP_INTERRUPT_TIMEOUT