From f7270060c90c2a268ad1a091dc9066ac50959c7e Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Tue, 20 Aug 2013 19:29:08 +0530 Subject: [PATCH] - Server and Client class now extends base Connection class - Proxy.buffer dict is now part of indivisual server/client connection classes - Cleaner API and code implementation (documentation) --- proxy.py | 158 +++++++++++++++++++++++++++---------------------------- 1 file changed, 78 insertions(+), 80 deletions(-) diff --git a/proxy.py b/proxy.py index 32eacea9..3634028e 100644 --- a/proxy.py +++ b/proxy.py @@ -208,42 +208,63 @@ class HttpParser(object): data = data[pos+len(CRLF):] return line, data -class Server(object): - """Established connection to destination server for proxying.""" +class Connection(object): + """TCP connection abstraction""" + + def __init__(self, what): + self.buffer = '' + self.closed = False + self.what = what + + def send(self, data): + return self.conn.send(data) + + def recv(self, bytes=8192): + try: + data = self.conn.recv(bytes) + if len(data) == 0: + logger.debug('recvd 0 bytes from %s' % self.what) + return None + logger.debug('rcvd %d bytes from %s' % (len(data), self.what)) + return data + except Exception as e: + logger.exception('Exception while receiving from connection %s %r with reason %r' % (self.what, self.conn, e)) + return None + + def close(self): + self.conn.close() + self.closed = True + + def buffer_size(self): + return len(self.buffer) + + def has_buffer(self): + return self.buffer_size() > 0 + + def queue(self, data): + self.buffer += data + + def flush(self): + sent = self.send(self.buffer) + self.buffer = self.buffer[sent:] + logger.debug('flushed %d bytes to %s' % (sent, self.what)) + +class Server(Connection): + """Established connection to destination server.""" def __init__(self, host, port): + super(Server, self).__init__('server') self.addr = (host, int(port)) self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.conn.connect((self.addr[0], self.addr[1])) - self.closed = False - - def send(self, data): - return self.conn.send(data) - - def recv(self, bytes): - return self.conn.recv(bytes) - - def close(self): - self.conn.close() - self.closed = True -class Client(object): +class Client(Connection): """Accepted client connection.""" def __init__(self, conn, addr): + super(Client, self).__init__('client') self.conn = conn self.addr = addr - self.closed = False - - def send(self, data): - return self.conn.send(data) - - def recv(self, bytes): - return self.conn.recv(bytes) - - def close(self): - self.conn.close() - self.closed = True class ProxyError(Exception): pass @@ -268,10 +289,6 @@ class Proxy(multiprocessing.Process): self.request = HttpParser() self.response = HttpParser(HTTP_RESPONSE_PARSER) - - self.buffer = dict() - self.buffer['client'] = '' - self.buffer['server'] = '' def _now(self): return datetime.datetime.utcnow() @@ -282,52 +299,20 @@ class Proxy(multiprocessing.Process): def _is_inactive(self): return self._inactive_for() > 30 - def _recv(self, what): - try: - data = getattr(self, what).recv(8192) - self.last_activity = self._now() - if len(data) == 0: - logger.debug('recvd 0 bytes from %s' % what) - return None - logger.debug('rcvd %d bytes from %s' % (len(data), what)) - return data - except Exception as e: - logger.exception('Exception while receiving from connection %r with reason %r' % (getattr(self, what), e)) - return None - - def _recv_from_client(self): - return self._recv('client') - - def _recv_from_server(self): - return self._recv('server') - - def _send(self, what, data): - self.buffer[what] += data - - def _send_to_client(self, data): - self._send('client', data) - - def _send_to_server(self, data): - self._send('server', data) - - def _flush(self, what): - sent = getattr(self, what).send(self.buffer[what]) - logger.debug('flushed %d bytes to %s' % (sent, what)) - self.buffer[what] = self.buffer[what][sent:] - - def _flush_client_buffer(self): - self._flush('client') - - def _flush_server_buffer(self): - self._flush('server') - def _process_request(self, data): + # once we have connection to the server + # we don't parse the http request packets + # any further, instead just pipe incoming + # data from client to server if self.server and not self.server.closed: - self._send_to_server(data) + self.server.queue(data) return + # parse http request self.request.parse(data) + # once http request parser has reached the state complete + # we attempt to establish connection to destination server if self.request.state == HTTP_PARSER_STATE_COMPLETE: logger.debug('request parser is in state complete') @@ -343,22 +328,31 @@ class Proxy(multiprocessing.Process): except Exception as e: raise ProxyConnectionFailed("%r" % e) + # for http connect methods (https requests) + # queue appropriate response for client + # notifying about established connection if self.request.method == "CONNECT": - self._send_to_client(CRLF.join([ + self.client.queue(CRLF.join([ 'HTTP/1.1 200 Connection established', 'Proxy-agent: proxy.py v%s' % __version__, CRLF ])) + # for usual http requests, re-build request packet + # and queue for the server with appropriate headers else: - self._send_to_server(self.request.build( + self.server.queue(self.request.build( del_headers=['proxy-connection', 'connection', 'keep-alive'], add_headers=[('Connection', 'Close')] )) def _process_response(self, data): + # parse incoming response packet + # only for non-https requests if not self.request.method == "CONNECT": self.response.parse(data) - self._send_to_client(data) + + # queue data for client + self.client.queue(data) def _access_log(self): host, port = self.server.addr if self.server else (None, None) @@ -374,7 +368,7 @@ class Proxy(multiprocessing.Process): rlist, wlist, xlist = [self.client.conn], [], [] logger.debug('*** watching client for read ready') - if len(self.buffer['client']) > 0: + if self.client.has_buffer(): logger.debug('pending client buffer found, watching client for write ready') wlist.append(self.client.conn) @@ -382,7 +376,7 @@ class Proxy(multiprocessing.Process): logger.debug('connection to server exists, watching server for read ready') rlist.append(self.server.conn) - if self.server and not self.server.closed and len(self.buffer['server']) > 0: + if self.server and not self.server.closed and self.server.has_buffer(): logger.debug('connection to server exists and pending server buffer found, watching server for write ready') wlist.append(self.server.conn) @@ -390,15 +384,16 @@ class Proxy(multiprocessing.Process): if self.client.conn in w: logger.debug('client is ready for writes, flushing client buffer') - self._flush_client_buffer() + self.client.flush() if self.server and not self.server.closed and self.server.conn in w: logger.debug('server is ready for writes, flushing server buffer') - self._flush_server_buffer() + self.server.flush() if self.client.conn in r: logger.debug('client is ready for reads, reading') - data = self._recv_from_client() + data = self.client.recv() + self.last_activity = self._now() if not data: logger.debug('client closed connection, breaking') break @@ -406,14 +401,15 @@ class Proxy(multiprocessing.Process): if self.server and not self.server.closed and self.server.conn in r: logger.debug('server is ready for reads, reading') - data = self._recv_from_server() + data = self.server.recv() + self.last_activity = self._now() if not data: logger.debug('server closed connection') self.server.close() else: self._process_response(data) - if len(self.buffer['client']) == 0: + if self.client.buffer_size() == 0: if self.response.state == HTTP_PARSER_STATE_COMPLETE: logger.debug('client buffer is empty and response state is complete, breaking') break @@ -426,8 +422,10 @@ class Proxy(multiprocessing.Process): except Exception as e: logger.exception('Exception while handling connection %r with reason %r' % (self.client.conn, e)) finally: - logger.debug("closing client connection with client pending buffer size %d bytes, server pending buffer size %d bytes" % (len(self.buffer['client']), len(self.buffer['server']))) + logger.debug("closing client connection with pending client buffer size %d bytes" % self.client.buffer_size()) self.client.close() + if self.server: + logger.debug("closed client connection with pending server buffer size %d bytes" % self.server.buffer_size()) self._access_log() logger.debug('Closing proxy for connection %r at address %r' % (self.client.conn, self.client.addr))