core: support deleting handlers, make Receiver.close() unregister
This commit is contained in:
parent
58eb9828b0
commit
f5d22a3ca1
|
@ -105,7 +105,7 @@ def _get_file(context, path, out_fp):
|
|||
kwargs={
|
||||
'size': len(s),
|
||||
}
|
||||
)
|
||||
).close()
|
||||
out_fp.write(s)
|
||||
|
||||
ok = out_fp.tell() == metadata['size']
|
||||
|
|
|
@ -470,6 +470,13 @@ Router Class
|
|||
:return:
|
||||
`handle`, or if `handle` was ``None``, the newly allocated handle.
|
||||
|
||||
.. method:: del_handler (handle)
|
||||
|
||||
Remove the handle registered for `handle`
|
||||
|
||||
:raises KeyError:
|
||||
The handle wasn't registered.
|
||||
|
||||
.. method:: _async_route(msg, stream=None)
|
||||
|
||||
Arrange for `msg` to be forwarded towards its destination. If its
|
||||
|
|
|
@ -436,6 +436,9 @@ class Receiver(object):
|
|||
self.notify(self)
|
||||
|
||||
def close(self):
|
||||
if self.handle:
|
||||
self.router.del_handler(self.handle)
|
||||
self.handle = None
|
||||
self._latch.put(Message.dead())
|
||||
|
||||
def empty(self):
|
||||
|
@ -1273,6 +1276,9 @@ class Router(object):
|
|||
self.broker.start_receive(stream)
|
||||
listen(stream, 'disconnect', lambda: self.on_stream_disconnect(stream))
|
||||
|
||||
def del_handler(self, handle):
|
||||
del self._handle_map[handle]
|
||||
|
||||
def add_handler(self, fn, handle=None, persist=True,
|
||||
policy=None, respondent=None):
|
||||
handle = handle or self._last_handle.next()
|
||||
|
|
Loading…
Reference in New Issue