Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VersionedSchema -> ObjectSchema #583

Merged
merged 8 commits into from
Jan 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
- Ensure errors raised in state handlers are trapped appropriately in Cloud Runners - [#554](https://github.com/PrefectHQ/prefect/pull/554)
- Ensure unexpected errors raised in FlowRunners are robustly handled - [#568](https://github.com/PrefectHQ/prefect/pull/568)
- Fixed non-deterministic errors in mapping caused by clients resolving futures of other clients - [#569](https://github.com/PrefectHQ/prefect/pull/569)
- Older versions of Prefect will now ignore fields added by newer versions when deserializing objects - [#583](https://github.com/PrefectHQ/prefect/pull/583)

### Breaking Changes

Expand All @@ -37,6 +38,7 @@
- Convert `timeout` kwarg from `timedelta` to `integer` - [#540](https://github.com/PrefectHQ/prefect/issues/540)
- Remove `timeout` kwarg from `executor.wait` - [#569](https://github.com/PrefectHQ/prefect/pull/569)
- Serialization of States will _ignore_ any result data that hasn't been processed - [#581](https://github.com/PrefectHQ/prefect/pull/581)
- Removes `VersionedSchema` in favor of implicit versioning: serializers will ignore unknown fields and the `create_object` method is responsible for recreating missing ones - [#583](https://github.com/PrefectHQ/prefect/pull/583)

## 0.4.0 <Badge text="beta" type="success"/>

Expand Down
4 changes: 2 additions & 2 deletions docs/outline.toml
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ functions = ["callback_factory", "slack_notifier", "gmail_notifier"]
[pages.utilities.serialization]
title = "Serialization"
module = "prefect.utilities.serialization"
classes = ["VersionedSchema", "JSONCompatible", "Nested", "Bytes", "UUID", "FunctionReference"]
functions = ["to_qualified_name", "from_qualified_name", "version"]
classes = ["JSONCompatible", "Nested", "Bytes", "UUID", "FunctionReference"]
functions = ["to_qualified_name", "from_qualified_name"]

[pages.utilities.tasks]
title = "Tasks"
Expand Down
5 changes: 2 additions & 3 deletions src/prefect/serialization/edge.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

import prefect
from prefect.serialization.task import TaskSchema
from prefect.utilities.serialization import VersionedSchema, version
from prefect.utilities.serialization import ObjectSchema


@version("0.3.3")
class EdgeSchema(VersionedSchema):
class EdgeSchema(ObjectSchema):
class Meta:
object_class = lambda: prefect.core.Edge

Expand Down
9 changes: 3 additions & 6 deletions src/prefect/serialization/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,20 @@
Bytes,
JSONCompatible,
OneOfSchema,
VersionedSchema,
ObjectSchema,
to_qualified_name,
version,
)


@version("0.3.3")
class LocalEnvironmentSchema(VersionedSchema):
class LocalEnvironmentSchema(ObjectSchema):
class Meta:
object_class = prefect.environments.LocalEnvironment

encryption_key = Bytes(allow_none=True)
serialized_flow = Bytes(allow_none=True)


@version("0.3.3")
class ContainerEnvironmentSchema(VersionedSchema):
class ContainerEnvironmentSchema(ObjectSchema):
class Meta:
object_class = prefect.environments.ContainerEnvironment

Expand Down
8 changes: 3 additions & 5 deletions src/prefect/serialization/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
from prefect.utilities.serialization import (
JSONCompatible,
Nested,
VersionedSchema,
ObjectSchema,
to_qualified_name,
version,
)


Expand All @@ -28,11 +27,10 @@ def get_reference_tasks(obj, context):
return utils.get_value(obj, "reference_tasks")


@version("0.3.3")
class FlowSchema(VersionedSchema):
class FlowSchema(ObjectSchema):
class Meta:
object_class = lambda: prefect.core.Flow
object_class_exclude = ["id", "type", "parameters"]
exclude_fields = ["id", "type", "parameters"]
# ordered to make sure Task objects are loaded before Edge objects, due to Task caching
ordered = True

Expand Down
8 changes: 2 additions & 6 deletions src/prefect/serialization/result_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,23 @@
from prefect.utilities.serialization import (
JSONCompatible,
OneOfSchema,
VersionedSchema,
ObjectSchema,
to_qualified_name,
version,
)


@version("0.4.0")
class BaseResultHandlerSchema(VersionedSchema):
class BaseResultHandlerSchema(ObjectSchema):
class Meta:
object_class = ResultHandler


@version("0.4.0")
class CloudResultHandlerSchema(BaseResultHandlerSchema):
class Meta:
object_class = CloudResultHandler

result_handler_service = fields.String(allow_none=True)


@version("0.4.0")
class LocalResultHandlerSchema(BaseResultHandlerSchema):
class Meta:
object_class = LocalResultHandler
Expand Down
13 changes: 3 additions & 10 deletions src/prefect/serialization/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,10 @@
from marshmallow import fields

import prefect
from prefect.utilities.serialization import (
OneOfSchema,
VersionedSchema,
to_qualified_name,
version,
)
from prefect.utilities.serialization import OneOfSchema, ObjectSchema, to_qualified_name


@version("0.3.3")
class IntervalScheduleSchema(VersionedSchema):
class IntervalScheduleSchema(ObjectSchema):
class Meta:
object_class = prefect.schedules.IntervalSchedule

Expand All @@ -22,8 +16,7 @@ class Meta:
interval = fields.TimeDelta(precision="microseconds", required=True)


@version("0.3.3")
class CronScheduleSchema(VersionedSchema):
class CronScheduleSchema(ObjectSchema):
class Meta:
object_class = prefect.schedules.CronSchedule

Expand Down
27 changes: 3 additions & 24 deletions src/prefect/serialization/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
from prefect.utilities.serialization import (
JSONCompatible,
OneOfSchema,
VersionedSchema,
ObjectSchema,
to_qualified_name,
version,
)


Expand All @@ -29,13 +28,8 @@ def _serialize(self, value, attr, obj, **kwargs):
)
return super()._serialize(value, attr, obj, **kwargs)

def _deserialize(self, value, attr, data, **kwargs):
value = super()._deserialize(value, attr, data, **kwargs)
return value


@version("0.3.3")
class BaseStateSchema(VersionedSchema):
class BaseStateSchema(ObjectSchema):
class Meta:
object_class = state.State

Expand All @@ -51,23 +45,20 @@ def create_object(self, data):
return base_obj


@version("0.3.3")
class PendingSchema(BaseStateSchema):
class Meta:
object_class = state.Pending

cached_inputs = ResultHandlerField(allow_none=True)


@version("0.3.3")
class SubmittedSchema(BaseStateSchema):
class Meta:
object_class = state.Submitted

state = fields.Nested("StateSchema", allow_none=True)


@version("0.3.3")
class CachedStateSchema(PendingSchema):
class Meta:
object_class = state.CachedState
Expand All @@ -77,49 +68,42 @@ class Meta:
cached_result_expiration = fields.DateTime(allow_none=True)


@version("0.3.3")
class ScheduledSchema(PendingSchema):
class Meta:
object_class = state.Scheduled

start_time = fields.DateTime(allow_none=True)


@version("0.3.3")
class ResumeSchema(ScheduledSchema):
class Meta:
object_class = state.Resume


@version("0.3.3")
class RetryingSchema(ScheduledSchema):
class Meta:
object_class = state.Retrying

run_count = fields.Int(allow_none=True)


@version("0.3.3")
class RunningSchema(BaseStateSchema):
class Meta:
object_class = state.Running


@version("0.3.3")
class FinishedSchema(BaseStateSchema):
class Meta:
object_class = state.Finished


@version("0.3.3")
class SuccessSchema(FinishedSchema):
class Meta:
object_class = state.Success

cached = fields.Nested(CachedStateSchema, allow_none=True)


@version("0.3.3")
class MappedSchema(SuccessSchema):
class Meta:
exclude = ["result", "map_states"]
Expand All @@ -138,34 +122,29 @@ def create_object(self, data):
return super().create_object(data)


@version("0.3.3")
class FailedSchema(FinishedSchema):
class Meta:
object_class = state.Failed


@version("0.3.3")
class TimedOutSchema(FinishedSchema):
class Meta:
object_class = state.TimedOut

cached_inputs = ResultHandlerField(allow_none=True)


@version("0.3.3")
class TriggerFailedSchema(FailedSchema):
class Meta:
object_class = state.TriggerFailed


@version("0.3.3")
class SkippedSchema(SuccessSchema):
class Meta:
object_class = state.Skipped
object_class_exclude = ["cached"]
exclude_fields = ["cached"]


@version("0.3.3")
class PausedSchema(PendingSchema):
class Meta:
object_class = state.Paused
Expand Down
13 changes: 5 additions & 8 deletions src/prefect/serialization/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
UUID,
FunctionReference,
JSONCompatible,
VersionedSchema,
ObjectSchema,
from_qualified_name,
to_qualified_name,
version,
)


Expand Down Expand Up @@ -55,11 +54,10 @@ def create_object(self, data):
return self.context["task_id_cache"][task_id]


@version("0.3.3")
class TaskSchema(TaskMethodsMixin, VersionedSchema):
class TaskSchema(TaskMethodsMixin, ObjectSchema):
class Meta:
object_class = lambda: prefect.core.Task
object_class_exclude = ["id", "type"]
exclude_fields = ["id", "type"]

id = UUID()
type = fields.Function(lambda task: to_qualified_name(type(task)), lambda x: x)
Expand Down Expand Up @@ -101,11 +99,10 @@ class Meta:
)


@version("0.3.3")
class ParameterSchema(TaskMethodsMixin, VersionedSchema):
class ParameterSchema(TaskMethodsMixin, ObjectSchema):
class Meta:
object_class = lambda: prefect.core.task.Parameter
object_class_exclude = ["id", "type"]
exclude_fields = ["id", "type"]

id = UUID()
type = fields.Function(lambda task: to_qualified_name(type(task)), lambda x: x)
Expand Down
Loading