Try to use multiprocessing instead of threading

This commit is contained in:
Oleksii Shevchuk 2016-08-26 09:12:48 +03:00
parent 425c54e90b
commit 9169fa350b
2 changed files with 24 additions and 27 deletions

View File

@ -8,10 +8,9 @@ from rpyc.utils.authenticators import AuthenticationError
from rpyc.utils.registry import UDPRegistryClient
from rpyc.core.stream import Stream
from buffer import Buffer
import threading, socket, time
import multiprocessing, socket, time
from streams.PupySocketStream import addGetPeer
class PseudoStreamDecoder(Stream):
def __init__(self, transport_class, transport_kwargs):
self.bufin=Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
@ -19,8 +18,8 @@ class PseudoStreamDecoder(Stream):
self.upstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
self.downstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
self.transport=transport_class(self, **transport_kwargs)
self.lockin=threading.Lock()
self.lockout=threading.Lock()
self.lockin=multiprocessing.Lock()
self.lockout=multiprocessing.Lock()
def decode_data(self, data):
with self.lockin:
@ -83,9 +82,9 @@ class PupyAsyncServer(object):
self.clients[cookie].buf_in.cookie=cookie
self.clients[cookie].buf_out.cookie=cookie
conn=Connection(self.service, Channel(self.clients[cookie]), config=config, _lazy=True)
t = threading.Thread(target = self.handle_new_conn, args=(conn,))
t.daemon=True
t.start()
p = multiprocessing.Process(target=self.handle_new_conn, args=(conn,))
p.daemon=True
p.start()
resp=None
with self.clients[cookie].upstream_lock:
self.clients[cookie].upstream.write(decoded)
@ -99,9 +98,9 @@ class PupyAsyncServer(object):
def handle_new_conn(self, conn):
try:
conn._init_service()
#conn.serve_all()
while True:
conn.serve(0.01)
conn.serve_all()
# while True:
# conn.serve(0.01)
except Exception as e:
logging.error(e)
@ -153,7 +152,7 @@ class PupyAsyncTCPServer(PupyAsyncServer):
def accept(self):
try:
s, addr = self.sock.accept()
t=threading.Thread(target=self.serve_request, args=(s, addr,))
t=multiprocessing.Process(target=self.serve_request, args=(s, addr,))
t.daemon=True
t.start()
#TODO : make a pool of threads
@ -248,8 +247,8 @@ class PupyTCPServer(ThreadPoolServer):
time.sleep(0.5)
stream=self.stream_class(sock, self.transport_class, self.transport_kwargs)
event=threading.Event()
t=threading.Thread(target=check_timeout, args=(event, stream.close))
event=multiprocessing.Event()
t=multiprocessing.Process(target=check_timeout, args=(event, stream.close))
t.daemon=True
t.start()
try:
@ -334,7 +333,7 @@ class PupyUDPServer(object):
raise
self.clients[addr]=self.stream_class((self.sock, addr), self.transport_class, self.transport_kwargs, client_side=False)
conn=Connection(self.service, Channel(self.clients[addr]), config=config, _lazy=True)
t = threading.Thread(target = self.handle_new_conn, args=(conn,))
t = multiprocessing.Process(target = self.handle_new_conn, args=(conn,))
t.daemon=True
t.start()
with self.clients[addr].downstream_lock:
@ -344,9 +343,9 @@ class PupyUDPServer(object):
def handle_new_conn(self, conn):
try:
conn._init_service()
#conn.serve_all()
while True:
conn.serve(0.01)
conn.serve_all()
# while True:
# conn.serve(0.01)
except Exception as e:
logging.error(e)

View File

@ -1,4 +1,4 @@
# -*- coding: UTF8 -*-
# -*- coding: utf-8 -*-
# Copyright (c) 2015, Nicolas VERDIER (contact@n1nj4.eu)
# Pupy is under the BSD 3-Clause license. see the LICENSE file at the root of the project for the detailed licence terms
""" abstraction layer over rpyc streams to handle different transports and integrate obfsproxy pluggable transports """
@ -7,7 +7,7 @@ __all__=["PupyAsyncTCPStream", "PupyAsyncUDPStream"]
from rpyc.core.stream import Stream
from ..buffer import Buffer
import sys, socket, time, errno, logging, traceback, string, random, threading
import sys, socket, time, errno, logging, traceback, string, random, multiprocessing
from rpyc.lib.compat import select, select_error, BYTES_LITERAL, get_exc_errno, maxint
from PupySocketStream import addGetPeer
@ -43,26 +43,25 @@ class PupyAsyncStream(Stream):
#buffers for transport
self.upstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
self.downstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
self.upstream_lock=threading.Lock()
self.downstream_lock=threading.Lock()
self.upstream_lock=multiprocessing.Lock()
self.downstream_lock=multiprocessing.Lock()
self.transport=transport_class(self, **transport_kwargs)
self.max_pull_interval=2
self.pull_interval=0
self.pull_event=threading.Event()
self.pull_event=multiprocessing.Event()
self.MAX_IO_CHUNK=32000*100 #3Mo because it is a async transport
#threading.Thread(target=monitor, args=(self,)).start()
self.client_side=self.transport.client
if self.client_side:
self.poller_thread=threading.Thread(target=self.poller_loop)
self.poller_thread=multiprocessing.Process(target=self.poller_loop)
self.poller_thread.daemon=True
self.poller_thread.start()
self.on_connect()
def on_connect(self):
self.transport.on_connect()
def close(self):
"""closes the stream, releasing any system resources associated with it"""
print "closing stream !"
@ -80,7 +79,7 @@ class PupyAsyncStream(Stream):
raise NotImplementedError()
def poll(self, timeout):
"""indicates whether the stream has data to read (within *timeout*
"""indicates whether the stream has data to read (within *timeout*
seconds)"""
return (len(self.upstream) > 0) or self.closed
@ -254,4 +253,3 @@ class PupyAsyncUDPStream(PupyAsyncStream):
#print "received: %s"%repr(total_received)
s.close()
return total_received