New version from Piers Lauder, who writes:

Added a debug function to replace 'print' statements.
Ensured that response attached to 'NO' replies is passed back.
added readonly exception.
Rearranged method order into types.
Ensure select returns a meaningful error on 'NO'.
'NO' returns from authenticate and login raise error with last message,
not list.
This commit is contained in:
Guido van Rossum 1998-09-28 15:34:46 +00:00
parent 75bb54c3d8
commit 26367a001d
1 changed files with 115 additions and 88 deletions

View File

@ -15,9 +15,9 @@
Time2Internaldate
"""
__version__ = "2.11"
__version__ = "2.15"
import binascii, re, socket, string, time, random
import binascii, re, socket, string, time, random, sys
# Globals
@ -97,7 +97,9 @@ class IMAP4:
Errors raise the exception class <instance>.error("<reason>").
IMAP4 server errors raise <instance>.abort("<reason>"),
which is a sub-class of 'error'.
which is a sub-class of 'error'. Mailbox status changes
from READ-WRITE to READ-ONLY raise the exception class
<instance>.readonly("<reason>"), which is a sub-class of 'abort'.
Note: to use this module, you must read the RFCs pertaining
to the IMAP4 protocol, as the semantics of the arguments to
@ -107,6 +109,7 @@ class IMAP4:
class error(Exception): pass # Logical errors - debug required
class abort(error): pass # Service errors - close and retry
class readonly(abort): pass # Mailbox status changed to READ-ONLY
def __init__(self, host = '', port = IMAP4_PORT):
@ -136,7 +139,7 @@ def __init__(self, host = '', port = IMAP4_PORT):
# request and store CAPABILITY response.
if __debug__ and self.debug >= 1:
print '\tnew IMAP4 connection, tag=%s' % self.tagpre
_mesg('new IMAP4 connection, tag=%s' % self.tagpre)
self.welcome = self._get_response()
if self.untagged_responses.has_key('PREAUTH'):
@ -154,7 +157,7 @@ def __init__(self, host = '', port = IMAP4_PORT):
self.capabilities = tuple(string.split(self.untagged_responses[cap][-1]))
if __debug__ and self.debug >= 3:
print '\tCAPABILITIES: %s' % `self.capabilities`
_mesg('CAPABILITIES: %s' % `self.capabilities`)
for version in AllowedVersions:
if not version in self.capabilities:
@ -165,6 +168,17 @@ def __init__(self, host = '', port = IMAP4_PORT):
raise self.error('server not IMAP4 compliant')
def __getattr__(self, attr):
# Allow UPPERCASE variants of IMAP4 command methods.
if Commands.has_key(attr):
return eval("self.%s" % string.lower(attr))
raise AttributeError("Unknown IMAP4 command: '%s'" % attr)
# Public methods
def open(self, host, port):
"""Setup 'self.sock' and 'self.file'."""
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@ -172,14 +186,43 @@ def open(self, host, port):
self.file = self.sock.makefile('r')
def __getattr__(self, attr):
"""Allow UPPERCASE variants of all following IMAP4 commands."""
if Commands.has_key(attr):
return eval("self.%s" % string.lower(attr))
raise AttributeError("Unknown IMAP4 command: '%s'" % attr)
def recent(self):
"""Return most recent 'RECENT' responses if any exist,
else prompt server for an update using the 'NOOP' command.
(typ, [data]) = <instance>.recent()
'data' is None if no new messages,
else list of RECENT responses, most recent last.
"""
name = 'RECENT'
typ, dat = self._untagged_response('OK', [None], name)
if dat[-1]:
return typ, dat
typ, dat = self.noop() # Prod server for response
return self._untagged_response(typ, dat, name)
# Public methods
def response(self, code):
"""Return data for response 'code' if received, or None.
Old value for response 'code' is cleared.
(code, [data]) = <instance>.response(code)
"""
return self._untagged_response(code, [None], string.upper(code))
def socket(self):
"""Return socket instance used to connect to IMAP4 server.
socket = <instance>.socket()
"""
return self.sock
# IMAP4 commands
def append(self, mailbox, flags, date_time, message):
@ -224,7 +267,7 @@ def authenticate(self, mechanism, authobject):
self.literal = _Authenticator(authobject).process
typ, dat = self._simple_command('AUTHENTICATE', mech)
if typ != 'OK':
raise self.error(dat)
raise self.error(dat[-1])
self.state = 'AUTH'
return typ, dat
@ -246,8 +289,7 @@ def close(self):
(typ, [data]) = <instance>.close()
"""
try:
try: typ, dat = self._simple_command('CLOSE')
except EOFError: typ, dat = None, [None]
typ, dat = self._simple_command('CLOSE')
finally:
self.state = 'AUTH'
return typ, dat
@ -288,7 +330,7 @@ def expunge(self):
"""
name = 'EXPUNGE'
typ, dat = self._simple_command(name)
return self._untagged_response(typ, name)
return self._untagged_response(typ, dat, name)
def fetch(self, message_set, message_parts):
@ -300,7 +342,7 @@ def fetch(self, message_set, message_parts):
"""
name = 'FETCH'
typ, dat = self._simple_command(name, message_set, message_parts)
return self._untagged_response(typ, name)
return self._untagged_response(typ, dat, name)
def list(self, directory='""', pattern='*'):
@ -312,7 +354,7 @@ def list(self, directory='""', pattern='*'):
"""
name = 'LIST'
typ, dat = self._simple_command(name, directory, pattern)
return self._untagged_response(typ, name)
return self._untagged_response(typ, dat, name)
def login(self, user, password):
@ -322,7 +364,7 @@ def login(self, user, password):
"""
typ, dat = self._simple_command('LOGIN', user, password)
if typ != 'OK':
raise self.error(dat)
raise self.error(dat[-1])
self.state = 'AUTH'
return typ, dat
@ -336,7 +378,7 @@ def logout(self):
"""
self.state = 'LOGOUT'
try: typ, dat = self._simple_command('LOGOUT')
except EOFError: typ, dat = None, [None]
except: typ, dat = 'NO', ['%s: %s' % sys.exc_info()[:2]]
self.file.close()
self.sock.close()
if self.untagged_responses.has_key('BYE'):
@ -353,7 +395,7 @@ def lsub(self, directory='""', pattern='*'):
"""
name = 'LSUB'
typ, dat = self._simple_command(name, directory, pattern)
return self._untagged_response(typ, name)
return self._untagged_response(typ, dat, name)
def noop(self):
@ -362,7 +404,7 @@ def noop(self):
(typ, data) = <instance>.noop()
"""
if __debug__ and self.debug >= 3:
print '\tuntagged responses: %s' % `self.untagged_responses`
_dump_ur(self.untagged_responses)
return self._simple_command('NOOP')
@ -375,26 +417,7 @@ def partial(self, message_num, message_part, start, length):
"""
name = 'PARTIAL'
typ, dat = self._simple_command(name, message_num, message_part, start, length)
return self._untagged_response(typ, 'FETCH')
def recent(self):
"""Return most recent 'RECENT' responses if any exist,
else prompt server for an update using the 'NOOP' command,
and flush all untagged responses.
(typ, [data]) = <instance>.recent()
'data' is None if no new messages,
else list of RECENT responses, most recent last.
"""
name = 'RECENT'
typ, dat = self._untagged_response('OK', name)
if dat[-1]:
return typ, dat
self.untagged_responses = {}
typ, dat = self._simple_command('NOOP')
return self._untagged_response(typ, name)
return self._untagged_response(typ, dat, 'FETCH')
def rename(self, oldmailbox, newmailbox):
@ -405,16 +428,6 @@ def rename(self, oldmailbox, newmailbox):
return self._simple_command('RENAME', oldmailbox, newmailbox)
def response(self, code):
"""Return data for response 'code' if received, or None.
Old value for response 'code' is cleared.
(code, [data]) = <instance>.response(code)
"""
return self._untagged_response(code, string.upper(code))
def search(self, charset, criteria):
"""Search mailbox for matching messages.
@ -426,7 +439,7 @@ def search(self, charset, criteria):
if charset:
charset = 'CHARSET ' + charset
typ, dat = self._simple_command(name, charset, criteria)
return self._untagged_response(typ, name)
return self._untagged_response(typ, dat, name)
def select(self, mailbox='INBOX', readonly=None):
@ -445,23 +458,17 @@ def select(self, mailbox='INBOX', readonly=None):
else:
name = 'SELECT'
typ, dat = self._simple_command(name, mailbox)
if typ == 'OK':
self.state = 'SELECTED'
elif typ == 'NO':
self.state = 'AUTH'
if not readonly and not self.untagged_responses.has_key('READ-WRITE'):
raise self.error('%s is not writable' % mailbox)
if typ != 'OK':
self.state = 'AUTH' # Might have been 'SELECTED'
return typ, dat
self.state = 'SELECTED'
if not self.untagged_responses.has_key('READ-WRITE') \
and not readonly:
if __debug__ and self.debug >= 1: _dump_ur(self.untagged_responses)
raise self.readonly('%s is not writable' % mailbox)
return typ, self.untagged_responses.get('EXISTS', [None])
def socket(self):
"""Return socket instance used to connect to IMAP4 server.
socket = <instance>.socket()
"""
return self.sock
def status(self, mailbox, names):
"""Request named status conditions for mailbox.
@ -471,7 +478,7 @@ def status(self, mailbox, names):
if self.PROTOCOL_VERSION == 'IMAP4':
raise self.error('%s unimplemented in IMAP4 (obtain IMAP4rev1 server, or re-code)' % name)
typ, dat = self._simple_command(name, mailbox, names)
return self._untagged_response(typ, name)
return self._untagged_response(typ, dat, name)
def store(self, message_set, command, flag_list):
@ -480,7 +487,7 @@ def store(self, message_set, command, flag_list):
(typ, [data]) = <instance>.store(message_set, command, flag_list)
"""
typ, dat = self._simple_command('STORE', message_set, command, flag_list)
return self._untagged_response(typ, 'FETCH')
return self._untagged_response(typ, dat, 'FETCH')
def subscribe(self, mailbox):
@ -511,9 +518,7 @@ def uid(self, command, *args):
name = 'SEARCH'
else:
name = 'FETCH'
typ, dat2 = self._untagged_response(typ, name)
if dat2[-1]: dat = dat2
return typ, dat
return self._untagged_response(typ, dat, name)
def unsubscribe(self, mailbox):
@ -542,12 +547,13 @@ def xatom(self, name, *args):
def _append_untagged(self, typ, dat):
ur = self.untagged_responses
if __debug__ and self.debug >= 5:
_mesg('untagged_responses[%s] %s += %s' %
(typ, len(ur.get(typ,'')), dat))
if ur.has_key(typ):
ur[typ].append(dat)
else:
ur[typ] = [dat]
if __debug__ and self.debug >= 5:
print '\tuntagged_responses[%s] %s += %s' % (typ, len(`ur[typ]`), _trunc(20, `dat`))
def _command(self, name, *args):
@ -557,8 +563,14 @@ def _command(self, name, *args):
raise self.error(
'command %s illegal in state %s' % (name, self.state))
if self.untagged_responses.has_key('OK'):
del self.untagged_responses['OK']
for typ in ('OK', 'NO', 'BAD'):
if self.untagged_responses.has_key(typ):
del self.untagged_responses[typ]
if self.untagged_responses.has_key('READ-WRITE') \
and self.untagged_responses.has_key('READ-ONLY'):
del self.untagged_responses['READ-WRITE']
raise self.readonly('mailbox status changed to READ-ONLY')
tag = self._new_tag()
data = '%s %s' % (tag, name)
@ -588,7 +600,7 @@ def _command(self, name, *args):
raise self.abort('socket error: %s' % val)
if __debug__ and self.debug >= 4:
print '\t> %s' % data
_mesg('> %s' % data)
if literal is None:
return tag
@ -606,7 +618,7 @@ def _command(self, name, *args):
literal = literator(self.continuation_response)
if __debug__ and self.debug >= 4:
print '\twrite literal size %s' % len(literal)
_mesg('write literal size %s' % len(literal))
try:
self.sock.send(literal)
@ -684,7 +696,7 @@ def _get_response(self):
size = string.atoi(self.mo.group('size'))
if __debug__ and self.debug >= 4:
print '\tread literal size %s' % size
_mesg('read literal size %s' % size)
data = self.file.read(size)
# Store response with literal as tuple
@ -702,6 +714,9 @@ def _get_response(self):
if typ in ('OK', 'NO', 'BAD') and self._match(Response_code, dat):
self._append_untagged(self.mo.group('type'), self.mo.group('data'))
if __debug__ and self.debug >= 1 and typ in ('NO', 'BAD'):
_mesg('%s response: %s' % (typ, dat))
return resp
@ -719,13 +734,13 @@ def _get_line(self):
line = self.file.readline()
if not line:
raise EOFError
raise self.abort('socket error: EOF')
# Protocol mandates all lines terminated by CRLF
line = line[:-2]
if __debug__ and self.debug >= 4:
print '\t< %s' % line
_mesg('< %s' % line)
return line
@ -736,7 +751,7 @@ def _match(self, cre, s):
self.mo = cre.match(s)
if __debug__ and self.mo is not None and self.debug >= 5:
print "\tmatched r'%s' => %s" % (cre.pattern, `self.mo.groups()`)
_mesg("\tmatched r'%s' => %s" % (cre.pattern, `self.mo.groups()`))
return self.mo is not None
@ -753,13 +768,15 @@ def _simple_command(self, name, *args):
return self._command_complete(name, apply(self._command, (name,) + args))
def _untagged_response(self, typ, name):
def _untagged_response(self, typ, dat, name):
if typ == 'NO':
return typ, dat
if not self.untagged_responses.has_key(name):
return typ, [None]
data = self.untagged_responses[name]
if __debug__ and self.debug >= 5:
print '\tuntagged_responses[%s] => %s' % (name, _trunc(20, `data`))
_mesg('untagged_responses[%s] => %s' % (name, data))
del self.untagged_responses[name]
return typ, data
@ -905,9 +922,19 @@ def Time2Internaldate(date_time):
if __debug__:
def _trunc(m, s):
if len(s) <= m: return s
return '%.*s..' % (m, s)
def _mesg(s):
# if len(s) > 70: s = '%.70s..' % s
sys.stderr.write('\t'+s+'\n')
sys.stderr.flush()
def _dump_ur(dict):
# Dump untagged responses (in `dict').
l = dict.items()
if not l: return
t = '\n\t\t'
j = string.join
l = map(lambda x,j=j:'%s: "%s"' % (x[0], x[1][0] and j(x[1], '" "') or ''), l)
_mesg('untagged responses dump:%s%s' % (t, j(l, t)))
@ -947,12 +974,12 @@ def _trunc(m, s):
def run(cmd, args):
typ, dat = apply(eval('M.%s' % cmd), args)
print ' %s %s\n => %s %s' % (cmd, args, typ, dat)
_mesg(' %s %s\n => %s %s' % (cmd, args, typ, dat))
return dat
Debug = 5
M = IMAP4(host)
print 'PROTOCOL_VERSION = %s' % M.PROTOCOL_VERSION
_mesg('PROTOCOL_VERSION = %s' % M.PROTOCOL_VERSION)
for cmd,args in test_seq1:
run(cmd, args)