mirror of https://github.com/MagicStack/uvloop.git
transport: Return duplicate sockets from get_extra_info('socket')
It appears that people use sockets returned from `transport.get_extra_info('socket')` with low-level APIs such as add_writer and remove_writer. If the returned socket fileno is the same as the one that transport is using, libuv will crash, since one fileno can't point to two different handles (uv_poll_t and uv_tcp_t). See also https://github.com/python/asyncio/issues/372
This commit is contained in:
parent
32f5fc7d21
commit
6e9c43bdec
|
@ -111,48 +111,6 @@ class _TestSockets:
|
|||
self.loop.run_until_complete(
|
||||
self.loop.sock_connect(sock, (b'', 0)))
|
||||
|
||||
def test_socket_handler_cleanup(self):
|
||||
# This tests recreates a rare condition where we have a socket
|
||||
# with an attached reader. We then remove the reader, and close the
|
||||
# socket. If the libuv Poll handler is still cached when we open
|
||||
# a new TCP connection, it might so happen that the new TCP connection
|
||||
# will receive a fileno that our previous socket was registered on.
|
||||
# In this case, when the cached Poll handle is finally closed,
|
||||
# we have a failed assertion in uv_poll_stop.
|
||||
# See also https://github.com/MagicStack/uvloop/issues/34
|
||||
# for details.
|
||||
|
||||
srv_sock = socket.socket()
|
||||
with srv_sock:
|
||||
srv_sock.bind(('127.0.0.1', 0))
|
||||
srv_sock.listen(100)
|
||||
|
||||
srv = self.loop.run_until_complete(
|
||||
self.loop.create_server(
|
||||
lambda: None, host='127.0.0.1', port=0))
|
||||
key_fileno = srv.sockets[0].fileno()
|
||||
srv.close()
|
||||
self.loop.run_until_complete(srv.wait_closed())
|
||||
|
||||
# Schedule create_connection task's callbacks
|
||||
tsk = self.loop.create_task(
|
||||
self.loop.create_connection(
|
||||
asyncio.Protocol, *srv_sock.getsockname()))
|
||||
|
||||
sock = socket.socket()
|
||||
with sock:
|
||||
# Add/remove readers
|
||||
if sock.fileno() != key_fileno:
|
||||
raise unittest.SkipTest()
|
||||
self.loop.add_reader(sock.fileno(), lambda: None)
|
||||
self.loop.remove_reader(sock.fileno())
|
||||
|
||||
tr, pr = self.loop.run_until_complete(
|
||||
asyncio.wait_for(tsk, loop=self.loop, timeout=0.1))
|
||||
tr.close()
|
||||
# Let the transport close
|
||||
self.loop.run_until_complete(asyncio.sleep(0, loop=self.loop))
|
||||
|
||||
|
||||
class TestUVSockets(_TestSockets, tb.UVTestCase):
|
||||
pass
|
||||
|
|
|
@ -461,9 +461,16 @@ class _TestTCP:
|
|||
self.assertFalse(t._paused)
|
||||
|
||||
sock = t.get_extra_info('socket')
|
||||
self.assertIs(sock, t.get_extra_info('socket'))
|
||||
sockname = sock.getsockname()
|
||||
peername = sock.getpeername()
|
||||
|
||||
# Test that adding a writer on the returned socket
|
||||
# does not crash uvloop. aiohttp does that to implement
|
||||
# sendfile, for instance.
|
||||
self.loop.add_writer(sock.fileno(), lambda: None)
|
||||
self.loop.remove_writer(sock.fileno())
|
||||
|
||||
self.assertTrue(isinstance(sock, socket.socket))
|
||||
self.assertEqual(t.get_extra_info('sockname'),
|
||||
sockname)
|
||||
|
|
|
@ -321,7 +321,15 @@ class _TestUnix:
|
|||
None,
|
||||
sock=sock)
|
||||
|
||||
sock = t.get_extra_info('socket')
|
||||
self.assertIs(t.get_extra_info('socket'), sock)
|
||||
|
||||
# Test that adding a writer on the returned socket
|
||||
# does not crash uvloop. aiohttp does that to implement
|
||||
# sendfile, for instance.
|
||||
self.loop.add_writer(sock.fileno(), lambda: None)
|
||||
self.loop.remove_writer(sock.fileno())
|
||||
|
||||
t.close()
|
||||
|
||||
s1, s2 = socket.socketpair(socket.AF_UNIX)
|
||||
|
|
|
@ -199,13 +199,13 @@ cdef class UVSocketHandle(UVHandle):
|
|||
raise NotImplementedError
|
||||
|
||||
cdef inline _get_socket(self):
|
||||
if self._fileobj:
|
||||
return self._fileobj
|
||||
|
||||
if self.__cached_socket is not None:
|
||||
return self.__cached_socket
|
||||
|
||||
self.__cached_socket = self._new_socket()
|
||||
if self.__cached_socket.fileno() == self._fileno():
|
||||
raise RuntimeError('new socket shares fileno with the transport')
|
||||
|
||||
return self.__cached_socket
|
||||
|
||||
cdef inline _attach_fileobj(self, object file):
|
||||
|
@ -218,7 +218,7 @@ cdef class UVSocketHandle(UVHandle):
|
|||
try:
|
||||
if self.__cached_socket is not None:
|
||||
try:
|
||||
self.__cached_socket.detach()
|
||||
self.__cached_socket.close()
|
||||
except OSError:
|
||||
pass
|
||||
self.__cached_socket = None
|
||||
|
|
|
@ -24,7 +24,8 @@ cdef __pipe_open(UVStream handle, int fd):
|
|||
|
||||
|
||||
cdef __pipe_get_socket(UVSocketHandle handle):
|
||||
return socket_socket(uv.AF_UNIX, uv.SOCK_STREAM, 0, handle._fileno())
|
||||
fileno = os_dup(handle._fileno())
|
||||
return socket_socket(uv.AF_UNIX, uv.SOCK_STREAM, 0, fileno)
|
||||
|
||||
|
||||
@cython.no_gc_clear
|
||||
|
|
|
@ -36,7 +36,7 @@ cdef __tcp_get_socket(UVSocketHandle handle):
|
|||
int err
|
||||
system.sockaddr_storage buf
|
||||
|
||||
fileno = handle._fileno()
|
||||
fileno = os_dup(handle._fileno())
|
||||
|
||||
err = uv.uv_tcp_getsockname(<uv.uv_tcp_t*>handle._handle,
|
||||
<system.sockaddr*>&buf,
|
||||
|
|
|
@ -182,7 +182,7 @@ cdef class UDPTransport(UVBaseTransport):
|
|||
raise RuntimeError(
|
||||
'UDPTransport.family is undefined; cannot create python socket')
|
||||
|
||||
fileno = self._fileno()
|
||||
fileno = os_dup(self._fileno())
|
||||
return socket_socket(self._family, uv.SOCK_STREAM, 0, fileno)
|
||||
|
||||
cdef _send(self, object data, object addr):
|
||||
|
|
Loading…
Reference in New Issue