Expose libuv uv_fs_event functionality (#474)

* Also updated pyOpenSSL to 22.x

Co-authored-by: Jens Jorgensen <jens@consiliumb.sg>
Co-authored-by: Fantix King <fantix.king@gmail.com>
This commit is contained in:
jensbjorgensen 2022-09-09 09:24:41 -05:00 committed by GitHub
parent 637a77a3a4
commit 74d381e87a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 259 additions and 1 deletions

View File

@ -34,7 +34,7 @@ TEST_DEPENDENCIES = [
'flake8~=3.9.2',
'psutil',
'pycodestyle~=2.7.0',
'pyOpenSSL~=19.0.0',
'pyOpenSSL~=22.0.0',
'mypy>=0.800',
CYTHON_DEPENDENCY,
]

100
tests/test_fs_event.py Normal file
View File

@ -0,0 +1,100 @@
import asyncio
import os.path
import tempfile
from uvloop import _testbase as tb
from uvloop.loop import FileSystemEvent
class Test_UV_FS_EVENT_CHANGE(tb.UVTestCase):
async def _file_writer(self):
f = await self.q.get()
while True:
f.write('hello uvloop\n')
f.flush()
x = await self.q.get()
if x is None:
return
def fs_event_setup(self):
self.change_event_count = 0
self.fname = ''
self.q = asyncio.Queue()
def event_cb(self, ev_fname: bytes, evt: FileSystemEvent):
_d, fn = os.path.split(self.fname)
self.assertEqual(ev_fname, fn)
self.assertEqual(evt, FileSystemEvent.CHANGE)
self.change_event_count += 1
if self.change_event_count < 4:
self.q.put_nowait(0)
else:
self.q.put_nowait(None)
def test_fs_event_change(self):
self.fs_event_setup()
async def run(write_task):
self.q.put_nowait(tf)
try:
await asyncio.wait_for(write_task, 4)
except asyncio.TimeoutError:
write_task.cancel()
with tempfile.NamedTemporaryFile('wt') as tf:
self.fname = tf.name.encode()
h = self.loop._monitor_fs(tf.name, self.event_cb)
self.assertFalse(h.cancelled())
self.loop.run_until_complete(run(
self.loop.create_task(self._file_writer())))
h.cancel()
self.assertTrue(h.cancelled())
self.assertEqual(self.change_event_count, 4)
class Test_UV_FS_EVENT_RENAME(tb.UVTestCase):
async def _file_renamer(self):
await self.q.get()
os.rename(os.path.join(self.dname, self.changed_name),
os.path.join(self.dname, self.changed_name + "-new"))
await self.q.get()
def fs_event_setup(self):
self.dname = ''
self.changed_name = "hello_fs_event.txt"
self.changed_set = {self.changed_name, self.changed_name + '-new'}
self.q = asyncio.Queue()
def event_cb(self, ev_fname: bytes, evt: FileSystemEvent):
ev_fname = ev_fname.decode()
self.assertEqual(evt, FileSystemEvent.RENAME)
self.changed_set.remove(ev_fname)
if len(self.changed_set) == 0:
self.q.put_nowait(None)
def test_fs_event_rename(self):
self.fs_event_setup()
async def run(write_task):
self.q.put_nowait(0)
try:
await asyncio.wait_for(write_task, 4)
except asyncio.TimeoutError:
write_task.cancel()
with tempfile.TemporaryDirectory() as td_name:
self.dname = td_name
f = open(os.path.join(td_name, self.changed_name), 'wt')
f.write('hello!')
f.close()
h = self.loop._monitor_fs(td_name, self.event_cb)
self.assertFalse(h.cancelled())
self.loop.run_until_complete(run(
self.loop.create_task(self._file_renamer())))
h.cancel()
self.assertTrue(h.cancelled())
self.assertEqual(len(self.changed_set), 0)

View File

@ -0,0 +1,12 @@
cdef class UVFSEvent(UVHandle):
cdef:
object callback
bint running
cdef _init(self, Loop loop, object callback, object context)
cdef _close(self)
cdef start(self, char* path, int flags)
cdef stop(self)
@staticmethod
cdef UVFSEvent new(Loop loop, object callback, object context)

112
uvloop/handles/fsevent.pyx Normal file
View File

@ -0,0 +1,112 @@
import enum
class FileSystemEvent(enum.IntEnum):
RENAME = uv.UV_RENAME
CHANGE = uv.UV_CHANGE
RENAME_CHANGE = RENAME | CHANGE
@cython.no_gc_clear
cdef class UVFSEvent(UVHandle):
cdef _init(self, Loop loop, object callback, object context):
cdef int err
self._start_init(loop)
self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(
sizeof(uv.uv_fs_event_t)
)
if self._handle is NULL:
self._abort_init()
raise MemoryError()
err = uv.uv_fs_event_init(
self._loop.uvloop, <uv.uv_fs_event_t*>self._handle
)
if err < 0:
self._abort_init()
raise convert_error(err)
self._finish_init()
self.running = 0
self.callback = callback
if context is None:
context = Context_CopyCurrent()
self.context = context
cdef start(self, char* path, int flags):
cdef int err
self._ensure_alive()
if self.running == 0:
err = uv.uv_fs_event_start(
<uv.uv_fs_event_t*>self._handle,
__uvfsevent_callback,
path,
flags,
)
if err < 0:
exc = convert_error(err)
self._fatal_error(exc, True)
return
self.running = 1
cdef stop(self):
cdef int err
if not self._is_alive():
self.running = 0
return
if self.running == 1:
err = uv.uv_fs_event_stop(<uv.uv_fs_event_t*>self._handle)
self.running = 0
if err < 0:
exc = convert_error(err)
self._fatal_error(exc, True)
return
cdef _close(self):
try:
self.stop()
finally:
UVHandle._close(<UVHandle>self)
def cancel(self):
self._close()
def cancelled(self):
return self.running == 0
@staticmethod
cdef UVFSEvent new(Loop loop, object callback, object context):
cdef UVFSEvent handle
handle = UVFSEvent.__new__(UVFSEvent)
handle._init(loop, callback, context)
return handle
cdef void __uvfsevent_callback(uv.uv_fs_event_t* handle, const char *filename,
int events, int status) with gil:
if __ensure_handle_data(
<uv.uv_handle_t*>handle, "UVFSEvent callback"
) == 0:
return
cdef:
UVFSEvent fs_event = <UVFSEvent> handle.data
Handle h
try:
h = new_Handle(
fs_event._loop,
fs_event.callback,
(filename, FileSystemEvent(events)),
fs_event.context,
)
h._run()
except BaseException as ex:
fs_event._error(ex, False)

View File

@ -183,6 +183,10 @@ cdef extern from "uv.h" nogil:
int pid
# ...
ctypedef struct uv_fs_event_t:
void* data
# ...
ctypedef enum uv_req_type:
UV_UNKNOWN_REQ = 0,
UV_REQ,
@ -215,6 +219,10 @@ cdef extern from "uv.h" nogil:
UV_LEAVE_GROUP = 0,
UV_JOIN_GROUP
cpdef enum uv_fs_event:
UV_RENAME = 1,
UV_CHANGE = 2
const char* uv_strerror(int err)
const char* uv_err_name(int err)
@ -253,6 +261,10 @@ cdef extern from "uv.h" nogil:
const uv_buf_t* buf,
const system.sockaddr* addr,
unsigned flags) with gil
ctypedef void (*uv_fs_event_cb)(uv_fs_event_t* handle,
const char *filename,
int events,
int status) with gil
# Generic request functions
int uv_cancel(uv_req_t* req)
@ -397,6 +409,13 @@ cdef extern from "uv.h" nogil:
int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb cb)
int uv_poll_stop(uv_poll_t* poll)
# FS Event
int uv_fs_event_init(uv_loop_t *loop, uv_fs_event_t *handle)
int uv_fs_event_start(uv_fs_event_t *handle, uv_fs_event_cb cb,
const char *path, unsigned int flags)
int uv_fs_event_stop(uv_fs_event_t *handle)
# Misc
ctypedef struct uv_timeval_t:

View File

@ -219,6 +219,7 @@ include "handles/streamserver.pxd"
include "handles/tcp.pxd"
include "handles/pipe.pxd"
include "handles/process.pxd"
include "handles/fsevent.pxd"
include "request.pxd"
include "sslproto.pxd"

View File

@ -3149,6 +3149,19 @@ cdef class Loop:
await waiter
return udp, protocol
def _monitor_fs(self, path: str, callback) -> asyncio.Handle:
cdef:
UVFSEvent fs_handle
char* c_str_path
self._check_closed()
fs_handle = UVFSEvent.new(self, callback, None)
p_bytes = path.encode('UTF-8')
c_str_path = p_bytes
flags = 0
fs_handle.start(c_str_path, flags)
return fs_handle
def _check_default_executor(self):
if self._executor_shutdown_called:
raise RuntimeError('Executor shutdown has been called')
@ -3301,6 +3314,7 @@ include "handles/streamserver.pyx"
include "handles/tcp.pyx"
include "handles/pipe.pyx"
include "handles/process.pyx"
include "handles/fsevent.pyx"
include "request.pyx"
include "dns.pyx"