mirror of https://github.com/celery/kombu.git
Adds Bobby Beever to AUTHORS
This commit is contained in:
parent
25d22f3a00
commit
d25a12868c
1
AUTHORS
1
AUTHORS
|
@ -11,6 +11,7 @@ Andrii Kostenko <andrey@kostenko.name>
|
|||
Andy McCurdy <andy@andymccurdy.com>
|
||||
Anton Gyllenberg <anton@iki.fi>
|
||||
Ask Solem <ask@celeryproject.org>
|
||||
Bobby Beever <bobby.beever@yahoo.com>
|
||||
Brian Bernstein
|
||||
Christophe Chauvet <christophe.chauvet@gmail.com>
|
||||
Christopher Grebs <cg@webshox.org>
|
||||
|
|
|
@ -36,6 +36,7 @@ class test_maybe_declare(TestCase):
|
|||
client.declared_entities = set()
|
||||
entity = Mock()
|
||||
entity.can_cache_declaration = True
|
||||
entity.auto_delete = False
|
||||
entity.is_bound = True
|
||||
entity.channel = channel
|
||||
|
||||
|
|
|
@ -3,6 +3,8 @@ from __future__ import with_statement
|
|||
|
||||
import tempfile
|
||||
|
||||
from nose import SkipTest
|
||||
|
||||
from kombu.connection import Connection
|
||||
from kombu.entity import Exchange, Queue
|
||||
from kombu.messaging import Consumer, Producer
|
||||
|
@ -13,10 +15,21 @@ from kombu.tests.utils import TestCase
|
|||
class test_FilesystemTransport(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
data_folder_in = tempfile.mkdtemp()
|
||||
data_folder_out = tempfile.mkdtemp()
|
||||
self.c = Connection(transport='filesystem', transport_options={'data_folder_in': data_folder_in, 'data_folder_out': data_folder_out})
|
||||
self.p = Connection(transport='filesystem', transport_options={'data_folder_in': data_folder_out, 'data_folder_out': data_folder_in})
|
||||
try:
|
||||
data_folder_in = tempfile.mkdtemp()
|
||||
data_folder_out = tempfile.mkdtemp()
|
||||
except Exception:
|
||||
raise SkipTest('filesystem transport: cannot create tempfiles')
|
||||
self.c = Connection(transport='filesystem',
|
||||
transport_options={
|
||||
'data_folder_in': data_folder_in,
|
||||
'data_folder_out': data_folder_out,
|
||||
})
|
||||
self.p = Connection(transport='filesystem',
|
||||
transport_options={
|
||||
'data_folder_in': data_folder_out,
|
||||
'data_folder_out': data_folder_in,
|
||||
})
|
||||
self.e = Exchange('test_transport_filesystem')
|
||||
self.q = Queue('test_transport_filesystem',
|
||||
exchange=self.e,
|
||||
|
@ -30,7 +43,8 @@ class test_FilesystemTransport(TestCase):
|
|||
consumer = Consumer(self.c.channel(), self.q, no_ack=True)
|
||||
|
||||
for i in range(10):
|
||||
producer.publish({'foo': i}, routing_key='test_transport_filesystem')
|
||||
producer.publish({'foo': i},
|
||||
routing_key='test_transport_filesystem')
|
||||
|
||||
_received = []
|
||||
|
||||
|
@ -56,9 +70,11 @@ class test_FilesystemTransport(TestCase):
|
|||
self.q2(consumer_channel).declare()
|
||||
|
||||
for i in range(10):
|
||||
producer.publish({'foo': i}, routing_key='test_transport_filesystem')
|
||||
producer.publish({'foo': i},
|
||||
routing_key='test_transport_filesystem')
|
||||
for i in range(10):
|
||||
producer.publish({'foo': i}, routing_key='test_transport_filesystem2')
|
||||
producer.publish({'foo': i},
|
||||
routing_key='test_transport_filesystem2')
|
||||
|
||||
_received1 = []
|
||||
_received2 = []
|
||||
|
@ -93,7 +109,8 @@ class test_FilesystemTransport(TestCase):
|
|||
|
||||
# queue.delete
|
||||
for i in range(10):
|
||||
producer.publish({'foo': i}, routing_key='test_transport_filesystem')
|
||||
producer.publish({'foo': i},
|
||||
routing_key='test_transport_filesystem')
|
||||
self.assertTrue(self.q(consumer_channel).get())
|
||||
self.q(consumer_channel).delete()
|
||||
self.q(consumer_channel).declare()
|
||||
|
@ -101,8 +118,8 @@ class test_FilesystemTransport(TestCase):
|
|||
|
||||
# queue.purge
|
||||
for i in range(10):
|
||||
producer.publish({'foo': i}, routing_key='test_transport_filesystem2')
|
||||
producer.publish({'foo': i},
|
||||
routing_key='test_transport_filesystem2')
|
||||
self.assertTrue(self.q2(consumer_channel).get())
|
||||
self.q2(consumer_channel).purge()
|
||||
self.assertIsNone(self.q2(consumer_channel).get())
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
"""Kombu transport using a filesystem as the message store."""
|
||||
from __future__ import absolute_import
|
||||
|
||||
from Queue import Empty
|
||||
|
||||
|
@ -20,11 +21,14 @@ __version__ = ".".join(map(str, VERSION))
|
|||
# needs win32all to work on Windows
|
||||
if os.name == 'nt':
|
||||
|
||||
import win32con, win32file, pywintypes
|
||||
import win32con
|
||||
import win32file
|
||||
import pywintypes
|
||||
|
||||
LOCK_EX = win32con.LOCKFILE_EXCLUSIVE_LOCK
|
||||
LOCK_SH = 0 # the default
|
||||
LOCK_NB = win32con.LOCKFILE_FAIL_IMMEDIATELY
|
||||
# 0 is the default
|
||||
LOCK_SH = 0 # noqa
|
||||
LOCK_NB = win32con.LOCKFILE_FAIL_IMMEDIATELY # noqa
|
||||
__overlapped = pywintypes.OVERLAPPED()
|
||||
|
||||
def lock(file, flags):
|
||||
|
@ -38,30 +42,34 @@ if os.name == 'nt':
|
|||
elif os.name == 'posix':
|
||||
|
||||
import fcntl
|
||||
from fcntl import LOCK_EX, LOCK_SH, LOCK_NB
|
||||
from fcntl import LOCK_EX, LOCK_SH, LOCK_NB # noqa
|
||||
|
||||
def lock(file, flags):
|
||||
def lock(file, flags): # noqa
|
||||
fcntl.flock(file.fileno(), flags)
|
||||
|
||||
def unlock(file):
|
||||
def unlock(file): # noqa
|
||||
fcntl.flock(file.fileno(), fcntl.LOCK_UN)
|
||||
else:
|
||||
raise RuntimeError('Filesystem plugin only defined for nt and posix platforms')
|
||||
raise RuntimeError(
|
||||
'Filesystem plugin only defined for NT and POSIX platforms')
|
||||
|
||||
|
||||
class Channel(virtual.Channel):
|
||||
|
||||
def _put(self, queue, payload, **kwargs):
|
||||
"""Put `message` onto `queue`."""
|
||||
|
||||
filename = '%s_%s.%s.msg' % (int(round(time.time()*1000)), uuid.uuid4(), queue)
|
||||
filename = '%s_%s.%s.msg' % (int(round(time.time() * 1000)),
|
||||
uuid.uuid4(), queue)
|
||||
filename = os.path.join(self.data_folder_out, filename)
|
||||
|
||||
try:
|
||||
f = open(filename, 'wb')
|
||||
lock(f, LOCK_EX)
|
||||
f.write(dumps(payload))
|
||||
except IOError, OSError:
|
||||
raise Exception('Filename [%s] could not be placed into folder.' % filename)
|
||||
except (IOError, OSError):
|
||||
raise StdChannelError(
|
||||
'Filename [%s] could not be placed into folder.' % filename)
|
||||
finally:
|
||||
unlock(f)
|
||||
f.close()
|
||||
|
@ -86,9 +94,10 @@ class Channel(virtual.Channel):
|
|||
|
||||
try:
|
||||
# move the file to the tmp/processed folder
|
||||
shutil.move(os.path.join(self.data_folder_in, filename), processed_folder)
|
||||
shutil.move(os.path.join(self.data_folder_in, filename),
|
||||
processed_folder)
|
||||
except IOError:
|
||||
pass # file could be locked, or removed in meantime so ignore
|
||||
pass # file could be locked, or removed in meantime so ignore
|
||||
|
||||
filename = os.path.join(processed_folder, filename)
|
||||
try:
|
||||
|
@ -97,8 +106,9 @@ class Channel(virtual.Channel):
|
|||
f.close()
|
||||
if not self.store_processed:
|
||||
os.remove(filename)
|
||||
except IOError, OSError:
|
||||
raise Exception('Filename [%s] could not be read from queue.' % filename)
|
||||
except (IOError, OSError):
|
||||
raise StdChannelError(
|
||||
'Filename [%s] could not be read from queue.' % filename)
|
||||
|
||||
return loads(payload)
|
||||
|
||||
|
@ -123,9 +133,9 @@ class Channel(virtual.Channel):
|
|||
count += 1
|
||||
|
||||
except OSError:
|
||||
# we simply ignore its existence, as it was probably
|
||||
# processed by another worker
|
||||
pass
|
||||
# we simply ignore its existence, as it was probably
|
||||
# processed by another worker
|
||||
pass
|
||||
|
||||
return count
|
||||
|
||||
|
@ -166,6 +176,7 @@ class Channel(virtual.Channel):
|
|||
def processed_folder(self):
|
||||
return self.transport_options.get('processed_folder', 'processed')
|
||||
|
||||
|
||||
class Transport(virtual.Transport):
|
||||
Channel = Channel
|
||||
|
||||
|
|
Loading…
Reference in New Issue