Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 85 additions & 63 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1006,11 +1006,6 @@ def test_idle_process_reuse_multiple(self):
ProcessPoolForkserverMixin,
ProcessPoolSpawnMixin))

def hide_process_stderr():
import io
sys.stderr = io.StringIO()


def _crash(delay=None):
"""Induces a segfault."""
if delay:
Expand All @@ -1027,13 +1022,18 @@ def _exit():

def _raise_error(Err):
"""Function that raises an Exception in process."""
hide_process_stderr()
raise Err()


def _raise_error_ignore_stderr(Err):
"""Function that raises an Exception in process and ignores stderr."""
import io
sys.stderr = io.StringIO()
raise Err()


def _return_instance(cls):
"""Function that returns a instance of cls."""
hide_process_stderr()
return cls()


Expand Down Expand Up @@ -1072,17 +1072,12 @@ class ErrorAtUnpickle(object):
"""Bad object that triggers an error at unpickling time."""
def __reduce__(self):
from pickle import UnpicklingError
return _raise_error, (UnpicklingError, )
return _raise_error_ignore_stderr, (UnpicklingError, )


class ExecutorDeadlockTest:
TIMEOUT = support.SHORT_TIMEOUT

@classmethod
def _sleep_id(cls, x, delay):
time.sleep(delay)
return x

def _fail_on_deadlock(self, executor):
# If we did not recover before TIMEOUT seconds, consider that the
# executor is in a deadlock state and forcefully clean all its
Expand All @@ -1102,57 +1097,84 @@ def _fail_on_deadlock(self, executor):
self.fail(f"Executor deadlock:\n\n{tb}")


def test_crash(self):
# extensive testing for deadlock caused by crashes in a pool.
def _check_crash(self, error, func, *args, ignore_stderr=False):
# test for deadlock caused by crashes in a pool
self.executor.shutdown(wait=True)
crash_cases = [
# Check problem occurring while pickling a task in
# the task_handler thread
(id, (ErrorAtPickle(),), PicklingError, "error at task pickle"),
# Check problem occurring while unpickling a task on workers
(id, (ExitAtUnpickle(),), BrokenProcessPool,
"exit at task unpickle"),
(id, (ErrorAtUnpickle(),), BrokenProcessPool,
"error at task unpickle"),
(id, (CrashAtUnpickle(),), BrokenProcessPool,
"crash at task unpickle"),
# Check problem occurring during func execution on workers
(_crash, (), BrokenProcessPool,
"crash during func execution on worker"),
(_exit, (), SystemExit,
"exit during func execution on worker"),
(_raise_error, (RuntimeError, ), RuntimeError,
"error during func execution on worker"),
# Check problem occurring while pickling a task result
# on workers
(_return_instance, (CrashAtPickle,), BrokenProcessPool,
"crash during result pickle on worker"),
(_return_instance, (ExitAtPickle,), SystemExit,
"exit during result pickle on worker"),
(_return_instance, (ErrorAtPickle,), PicklingError,
"error during result pickle on worker"),
# Check problem occurring while unpickling a task in
# the result_handler thread
(_return_instance, (ErrorAtUnpickle,), BrokenProcessPool,
"error during result unpickle in result_handler"),
(_return_instance, (ExitAtUnpickle,), BrokenProcessPool,
"exit during result unpickle in result_handler")
]
for func, args, error, name in crash_cases:
with self.subTest(name):
# The captured_stderr reduces the noise in the test report
with support.captured_stderr():
executor = self.executor_type(
max_workers=2, mp_context=get_context(self.ctx))
res = executor.submit(func, *args)
with self.assertRaises(error):
try:
res.result(timeout=self.TIMEOUT)
except futures.TimeoutError:
# If we did not recover before TIMEOUT seconds,
# consider that the executor is in a deadlock state
self._fail_on_deadlock(executor)
executor.shutdown(wait=True)

executor = self.executor_type(
max_workers=2, mp_context=get_context(self.ctx))
res = executor.submit(func, *args)

if ignore_stderr:
cm = support.captured_stderr()
else:
cm = contextlib.nullcontext()

try:
with self.assertRaises(error):
with cm:
res.result(timeout=self.TIMEOUT)
except futures.TimeoutError:
# If we did not recover before TIMEOUT seconds,
# consider that the executor is in a deadlock state
self._fail_on_deadlock(executor)
executor.shutdown(wait=True)

def test_error_at_task_pickle(self):
# Check problem occurring while pickling a task in
# the task_handler thread
self._check_crash(PicklingError, id, ErrorAtPickle())

def test_exit_at_task_unpickle(self):
# Check problem occurring while unpickling a task on workers
self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())

def test_error_at_task_unpickle(self):
# Check problem occurring while unpickling a task on workers
self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())

def test_crash_at_task_unpickle(self):
# Check problem occurring while unpickling a task on workers
self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())

def test_crash_during_func_exec_on_worker(self):
# Check problem occurring during func execution on workers
self._check_crash(BrokenProcessPool, _crash)

def test_exit_during_func_exec_on_worker(self):
# Check problem occurring during func execution on workers
self._check_crash(SystemExit, _exit)

def test_error_during_func_exec_on_worker(self):
# Check problem occurring during func execution on workers
self._check_crash(RuntimeError, _raise_error, RuntimeError)

def test_crash_during_result_pickle_on_worker(self):
# Check problem occurring while pickling a task result
# on workers
self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)

def test_exit_during_result_pickle_on_worker(self):
# Check problem occurring while pickling a task result
# on workers
self._check_crash(SystemExit, _return_instance, ExitAtPickle)

def test_error_during_result_pickle_on_worker(self):
# Check problem occurring while pickling a task result
# on workers
self._check_crash(PicklingError, _return_instance, ErrorAtPickle)

def test_error_during_result_unpickle_in_result_handler(self):
# Check problem occurring while unpickling a task in
# the result_handler thread
self._check_crash(BrokenProcessPool,
_return_instance, ErrorAtUnpickle,
ignore_stderr=True)

def test_exit_during_result_unpickle_in_result_handler(self):
# Check problem occurring while unpickling a task in
# the result_handler thread
self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)

def test_shutdown_deadlock(self):
# Test that the pool calling shutdown do not cause deadlock
Expand Down