mirror of https://github.com/MagicStack/uvloop.git
Fix `call_soon_threadsafe` thread safety
Don't start the idle handler in other threads or signal handlers, leaving the job to `_on_wake()`. Co-authored-by: hexin02 <hexin02@megvii.com>
This commit is contained in:
parent
2e71c4c257
commit
6387a4e47a
|
@ -2,6 +2,7 @@ import asyncio
|
|||
import fcntl
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
@ -702,6 +703,33 @@ class _TestBase:
|
|||
self.loop.run_until_complete(
|
||||
self.loop.shutdown_default_executor())
|
||||
|
||||
def test_call_soon_threadsafe_safety(self):
|
||||
ITERATIONS = 4096
|
||||
counter = [0]
|
||||
|
||||
def cb():
|
||||
counter[0] += 1
|
||||
if counter[0] < ITERATIONS - 512:
|
||||
h = self.loop.call_later(0.01, lambda: None)
|
||||
self.loop.call_later(
|
||||
0.0005 + random.random() * 0.0005, h.cancel
|
||||
)
|
||||
|
||||
def scheduler():
|
||||
loop = self.loop
|
||||
for i in range(ITERATIONS):
|
||||
if loop.is_running():
|
||||
loop.call_soon_threadsafe(cb)
|
||||
time.sleep(0.001)
|
||||
loop.call_soon_threadsafe(loop.stop)
|
||||
|
||||
thread = threading.Thread(target=scheduler)
|
||||
|
||||
self.loop.call_soon(thread.start)
|
||||
self.loop.run_forever()
|
||||
thread.join()
|
||||
self.assertEqual(counter[0], ITERATIONS)
|
||||
|
||||
|
||||
class TestBaseUV(_TestBase, UVTestCase):
|
||||
|
||||
|
|
|
@ -145,6 +145,7 @@ cdef class Loop:
|
|||
cdef _exec_queued_writes(self)
|
||||
|
||||
cdef inline _call_soon(self, object callback, object args, object context)
|
||||
cdef inline _append_ready_handle(self, Handle handle)
|
||||
cdef inline _call_soon_handle(self, Handle handle)
|
||||
|
||||
cdef _call_later(self, uint64_t delay, object callback, object args,
|
||||
|
|
|
@ -427,7 +427,7 @@ cdef class Loop:
|
|||
if handle._cancelled:
|
||||
self.remove_signal_handler(sig) # Remove it properly.
|
||||
else:
|
||||
self._call_soon_handle(handle)
|
||||
self._append_ready_handle(handle)
|
||||
self.handler_async.send()
|
||||
|
||||
cdef _on_wake(self):
|
||||
|
@ -667,10 +667,13 @@ cdef class Loop:
|
|||
self._call_soon_handle(handle)
|
||||
return handle
|
||||
|
||||
cdef inline _call_soon_handle(self, Handle handle):
|
||||
cdef inline _append_ready_handle(self, Handle handle):
|
||||
self._check_closed()
|
||||
self._ready.append(handle)
|
||||
self._ready_len += 1
|
||||
|
||||
cdef inline _call_soon_handle(self, Handle handle):
|
||||
self._append_ready_handle(handle)
|
||||
if not self.handler_idle.running:
|
||||
self.handler_idle.start()
|
||||
|
||||
|
@ -1281,7 +1284,11 @@ cdef class Loop:
|
|||
"""Like call_soon(), but thread-safe."""
|
||||
if not args:
|
||||
args = None
|
||||
handle = self._call_soon(callback, args, context)
|
||||
cdef Handle handle = new_Handle(self, callback, args, context)
|
||||
self._append_ready_handle(handle) # deque append is atomic
|
||||
# libuv async handler is thread-safe while the idle handler is not -
|
||||
# we only set the async handler here, which will start the idle handler
|
||||
# in _on_wake() from the loop and eventually call the callback.
|
||||
self.handler_async.send()
|
||||
return handle
|
||||
|
||||
|
|
Loading…
Reference in New Issue