Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coordinator pipeline draft #7

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion agents/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ dependencies {
implementation 'org.apache.beam:beam-sdks-java-extensions-sql:2.27.0'
implementation 'org.apache.beam:beam-sdks-java-extensions-sql-zetasql:2.27.0'
implementation 'org.apache.beam:beam-sdks-java-test-utils:2.27.0'
implementation 'org.apache.beam:beam-runners-flink-1.11:2.27.0'
implementation 'org.apache.beam:beam-runners-flink-1.10:2.27.0'
implementation 'org.apache.beam:beam-runners-spark:2.27.0'
implementation 'org.apache.beam:beam-sdks-java-io-kafka:2.27.0'
implementation 'io.confluent:kafka-avro-serializer:5.3.2'
Expand All @@ -31,4 +31,12 @@ dependencies {
annotationProcessor 'com.google.auto.value:auto-value:1.6'
testCompile group: 'org.hamcrest', name: 'hamcrest', version: '2.2'
implementation group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.30'
// https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.12.2'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.12.2'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.12.2'
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-joda', version: '2.12.2'
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jdk8', version: '2.12.2'
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.12.2'
implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-jaxb-annotations', version: '2.12.2'
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.flamestream.optimizer.sql.agents;

import org.apache.beam.sdk.extensions.sql.impl.ParseException;
import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.*;
import org.apache.calcite.tools.Planner;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.Row;
import org.checkerframework.checker.nullness.compatqual.NonNullType;
import org.checkerframework.checker.nullness.qual.NonNull;

import java.util.stream.Stream;

interface Coordinator {
Expand All @@ -25,14 +25,14 @@ interface SqlQueryJob {
}

interface QueryJobBuilder {
QueryJobBuilder addOutput(PTransform<PCollection<Row>, PDone> sink);
QueryJobBuilder addOutput(PTransform<@NonNull PCollection<Row>, @NonNull PDone> sink);

QueryJobBuilder setPreHook(PTransform<PCollection<Row>, PCollection<Row>> hook);
QueryJobBuilder setPostHook(PTransform<PCollection<Row>, PCollection<Row>> hook);
QueryJobBuilder setPreHook(PTransform<@NonNull PCollection<Row>, @NonNull PCollection<Row>> hook);
QueryJobBuilder setPostHook(PTransform<@NonNull PCollection<Row>, @NonNull PCollection<Row>> hook);

QueryJobBuilder registerUdf(String functionName, SerializableFunction sfn);
QueryJobBuilder registerUdf(String functionName, SerializableFunction<?, ?> sfn);
QueryJobBuilder registerUdf(String functionName, Class<?> clazz, String method);
QueryJobBuilder registerUdaf(String functionName, Combine.CombineFn combineFn);
QueryJobBuilder registerUdf(String functionName, Combine.CombineFn<?, ?, ?> combineFn);

SqlQueryJob build(String query);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.flamestream.optimizer.sql.agents;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.calcite.tools.Planner;
import org.checkerframework.checker.nullness.qual.NonNull;

import java.util.Collection;

public class CoordinatorExecutorPipeline {
public static void fromUserQuerySource(
final @NonNull Pipeline pipeline,
final Planner planner, // could possibly be removed in the near future
final QueryPlanner queryPlanner,
final CostEstimator costEstimator,
final @NonNull PTransform<@NonNull ? super PBegin, @NonNull PCollection<Coordinator.SqlQueryJob>> userQuerySource,
final @NonNull Collection<UserSource> inputs) {

final PCollection<Coordinator.SqlQueryJob> queries = pipeline.apply("ReadQuery", userQuerySource);

final Executor executor = new ExecutorImpl();
final Coordinator coordinator = new CoordinatorImpl(planner, queryPlanner, costEstimator, executor);
for (UserSource input : inputs) {
coordinator.registerInput(input.getTag(), input.getSource());
}

queries.apply("SubmitToCoordinator", ParDo.of(new CoordinatorExecutorDoFn(coordinator)));
}
}

// TODO should it return the result or simply send the pipeline to the cluster? I would expect the result
class CoordinatorExecutorDoFn extends DoFn<Coordinator.SqlQueryJob, Void> {
private final Coordinator coordinator;

public CoordinatorExecutorDoFn(@NonNull Coordinator coordinator) {
this.coordinator = coordinator;
}

@ProcessElement
public void processElement(ProcessContext c) {
final Coordinator.SqlQueryJob queryJob = c.element();
if (queryJob == null) {
return;
}

// submits the resulting pipeline to executor, which submits it to the cluster
coordinator.start(queryJob);

// TODO are we getting the results here? if so, how?
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.flamestream.optimizer.sql.agents;

import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.checkerframework.checker.nullness.qual.NonNull;

import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.function.Consumer;

public class ExecutorImpl implements Executor, Serializable {
private Pipeline currentPipeline;

@Override
public void startOrUpdate(Pipeline pipeline, Consumer<ChangingStatus> statusConsumer) {
currentPipeline = pipeline;

final PipelineRunner<@NonNull PipelineResult> runner = FlinkRunner.fromOptions(pipeline.getOptions());
runner.run(pipeline);
}

@Nullable
@Override
public Pipeline current() {
return currentPipeline;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.flamestream.optimizer.sql.agents;

import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.values.Row;
import org.checkerframework.checker.nullness.qual.NonNull;

class UserSource {
private final String tag;
// TODO should there be any limitations on CheckpointT? should UserSource be a generic class?
private final UnboundedSource<Row, @NonNull ?> source;

public UserSource(String tag, UnboundedSource<Row, @NonNull ?> source) {
this.tag = tag;
this.source = source;
}

public String getTag() {
return tag;
}

public UnboundedSource<Row, @NonNull ?> getSource() {
return source;
}
}