from __future__ import absolute_import, unicode_literals import pickle import pytest from case import Mock, call from kombu import Connection, Exchange, Producer, Queue, binding from kombu.abstract import MaybeChannelBound from kombu.exceptions import NotBoundError from kombu.serialization import registry from t.mocks import Transport def get_conn(): return Connection(transport=Transport) class test_binding: def test_constructor(self): x = binding( Exchange('foo'), 'rkey', arguments={'barg': 'bval'}, unbind_arguments={'uarg': 'uval'}, ) assert x.exchange == Exchange('foo') assert x.routing_key == 'rkey' assert x.arguments == {'barg': 'bval'} assert x.unbind_arguments == {'uarg': 'uval'} def test_declare(self): chan = get_conn().channel() x = binding(Exchange('foo'), 'rkey') x.declare(chan) assert 'exchange_declare' in chan def test_declare_no_exchange(self): chan = get_conn().channel() x = binding() x.declare(chan) assert 'exchange_declare' not in chan def test_bind(self): chan = get_conn().channel() x = binding(Exchange('foo')) x.bind(Exchange('bar')(chan)) assert 'exchange_bind' in chan def test_unbind(self): chan = get_conn().channel() x = binding(Exchange('foo')) x.unbind(Exchange('bar')(chan)) assert 'exchange_unbind' in chan def test_repr(self): b = binding(Exchange('foo'), 'rkey') assert 'foo' in repr(b) assert 'rkey' in repr(b) class test_Exchange: def test_bound(self): exchange = Exchange('foo', 'direct') assert not exchange.is_bound assert '