Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop Tornado #876

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ jobs:
- name: Install the Python dependencies
run: |
pip install .[test] codecov
pip install https://github.com/davidbrochart/jupyter_client/archive/async_stream.zip

- name: Install matplotlib
if: ${{ !startsWith(matrix.os, 'macos') && !startsWith(matrix.python-version, 'pypy') }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/downstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,5 @@ jobs:
git clone https://github.com/jupyter/jupyter_kernel_test.git
cd jupyter_kernel_test
pip install -e ".[test]"
pip install https://github.com/davidbrochart/jupyter_client/archive/async_stream.zip
python test_ipykernel.py
36 changes: 0 additions & 36 deletions examples/embedding/inprocess_qtconsole.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import os
import sys

import tornado
from IPython.lib import guisupport
from qtconsole.inprocess import QtInProcessKernelManager
from qtconsole.rich_ipython_widget import RichIPythonWidget
Expand All @@ -11,44 +9,10 @@ def print_process_id():
print("Process ID is:", os.getpid())


def init_asyncio_patch():
"""set default asyncio policy to be compatible with tornado
Tornado 6 (at least) is not compatible with the default
asyncio implementation on Windows
Pick the older SelectorEventLoopPolicy on Windows
if the known-incompatible default policy is in use.
do this as early as possible to make it a low priority and overrideable
ref: https://github.com/tornadoweb/tornado/issues/2608
FIXME: if/when tornado supports the defaults in asyncio,
remove and bump tornado requirement for py38
"""
if (
sys.platform.startswith("win")
and sys.version_info >= (3, 8)
and tornado.version_info < (6, 1)
):
import asyncio

try:
from asyncio import (
WindowsProactorEventLoopPolicy,
WindowsSelectorEventLoopPolicy,
)
except ImportError:
pass
# not affected
else:
if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy:
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())


def main():
# Print the ID of the main process
print_process_id()

init_asyncio_patch()
app = guisupport.get_app_qt4()

# Create an in-process kernel
Expand Down
36 changes: 0 additions & 36 deletions examples/embedding/inprocess_terminal.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import os
import sys

import tornado
from jupyter_console.ptshell import ZMQTerminalInteractiveShell

from ipykernel.inprocess.manager import InProcessKernelManager
Expand All @@ -11,46 +9,12 @@ def print_process_id():
print("Process ID is:", os.getpid())


def init_asyncio_patch():
"""set default asyncio policy to be compatible with tornado
Tornado 6 (at least) is not compatible with the default
asyncio implementation on Windows
Pick the older SelectorEventLoopPolicy on Windows
if the known-incompatible default policy is in use.
do this as early as possible to make it a low priority and overrideable
ref: https://github.com/tornadoweb/tornado/issues/2608
FIXME: if/when tornado supports the defaults in asyncio,
remove and bump tornado requirement for py38
"""
if (
sys.platform.startswith("win")
and sys.version_info >= (3, 8)
and tornado.version_info < (6, 1)
):
import asyncio

try:
from asyncio import (
WindowsProactorEventLoopPolicy,
WindowsSelectorEventLoopPolicy,
)
except ImportError:
pass
# not affected
else:
if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy:
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())


def main():
print_process_id()

# Create an in-process kernel
# >>> print_process_id()
# will print the same process ID as the main process
init_asyncio_patch()
kernel_manager = InProcessKernelManager()
kernel_manager.start_kernel()
kernel = kernel_manager.kernel
Expand Down
10 changes: 5 additions & 5 deletions ipykernel/control.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import asyncio
from threading import Thread

from tornado.ioloop import IOLoop


class ControlThread(Thread):
def __init__(self, **kwargs):
Thread.__init__(self, name="Control", **kwargs)
self.io_loop = IOLoop(make_current=False)
self.pydev_do_not_trace = True
self.is_pydev_daemon_thread = True
self.io_loop = asyncio.new_event_loop()

def run(self):
self.name = "Control"
asyncio.set_event_loop(self.io_loop)
try:
self.io_loop.start()
self.io_loop.run_forever()
finally:
self.io_loop.close()

Expand All @@ -22,4 +22,4 @@ def stop(self):

This method is threadsafe.
"""
self.io_loop.add_callback(self.io_loop.stop)
self.io_loop.call_soon_threadsafe(self.io_loop.stop)
21 changes: 12 additions & 9 deletions ipykernel/debugger.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os
import re
import sys
Expand All @@ -6,8 +7,6 @@
import zmq
from IPython.core.getipython import get_ipython
from IPython.core.inputtransformer2 import leading_empty_lines
from tornado.locks import Event
from tornado.queues import Queue
from zmq.utils import jsonapi

try:
Expand Down Expand Up @@ -86,11 +85,13 @@ class DebugpyMessageQueue:
SEPARATOR = "\r\n\r\n"
SEPARATOR_LENGTH = 4

message_queue: asyncio.Queue[t.Dict[str, t.Any]]

def __init__(self, event_callback, log):
self.tcp_buffer = ""
self._reset_tcp_pos()
self.event_callback = event_callback
self.message_queue: Queue[t.Any] = Queue()
self.message_queue = asyncio.Queue()
self.log = log

def _reset_tcp_pos(self):
Expand Down Expand Up @@ -171,7 +172,7 @@ def __init__(self, log, debugpy_stream, event_callback):
self.debugpy_port = -1
self.routing_id = None
self.wait_for_attach = True
self.init_event = Event()
self.init_event = asyncio.Event()
self.init_event_seq = -1

def _get_endpoint(self):
Expand Down Expand Up @@ -245,7 +246,7 @@ def connect_tcp_socket(self):
def disconnect_tcp_socket(self):
self.debugpy_stream.socket.disconnect(self._get_endpoint())
self.routing_id = None
self.init_event = Event()
self.init_event = asyncio.Event()
self.init_event_seq = -1
self.wait_for_attach = True

Expand Down Expand Up @@ -278,6 +279,8 @@ class Debugger:
"configurationDone",
]

stopped_queue: asyncio.Queue[t.Dict[str, t.Any]]

# Requests that can be handled even if the debugger is not running
static_debug_msg_types = ["debugInfo", "inspectVariables", "richInspectVariables", "modules"]

Expand All @@ -291,7 +294,7 @@ def __init__(
self.is_started = False
self.event_callback = event_callback
self.just_my_code = just_my_code
self.stopped_queue: Queue[t.Any] = Queue()
self.stopped_queue = asyncio.Queue()

self.started_debug_handlers = {}
for msg_type in Debugger.started_debug_msg_types:
Expand Down Expand Up @@ -367,7 +370,7 @@ async def handle_stopped_event(self):
def tcp_client(self):
return self.debugpy_client

def start(self):
async def start(self):
if not self.debugpy_initialized:
tmp_dir = get_tmp_directory()
if not os.path.exists(tmp_dir):
Expand All @@ -384,7 +387,7 @@ def start(self):
(self.shell_socket.getsockopt(ROUTING_ID)),
)

ident, msg = self.session.recv(self.shell_socket, mode=0)
ident, msg = await self.session.async_recv(self.shell_socket, mode=0)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.debugpy_initialized = msg["content"]["status"] == "ok"

# Don't remove leading empty lines when debugging so the breakpoints are correctly positioned
Expand Down Expand Up @@ -631,7 +634,7 @@ async def process_request(self, message):
if self.is_started:
self.log.info("The debugger has already started")
else:
self.is_started = self.start()
self.is_started = await self.start()
if self.is_started:
self.log.info("The debugger has started")
else:
Expand Down
4 changes: 2 additions & 2 deletions ipykernel/eventloops.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def on_timer(self, event):
self.func()

# We need a custom wx.App to create our Frame subclass that has the
# wx.Timer to defer back to the tornado event loop.
# wx.Timer to defer back to the event loop.
class IPWxApp(wx.App):
def OnInit(self):
self.frame = TimerFrame(wake)
Expand Down Expand Up @@ -378,7 +378,7 @@ def loop_asyncio(kernel):
import asyncio

loop = asyncio.get_event_loop()
# loop is already running (e.g. tornado 5), nothing left to do
# loop is already running, nothing left to do
if loop.is_running():
return

Expand Down
40 changes: 0 additions & 40 deletions ipykernel/inprocess/tests/test_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from io import StringIO

import pytest
import tornado
from IPython.utils.io import capture_output
from jupyter_client.session import Session

Expand All @@ -20,44 +19,6 @@
orig_msg = Session.msg


def _init_asyncio_patch():
"""set default asyncio policy to be compatible with tornado

Tornado 6 (at least) is not compatible with the default
asyncio implementation on Windows

Pick the older SelectorEventLoopPolicy on Windows
if the known-incompatible default policy is in use.

do this as early as possible to make it a low priority and overrideable

ref: https://github.com/tornadoweb/tornado/issues/2608

FIXME: if/when tornado supports the defaults in asyncio,
remove and bump tornado requirement for py38
"""
if (
sys.platform.startswith("win")
and sys.version_info >= (3, 8)
and tornado.version_info < (6, 1)
):
import asyncio

try:
from asyncio import (
WindowsProactorEventLoopPolicy,
WindowsSelectorEventLoopPolicy,
)
except ImportError:
pass
# not affected
else:
if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy:
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())


def _inject_cell_id(_self, *args, **kwargs):
"""
This patch jupyter_client.session:Session.msg to add a cell_id to the return message metadata
Expand All @@ -80,7 +41,6 @@ def patch_cell_id():

class InProcessKernelTestCase(unittest.TestCase):
def setUp(self):
_init_asyncio_patch()
self.km = InProcessKernelManager()
self.km.start_kernel()
self.kc = self.km.client()
Expand Down
25 changes: 17 additions & 8 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

import asyncio
import atexit
import io
import os
Expand All @@ -18,8 +19,8 @@

import zmq
from jupyter_client.session import extract_header
from tornado.ioloop import IOLoop
from zmq.eventloop.zmqstream import ZMQStream

from .zmqstream import ZMQStream

# -----------------------------------------------------------------------------
# Globals
Expand Down Expand Up @@ -57,7 +58,7 @@ def __init__(self, socket, pipe=False):
self.background_socket = BackgroundSocket(self)
self._master_pid = os.getpid()
self._pipe_flag = pipe
self.io_loop = IOLoop(make_current=False)
self.io_loop = asyncio.new_event_loop()
if pipe:
self._setup_pipe_in()
self._local = threading.local()
Expand All @@ -72,8 +73,11 @@ def __init__(self, socket, pipe=False):

def _thread_main(self):
"""The inner loop that's actually run in a thread"""
self.io_loop.start()
self.io_loop.close(all_fds=True)
asyncio.set_event_loop(self.io_loop)
try:
self.io_loop.run_forever()
finally:
self.io_loop.close()

def _setup_event_pipe(self):
"""Create the PULL socket listening for events that should fire in this thread."""
Expand Down Expand Up @@ -181,13 +185,14 @@ def stop(self):
"""Stop the IOPub thread"""
if not self.thread.is_alive():
return
self.io_loop.add_callback(self.io_loop.stop)
self.thread.join()
self.io_loop.call_soon_threadsafe(self.io_loop.stop)
# self.thread.join()
# close *all* event pipes, created in any thread
# event pipes can only be used from other threads while self.thread.is_alive()
# so after thread.join, this should be safe
for event_pipe in self._event_pipes:
event_pipe.close()
self._event_puller.socket.close()

def close(self):
if self.closed:
Expand Down Expand Up @@ -456,7 +461,11 @@ def _schedule_flush(self):

# add_timeout has to be handed to the io thread via event pipe
def _schedule_in_thread():
self._io_loop.call_later(self.flush_interval, self._flush)
# FIXME: original code was:
# self._io_loop.call_later(self.flush_interval, self._flush)
self._io_loop.call_soon_threadsafe(
self._io_loop.call_later, self.flush_interval, self._flush
)

self.pub_thread.schedule(_schedule_in_thread)

Expand Down
Loading