diff --git a/AUTHORS b/AUTHORS index 12d57966..8aedb36d 100644 --- a/AUTHORS +++ b/AUTHORS @@ -11,6 +11,7 @@ Andrii Kostenko Andy McCurdy Anton Gyllenberg Ask Solem +Bobby Beever Brian Bernstein Christophe Chauvet Christopher Grebs diff --git a/kombu/tests/test_common.py b/kombu/tests/test_common.py index 10f63cb4..a66ecf39 100644 --- a/kombu/tests/test_common.py +++ b/kombu/tests/test_common.py @@ -36,6 +36,7 @@ class test_maybe_declare(TestCase): client.declared_entities = set() entity = Mock() entity.can_cache_declaration = True + entity.auto_delete = False entity.is_bound = True entity.channel = channel diff --git a/kombu/tests/transport/test_filesystem.py b/kombu/tests/transport/test_filesystem.py index 547f5a70..7d004b2a 100644 --- a/kombu/tests/transport/test_filesystem.py +++ b/kombu/tests/transport/test_filesystem.py @@ -3,6 +3,8 @@ from __future__ import with_statement import tempfile +from nose import SkipTest + from kombu.connection import Connection from kombu.entity import Exchange, Queue from kombu.messaging import Consumer, Producer @@ -13,10 +15,21 @@ from kombu.tests.utils import TestCase class test_FilesystemTransport(TestCase): def setUp(self): - data_folder_in = tempfile.mkdtemp() - data_folder_out = tempfile.mkdtemp() - self.c = Connection(transport='filesystem', transport_options={'data_folder_in': data_folder_in, 'data_folder_out': data_folder_out}) - self.p = Connection(transport='filesystem', transport_options={'data_folder_in': data_folder_out, 'data_folder_out': data_folder_in}) + try: + data_folder_in = tempfile.mkdtemp() + data_folder_out = tempfile.mkdtemp() + except Exception: + raise SkipTest('filesystem transport: cannot create tempfiles') + self.c = Connection(transport='filesystem', + transport_options={ + 'data_folder_in': data_folder_in, + 'data_folder_out': data_folder_out, + }) + self.p = Connection(transport='filesystem', + transport_options={ + 'data_folder_in': data_folder_out, + 'data_folder_out': data_folder_in, + }) self.e = Exchange('test_transport_filesystem') self.q = Queue('test_transport_filesystem', exchange=self.e, @@ -30,7 +43,8 @@ class test_FilesystemTransport(TestCase): consumer = Consumer(self.c.channel(), self.q, no_ack=True) for i in range(10): - producer.publish({'foo': i}, routing_key='test_transport_filesystem') + producer.publish({'foo': i}, + routing_key='test_transport_filesystem') _received = [] @@ -56,9 +70,11 @@ class test_FilesystemTransport(TestCase): self.q2(consumer_channel).declare() for i in range(10): - producer.publish({'foo': i}, routing_key='test_transport_filesystem') + producer.publish({'foo': i}, + routing_key='test_transport_filesystem') for i in range(10): - producer.publish({'foo': i}, routing_key='test_transport_filesystem2') + producer.publish({'foo': i}, + routing_key='test_transport_filesystem2') _received1 = [] _received2 = [] @@ -93,7 +109,8 @@ class test_FilesystemTransport(TestCase): # queue.delete for i in range(10): - producer.publish({'foo': i}, routing_key='test_transport_filesystem') + producer.publish({'foo': i}, + routing_key='test_transport_filesystem') self.assertTrue(self.q(consumer_channel).get()) self.q(consumer_channel).delete() self.q(consumer_channel).declare() @@ -101,8 +118,8 @@ class test_FilesystemTransport(TestCase): # queue.purge for i in range(10): - producer.publish({'foo': i}, routing_key='test_transport_filesystem2') + producer.publish({'foo': i}, + routing_key='test_transport_filesystem2') self.assertTrue(self.q2(consumer_channel).get()) self.q2(consumer_channel).purge() self.assertIsNone(self.q2(consumer_channel).get()) - diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py index bd8db8f6..193126da 100644 --- a/kombu/transport/filesystem.py +++ b/kombu/transport/filesystem.py @@ -1,4 +1,5 @@ """Kombu transport using a filesystem as the message store.""" +from __future__ import absolute_import from Queue import Empty @@ -20,11 +21,14 @@ __version__ = ".".join(map(str, VERSION)) # needs win32all to work on Windows if os.name == 'nt': - import win32con, win32file, pywintypes + import win32con + import win32file + import pywintypes LOCK_EX = win32con.LOCKFILE_EXCLUSIVE_LOCK - LOCK_SH = 0 # the default - LOCK_NB = win32con.LOCKFILE_FAIL_IMMEDIATELY + # 0 is the default + LOCK_SH = 0 # noqa + LOCK_NB = win32con.LOCKFILE_FAIL_IMMEDIATELY # noqa __overlapped = pywintypes.OVERLAPPED() def lock(file, flags): @@ -38,30 +42,34 @@ if os.name == 'nt': elif os.name == 'posix': import fcntl - from fcntl import LOCK_EX, LOCK_SH, LOCK_NB + from fcntl import LOCK_EX, LOCK_SH, LOCK_NB # noqa - def lock(file, flags): + def lock(file, flags): # noqa fcntl.flock(file.fileno(), flags) - def unlock(file): + def unlock(file): # noqa fcntl.flock(file.fileno(), fcntl.LOCK_UN) else: - raise RuntimeError('Filesystem plugin only defined for nt and posix platforms') + raise RuntimeError( + 'Filesystem plugin only defined for NT and POSIX platforms') + class Channel(virtual.Channel): def _put(self, queue, payload, **kwargs): """Put `message` onto `queue`.""" - filename = '%s_%s.%s.msg' % (int(round(time.time()*1000)), uuid.uuid4(), queue) + filename = '%s_%s.%s.msg' % (int(round(time.time() * 1000)), + uuid.uuid4(), queue) filename = os.path.join(self.data_folder_out, filename) try: f = open(filename, 'wb') lock(f, LOCK_EX) f.write(dumps(payload)) - except IOError, OSError: - raise Exception('Filename [%s] could not be placed into folder.' % filename) + except (IOError, OSError): + raise StdChannelError( + 'Filename [%s] could not be placed into folder.' % filename) finally: unlock(f) f.close() @@ -86,9 +94,10 @@ class Channel(virtual.Channel): try: # move the file to the tmp/processed folder - shutil.move(os.path.join(self.data_folder_in, filename), processed_folder) + shutil.move(os.path.join(self.data_folder_in, filename), + processed_folder) except IOError: - pass # file could be locked, or removed in meantime so ignore + pass # file could be locked, or removed in meantime so ignore filename = os.path.join(processed_folder, filename) try: @@ -97,8 +106,9 @@ class Channel(virtual.Channel): f.close() if not self.store_processed: os.remove(filename) - except IOError, OSError: - raise Exception('Filename [%s] could not be read from queue.' % filename) + except (IOError, OSError): + raise StdChannelError( + 'Filename [%s] could not be read from queue.' % filename) return loads(payload) @@ -123,9 +133,9 @@ class Channel(virtual.Channel): count += 1 except OSError: - # we simply ignore its existence, as it was probably - # processed by another worker - pass + # we simply ignore its existence, as it was probably + # processed by another worker + pass return count @@ -166,6 +176,7 @@ class Channel(virtual.Channel): def processed_folder(self): return self.transport_options.get('processed_folder', 'processed') + class Transport(virtual.Transport): Channel = Channel