Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds unwind processor. #167

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dataflows/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .stream import stream
from .unpivot import unpivot
from .unstream import unstream
from .unwind import unwind
from .update_package import update_package, add_metadata
from .update_resource import update_resource
from .update_schema import update_schema
Expand Down
54 changes: 54 additions & 0 deletions dataflows/processors/unwind.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from dataflows.helpers.resource_matcher import ResourceMatcher
from dataflows.processors.add_computed_field import get_new_fields


def unwind(from_key: str, to_key: dict, transformer=None, resources=None, source_delete=True):

"""From a row of data, generate a row per value from from_key, where the value is set onto to_key."""

def _unwinder(rows):
for row in rows:
try:
iter(row[from_key])
for value in row[from_key]:
ret = {}
ret.update(row)
ret[to_key['name']] = value if transformer is None else transformer(value)
if source_delete is True:
del ret[from_key]
yield ret
except TypeError:
# no iterable to unwind. Take the value we have and set it on the to_key.
ret = {}
ret.update(row)
ret[to_key['name']] = (
ret[from_key] if transformer is None else transformer(ret[from_key])
)
if source_delete is True:
del ret[from_key]
yield ret

def func(package):
matcher = ResourceMatcher(resources, package.pkg)
for resource in package.pkg.descriptor['resources']:
if matcher.match(resource['name']):
new_fields = get_new_fields(
resource, [{'target': {'name': to_key['name'], 'type': to_key['type']}}]
)
if source_delete is True:
resource['schema']['fields'] = [
field
for field in resource['schema']['fields']
if not field['name'] == from_key
]
resource['schema']['fields'].extend(new_fields)

yield package.pkg
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before yielding here, you can process the datapackage itself and remove the from_key field if source_delete and from_key != to_key.


for resource in package:
if matcher.match(resource.res.name):
yield _unwinder(resource)
else:
yield resource

return func
104 changes: 103 additions & 1 deletion tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -2309,4 +2309,106 @@ def mult(row):
add_field('c', 'integer'),
parallelize(mult),
).results()[0][0][:100]
print(res)
print(res)


def test_unwind_basic():
from dataflows import Flow, unwind

data = [
{
'id': 1,
'title': 'Blog Post',
'tags': ['hello', 'world'],
'comments': ['Nice post.', 'Well written'],
}
]
results, dp, _ = Flow(
data,
unwind('tags', {'name': 'tag', 'type': 'string'}),
).results()

assert len(results[0]) == 2
assert results[0][0]['tag'] == 'hello'
assert 'tag' in [f.name for f in dp.resources[0].schema.fields]


def test_unwind_twice_in_flow():
from dataflows import Flow, unwind

data = [
{
'id': 1,
'title': 'Blog Post',
'tags': ['hello', 'world'],
'comments': ['Nice post.', 'Well written'],
}
]
results, dp, _ = Flow(
data,
unwind('tags', {'name': 'tag', 'type': 'string'}),
unwind('comments', {'name': 'comment', 'type': 'string'}),
).results()

assert len(results[0]) == 4
assert results[0][0]['tag'] == 'hello'
assert results[0][0]['comment'] == 'Nice post.'


def test_unwind_from_key_not_iterable():
from dataflows import Flow, unwind

data = [
{'id': 1, 'title': 'Blog Post', 'tags': None, 'comments': ['Nice post.', 'Well written']}
]
results, dp, _ = Flow(
data,
unwind('tags', {'name': 'tag', 'type': 'string'}),
).results()

assert len(results[0]) == 1
assert results[0][0]['tag'] == None


def test_unwind_source_delete():
from dataflows import Flow, unwind

data = [{'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world']}]
results, dp, _ = Flow(
data,
unwind('tags', {'name': 'tag', 'type': 'string'}),
).results()

assert len(results[0]) == 2
assert results[0][0]['tag']
assert not results[0][0].get('tags')
assert 'tags' not in [f.name for f in dp.resources[0].schema.fields]


def test_unwind_source_keep():
from dataflows import Flow, unwind

data = [{'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world']}]
results, dp, _ = Flow(
data,
unwind('tags', {'name': 'tag', 'type': 'string'}, source_delete=False),
).results()

assert len(results[0]) == 2
assert results[0][0]['tag']
assert results[0][0]['tags'] == data[0]['tags']
assert 'tags' in [f.name for f in dp.resources[0].schema.fields]


def test_unwind_with_transformer():
from dataflows import Flow, unwind

data = [{'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world']}]
results, dp, _ = Flow(
data,
unwind('tags', {'name': 'tag', 'type': 'string'}, lambda v: v.title()),
).results()

assert len(results[0]) == 2
assert results[0][0]['tag'] == 'Hello'
assert results[0][1]['tag'] == 'World'