Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36324] Harden runtime error cases for table builtin functions #26190

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.flink.table.planner.functions;

import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.legacy.table.connector.source.SourceFunctionProvider;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
Expand All @@ -38,27 +40,30 @@
import org.apache.flink.table.operations.AggregateQueryOperation;
import org.apache.flink.table.operations.ProjectQueryOperation;
import org.apache.flink.table.planner.factories.TableFactoryHarness;
import org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.TestCase;
import org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.TestCaseWithClusterClient;
import org.apache.flink.table.types.DataType;
import org.apache.flink.test.junit5.InjectMiniCluster;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;

import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -86,14 +91,15 @@ abstract class BuiltInAggregateFunctionTestBase {

abstract Stream<TestSpec> getTestCaseSpecs();

final Stream<BuiltInFunctionTestBase.TestCase> getTestCases() {
final Stream<TestCase> getTestCases() {
return this.getTestCaseSpecs().flatMap(TestSpec::getTestCases);
}

@ParameterizedTest
@MethodSource("getTestCases")
final void test(BuiltInFunctionTestBase.TestCase testCase) throws Throwable {
testCase.execute();
final void test(TestCase testCase, @InjectMiniCluster MiniCluster miniCluster)
throws Throwable {
testCase.execute(new MiniClusterClient(miniCluster.getConfiguration(), miniCluster));
}

protected static Table asTable(TableEnvironment tEnv, DataType sourceRowType, List<Row> rows) {
Expand Down Expand Up @@ -308,8 +314,9 @@ TestSpec testSqlRuntimeError(
return this;
}

private Executable createTestItemExecutable(TestItem testItem, String stateBackend) {
return () -> {
private TestCaseWithClusterClient createTestItemExecutable(
TestItem testItem, String stateBackend) {
return (clusterClient) -> {
Configuration conf = new Configuration();
conf.set(StateBackendOptions.STATE_BACKEND, stateBackend);
final TableEnvironment tEnv =
Expand All @@ -320,23 +327,23 @@ private Executable createTestItemExecutable(TestItem testItem, String stateBacke
.build());
final Table sourceTable = asTable(tEnv, sourceRowType, sourceRows);

testItem.execute(tEnv, sourceTable);
testItem.execute(tEnv, sourceTable, clusterClient);
};
}

Stream<BuiltInFunctionTestBase.TestCase> getTestCases() {
Stream<TestCase> getTestCases() {
return Stream.concat(
testItems.stream()
.map(
testItem ->
new BuiltInFunctionTestBase.TestCase(
new TestCase(
testItem.toString(),
createTestItemExecutable(
testItem, HASHMAP_STATE_BACKEND_NAME))),
testItems.stream()
.map(
testItem ->
new BuiltInFunctionTestBase.TestCase(
new TestCase(
testItem.toString(),
createTestItemExecutable(
testItem,
Expand All @@ -362,7 +369,7 @@ public String toString() {
// ---------------------------------------------------------------------------------------------

private interface TestItem {
void execute(TableEnvironment tEnv, Table sourceTable);
void execute(TableEnvironment tEnv, Table sourceTable, MiniClusterClient clusterClient);
}

// ---------------------------------------------------------------------------------------------
Expand All @@ -377,7 +384,8 @@ public SuccessItem(@Nullable DataType expectedRowType, @Nullable List<Row> expec
}

@Override
public void execute(TableEnvironment tEnv, Table sourceTable) {
public void execute(
TableEnvironment tEnv, Table sourceTable, MiniClusterClient clusterClient) {
final TableResult tableResult = getResult(tEnv, sourceTable);

if (expectedRowType != null) {
Expand Down Expand Up @@ -495,7 +503,6 @@ protected TableResult getResult(TableEnvironment tEnv, Table sourceTable) {
return tEnv.sqlQuery(stringBuilder.toString()).execute();
}

@Nonnull
private static List<ResolvedExpression> recreateSelectList(
AggregateQueryOperation aggQueryOperation,
ProjectQueryOperation projectQueryOperation) {
Expand Down Expand Up @@ -579,7 +586,8 @@ Consumer<? super Throwable> errorMatcher() {
}

@Override
public void execute(TableEnvironment tEnv, Table sourceTable) {
public void execute(
TableEnvironment tEnv, Table sourceTable, MiniClusterClient clusterClient) {
AtomicReference<TableResult> tableResult = new AtomicReference<>();

Throwable t =
Expand All @@ -604,7 +612,22 @@ public void execute(TableEnvironment tEnv, Table sourceTable) {
.containsExactlyElementsOf(getFieldDataTypes(expectedRowType));
}

assertThatThrownBy(() -> tableResult.get().await())
assertThatThrownBy(
() -> {
final TableResult result = tableResult.get();
result.await();
final Optional<SerializedThrowable> serializedThrowable =
clusterClient
.requestJobResult(
result.getJobClient().get().getJobID())
.get()
.getSerializedThrowable();
if (serializedThrowable.isPresent()) {
throw serializedThrowable
.get()
.deserializeError(getClass().getClassLoader());
}
})
.isNotNull()
.satisfies(this.errorMatcher());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.flink.table.planner.functions;

import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
Expand All @@ -34,14 +36,15 @@
import org.apache.flink.table.operations.ProjectQueryOperation;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.test.junit5.InjectMiniCluster;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;

import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -51,6 +54,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -95,28 +99,32 @@ private Stream<TestCase> getTestCases() {

@ParameterizedTest
@MethodSource("getTestCases")
final void test(TestCase testCase) throws Throwable {
testCase.execute();
final void test(TestCase testCase, @InjectMiniCluster MiniCluster miniCluster)
throws Throwable {
testCase.execute(new MiniClusterClient(miniCluster.getConfiguration(), miniCluster));
}

// --------------------------------------------------------------------------------------------
// Test model
// --------------------------------------------------------------------------------------------

interface TestCaseWithClusterClient {
void execute(MiniClusterClient clusterClient) throws Throwable;
}

/** Single test case. */
static class TestCase implements Executable {
static class TestCase implements TestCaseWithClusterClient {

private final String name;
private final Executable executable;
private final TestCaseWithClusterClient executable;

TestCase(String name, Executable executable) {
TestCase(String name, TestCaseWithClusterClient executable) {
this.name = name;
this.executable = executable;
}

@Override
public void execute() throws Throwable {
this.executable.execute();
public void execute(MiniClusterClient clusterClient) throws Throwable {
this.executable.execute(clusterClient);
}

@Override
Expand Down Expand Up @@ -324,7 +332,7 @@ Stream<TestCase> getTestCases(Configuration configuration) {
private TestCase getTestCase(Configuration configuration, TestItem testItem) {
return new TestCase(
testItem.toString(),
() -> {
(clusterClient) -> {
final TableEnvironmentInternal env =
(TableEnvironmentInternal)
TableEnvironment.create(
Expand Down Expand Up @@ -353,7 +361,7 @@ private TestCase getTestCase(Configuration configuration, TestItem testItem) {
inputTable = env.fromValues(DataTypes.ROW(fields), Row.of(fieldData));
}

testItem.test(env, inputTable);
testItem.test(env, inputTable, clusterClient);
});
}

Expand All @@ -370,7 +378,11 @@ private interface TestItem {
* @param inputTable The input table of this test that contains input data and data type. If
* it is null, the test is not dependent on the input data.
*/
void test(TableEnvironmentInternal env, @Nullable Table inputTable) throws Exception;
void test(
TableEnvironmentInternal env,
@Nullable Table inputTable,
MiniClusterClient clusterClient)
throws Exception;
}

private abstract static class ResultTestItem<T> implements TestItem {
Expand All @@ -387,7 +399,10 @@ private abstract static class ResultTestItem<T> implements TestItem {
abstract Table query(TableEnvironment env, @Nullable Table inputTable);

@Override
public void test(TableEnvironmentInternal env, @Nullable Table inputTable)
public void test(
TableEnvironmentInternal env,
@Nullable Table inputTable,
MiniClusterClient clusterClient)
throws Exception {
final Table resultTable = this.query(env, inputTable);

Expand Down Expand Up @@ -459,7 +474,10 @@ Consumer<? super Throwable> errorMatcher() {
}

@Override
public void test(TableEnvironmentInternal env, @Nullable Table inputTable) {
public void test(
TableEnvironmentInternal env,
@Nullable Table inputTable,
MiniClusterClient clusterClient) {
AtomicReference<TableResult> tableResult = new AtomicReference<>();

Throwable t =
Expand All @@ -475,7 +493,22 @@ public void test(TableEnvironmentInternal env, @Nullable Table inputTable) {
assertThat(t).as("Error while validating the query").isNull();
}

assertThatThrownBy(() -> tableResult.get().await())
assertThatThrownBy(
() -> {
final TableResult result = tableResult.get();
result.await();
final Optional<SerializedThrowable> serializedThrowable =
clusterClient
.requestJobResult(
result.getJobClient().get().getJobID())
.get()
.getSerializedThrowable();
if (serializedThrowable.isPresent()) {
throw serializedThrowable
.get()
.deserializeError(getClass().getClassLoader());
}
})
.isNotNull()
.satisfies(this.errorMatcher());
}
Expand Down