mirror of https://github.com/celery/kombu.git
[Py3] Redis: Fixes fanout with pmessage on python 3. Closes #324
This commit is contained in:
parent
0136555208
commit
8938a5edc6
|
@ -555,9 +555,9 @@ class Channel(virtual.Channel):
|
||||||
c.connection.disconnect()
|
c.connection.disconnect()
|
||||||
|
|
||||||
def _handle_message(self, client, r):
|
def _handle_message(self, client, r):
|
||||||
if r[0] == 'unsubscribe' and r[2] == 0:
|
if bytes_to_str(r[0]) == 'unsubscribe' and r[2] == 0:
|
||||||
client.subscribed = False
|
client.subscribed = False
|
||||||
elif r[0] == 'pmessage':
|
elif bytes_to_str(r[0]) == 'pmessage':
|
||||||
return {'type': r[0], 'pattern': r[1],
|
return {'type': r[0], 'pattern': r[1],
|
||||||
'channel': r[2], 'data': r[3]}
|
'channel': r[2], 'data': r[3]}
|
||||||
else:
|
else:
|
||||||
|
@ -582,8 +582,8 @@ class Channel(virtual.Channel):
|
||||||
try:
|
try:
|
||||||
message = loads(bytes_to_str(payload['data']))
|
message = loads(bytes_to_str(payload['data']))
|
||||||
except (TypeError, ValueError):
|
except (TypeError, ValueError):
|
||||||
warn('Cannot process event on channel %r: %r',
|
warn('Cannot process event on channel %r: %s',
|
||||||
channel, payload, exc_info=1)
|
channel, repr(payload)[:4096], exc_info=1)
|
||||||
raise Empty()
|
raise Empty()
|
||||||
exchange = channel.split('/', 1)[0]
|
exchange = channel.split('/', 1)[0]
|
||||||
return message, self._fanout_to_queue[exchange]
|
return message, self._fanout_to_queue[exchange]
|
||||||
|
|
Loading…
Reference in New Issue