Skip to content

Commit

Permalink
GH-94182: Run the PidfdChildWatcher on the running loop (#94184)
Browse files Browse the repository at this point in the history
There is no reason for this watcher to be attached to any particular loop.
This should make it safe to use regardless of the lifetime of the event loop running in the main thread
(relative to other loops).

Co-authored-by: Yury Selivanov <[email protected]>
Co-authored-by: Jelle Zijlstra <[email protected]>
  • Loading branch information
3 people authored Oct 8, 2022
1 parent 27ce45d commit 3d8b224
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 42 deletions.
44 changes: 12 additions & 32 deletions Lib/asyncio/unix_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,46 +912,29 @@ class PidfdChildWatcher(AbstractChildWatcher):
recent (5.3+) kernels.
"""

def __init__(self):
self._loop = None
self._callbacks = {}

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
pass

def is_active(self):
return self._loop is not None and self._loop.is_running()
return True

def close(self):
self.attach_loop(None)
pass

def attach_loop(self, loop):
if self._loop is not None and loop is None and self._callbacks:
warnings.warn(
'A loop is being detached '
'from a child watcher with pending handlers',
RuntimeWarning)
for pidfd, _, _ in self._callbacks.values():
self._loop._remove_reader(pidfd)
os.close(pidfd)
self._callbacks.clear()
self._loop = loop
pass

def add_child_handler(self, pid, callback, *args):
existing = self._callbacks.get(pid)
if existing is not None:
self._callbacks[pid] = existing[0], callback, args
else:
pidfd = os.pidfd_open(pid)
self._loop._add_reader(pidfd, self._do_wait, pid)
self._callbacks[pid] = pidfd, callback, args
loop = events.get_running_loop()
pidfd = os.pidfd_open(pid)
loop._add_reader(pidfd, self._do_wait, pid, pidfd, callback, args)

def _do_wait(self, pid):
pidfd, callback, args = self._callbacks.pop(pid)
self._loop._remove_reader(pidfd)
def _do_wait(self, pid, pidfd, callback, args):
loop = events.get_running_loop()
loop._remove_reader(pidfd)
try:
_, status = os.waitpid(pid, 0)
except ChildProcessError:
Expand All @@ -969,12 +952,9 @@ def _do_wait(self, pid):
callback(pid, returncode, *args)

def remove_child_handler(self, pid):
try:
pidfd, _, _ = self._callbacks.pop(pid)
except KeyError:
return False
self._loop._remove_reader(pidfd)
os.close(pidfd)
# asyncio never calls remove_child_handler() !!!
# The method is no-op but is implemented because
# abstract base classes require it.
return True


Expand Down
54 changes: 44 additions & 10 deletions Lib/test/test_asyncio/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys
import unittest
import warnings
import functools
from unittest import mock

import asyncio
Expand All @@ -30,6 +31,19 @@
'sys.stdout.buffer.write(data)'))]


@functools.cache
def _has_pidfd_support():
if not hasattr(os, 'pidfd_open'):
return False

try:
os.close(os.pidfd_open(os.getpid()))
except OSError:
return False

return True


def tearDownModule():
asyncio.set_event_loop_policy(None)

Expand Down Expand Up @@ -708,17 +722,8 @@ class SubprocessFastWatcherTests(SubprocessWatcherMixin,

Watcher = unix_events.FastChildWatcher

def has_pidfd_support():
if not hasattr(os, 'pidfd_open'):
return False
try:
os.close(os.pidfd_open(os.getpid()))
except OSError:
return False
return True

@unittest.skipUnless(
has_pidfd_support(),
_has_pidfd_support(),
"operating system does not support pidfds",
)
class SubprocessPidfdWatcherTests(SubprocessWatcherMixin,
Expand Down Expand Up @@ -751,6 +756,35 @@ async def execute():
mock.call.__exit__(RuntimeError, mock.ANY, mock.ANY),
])


@unittest.skipUnless(
_has_pidfd_support(),
"operating system does not support pidfds",
)
def test_create_subprocess_with_pidfd(self):
async def in_thread():
proc = await asyncio.create_subprocess_exec(
*PROGRAM_CAT,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
stdout, stderr = await proc.communicate(b"some data")
return proc.returncode, stdout

async def main():
# asyncio.Runner did not call asyncio.set_event_loop()
with self.assertRaises(RuntimeError):
asyncio.get_event_loop_policy().get_event_loop()
return await asyncio.to_thread(asyncio.run, in_thread())

asyncio.set_child_watcher(asyncio.PidfdChildWatcher())
try:
with asyncio.Runner(loop_factory=asyncio.new_event_loop) as runner:
returncode, stdout = runner.run(main())
self.assertEqual(returncode, 0)
self.assertEqual(stdout, b'some data')
finally:
asyncio.set_child_watcher(None)
else:
# Windows
class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
run the :class:`asyncio.PidfdChildWatcher` on the running loop, this allows event loops to run subprocesses when there is no default event loop running on the main thread

0 comments on commit 3d8b224

Please sign in to comment.