From 9169fa350b71d919d2e2c78d3d39086f05d984ae Mon Sep 17 00:00:00 2001 From: Oleksii Shevchuk Date: Fri, 26 Aug 2016 09:12:48 +0300 Subject: [PATCH] Try to use multiprocessing instead of threading --- pupy/network/lib/servers.py | 33 ++++++++++----------- pupy/network/lib/streams/PupyAsyncStream.py | 18 +++++------ 2 files changed, 24 insertions(+), 27 deletions(-) diff --git a/pupy/network/lib/servers.py b/pupy/network/lib/servers.py index 0ed85d94..2a00a570 100644 --- a/pupy/network/lib/servers.py +++ b/pupy/network/lib/servers.py @@ -8,10 +8,9 @@ from rpyc.utils.authenticators import AuthenticationError from rpyc.utils.registry import UDPRegistryClient from rpyc.core.stream import Stream from buffer import Buffer -import threading, socket, time +import multiprocessing, socket, time from streams.PupySocketStream import addGetPeer - class PseudoStreamDecoder(Stream): def __init__(self, transport_class, transport_kwargs): self.bufin=Buffer(transport_func=addGetPeer(("127.0.0.1", 443))) @@ -19,8 +18,8 @@ class PseudoStreamDecoder(Stream): self.upstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443))) self.downstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443))) self.transport=transport_class(self, **transport_kwargs) - self.lockin=threading.Lock() - self.lockout=threading.Lock() + self.lockin=multiprocessing.Lock() + self.lockout=multiprocessing.Lock() def decode_data(self, data): with self.lockin: @@ -83,9 +82,9 @@ class PupyAsyncServer(object): self.clients[cookie].buf_in.cookie=cookie self.clients[cookie].buf_out.cookie=cookie conn=Connection(self.service, Channel(self.clients[cookie]), config=config, _lazy=True) - t = threading.Thread(target = self.handle_new_conn, args=(conn,)) - t.daemon=True - t.start() + p = multiprocessing.Process(target=self.handle_new_conn, args=(conn,)) + p.daemon=True + p.start() resp=None with self.clients[cookie].upstream_lock: self.clients[cookie].upstream.write(decoded) @@ -99,9 +98,9 @@ class PupyAsyncServer(object): def handle_new_conn(self, conn): try: conn._init_service() - #conn.serve_all() - while True: - conn.serve(0.01) + conn.serve_all() + # while True: + # conn.serve(0.01) except Exception as e: logging.error(e) @@ -153,7 +152,7 @@ class PupyAsyncTCPServer(PupyAsyncServer): def accept(self): try: s, addr = self.sock.accept() - t=threading.Thread(target=self.serve_request, args=(s, addr,)) + t=multiprocessing.Process(target=self.serve_request, args=(s, addr,)) t.daemon=True t.start() #TODO : make a pool of threads @@ -248,8 +247,8 @@ class PupyTCPServer(ThreadPoolServer): time.sleep(0.5) stream=self.stream_class(sock, self.transport_class, self.transport_kwargs) - event=threading.Event() - t=threading.Thread(target=check_timeout, args=(event, stream.close)) + event=multiprocessing.Event() + t=multiprocessing.Process(target=check_timeout, args=(event, stream.close)) t.daemon=True t.start() try: @@ -334,7 +333,7 @@ class PupyUDPServer(object): raise self.clients[addr]=self.stream_class((self.sock, addr), self.transport_class, self.transport_kwargs, client_side=False) conn=Connection(self.service, Channel(self.clients[addr]), config=config, _lazy=True) - t = threading.Thread(target = self.handle_new_conn, args=(conn,)) + t = multiprocessing.Process(target = self.handle_new_conn, args=(conn,)) t.daemon=True t.start() with self.clients[addr].downstream_lock: @@ -344,9 +343,9 @@ class PupyUDPServer(object): def handle_new_conn(self, conn): try: conn._init_service() - #conn.serve_all() - while True: - conn.serve(0.01) + conn.serve_all() + # while True: + # conn.serve(0.01) except Exception as e: logging.error(e) diff --git a/pupy/network/lib/streams/PupyAsyncStream.py b/pupy/network/lib/streams/PupyAsyncStream.py index e94d312f..fa156b7a 100644 --- a/pupy/network/lib/streams/PupyAsyncStream.py +++ b/pupy/network/lib/streams/PupyAsyncStream.py @@ -1,4 +1,4 @@ -# -*- coding: UTF8 -*- +# -*- coding: utf-8 -*- # Copyright (c) 2015, Nicolas VERDIER (contact@n1nj4.eu) # Pupy is under the BSD 3-Clause license. see the LICENSE file at the root of the project for the detailed licence terms """ abstraction layer over rpyc streams to handle different transports and integrate obfsproxy pluggable transports """ @@ -7,7 +7,7 @@ __all__=["PupyAsyncTCPStream", "PupyAsyncUDPStream"] from rpyc.core.stream import Stream from ..buffer import Buffer -import sys, socket, time, errno, logging, traceback, string, random, threading +import sys, socket, time, errno, logging, traceback, string, random, multiprocessing from rpyc.lib.compat import select, select_error, BYTES_LITERAL, get_exc_errno, maxint from PupySocketStream import addGetPeer @@ -43,26 +43,25 @@ class PupyAsyncStream(Stream): #buffers for transport self.upstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443))) self.downstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443))) - self.upstream_lock=threading.Lock() - self.downstream_lock=threading.Lock() + self.upstream_lock=multiprocessing.Lock() + self.downstream_lock=multiprocessing.Lock() self.transport=transport_class(self, **transport_kwargs) self.max_pull_interval=2 self.pull_interval=0 - self.pull_event=threading.Event() + self.pull_event=multiprocessing.Event() self.MAX_IO_CHUNK=32000*100 #3Mo because it is a async transport - #threading.Thread(target=monitor, args=(self,)).start() self.client_side=self.transport.client if self.client_side: - self.poller_thread=threading.Thread(target=self.poller_loop) + self.poller_thread=multiprocessing.Process(target=self.poller_loop) self.poller_thread.daemon=True self.poller_thread.start() self.on_connect() def on_connect(self): self.transport.on_connect() - + def close(self): """closes the stream, releasing any system resources associated with it""" print "closing stream !" @@ -80,7 +79,7 @@ class PupyAsyncStream(Stream): raise NotImplementedError() def poll(self, timeout): - """indicates whether the stream has data to read (within *timeout* + """indicates whether the stream has data to read (within *timeout* seconds)""" return (len(self.upstream) > 0) or self.closed @@ -254,4 +253,3 @@ class PupyAsyncUDPStream(PupyAsyncStream): #print "received: %s"%repr(total_received) s.close() return total_received -