From 18418858b2cfca221613af1c58bf657f8bfb6d6c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=C5=81ukasz=20Langa?= <lukasz@langa.pl>
Date: Fri, 29 Jul 2022 14:36:58 +0200
Subject: [PATCH] [3.10] gh-95166: cancel map waited on future on timeout
 (GH-95169) (GH-95375)

Co-authored-by: Thomas Grainger <tagrain@gmail.com>
Co-authored-by: Kumar Aditya <59607654+kumaraditya303@users.noreply.github.com>
---
 Lib/concurrent/futures/_base.py               | 16 +++++++++--
 Lib/test/test_concurrent_futures.py           | 27 +++++++++++++++++++
 ...2-07-23-10-42-05.gh-issue-95166.xw6p3C.rst |  1 +
 3 files changed, 42 insertions(+), 2 deletions(-)
 create mode 100644 Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst

diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index cf119ac6437..a329e74d11f 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -312,6 +312,18 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
     done.update(waiter.finished_futures)
     return DoneAndNotDoneFutures(done, fs - done)
 
+
+def _result_or_cancel(fut, timeout=None):
+    try:
+        try:
+            return fut.result(timeout)
+        finally:
+            fut.cancel()
+    finally:
+        # Break a reference cycle with the exception in self._exception
+        del fut
+
+
 class Future(object):
     """Represents the result of an asynchronous computation."""
 
@@ -606,9 +618,9 @@ def result_iterator():
                 while fs:
                     # Careful not to keep a reference to the popped future
                     if timeout is None:
-                        yield fs.pop().result()
+                        yield _result_or_cancel(fs.pop())
                     else:
-                        yield fs.pop().result(end_time - time.monotonic())
+                        yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
             finally:
                 for future in fs:
                     future.cancel()
diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index e5eed995811..e174d5464d2 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -932,6 +932,33 @@ def submit(pool):
                 with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
                     workers.submit(tuple)
 
+    def test_executor_map_current_future_cancel(self):
+        stop_event = threading.Event()
+        log = []
+
+        def log_n_wait(ident):
+            log.append(f"{ident=} started")
+            try:
+                stop_event.wait()
+            finally:
+                log.append(f"{ident=} stopped")
+
+        with self.executor_type(max_workers=1) as pool:
+            # submit work to saturate the pool
+            fut = pool.submit(log_n_wait, ident="first")
+            try:
+                with contextlib.closing(
+                    pool.map(log_n_wait, ["second", "third"], timeout=0)
+                ) as gen:
+                    with self.assertRaises(futures.TimeoutError):
+                        next(gen)
+            finally:
+                stop_event.set()
+            fut.result()
+        # ident='second' is cancelled as a result of raising a TimeoutError
+        # ident='third' is cancelled because it remained in the collection of futures
+        self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
+
 
 class ProcessPoolExecutorTest(ExecutorTest):
 
diff --git a/Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst b/Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst
new file mode 100644
index 00000000000..34b01707843
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2022-07-23-10-42-05.gh-issue-95166.xw6p3C.rst
@@ -0,0 +1 @@
+Fix :meth:`concurrent.futures.Executor.map` to cancel the currently waiting on future on an error - e.g. TimeoutError or KeyboardInterrupt.
-- 
GitLab