# -*- Mode: Python; tab-width: 4 -*- # Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp # Author: Sam Rushing # ====================================================================== # Copyright 1996 by Sam Rushing # # All Rights Reserved # # Permission to use, copy, modify, and distribute this software and # its documentation for any purpose and without fee is hereby # granted, provided that the above copyright notice appear in all # copies and that both that copyright notice and this permission # notice appear in supporting documentation, and that the name of Sam # Rushing not be used in advertising or publicity pertaining to # distribution of the software without specific, written prior # permission. # # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # ====================================================================== """A class supporting chat-style (command/response) protocols. This class adds support for 'chat' style protocols - where one side sends a 'command', and the other sends a response (examples would be the common internet protocols - smtp, nntp, ftp, etc..). The handle_read() method looks at the input stream for the current 'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n' for multi-line output), calling self.found_terminator() on its receipt. for example: Say you build an async nntp client using this class. At the start of the connection, you'll have self.terminator set to '\r\n', in order to process the single-line greeting. Just before issuing a 'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST command will be accumulated (using your own 'collect_incoming_data' method) up to the terminator, and then control will be returned to you - by calling your self.found_terminator() method. """ import socket import asyncore class async_chat (asyncore.dispatcher): """This is an abstract class. You must derive from this class, and add the two methods collect_incoming_data() and found_terminator()""" # these are overridable defaults ac_in_buffer_size = 4096 ac_out_buffer_size = 4096 def __init__ (self, conn=None): self.ac_in_buffer = '' self.ac_out_buffer = '' self.producer_fifo = fifo() asyncore.dispatcher.__init__ (self, conn) def set_terminator (self, term): "Set the input delimiter. Can be a fixed string of any length, an integer, or None" self.terminator = term def get_terminator (self): return self.terminator # grab some more data from the socket, # throw it to the collector method, # check for the terminator, # if found, transition to the next state. def handle_read (self): try: data = self.recv (self.ac_in_buffer_size) except socket.error, why: self.handle_error() return self.ac_in_buffer = self.ac_in_buffer + data # Continue to search for self.terminator in self.ac_in_buffer, # while calling self.collect_incoming_data. The while loop # is necessary because we might read several data+terminator # combos with a single recv(1024). while self.ac_in_buffer: lb = len(self.ac_in_buffer) terminator = self.get_terminator() if terminator is None: # no terminator, collect it all self.collect_incoming_data (self.ac_in_buffer) self.ac_in_buffer = '' elif type(terminator) == type(0): # numeric terminator n = terminator if lb < n: self.collect_incoming_data (self.ac_in_buffer) self.ac_in_buffer = '' self.terminator = self.terminator - lb else: self.collect_incoming_data (self.ac_in_buffer[:n]) self.ac_in_buffer = self.ac_in_buffer[n:] self.terminator = 0 self.found_terminator() else: # 3 cases: # 1) end of buffer matches terminator exactly: # collect data, transition # 2) end of buffer matches some prefix: # collect data to the prefix # 3) end of buffer does not match any prefix: # collect data terminator_len = len(terminator) index = self.ac_in_buffer.find(terminator) if index != -1: # we found the terminator if index > 0: # don't bother reporting the empty string (source of subtle bugs) self.collect_incoming_data (self.ac_in_buffer[:index]) self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:] # This does the Right Thing if the terminator is changed here. self.found_terminator() else: # check for a prefix of the terminator index = find_prefix_at_end (self.ac_in_buffer, terminator) if index: if index != lb: # we found a prefix, collect up to the prefix self.collect_incoming_data (self.ac_in_buffer[:-index]) self.ac_in_buffer = self.ac_in_buffer[-index:] break else: # no prefix, collect it all self.collect_incoming_data (self.ac_in_buffer) self.ac_in_buffer = '' def handle_write (self): self.initiate_send () def handle_close (self): self.close() def push (self, data): self.producer_fifo.push (simple_producer (data)) self.initiate_send() def push_with_producer (self, producer): self.producer_fifo.push (producer) self.initiate_send() def readable (self): "predicate for inclusion in the readable for select()" return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) def writable (self): "predicate for inclusion in the writable for select()" # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected) # this is about twice as fast, though not as clear. return not ( (self.ac_out_buffer is '') and self.producer_fifo.is_empty() and self.connected ) def close_when_done (self): "automatically close this channel once the outgoing queue is empty" self.producer_fifo.push (None) # refill the outgoing buffer by calling the more() method # of the first producer in the queue def refill_buffer (self): _string_type = type('') while 1: if len(self.producer_fifo): p = self.producer_fifo.first() # a 'None' in the producer fifo is a sentinel, # telling us to close the channel. if p is None: if not self.ac_out_buffer: self.producer_fifo.pop() self.close() return elif type(p) is _string_type: self.producer_fifo.pop() self.ac_out_buffer = self.ac_out_buffer + p return data = p.more() if data: self.ac_out_buffer = self.ac_out_buffer + data return else: self.producer_fifo.pop() else: return def initiate_send (self): obs = self.ac_out_buffer_size # try to refill the buffer if (len (self.ac_out_buffer) < obs): self.refill_buffer() if self.ac_out_buffer and self.connected: # try to send the buffer try: num_sent = self.send (self.ac_out_buffer[:obs]) if num_sent: self.ac_out_buffer = self.ac_out_buffer[num_sent:] except socket.error, why: self.handle_error() return def discard_buffers (self): # Emergencies only! self.ac_in_buffer = '' self.ac_out_buffer = '' while self.producer_fifo: self.producer_fifo.pop() class simple_producer: def __init__ (self, data, buffer_size=512): self.data = data self.buffer_size = buffer_size def more (self): if len (self.data) > self.buffer_size: result = self.data[:self.buffer_size] self.data = self.data[self.buffer_size:] return result else: result = self.data self.data = '' return result class fifo: def __init__ (self, list=None): if not list: self.list = [] else: self.list = list def __len__ (self): return len(self.list) def is_empty (self): return self.list == [] def first (self): return self.list[0] def push (self, data): self.list.append (data) def pop (self): if self.list: result = self.list[0] del self.list[0] return (1, result) else: return (0, None) # Given 'haystack', see if any prefix of 'needle' is at its end. This # assumes an exact match has already been checked. Return the number of # characters matched. # for example: # f_p_a_e ("qwerty\r", "\r\n") => 1 # f_p_a_e ("qwerty\r\n", "\r\n") => 2 # f_p_a_e ("qwertydkjf", "\r\n") => 0 # this could maybe be made faster with a computed regex? # [answer: no; circa Python-2.0, Jan 2001] # python: 18307/s # re: 12820/s # regex: 14035/s def find_prefix_at_end (haystack, needle): nl = len(needle) result = 0 for i in range (1,nl): if haystack[-(nl-i):] == needle[:(nl-i)]: result = nl-i break return result