diff --git a/tests/test_base.py b/tests/test_base.py index 6d87f54..b1a7936 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -2,6 +2,7 @@ import asyncio import fcntl import logging import os +import random import sys import threading import time @@ -702,6 +703,33 @@ class _TestBase: self.loop.run_until_complete( self.loop.shutdown_default_executor()) + def test_call_soon_threadsafe_safety(self): + ITERATIONS = 4096 + counter = [0] + + def cb(): + counter[0] += 1 + if counter[0] < ITERATIONS - 512: + h = self.loop.call_later(0.01, lambda: None) + self.loop.call_later( + 0.0005 + random.random() * 0.0005, h.cancel + ) + + def scheduler(): + loop = self.loop + for i in range(ITERATIONS): + if loop.is_running(): + loop.call_soon_threadsafe(cb) + time.sleep(0.001) + loop.call_soon_threadsafe(loop.stop) + + thread = threading.Thread(target=scheduler) + + self.loop.call_soon(thread.start) + self.loop.run_forever() + thread.join() + self.assertEqual(counter[0], ITERATIONS) + class TestBaseUV(_TestBase, UVTestCase): diff --git a/uvloop/loop.pxd b/uvloop/loop.pxd index 86b9e5d..06bee85 100644 --- a/uvloop/loop.pxd +++ b/uvloop/loop.pxd @@ -145,6 +145,7 @@ cdef class Loop: cdef _exec_queued_writes(self) cdef inline _call_soon(self, object callback, object args, object context) + cdef inline _append_ready_handle(self, Handle handle) cdef inline _call_soon_handle(self, Handle handle) cdef _call_later(self, uint64_t delay, object callback, object args, diff --git a/uvloop/loop.pyx b/uvloop/loop.pyx index 4d96ffa..d9b5aaa 100644 --- a/uvloop/loop.pyx +++ b/uvloop/loop.pyx @@ -427,7 +427,7 @@ cdef class Loop: if handle._cancelled: self.remove_signal_handler(sig) # Remove it properly. else: - self._call_soon_handle(handle) + self._append_ready_handle(handle) self.handler_async.send() cdef _on_wake(self): @@ -667,10 +667,13 @@ cdef class Loop: self._call_soon_handle(handle) return handle - cdef inline _call_soon_handle(self, Handle handle): + cdef inline _append_ready_handle(self, Handle handle): self._check_closed() self._ready.append(handle) self._ready_len += 1 + + cdef inline _call_soon_handle(self, Handle handle): + self._append_ready_handle(handle) if not self.handler_idle.running: self.handler_idle.start() @@ -1281,7 +1284,11 @@ cdef class Loop: """Like call_soon(), but thread-safe.""" if not args: args = None - handle = self._call_soon(callback, args, context) + cdef Handle handle = new_Handle(self, callback, args, context) + self._append_ready_handle(handle) # deque append is atomic + # libuv async handler is thread-safe while the idle handler is not - + # we only set the async handler here, which will start the idle handler + # in _on_wake() from the loop and eventually call the callback. self.handler_async.send() return handle