cpython/Lib/test/test_atexit.py

139 lines
4.2 KiB
Python

import atexit
import os
import textwrap
import unittest
from test import support
from test.support import script_helper
from test.support import threading_helper
class GeneralTest(unittest.TestCase):
def test_general(self):
# Run _test_atexit.py in a subprocess since it calls atexit._clear()
script = support.findfile("_test_atexit.py")
script_helper.run_test_script(script)
class FunctionalTest(unittest.TestCase):
def test_shutdown(self):
# Actually test the shutdown mechanism in a subprocess
code = textwrap.dedent("""
import atexit
def f(msg):
print(msg)
atexit.register(f, "one")
atexit.register(f, "two")
""")
res = script_helper.assert_python_ok("-c", code)
self.assertEqual(res.out.decode().splitlines(), ["two", "one"])
self.assertFalse(res.err)
def test_atexit_instances(self):
# bpo-42639: It is safe to have more than one atexit instance.
code = textwrap.dedent("""
import sys
import atexit as atexit1
del sys.modules['atexit']
import atexit as atexit2
del sys.modules['atexit']
assert atexit2 is not atexit1
atexit1.register(print, "atexit1")
atexit2.register(print, "atexit2")
""")
res = script_helper.assert_python_ok("-c", code)
self.assertEqual(res.out.decode().splitlines(), ["atexit2", "atexit1"])
self.assertFalse(res.err)
@threading_helper.requires_working_threading()
@support.requires_resource("cpu")
@unittest.skipUnless(support.Py_GIL_DISABLED, "only meaningful without the GIL")
def test_atexit_thread_safety(self):
# GH-126907: atexit was not thread safe on the free-threaded build
source = """
from threading import Thread
def dummy():
pass
def thready():
for _ in range(100):
atexit.register(dummy)
atexit._clear()
atexit.register(dummy)
atexit.unregister(dummy)
atexit._run_exitfuncs()
threads = [Thread(target=thready) for _ in range(10)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
"""
# atexit._clear() has some evil side effects, and we don't
# want them to affect the rest of the tests.
script_helper.assert_python_ok("-c", textwrap.dedent(source))
@support.cpython_only
class SubinterpreterTest(unittest.TestCase):
def test_callbacks_leak(self):
# This test shows a leak in refleak mode if atexit doesn't
# take care to free callbacks in its per-subinterpreter module
# state.
n = atexit._ncallbacks()
code = textwrap.dedent(r"""
import atexit
def f():
pass
atexit.register(f)
del atexit
""")
ret = support.run_in_subinterp(code)
self.assertEqual(ret, 0)
self.assertEqual(atexit._ncallbacks(), n)
def test_callbacks_leak_refcycle(self):
# Similar to the above, but with a refcycle through the atexit
# module.
n = atexit._ncallbacks()
code = textwrap.dedent(r"""
import atexit
def f():
pass
atexit.register(f)
atexit.__atexit = atexit
""")
ret = support.run_in_subinterp(code)
self.assertEqual(ret, 0)
self.assertEqual(atexit._ncallbacks(), n)
@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
def test_callback_on_subinterpreter_teardown(self):
# This tests if a callback is called on
# subinterpreter teardown.
expected = b"The test has passed!"
r, w = os.pipe()
code = textwrap.dedent(r"""
import os
import atexit
def callback():
os.write({:d}, b"The test has passed!")
atexit.register(callback)
""".format(w))
ret = support.run_in_subinterp(code)
os.close(w)
self.assertEqual(os.read(r, len(expected)), expected)
os.close(r)
if __name__ == "__main__":
unittest.main()