Skip to content
Snippets Groups Projects
Select Git revision
  • 02d35fc4ef8a718e74ab97402b6d8371b69d3635
  • main default protected
  • 3.10
  • 3.11
  • revert-15688-bpo-38031-_io-FileIO-opener-crash
  • 3.8
  • 3.9
  • 3.7
  • enum-fix_auto
  • branch-v3.11.0
  • backport-c3648f4-3.11
  • gh-93963/remove-importlib-resources-abcs
  • refactor-wait_for
  • shared-testcase
  • v3.12.0a2
  • v3.12.0a1
  • v3.11.0
  • v3.8.15
  • v3.9.15
  • v3.10.8
  • v3.7.15
  • v3.11.0rc2
  • v3.8.14
  • v3.9.14
  • v3.7.14
  • v3.10.7
  • v3.11.0rc1
  • v3.10.6
  • v3.11.0b5
  • v3.11.0b4
  • v3.10.5
  • v3.11.0b3
  • v3.11.0b2
  • v3.9.13
34 results

test_threaded_import.py

Blame
  • test_threaded_import.py 9.49 KiB
    # This is a variant of the very old (early 90's) file
    # Demo/threads/bug.py.  It simply provokes a number of threads into
    # trying to import the same module "at the same time".
    # There are no pleasant failure modes -- most likely is that Python
    # complains several times about module random having no attribute
    # randrange, and then Python hangs.
    
    import _imp as imp
    import os
    import importlib
    import sys
    import time
    import shutil
    import threading
    import unittest
    from unittest import mock
    from test.support import verbose
    from test.support.import_helper import forget
    from test.support.os_helper import (TESTFN, unlink, rmtree)
    from test.support import script_helper, threading_helper
    
    def task(N, done, done_tasks, errors):
        try:
            # We don't use modulefinder but still import it in order to stress
            # importing of different modules from several threads.
            if len(done_tasks) % 2:
                import modulefinder
                import random
            else:
                import random
                import modulefinder
            # This will fail if random is not completely initialized
            x = random.randrange(1, 3)
        except Exception as e:
            errors.append(e.with_traceback(None))
        finally:
            done_tasks.append(threading.get_ident())
            finished = len(done_tasks) == N
            if finished:
                done.set()
    
    def mock_register_at_fork(func):
        # bpo-30599: Mock os.register_at_fork() when importing the random module,
        # since this function doesn't allow to unregister callbacks and would leak
        # memory.
        return mock.patch('os.register_at_fork', create=True)(func)
    
    # Create a circular import structure: A -> C -> B -> D -> A
    # NOTE: `time` is already loaded and therefore doesn't threaten to deadlock.
    
    circular_imports_modules = {
        'A': """if 1:
            import time
            time.sleep(%(delay)s)
            x = 'a'
            import C
            """,
        'B': """if 1:
            import time
            time.sleep(%(delay)s)
            x = 'b'
            import D
            """,
        'C': """import B""",
        'D': """import A""",
    }
    
    class Finder:
        """A dummy finder to detect concurrent access to its find_spec()
        method."""
    
        def __init__(self):
            self.numcalls = 0
            self.x = 0
            self.lock = threading.Lock()
    
        def find_spec(self, name, path=None, target=None):
            # Simulate some thread-unsafe behaviour. If calls to find_spec()
            # are properly serialized, `x` will end up the same as `numcalls`.
            # Otherwise not.
            assert imp.lock_held()
            with self.lock:
                self.numcalls += 1
            x = self.x
            time.sleep(0.01)
            self.x = x + 1
    
    class FlushingFinder:
        """A dummy finder which flushes sys.path_importer_cache when it gets
        called."""
    
        def find_spec(self, name, path=None, target=None):
            sys.path_importer_cache.clear()
    
    
    class ThreadedImportTests(unittest.TestCase):
    
        def setUp(self):
            self.old_random = sys.modules.pop('random', None)
    
        def tearDown(self):
            # If the `random` module was already initialized, we restore the
            # old module at the end so that pickling tests don't fail.
            # See http://bugs.python.org/issue3657#msg110461
            if self.old_random is not None:
                sys.modules['random'] = self.old_random
    
        @mock_register_at_fork
        def check_parallel_module_init(self, mock_os):
            if imp.lock_held():
                # This triggers on, e.g., from test import autotest.
                raise unittest.SkipTest("can't run when import lock is held")
    
            done = threading.Event()
            for N in (20, 50) * 3:
                if verbose:
                    print("Trying", N, "threads ...", end=' ')
                # Make sure that random and modulefinder get reimported freshly
                for modname in ['random', 'modulefinder']:
                    try:
                        del sys.modules[modname]
                    except KeyError:
                        pass
                errors = []
                done_tasks = []
                done.clear()
                t0 = time.monotonic()
                with threading_helper.start_threads(
                        threading.Thread(target=task, args=(N, done, done_tasks, errors,))
                        for i in range(N)):
                    pass
                completed = done.wait(10 * 60)
                dt = time.monotonic() - t0
                if verbose:
                    print("%.1f ms" % (dt*1e3), flush=True, end=" ")
                dbg_info = 'done: %s/%s' % (len(done_tasks), N)
                self.assertFalse(errors, dbg_info)
                self.assertTrue(completed, dbg_info)
                if verbose:
                    print("OK.")
    
        def test_parallel_module_init(self):
            self.check_parallel_module_init()
    
        def test_parallel_meta_path(self):
            finder = Finder()
            sys.meta_path.insert(0, finder)
            try:
                self.check_parallel_module_init()
                self.assertGreater(finder.numcalls, 0)
                self.assertEqual(finder.x, finder.numcalls)
            finally:
                sys.meta_path.remove(finder)
    
        def test_parallel_path_hooks(self):
            # Here the Finder instance is only used to check concurrent calls
            # to path_hook().
            finder = Finder()
            # In order for our path hook to be called at each import, we need
            # to flush the path_importer_cache, which we do by registering a
            # dedicated meta_path entry.
            flushing_finder = FlushingFinder()
            def path_hook(path):
                finder.find_spec('')
                raise ImportError
            sys.path_hooks.insert(0, path_hook)
            sys.meta_path.append(flushing_finder)
            try:
                # Flush the cache a first time
                flushing_finder.find_spec('')
                numtests = self.check_parallel_module_init()
                self.assertGreater(finder.numcalls, 0)
                self.assertEqual(finder.x, finder.numcalls)
            finally:
                sys.meta_path.remove(flushing_finder)
                sys.path_hooks.remove(path_hook)
    
        def test_import_hangers(self):
            # In case this test is run again, make sure the helper module
            # gets loaded from scratch again.
            try:
                del sys.modules['test.test_importlib.threaded_import_hangers']
            except KeyError:
                pass
            import test.test_importlib.threaded_import_hangers
            self.assertFalse(test.test_importlib.threaded_import_hangers.errors)
    
        def test_circular_imports(self):
            # The goal of this test is to exercise implementations of the import
            # lock which use a per-module lock, rather than a global lock.
            # In these implementations, there is a possible deadlock with
            # circular imports, for example:
            # - thread 1 imports A (grabbing the lock for A) which imports B
            # - thread 2 imports B (grabbing the lock for B) which imports A
            # Such implementations should be able to detect such situations and
            # resolve them one way or the other, without freezing.
            # NOTE: our test constructs a slightly less trivial import cycle,
            # in order to better stress the deadlock avoidance mechanism.
            delay = 0.5
            os.mkdir(TESTFN)
            self.addCleanup(shutil.rmtree, TESTFN)
            sys.path.insert(0, TESTFN)
            self.addCleanup(sys.path.remove, TESTFN)
            for name, contents in circular_imports_modules.items():
                contents = contents % {'delay': delay}
                with open(os.path.join(TESTFN, name + ".py"), "wb") as f:
                    f.write(contents.encode('utf-8'))
                self.addCleanup(forget, name)
    
            importlib.invalidate_caches()
            results = []
            def import_ab():
                import A
                results.append(getattr(A, 'x', None))
            def import_ba():
                import B
                results.append(getattr(B, 'x', None))
            t1 = threading.Thread(target=import_ab)
            t2 = threading.Thread(target=import_ba)
            t1.start()
            t2.start()
            t1.join()
            t2.join()
            self.assertEqual(set(results), {'a', 'b'})
    
        @mock_register_at_fork
        def test_side_effect_import(self, mock_os):
            code = """if 1:
                import threading
                def target():
                    import random
                t = threading.Thread(target=target)
                t.start()
                t.join()
                t = None"""
            sys.path.insert(0, os.curdir)
            self.addCleanup(sys.path.remove, os.curdir)
            filename = TESTFN + ".py"
            with open(filename, "wb") as f:
                f.write(code.encode('utf-8'))
            self.addCleanup(unlink, filename)
            self.addCleanup(forget, TESTFN)
            self.addCleanup(rmtree, '__pycache__')
            importlib.invalidate_caches()
            __import__(TESTFN)
            del sys.modules[TESTFN]
    
        def test_concurrent_futures_circular_import(self):
            # Regression test for bpo-43515
            fn = os.path.join(os.path.dirname(__file__),
                              'partial', 'cfimport.py')
            script_helper.assert_python_ok(fn)
    
        def test_multiprocessing_pool_circular_import(self):
            # Regression test for bpo-41567
            fn = os.path.join(os.path.dirname(__file__),
                              'partial', 'pool_in_threads.py')
            script_helper.assert_python_ok(fn)
    
    
    def setUpModule():
        thread_info = threading_helper.threading_setup()
        unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
        try:
            old_switchinterval = sys.getswitchinterval()
            unittest.addModuleCleanup(sys.setswitchinterval, old_switchinterval)
            sys.setswitchinterval(1e-5)
        except AttributeError:
            pass
    
    
    if __name__ == "__main__":
        unittest.main()