From 74d381e87a98e4a58e682aa83c2dc4d81ea814a5 Mon Sep 17 00:00:00 2001 From: jensbjorgensen Date: Fri, 9 Sep 2022 09:24:41 -0500 Subject: [PATCH] Expose libuv uv_fs_event functionality (#474) * Also updated pyOpenSSL to 22.x Co-authored-by: Jens Jorgensen Co-authored-by: Fantix King --- setup.py | 2 +- tests/test_fs_event.py | 100 +++++++++++++++++++++++++++++++++ uvloop/handles/fsevent.pxd | 12 ++++ uvloop/handles/fsevent.pyx | 112 +++++++++++++++++++++++++++++++++++++ uvloop/includes/uv.pxd | 19 +++++++ uvloop/loop.pxd | 1 + uvloop/loop.pyx | 14 +++++ 7 files changed, 259 insertions(+), 1 deletion(-) create mode 100644 tests/test_fs_event.py create mode 100644 uvloop/handles/fsevent.pxd create mode 100644 uvloop/handles/fsevent.pyx diff --git a/setup.py b/setup.py index 8425962..486ea27 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,7 @@ TEST_DEPENDENCIES = [ 'flake8~=3.9.2', 'psutil', 'pycodestyle~=2.7.0', - 'pyOpenSSL~=19.0.0', + 'pyOpenSSL~=22.0.0', 'mypy>=0.800', CYTHON_DEPENDENCY, ] diff --git a/tests/test_fs_event.py b/tests/test_fs_event.py new file mode 100644 index 0000000..743589b --- /dev/null +++ b/tests/test_fs_event.py @@ -0,0 +1,100 @@ +import asyncio +import os.path +import tempfile + +from uvloop import _testbase as tb +from uvloop.loop import FileSystemEvent + + +class Test_UV_FS_EVENT_CHANGE(tb.UVTestCase): + async def _file_writer(self): + f = await self.q.get() + while True: + f.write('hello uvloop\n') + f.flush() + x = await self.q.get() + if x is None: + return + + def fs_event_setup(self): + self.change_event_count = 0 + self.fname = '' + self.q = asyncio.Queue() + + def event_cb(self, ev_fname: bytes, evt: FileSystemEvent): + _d, fn = os.path.split(self.fname) + self.assertEqual(ev_fname, fn) + self.assertEqual(evt, FileSystemEvent.CHANGE) + self.change_event_count += 1 + if self.change_event_count < 4: + self.q.put_nowait(0) + else: + self.q.put_nowait(None) + + def test_fs_event_change(self): + self.fs_event_setup() + + async def run(write_task): + self.q.put_nowait(tf) + try: + await asyncio.wait_for(write_task, 4) + except asyncio.TimeoutError: + write_task.cancel() + + with tempfile.NamedTemporaryFile('wt') as tf: + self.fname = tf.name.encode() + h = self.loop._monitor_fs(tf.name, self.event_cb) + self.assertFalse(h.cancelled()) + + self.loop.run_until_complete(run( + self.loop.create_task(self._file_writer()))) + h.cancel() + self.assertTrue(h.cancelled()) + + self.assertEqual(self.change_event_count, 4) + + +class Test_UV_FS_EVENT_RENAME(tb.UVTestCase): + async def _file_renamer(self): + await self.q.get() + os.rename(os.path.join(self.dname, self.changed_name), + os.path.join(self.dname, self.changed_name + "-new")) + await self.q.get() + + def fs_event_setup(self): + self.dname = '' + self.changed_name = "hello_fs_event.txt" + self.changed_set = {self.changed_name, self.changed_name + '-new'} + self.q = asyncio.Queue() + + def event_cb(self, ev_fname: bytes, evt: FileSystemEvent): + ev_fname = ev_fname.decode() + self.assertEqual(evt, FileSystemEvent.RENAME) + self.changed_set.remove(ev_fname) + if len(self.changed_set) == 0: + self.q.put_nowait(None) + + def test_fs_event_rename(self): + self.fs_event_setup() + + async def run(write_task): + self.q.put_nowait(0) + try: + await asyncio.wait_for(write_task, 4) + except asyncio.TimeoutError: + write_task.cancel() + + with tempfile.TemporaryDirectory() as td_name: + self.dname = td_name + f = open(os.path.join(td_name, self.changed_name), 'wt') + f.write('hello!') + f.close() + h = self.loop._monitor_fs(td_name, self.event_cb) + self.assertFalse(h.cancelled()) + + self.loop.run_until_complete(run( + self.loop.create_task(self._file_renamer()))) + h.cancel() + self.assertTrue(h.cancelled()) + + self.assertEqual(len(self.changed_set), 0) diff --git a/uvloop/handles/fsevent.pxd b/uvloop/handles/fsevent.pxd new file mode 100644 index 0000000..3a32428 --- /dev/null +++ b/uvloop/handles/fsevent.pxd @@ -0,0 +1,12 @@ +cdef class UVFSEvent(UVHandle): + cdef: + object callback + bint running + + cdef _init(self, Loop loop, object callback, object context) + cdef _close(self) + cdef start(self, char* path, int flags) + cdef stop(self) + + @staticmethod + cdef UVFSEvent new(Loop loop, object callback, object context) diff --git a/uvloop/handles/fsevent.pyx b/uvloop/handles/fsevent.pyx new file mode 100644 index 0000000..7b458c2 --- /dev/null +++ b/uvloop/handles/fsevent.pyx @@ -0,0 +1,112 @@ +import enum + + +class FileSystemEvent(enum.IntEnum): + RENAME = uv.UV_RENAME + CHANGE = uv.UV_CHANGE + RENAME_CHANGE = RENAME | CHANGE + + +@cython.no_gc_clear +cdef class UVFSEvent(UVHandle): + cdef _init(self, Loop loop, object callback, object context): + cdef int err + + self._start_init(loop) + + self._handle = PyMem_RawMalloc( + sizeof(uv.uv_fs_event_t) + ) + if self._handle is NULL: + self._abort_init() + raise MemoryError() + + err = uv.uv_fs_event_init( + self._loop.uvloop, self._handle + ) + if err < 0: + self._abort_init() + raise convert_error(err) + + self._finish_init() + + self.running = 0 + self.callback = callback + if context is None: + context = Context_CopyCurrent() + self.context = context + + cdef start(self, char* path, int flags): + cdef int err + + self._ensure_alive() + + if self.running == 0: + err = uv.uv_fs_event_start( + self._handle, + __uvfsevent_callback, + path, + flags, + ) + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + self.running = 1 + + cdef stop(self): + cdef int err + + if not self._is_alive(): + self.running = 0 + return + + if self.running == 1: + err = uv.uv_fs_event_stop(self._handle) + self.running = 0 + if err < 0: + exc = convert_error(err) + self._fatal_error(exc, True) + return + + cdef _close(self): + try: + self.stop() + finally: + UVHandle._close(self) + + def cancel(self): + self._close() + + def cancelled(self): + return self.running == 0 + + @staticmethod + cdef UVFSEvent new(Loop loop, object callback, object context): + cdef UVFSEvent handle + handle = UVFSEvent.__new__(UVFSEvent) + handle._init(loop, callback, context) + return handle + + +cdef void __uvfsevent_callback(uv.uv_fs_event_t* handle, const char *filename, + int events, int status) with gil: + if __ensure_handle_data( + handle, "UVFSEvent callback" + ) == 0: + return + + cdef: + UVFSEvent fs_event = handle.data + Handle h + + try: + h = new_Handle( + fs_event._loop, + fs_event.callback, + (filename, FileSystemEvent(events)), + fs_event.context, + ) + h._run() + except BaseException as ex: + fs_event._error(ex, False) diff --git a/uvloop/includes/uv.pxd b/uvloop/includes/uv.pxd index 0b0f1da..2f2f1e8 100644 --- a/uvloop/includes/uv.pxd +++ b/uvloop/includes/uv.pxd @@ -183,6 +183,10 @@ cdef extern from "uv.h" nogil: int pid # ... + ctypedef struct uv_fs_event_t: + void* data + # ... + ctypedef enum uv_req_type: UV_UNKNOWN_REQ = 0, UV_REQ, @@ -215,6 +219,10 @@ cdef extern from "uv.h" nogil: UV_LEAVE_GROUP = 0, UV_JOIN_GROUP + cpdef enum uv_fs_event: + UV_RENAME = 1, + UV_CHANGE = 2 + const char* uv_strerror(int err) const char* uv_err_name(int err) @@ -253,6 +261,10 @@ cdef extern from "uv.h" nogil: const uv_buf_t* buf, const system.sockaddr* addr, unsigned flags) with gil + ctypedef void (*uv_fs_event_cb)(uv_fs_event_t* handle, + const char *filename, + int events, + int status) with gil # Generic request functions int uv_cancel(uv_req_t* req) @@ -397,6 +409,13 @@ cdef extern from "uv.h" nogil: int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb cb) int uv_poll_stop(uv_poll_t* poll) + # FS Event + + int uv_fs_event_init(uv_loop_t *loop, uv_fs_event_t *handle) + int uv_fs_event_start(uv_fs_event_t *handle, uv_fs_event_cb cb, + const char *path, unsigned int flags) + int uv_fs_event_stop(uv_fs_event_t *handle) + # Misc ctypedef struct uv_timeval_t: diff --git a/uvloop/loop.pxd b/uvloop/loop.pxd index d3c9170..2080dfe 100644 --- a/uvloop/loop.pxd +++ b/uvloop/loop.pxd @@ -219,6 +219,7 @@ include "handles/streamserver.pxd" include "handles/tcp.pxd" include "handles/pipe.pxd" include "handles/process.pxd" +include "handles/fsevent.pxd" include "request.pxd" include "sslproto.pxd" diff --git a/uvloop/loop.pyx b/uvloop/loop.pyx index 05f9f0e..ad6657a 100644 --- a/uvloop/loop.pyx +++ b/uvloop/loop.pyx @@ -3149,6 +3149,19 @@ cdef class Loop: await waiter return udp, protocol + def _monitor_fs(self, path: str, callback) -> asyncio.Handle: + cdef: + UVFSEvent fs_handle + char* c_str_path + + self._check_closed() + fs_handle = UVFSEvent.new(self, callback, None) + p_bytes = path.encode('UTF-8') + c_str_path = p_bytes + flags = 0 + fs_handle.start(c_str_path, flags) + return fs_handle + def _check_default_executor(self): if self._executor_shutdown_called: raise RuntimeError('Executor shutdown has been called') @@ -3301,6 +3314,7 @@ include "handles/streamserver.pyx" include "handles/tcp.pyx" include "handles/pipe.pyx" include "handles/process.pyx" +include "handles/fsevent.pyx" include "request.pyx" include "dns.pyx"