Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-ipython: add support for %%vdkingest #2866

Merged
merged 3 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions projects/vdk-plugins/vdk-ipython/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,49 @@ select * from placeholder_todo
where completed = True
```

### Ingesting data with %%vdkingest

```toml
%%vdkingest

# Data Source Configuration
[sources.yourSourceId]
## Data Source Name. Installed dta sources can be seen using vdk data-sources --list
name = "<data-source-name>"
## The singer tap we will use
config = {
## Set the configuration for the data source.
## You can see what config options are supported with vdk data-sources --config <data-source-name>
}

[sources.yourSourceId_2]
# repeat this for as many sources you want
# ...

# Data Destination Configuration.
## Ingestion methods and targets are the same one as those accepted by send_object_for_ingestion
## See https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-core/src/vdk/api/job_input.py#L183
[destinations.yourDestinationId]
## the only required parameter is method
method = "<method-name>"
## Optionally specify target
## target =

[destinations.yourDestinationId_2]
# repeat this for as many destinations you want
# ...

# Data Flows from Source to Destination
[[flows]]
from = "yourSourceId"
to = "yourDestinationId"

[[flows]]
from = "yourSourceId_2"
to = "yourDestinationId_2"
```

Complete the full self-paced tutorial at https://bit.ly/vdk-ingest

### Build and testing

Expand Down
1 change: 1 addition & 0 deletions projects/vdk-plugins/vdk-ipython/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ ipyaggrid
# testing dependecies
pytest
vdk-core
vdk-data-sources
vdk-sqlite
vdk-test-utils
3 changes: 2 additions & 1 deletion projects/vdk-plugins/vdk-ipython/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import setuptools


__version__ = "0.1.0"
__version__ = "0.2.5"

setuptools.setup(
name="vdk-ipython",
Expand All @@ -28,4 +28,5 @@
"Programming Language :: Python :: 3.11",
"Framework :: IPython",
],
extras_require={"data-sources": ["vdk-data-sources"]},
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
import logging

from vdk.plugin.ipython.ingest import vdkingest
from vdk.plugin.ipython.job import load_job
from vdk.plugin.ipython.job import magic_load_job
from vdk.plugin.ipython.sql import vdksql
Expand All @@ -20,3 +21,6 @@ def load_ipython_extension(ipython):
"""
ipython.register_magic_function(magic_load_job, magic_name="reload_VDK")
ipython.register_magic_function(vdksql, magic_kind="cell", magic_name="vdksql")
ipython.register_magic_function(
vdkingest, magic_kind="cell", magic_name="vdkingest"
)
50 changes: 50 additions & 0 deletions projects/vdk-plugins/vdk-ipython/src/vdk/plugin/ipython/ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import os
import warnings

from IPython import get_ipython
from vdk.api.job_input import IJobInput
from vdk.plugin.ipython import job
from vdk.plugin.ipython.common import show_ipython_error
from vdk.plugin.ipython.job import JobControl

log = logging.getLogger(__name__)


def vdkingest(line, cell):
"""
TOML based ingestion configuration
"""

vdk: JobControl = get_ipython().user_global_ns.get("VDK", None)
if not vdk:
log.warning(
"VDK is not initialized with '%reload_VDK'. "
"Will auto-initialize now wth default parameters."
)
job.load_job()
vdk = get_ipython().user_global_ns.get("VDK", None)
if not vdk:
message = "VDK cannot initialized. Please execute: %reload_VDK"
show_ipython_error(message)
return None

job_input: IJobInput = vdk.get_initialized_job_input()

try:
from vdk.plugin.data_sources.mapping.data_flow import DataFlowInput
except ImportError:
raise ImportError(
"vdk-data-sources is not installed. %%vdkingest is not available without it"
)

from vdk.plugin.data_sources.mapping import toml_parser
import toml

parsed_toml = toml.loads(cell)
definitions = toml_parser.definitions_from_dict(parsed_toml)

with DataFlowInput(job_input) as flow_input:
flow_input.start_flows(definitions)
5 changes: 3 additions & 2 deletions projects/vdk-plugins/vdk-ipython/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ def ip(session_ip):


@pytest.fixture(scope="function")
def sqlite_ip(ip, tmpdir):
job_dir = str(tmpdir) + "vdk-sqlite.db"
def sqlite_ip(ip, tmp_path):
job_dir = f"{tmp_path}/vdk-sqlite.db"
ip.get_ipython().run_cell("%env VDK_INGEST_METHOD_DEFAULT=sqlite")
ip.get_ipython().run_cell(f"%env VDK_SQLITE_FILE={job_dir}")
ip.get_ipython().run_cell(f"%env VDK_INGEST_TARGET_DEFAULT={job_dir}")
ip.get_ipython().run_cell("%env VDK_DB_DEFAULT_TYPE=SQLITE")
ip.get_ipython().run_cell("%env INGESTER_WAIT_TO_FINISH_AFTER_EVERY_SEND=true")
yield ip
40 changes: 40 additions & 0 deletions projects/vdk-plugins/vdk-ipython/tests/test_ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import logging
import os
from unittest.mock import patch

import ipyaggrid

_log = logging.getLogger(__name__)


@patch.dict(
os.environ,
{"USE_DEFAULT_CELL_TABLE_OUTPUT": "true"},
)
def test_ingest_cell(sqlite_ip, capsys):
query = """%%vdkingest
[sources]
s1 = {name = "auto-generated-data"}

[destinations]
d1 = {method = "sqlite"}

[[flows]]
from="s1"
to="d1"
"""
sqlite_ip.get_ipython().run_cell(query)
assert capsys.readouterr().out == ""

select_query = """
%%vdksql
SELECT * from stream_0
"""
capsys.readouterr() # reset buffer
select_output = sqlite_ip.get_ipython().run_cell(select_query).result
assert select_output.values.tolist() == [
[1, "Stream_0_Name_0", 0],
[2, "Stream_0_Name_1", 0],
]
5 changes: 3 additions & 2 deletions projects/vdk-plugins/vdk-ipython/tests/test_job_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,12 @@ def test_calling_get_initialise_job_input_multiple_times(ip):


# uses the pytest tmpdir fixture - https://docs.pytest.org/en/6.2.x/tmpdir.html#the-tmpdir-fixture
def test_extension_with_ingestion_job(ip, tmpdir):
def test_extension_with_ingestion_job(ip, tmp_path):
# set environmental variables via Jupyter notebook
job_dir = str(tmpdir) + "vdk-sqlite.db"
job_dir = f"{tmp_path}/vdk-sqlite.db"
ip.get_ipython().run_cell("%env VDK_INGEST_METHOD_DEFAULT=sqlite")
ip.get_ipython().run_cell(f"%env VDK_SQLITE_FILE={job_dir}")
ip.get_ipython().run_cell(f"%env VDK_INGEST_TARGET_DEFAULT={job_dir}")
ip.get_ipython().run_cell("%env VDK_DB_DEFAULT_TYPE=SQLITE")
ip.get_ipython().run_cell("%env INGESTER_WAIT_TO_FINISH_AFTER_EVERY_SEND=true")

Expand Down