From 5541d467cb9233b3e8b1fe5b80eba2f17eb75446 Mon Sep 17 00:00:00 2001 From: Darya Sharkova Date: Sat, 12 Jun 2021 13:49:31 +0300 Subject: [PATCH] Coordinator pipeline draft. --- agents/build.gradle | 10 +++- .../optimizer/sql/agents/Coordinator.java | 20 +++---- .../agents/CoordinatorExecutorPipeline.java | 56 +++++++++++++++++++ .../optimizer/sql/agents/ExecutorImpl.java | 29 ++++++++++ .../optimizer/sql/agents/UserSource.java | 24 ++++++++ 5 files changed, 128 insertions(+), 11 deletions(-) create mode 100644 agents/src/main/java/com/flamestream/optimizer/sql/agents/CoordinatorExecutorPipeline.java create mode 100644 agents/src/main/java/com/flamestream/optimizer/sql/agents/ExecutorImpl.java create mode 100644 agents/src/main/java/com/flamestream/optimizer/sql/agents/UserSource.java diff --git a/agents/build.gradle b/agents/build.gradle index d8c649d..39dba5a 100644 --- a/agents/build.gradle +++ b/agents/build.gradle @@ -21,7 +21,7 @@ dependencies { implementation 'org.apache.beam:beam-sdks-java-extensions-sql:2.27.0' implementation 'org.apache.beam:beam-sdks-java-extensions-sql-zetasql:2.27.0' implementation 'org.apache.beam:beam-sdks-java-test-utils:2.27.0' - implementation 'org.apache.beam:beam-runners-flink-1.11:2.27.0' + implementation 'org.apache.beam:beam-runners-flink-1.10:2.27.0' implementation 'org.apache.beam:beam-runners-spark:2.27.0' implementation 'org.apache.beam:beam-sdks-java-io-kafka:2.27.0' implementation 'io.confluent:kafka-avro-serializer:5.3.2' @@ -31,4 +31,12 @@ dependencies { annotationProcessor 'com.google.auto.value:auto-value:1.6' testCompile group: 'org.hamcrest', name: 'hamcrest', version: '2.2' implementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30' + // https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.12.2' + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.12.2' + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.12.2' + implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-joda', version: '2.12.2' + implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jdk8', version: '2.12.2' + implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.12.2' + implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-jaxb-annotations', version: '2.12.2' } \ No newline at end of file diff --git a/agents/src/main/java/com/flamestream/optimizer/sql/agents/Coordinator.java b/agents/src/main/java/com/flamestream/optimizer/sql/agents/Coordinator.java index 467e5e8..d1e01c5 100644 --- a/agents/src/main/java/com/flamestream/optimizer/sql/agents/Coordinator.java +++ b/agents/src/main/java/com/flamestream/optimizer/sql/agents/Coordinator.java @@ -1,15 +1,15 @@ package com.flamestream.optimizer.sql.agents; -import org.apache.beam.sdk.extensions.sql.impl.ParseException; -import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner; -import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.*; -import org.apache.calcite.tools.Planner; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.beam.sdk.values.Row; import org.checkerframework.checker.nullness.compatqual.NonNullType; +import org.checkerframework.checker.nullness.qual.NonNull; + import java.util.stream.Stream; interface Coordinator { @@ -25,14 +25,14 @@ interface SqlQueryJob { } interface QueryJobBuilder { - QueryJobBuilder addOutput(PTransform, PDone> sink); + QueryJobBuilder addOutput(PTransform<@NonNull PCollection, @NonNull PDone> sink); - QueryJobBuilder setPreHook(PTransform, PCollection> hook); - QueryJobBuilder setPostHook(PTransform, PCollection> hook); + QueryJobBuilder setPreHook(PTransform<@NonNull PCollection, @NonNull PCollection> hook); + QueryJobBuilder setPostHook(PTransform<@NonNull PCollection, @NonNull PCollection> hook); - QueryJobBuilder registerUdf(String functionName, SerializableFunction sfn); + QueryJobBuilder registerUdf(String functionName, SerializableFunction sfn); QueryJobBuilder registerUdf(String functionName, Class clazz, String method); - QueryJobBuilder registerUdaf(String functionName, Combine.CombineFn combineFn); + QueryJobBuilder registerUdf(String functionName, Combine.CombineFn combineFn); SqlQueryJob build(String query); } diff --git a/agents/src/main/java/com/flamestream/optimizer/sql/agents/CoordinatorExecutorPipeline.java b/agents/src/main/java/com/flamestream/optimizer/sql/agents/CoordinatorExecutorPipeline.java new file mode 100644 index 0000000..f0a2f70 --- /dev/null +++ b/agents/src/main/java/com/flamestream/optimizer/sql/agents/CoordinatorExecutorPipeline.java @@ -0,0 +1,56 @@ +package com.flamestream.optimizer.sql.agents; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.calcite.tools.Planner; +import org.checkerframework.checker.nullness.qual.NonNull; + +import java.util.Collection; + +public class CoordinatorExecutorPipeline { + public static void fromUserQuerySource( + final @NonNull Pipeline pipeline, + final Planner planner, // could possibly be removed in the near future + final QueryPlanner queryPlanner, + final CostEstimator costEstimator, + final @NonNull PTransform<@NonNull ? super PBegin, @NonNull PCollection> userQuerySource, + final @NonNull Collection inputs) { + + final PCollection queries = pipeline.apply("ReadQuery", userQuerySource); + + final Executor executor = new ExecutorImpl(); + final Coordinator coordinator = new CoordinatorImpl(planner, queryPlanner, costEstimator, executor); + for (UserSource input : inputs) { + coordinator.registerInput(input.getTag(), input.getSource()); + } + + queries.apply("SubmitToCoordinator", ParDo.of(new CoordinatorExecutorDoFn(coordinator))); + } +} + +// TODO should it return the result or simply send the pipeline to the cluster? I would expect the result +class CoordinatorExecutorDoFn extends DoFn { + private final Coordinator coordinator; + + public CoordinatorExecutorDoFn(@NonNull Coordinator coordinator) { + this.coordinator = coordinator; + } + + @ProcessElement + public void processElement(ProcessContext c) { + final Coordinator.SqlQueryJob queryJob = c.element(); + if (queryJob == null) { + return; + } + + // submits the resulting pipeline to executor, which submits it to the cluster + coordinator.start(queryJob); + + // TODO are we getting the results here? if so, how? + } +} \ No newline at end of file diff --git a/agents/src/main/java/com/flamestream/optimizer/sql/agents/ExecutorImpl.java b/agents/src/main/java/com/flamestream/optimizer/sql/agents/ExecutorImpl.java new file mode 100644 index 0000000..878c418 --- /dev/null +++ b/agents/src/main/java/com/flamestream/optimizer/sql/agents/ExecutorImpl.java @@ -0,0 +1,29 @@ +package com.flamestream.optimizer.sql.agents; + +import org.apache.beam.runners.flink.FlinkRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.PipelineRunner; +import org.checkerframework.checker.nullness.qual.NonNull; + +import javax.annotation.Nullable; +import java.io.Serializable; +import java.util.function.Consumer; + +public class ExecutorImpl implements Executor, Serializable { + private Pipeline currentPipeline; + + @Override + public void startOrUpdate(Pipeline pipeline, Consumer statusConsumer) { + currentPipeline = pipeline; + + final PipelineRunner<@NonNull PipelineResult> runner = FlinkRunner.fromOptions(pipeline.getOptions()); + runner.run(pipeline); + } + + @Nullable + @Override + public Pipeline current() { + return currentPipeline; + } +} diff --git a/agents/src/main/java/com/flamestream/optimizer/sql/agents/UserSource.java b/agents/src/main/java/com/flamestream/optimizer/sql/agents/UserSource.java new file mode 100644 index 0000000..25dc1c1 --- /dev/null +++ b/agents/src/main/java/com/flamestream/optimizer/sql/agents/UserSource.java @@ -0,0 +1,24 @@ +package com.flamestream.optimizer.sql.agents; + +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.values.Row; +import org.checkerframework.checker.nullness.qual.NonNull; + +class UserSource { + private final String tag; + // TODO should there be any limitations on CheckpointT? should UserSource be a generic class? + private final UnboundedSource source; + + public UserSource(String tag, UnboundedSource source) { + this.tag = tag; + this.source = source; + } + + public String getTag() { + return tag; + } + + public UnboundedSource getSource() { + return source; + } +}