- Server and Client class now extends base Connection class

- Proxy.buffer dict is now part of indivisual server/client connection classes
- Cleaner API and code implementation (documentation)
This commit is contained in:
Abhinav Singh 2013-08-20 19:29:08 +05:30
parent 77a093dfcb
commit f7270060c9
1 changed files with 78 additions and 80 deletions

158
proxy.py
View File

@ -208,42 +208,63 @@ class HttpParser(object):
data = data[pos+len(CRLF):]
return line, data
class Server(object):
"""Established connection to destination server for proxying."""
class Connection(object):
"""TCP connection abstraction"""
def __init__(self, what):
self.buffer = ''
self.closed = False
self.what = what
def send(self, data):
return self.conn.send(data)
def recv(self, bytes=8192):
try:
data = self.conn.recv(bytes)
if len(data) == 0:
logger.debug('recvd 0 bytes from %s' % self.what)
return None
logger.debug('rcvd %d bytes from %s' % (len(data), self.what))
return data
except Exception as e:
logger.exception('Exception while receiving from connection %s %r with reason %r' % (self.what, self.conn, e))
return None
def close(self):
self.conn.close()
self.closed = True
def buffer_size(self):
return len(self.buffer)
def has_buffer(self):
return self.buffer_size() > 0
def queue(self, data):
self.buffer += data
def flush(self):
sent = self.send(self.buffer)
self.buffer = self.buffer[sent:]
logger.debug('flushed %d bytes to %s' % (sent, self.what))
class Server(Connection):
"""Established connection to destination server."""
def __init__(self, host, port):
super(Server, self).__init__('server')
self.addr = (host, int(port))
self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.conn.connect((self.addr[0], self.addr[1]))
self.closed = False
def send(self, data):
return self.conn.send(data)
def recv(self, bytes):
return self.conn.recv(bytes)
def close(self):
self.conn.close()
self.closed = True
class Client(object):
class Client(Connection):
"""Accepted client connection."""
def __init__(self, conn, addr):
super(Client, self).__init__('client')
self.conn = conn
self.addr = addr
self.closed = False
def send(self, data):
return self.conn.send(data)
def recv(self, bytes):
return self.conn.recv(bytes)
def close(self):
self.conn.close()
self.closed = True
class ProxyError(Exception):
pass
@ -268,10 +289,6 @@ class Proxy(multiprocessing.Process):
self.request = HttpParser()
self.response = HttpParser(HTTP_RESPONSE_PARSER)
self.buffer = dict()
self.buffer['client'] = ''
self.buffer['server'] = ''
def _now(self):
return datetime.datetime.utcnow()
@ -282,52 +299,20 @@ class Proxy(multiprocessing.Process):
def _is_inactive(self):
return self._inactive_for() > 30
def _recv(self, what):
try:
data = getattr(self, what).recv(8192)
self.last_activity = self._now()
if len(data) == 0:
logger.debug('recvd 0 bytes from %s' % what)
return None
logger.debug('rcvd %d bytes from %s' % (len(data), what))
return data
except Exception as e:
logger.exception('Exception while receiving from connection %r with reason %r' % (getattr(self, what), e))
return None
def _recv_from_client(self):
return self._recv('client')
def _recv_from_server(self):
return self._recv('server')
def _send(self, what, data):
self.buffer[what] += data
def _send_to_client(self, data):
self._send('client', data)
def _send_to_server(self, data):
self._send('server', data)
def _flush(self, what):
sent = getattr(self, what).send(self.buffer[what])
logger.debug('flushed %d bytes to %s' % (sent, what))
self.buffer[what] = self.buffer[what][sent:]
def _flush_client_buffer(self):
self._flush('client')
def _flush_server_buffer(self):
self._flush('server')
def _process_request(self, data):
# once we have connection to the server
# we don't parse the http request packets
# any further, instead just pipe incoming
# data from client to server
if self.server and not self.server.closed:
self._send_to_server(data)
self.server.queue(data)
return
# parse http request
self.request.parse(data)
# once http request parser has reached the state complete
# we attempt to establish connection to destination server
if self.request.state == HTTP_PARSER_STATE_COMPLETE:
logger.debug('request parser is in state complete')
@ -343,22 +328,31 @@ class Proxy(multiprocessing.Process):
except Exception as e:
raise ProxyConnectionFailed("%r" % e)
# for http connect methods (https requests)
# queue appropriate response for client
# notifying about established connection
if self.request.method == "CONNECT":
self._send_to_client(CRLF.join([
self.client.queue(CRLF.join([
'HTTP/1.1 200 Connection established',
'Proxy-agent: proxy.py v%s' % __version__,
CRLF
]))
# for usual http requests, re-build request packet
# and queue for the server with appropriate headers
else:
self._send_to_server(self.request.build(
self.server.queue(self.request.build(
del_headers=['proxy-connection', 'connection', 'keep-alive'],
add_headers=[('Connection', 'Close')]
))
def _process_response(self, data):
# parse incoming response packet
# only for non-https requests
if not self.request.method == "CONNECT":
self.response.parse(data)
self._send_to_client(data)
# queue data for client
self.client.queue(data)
def _access_log(self):
host, port = self.server.addr if self.server else (None, None)
@ -374,7 +368,7 @@ class Proxy(multiprocessing.Process):
rlist, wlist, xlist = [self.client.conn], [], []
logger.debug('*** watching client for read ready')
if len(self.buffer['client']) > 0:
if self.client.has_buffer():
logger.debug('pending client buffer found, watching client for write ready')
wlist.append(self.client.conn)
@ -382,7 +376,7 @@ class Proxy(multiprocessing.Process):
logger.debug('connection to server exists, watching server for read ready')
rlist.append(self.server.conn)
if self.server and not self.server.closed and len(self.buffer['server']) > 0:
if self.server and not self.server.closed and self.server.has_buffer():
logger.debug('connection to server exists and pending server buffer found, watching server for write ready')
wlist.append(self.server.conn)
@ -390,15 +384,16 @@ class Proxy(multiprocessing.Process):
if self.client.conn in w:
logger.debug('client is ready for writes, flushing client buffer')
self._flush_client_buffer()
self.client.flush()
if self.server and not self.server.closed and self.server.conn in w:
logger.debug('server is ready for writes, flushing server buffer')
self._flush_server_buffer()
self.server.flush()
if self.client.conn in r:
logger.debug('client is ready for reads, reading')
data = self._recv_from_client()
data = self.client.recv()
self.last_activity = self._now()
if not data:
logger.debug('client closed connection, breaking')
break
@ -406,14 +401,15 @@ class Proxy(multiprocessing.Process):
if self.server and not self.server.closed and self.server.conn in r:
logger.debug('server is ready for reads, reading')
data = self._recv_from_server()
data = self.server.recv()
self.last_activity = self._now()
if not data:
logger.debug('server closed connection')
self.server.close()
else:
self._process_response(data)
if len(self.buffer['client']) == 0:
if self.client.buffer_size() == 0:
if self.response.state == HTTP_PARSER_STATE_COMPLETE:
logger.debug('client buffer is empty and response state is complete, breaking')
break
@ -426,8 +422,10 @@ class Proxy(multiprocessing.Process):
except Exception as e:
logger.exception('Exception while handling connection %r with reason %r' % (self.client.conn, e))
finally:
logger.debug("closing client connection with client pending buffer size %d bytes, server pending buffer size %d bytes" % (len(self.buffer['client']), len(self.buffer['server'])))
logger.debug("closing client connection with pending client buffer size %d bytes" % self.client.buffer_size())
self.client.close()
if self.server:
logger.debug("closed client connection with pending server buffer size %d bytes" % self.server.buffer_size())
self._access_log()
logger.debug('Closing proxy for connection %r at address %r' % (self.client.conn, self.client.addr))