Fix `call_soon_threadsafe` thread safety

Don't start the idle handler in other threads or signal handlers,
leaving the job to `_on_wake()`.

Co-authored-by: hexin02 <hexin02@megvii.com>
This commit is contained in:
Fantix King 2021-07-12 14:08:47 -04:00
parent b0526cd50b
commit 4b803b155e
3 changed files with 39 additions and 3 deletions

View File

@ -2,6 +2,7 @@ import asyncio
import fcntl
import logging
import os
import random
import sys
import threading
import time
@ -702,6 +703,33 @@ class _TestBase:
self.loop.run_until_complete(
self.loop.shutdown_default_executor())
def test_call_soon_threadsafe_safety(self):
ITERATIONS = 4096
counter = [0]
def cb():
counter[0] += 1
if counter[0] < ITERATIONS - 512:
h = self.loop.call_later(0.01, lambda: None)
self.loop.call_later(
0.0005 + random.random() * 0.0005, h.cancel
)
def scheduler():
loop = self.loop
for i in range(ITERATIONS):
if loop.is_running():
loop.call_soon_threadsafe(cb)
time.sleep(0.001)
loop.call_soon_threadsafe(loop.stop)
thread = threading.Thread(target=scheduler)
self.loop.call_soon(thread.start)
self.loop.run_forever()
thread.join()
self.assertEqual(counter[0], ITERATIONS)
class TestBaseUV(_TestBase, UVTestCase):

View File

@ -145,6 +145,7 @@ cdef class Loop:
cdef _exec_queued_writes(self)
cdef inline _call_soon(self, object callback, object args, object context)
cdef inline _append_ready_handle(self, Handle handle)
cdef inline _call_soon_handle(self, Handle handle)
cdef _call_later(self, uint64_t delay, object callback, object args,

View File

@ -427,7 +427,7 @@ cdef class Loop:
if handle._cancelled:
self.remove_signal_handler(sig) # Remove it properly.
else:
self._call_soon_handle(handle)
self._append_ready_handle(handle)
self.handler_async.send()
cdef _on_wake(self):
@ -667,10 +667,13 @@ cdef class Loop:
self._call_soon_handle(handle)
return handle
cdef inline _call_soon_handle(self, Handle handle):
cdef inline _append_ready_handle(self, Handle handle):
self._check_closed()
self._ready.append(handle)
self._ready_len += 1
cdef inline _call_soon_handle(self, Handle handle):
self._append_ready_handle(handle)
if not self.handler_idle.running:
self.handler_idle.start()
@ -1281,7 +1284,11 @@ cdef class Loop:
"""Like call_soon(), but thread-safe."""
if not args:
args = None
handle = self._call_soon(callback, args, context)
cdef Handle handle = new_Handle(self, callback, args, context)
self._append_ready_handle(handle) # deque append is atomic
# libuv async handler is thread-safe while the idle handler is not -
# we only set the async handler here, which will start the idle handler
# in _on_wake() from the loop and eventually call the callback.
self.handler_async.send()
return handle