diff --git a/pupy/network/lib/servers.py b/pupy/network/lib/servers.py index dd3a8063..6a8d2943 100644 --- a/pupy/network/lib/servers.py +++ b/pupy/network/lib/servers.py @@ -8,7 +8,7 @@ from rpyc.utils.authenticators import AuthenticationError from rpyc.utils.registry import UDPRegistryClient from rpyc.core.stream import Stream from buffer import Buffer -import threading, socket +import threading, socket, time from streams.PupySocketStream import addGetPeer @@ -235,7 +235,29 @@ class PupyTCPServer(ThreadPoolServer): h=addrinfo[0] p=addrinfo[1] config = dict(self.protocol_config, credentials=credentials, connid="%s:%d"%(h, p)) - return Connection(self.service, Channel(self.stream_class(sock, self.transport_class, self.transport_kwargs)), config=config) + def check_timeout(event, cb, timeout=10): + start_time=time.time() + while True: + if time.time()-start_time>timeout: + if not event.is_set(): + logging.error("timeout occured !") + cb() + break + elif event.is_set(): + break + time.sleep(0.5) + stream=self.stream_class(sock, self.transport_class, self.transport_kwargs) + + event=threading.Event() + t=threading.Thread(target=check_timeout, args=(event, stream.close)) + t.daemon=True + t.start() + try: + c=Connection(self.service, Channel(stream), config=config) + finally: + event.set() + return c + class PupyUDPServer(object): diff --git a/pupy/network/lib/streams/PupySocketStream.py b/pupy/network/lib/streams/PupySocketStream.py index 628b84f2..9a99898d 100644 --- a/pupy/network/lib/streams/PupySocketStream.py +++ b/pupy/network/lib/streams/PupySocketStream.py @@ -51,7 +51,9 @@ class PupySocketStream(SocketStream): def on_connect(self): self.transport.on_connect() - super(PupySocketStream, self).write(self.downstream.read()) + d=self.downstream.read() + if d: + super(PupySocketStream, self).write(d) def _read(self): try: diff --git a/pupy/pp.py b/pupy/pp.py index 960baa54..d42d6664 100755 --- a/pupy/pp.py +++ b/pupy/pp.py @@ -74,6 +74,10 @@ class ReverseSlaveService(Service): self._conn.root.set_modules(ModuleNamespace(self.exposed_getmodule)) def on_disconnect(self): print "disconnecting !" + try: + self._conn.close() + except: + pass raise KeyboardInterrupt def exposed_exit(self): raise SystemExit