Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementations: pyspark and ray #90

Merged
merged 17 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .github/envs/environment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: test-environment
channels:
- conda-forge
dependencies:
- dask >=2025
- pandas
- polars
- pyspark
- pyarrow >=15
- numpy
- pytest
- pytest-cov
- numba
- awkward
- distributed
- openjdk ==20
- pip
- pip:
- ray[data]
- git+https://github.com/dask-contrib/dask-awkward
14 changes: 9 additions & 5 deletions .github/workflows/pypi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@ jobs:
fail-fast: false
matrix:
platform: [ubuntu-latest, macos-latest, windows-latest]
python-version: ["3.9", "3.10", "3.11", "3.12"]
python-version: ["3.10", "3.11", "3.12"]
runs-on: ${{matrix.platform}}
steps:
- name: Checkout
uses: actions/checkout@v3
- name: setup Python ${{matrix.python-version}}
uses: actions/setup-python@v4
- name: Setup Conda Environment
uses: conda-incubator/setup-miniconda@v3
with:
python-version: ${{matrix.python-version}}
python-version: ${{ matrix.python-version }}
environment-file: .github/envs/environment.yml
activate-environment: test-environment
- name: install
shell: bash -l {0}
run: |
pip install pip wheel -U
pip install -q --no-cache-dir .[test]
pip install -q --no-cache-dir -e .[test]
pip list
- name: test
shell: bash -l {0}
run: |
python -m pytest -v --cov-config=.coveragerc --cov akimbo
10 changes: 10 additions & 0 deletions docs/demo/akimbo-demo.ipynb → docs/akimbo-demo.ipynb
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "8b1be0e8",
"metadata": {},
"source": [
"# HEP Demo\n",
"\n",
"Here we show a plausible small workflow on a real excerpt of particle data."
]
},
{
"cell_type": "code",
"execution_count": 1,
Expand Down
36 changes: 21 additions & 15 deletions docs/api.rst
Original file line number Diff line number Diff line change
@@ -1,21 +1,6 @@
akimbo
==============

.. currentmodule:: akimbo

Top Level Functions
~~~~~~~~~~~~~~~~~~~

.. autosummary::
:toctree: generated/

read_parquet
read_json
read_avro
get_parquet_schema
get_json_schema
get_avro_schema

Accessor
~~~~~~~~

Expand All @@ -38,6 +23,8 @@ Backends
akimbo.dask.DaskAwkwardAccessor
akimbo.polars.PolarsAwkwardAccessor
akimbo.cudf.CudfAwkwardAccessor
akimbo.ray.RayAccessor
akimbo.spark.SparkAccessor

.. autoclass:: akimbo.pandas.PandasAwkwardAccessor

Expand All @@ -47,6 +34,25 @@ Backends

.. autoclass:: akimbo.cudf.CudfAwkwardAccessor

.. autoclass:: akimbo.ray.RayAccessor

.. autoclass:: akimbo.spark.SparkAccessor

Top Level Functions
~~~~~~~~~~~~~~~~~~~
.. currentmodule:: akimbo


.. autosummary::
:toctree: generated/

read_parquet
read_json
read_avro
get_parquet_schema
get_json_schema
get_avro_schema


Extensions
~~~~~~~~~~
Expand Down
8 changes: 1 addition & 7 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,7 @@
]

templates_path = ["_templates"]
exclude_patterns = [
"_build",
"Thumbs.db",
".DS_Store",
"**.ipynb_checkpoints",
"**akimbo-demo.ipynb",
]
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store", "**.ipynb_checkpoints"]


# -- Options for HTML output -------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions docs/cudf-ak.ipynb
1 change: 0 additions & 1 deletion docs/demo/.gitignore

This file was deleted.

File renamed without changes.
16 changes: 15 additions & 1 deletion example/cudf-ak.ipynb → docs/example/cudf-ak.ipynb
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "ee00a3e2",
"metadata": {},
"source": [
"# GPU backend"
]
},
{
"cell_type": "markdown",
"id": "58d18a3a-45b1-425a-b822-e8be0a6c0bc0",
"metadata": {},
"source": [
"This example depends on data in a file that can be made in the following way.\n",
"\n",
"```python\n",
"import awkward as ak\n",
Expand All @@ -14,6 +23,11 @@
" [[6, 7]]] * N\n",
" arr = ak.Array({\"a\": part})\n",
" ak.to_parquet(arr, fn, extensionarray=False)\n",
"```\n",
"\n",
"The file cuda-env.yaml can be used to create a functional environment using conda:\n",
"```bash\n",
"$ conda env create -f example/cuda-env.yaml\n",
"```"
]
},
Expand Down Expand Up @@ -617,7 +631,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.0"
"version": "3.10.9"
}
},
"nbformat": 4,
Expand Down
9 changes: 9 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ identical syntax:
- dask.dataframe
- polars
- cuDF
- ray dataset
- pyspark


numpy-like API
Expand Down Expand Up @@ -111,6 +113,13 @@ the ``akimbo`` system, you can apply these methods to ragged/nested dataframes.
install.rst
quickstart.ipynb

.. toctree::
:maxdepth: 1
:caption: Demos

akimbo-demo.ipynb
cudf-ak.ipynb

.. toctree::
:maxdepth: 1
:caption: API Reference
Expand Down
6 changes: 5 additions & 1 deletion docs/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ Requirements
~~~~~~~~~~~~

To install ``akimbo`` you will need ``awkward`` and
one of the backend libraries: ``pandas``, ``dask`` or ``polars``.
one of the backend libraries: ``pandas``, ``dask``, ``cuDF``, ``ray.data``,
``pyspark`` or ``polars``. Each of there have various installation options,
please see their respective documentation.

``akimbo`` depends on ``pyarrow`` and ``awkward``.


From PyPI
Expand Down
File renamed without changes
File renamed without changes
6 changes: 5 additions & 1 deletion src/akimbo/apply_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ def dec(
match: function to determine if a part of the data structure matches the type we want to
operate on
outtype: postprocessing function after transform
inmode: how ``func`` expects its inputs: as awkward arrays (ak), numpy or arrow
inmode: how ``func`` expects its inputs: as
- ak: awkward arrays,
- numpy
- arrow
- other: anything that can be cast to ak arrays, e.g., number literals
"""

@functools.wraps(func)
Expand Down
18 changes: 16 additions & 2 deletions src/akimbo/cudf.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ def f(lay, method=meth, **kwargs):


class CudfAwkwardAccessor(Accessor):
"""Operations on cuDF dataframes on the GPU.

Data are kept in GPU memory and use views rather than copies where
possible.
"""

series_type = Series
dataframe_type = DataFrame

Expand Down Expand Up @@ -145,9 +151,17 @@ def str(self):
try:
cast = dec_cu(libcudf.unary.cast, match=leaf)
except AttributeError:

def cast_inner(col, dtype):
return cudf.core.column.ColumnBase(col.data, size=len(col), dtype=np.dtype(dtype),
mask=None, offset=0, children=())
return cudf.core.column.ColumnBase(
col.data,
size=len(col),
dtype=np.dtype(dtype),
mask=None,
offset=0,
children=(),
)

cast = dec_cu(cast_inner, match=leaf)

@property
Expand Down
2 changes: 1 addition & 1 deletion src/akimbo/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def run(self, *args, **kwargs):
ar = [self._to_tt(ar) if hasattr(ar, "ak") else ar for ar in ar]
out = op(tt, *ar, **kwargs)
meta = PandasAwkwardAccessor._to_output(
ak.typetracer.length_zero_if_typetracer(out)
ak.typetracer.length_one_if_typetracer(out)
)
except (ValueError, TypeError):
meta = None
Expand Down
2 changes: 1 addition & 1 deletion src/akimbo/datetimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def __init__(self, accessor) -> None:
floor_temporal = dec_t(pc.floor_temporal)
reound_temporal = dec_t(pc.round_temporal)
strftime = dec_t(pc.strftime)
strptime = dec_t(pc.strptime)
# strptime = dec_t(pc.strptime) # this is in .str instead
day = dec_t(pc.day)
day_of_week = dec_t(pc.day_of_week)
day_of_year = dec_t(pc.day_of_year)
Expand Down
14 changes: 12 additions & 2 deletions src/akimbo/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@


def ak_to_series(ds, backend="pandas", extract=True):
"""Make backend-specific series from data"""
if backend == "pandas":
import akimbo.pandas

Expand All @@ -23,13 +24,18 @@ def ak_to_series(ds, backend="pandas", extract=True):
import akimbo.cudf

s = akimbo.cudf.CudfAwkwardAccessor._to_output(ds)
elif backend in ["ray", "spark"]:
raise ValueError("Backend only supports dataframes, not series")

else:
raise ValueError("Backend must be in {'pandas', 'polars', 'dask'}")
if extract and ds.fields:
return s.ak.unpack()
return s


# TODO: read_parquet should use native versions rather than convert. This version
# is OK for pandas
def read_parquet(
url: str,
storage_options: dict | None = None,
Expand Down Expand Up @@ -60,6 +66,8 @@ def read_parquet(
return ak_to_series(ds, backend, extract=extract)


# TODO: should be a map over input files, maybe with newline byte blocks
# as in dask
def read_json(
url: str,
storage_options: dict | None = None,
Expand Down Expand Up @@ -124,6 +132,8 @@ def get_json_schema(
return layout_to_jsonschema(arr.layout)


# TODO: should be a map over input files, maybe with newline byte blocks
# as in dask
def read_avro(
url: str,
storage_options: dict | None = None,
Expand Down Expand Up @@ -205,9 +215,9 @@ def join(
merge = _merge

counts = np.empty(len(table1), dtype="uint64")
# TODO: the line below over-allocates, can switch to somehing growable
# TODO: the line below over-allocates, can switch to something growable
matches = np.empty(len(table2), dtype="uint64")
# TODO: to_numpy(allow_missong) makes this a bit faster, but is not
# TODO: to_numpy(allow_missing) makes this a bit faster, but is not
# not GPU general
counts, matches, ind = merge(table1[key], table2[key], counts, matches)
matches.resize(int(ind), refcheck=False)
Expand Down
Loading
Loading