Add fanout to filesystem (#1499)

* Create a folder for each queue when using filesystem transport and add fanout support

* clean up unused variables

* Add fanout support to filesystem transport

filesystem transport lacks of fanout support.

1. Add fanout support to filesystem transport.
2. Add a unit test for it.

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Remove the refactoring work and make the test passed

1. Remove all of refactoring work
2. make the test pass

* Use pathlib for some Path operation

* Some reviewed changes

Co-authored-by: Yuriy Halytskyy <y.halytskyy@auckland.ac.nz>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
Gao 2022-03-15 22:21:07 +08:00 committed by GitHub
parent d57dde5631
commit 0282e1419f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 156 additions and 1 deletions

View File

@ -65,7 +65,7 @@ Features
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Fanout: Yes
* Supports Priority: No
* Supports TTL: No
@ -86,12 +86,16 @@ Transport Options
* ``store_processed`` - if set to True, all processed messages are backed up to
``processed_folder``.
* ``processed_folder`` - directory where are backed up processed files.
* ``control_folder`` - directory where are exchange-queue table stored.
"""
import os
import shutil
import tempfile
import uuid
from collections import namedtuple
from contextlib import contextmanager
from pathlib import Path
from queue import Empty
from time import monotonic
@ -128,6 +132,7 @@ if os.name == 'nt':
hfile = win32file._get_osfhandle(file.fileno())
win32file.UnlockFileEx(hfile, 0, 0xffff0000, __overlapped)
elif os.name == 'posix':
import fcntl
@ -140,14 +145,61 @@ elif os.name == 'posix':
def unlock(file):
"""Remove file lock."""
fcntl.flock(file.fileno(), fcntl.LOCK_UN)
else:
raise RuntimeError(
'Filesystem plugin only defined for NT and POSIX platforms')
exchange_queue_t = namedtuple("exchange_queue_t",
["routing_key", "pattern", "queue"])
class Channel(virtual.Channel):
"""Filesystem Channel."""
supports_fanout = True
@contextmanager
def _get_exchange_file_obj(self, exchange, mode="rb"):
file = self.control_folder / f"{exchange}.exchange"
if "w" in mode:
self.control_folder.mkdir(exist_ok=True)
f_obj = file.open(mode)
try:
if "w" in mode:
lock(f_obj, LOCK_EX)
yield f_obj
except OSError:
raise ChannelError(f"Cannot open {file}")
finally:
if "w" in mode:
unlock(f_obj)
f_obj.close()
def get_table(self, exchange):
try:
with self._get_exchange_file_obj(exchange) as f_obj:
exchange_table = loads(bytes_to_str(f_obj.read()))
return [exchange_queue_t(*q) for q in exchange_table]
except FileNotFoundError:
return []
def _queue_bind(self, exchange, routing_key, pattern, queue):
queues = self.get_table(exchange)
queue_val = exchange_queue_t(routing_key or "", pattern or "",
queue or "")
if queue_val not in queues:
queues.insert(0, queue_val)
with self._get_exchange_file_obj(exchange, "wb") as f_obj:
f_obj.write(str_to_bytes(dumps(queues)))
def _put_fanout(self, exchange, payload, routing_key, **kwargs):
for q in self.get_table(exchange):
self._put(q.queue, payload, **kwargs)
def _put(self, queue, payload, **kwargs):
"""Put `message` onto `queue`."""
filename = '{}_{}.{}.msg'.format(int(round(monotonic() * 1000)),
@ -266,10 +318,19 @@ class Channel(virtual.Channel):
def processed_folder(self):
return self.transport_options.get('processed_folder', 'processed')
@property
def control_folder(self):
return Path(self.transport_options.get('control_folder', 'control'))
class Transport(virtual.Transport):
"""Filesystem Transport."""
implements = virtual.Transport.implements.extend(
asynchronous=False,
exchange_type=frozenset(['direct', 'topic', 'fanout'])
)
Channel = Channel
# filesystem backend state is global.
global_state = virtual.BrokerState()

View File

@ -1,4 +1,5 @@
import tempfile
from queue import Empty
import pytest
@ -138,3 +139,96 @@ class test_FilesystemTransport:
assert self.q2(consumer_channel).get()
self.q2(consumer_channel).purge()
assert self.q2(consumer_channel).get() is None
@t.skip.if_win32
class test_FilesystemFanout:
def setup(self):
try:
data_folder_in = tempfile.mkdtemp()
data_folder_out = tempfile.mkdtemp()
control_folder = tempfile.mkdtemp()
except Exception:
pytest.skip("filesystem transport: cannot create tempfiles")
self.consumer_connection = Connection(
transport="filesystem",
transport_options={
"data_folder_in": data_folder_in,
"data_folder_out": data_folder_out,
"control_folder": control_folder,
},
)
self.consume_channel = self.consumer_connection.channel()
self.produce_connection = Connection(
transport="filesystem",
transport_options={
"data_folder_in": data_folder_out,
"data_folder_out": data_folder_in,
"control_folder": control_folder,
},
)
self.producer_channel = self.produce_connection.channel()
self.exchange = Exchange("filesystem_exchange_fanout", type="fanout")
self.q1 = Queue("queue1", exchange=self.exchange)
self.q2 = Queue("queue2", exchange=self.exchange)
def teardown(self):
# make sure we don't attempt to restore messages at shutdown.
for channel in [self.producer_channel, self.consumer_connection]:
try:
channel._qos._dirty.clear()
except AttributeError:
pass
try:
channel._qos._delivered.clear()
except AttributeError:
pass
def test_produce_consume(self):
producer = Producer(self.producer_channel, self.exchange)
consumer1 = Consumer(self.consume_channel, self.q1)
consumer2 = Consumer(self.consume_channel, self.q2)
self.q2(self.consume_channel).declare()
for i in range(10):
producer.publish({"foo": i})
_received1 = []
_received2 = []
def callback1(message_data, message):
_received1.append(message)
message.ack()
def callback2(message_data, message):
_received2.append(message)
message.ack()
consumer1.register_callback(callback1)
consumer2.register_callback(callback2)
consumer1.consume()
consumer2.consume()
while 1:
try:
self.consume_channel.drain_events()
except Empty:
break
assert len(_received1) + len(_received2) == 20
# queue.delete
for i in range(10):
producer.publish({"foo": i})
assert self.q1(self.consume_channel).get()
self.q1(self.consume_channel).delete()
self.q1(self.consume_channel).declare()
assert self.q1(self.consume_channel).get() is None
# queue.purge
assert self.q2(self.consume_channel).get()
self.q2(self.consume_channel).purge()
assert self.q2(self.consume_channel).get() is None