Skip to content

Commit

Permalink
MINOR: cleanup SinkNode generics (#18975)
Browse files Browse the repository at this point in the history
Reviewers: Andrew Schofield <[email protected]>, Bill Bejeck <[email protected]>
  • Loading branch information
mjsax authored Feb 21, 2025
1 parent 709bfc5 commit acea35d
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
*/
public class WrappingNullableUtils {

@SuppressWarnings({"unchecked", "rawtypes"})
private static <T> Deserializer<T> prepareDeserializer(final Deserializer<T> specificDeserializer, final ProcessorContext context, final boolean isKey, final String name) {
@SuppressWarnings({"unchecked", "resource"})
private static <T> Deserializer<T> prepareDeserializer(final Deserializer<T> specificDeserializer, final ProcessorContext<?, ?> context, final boolean isKey) {
final Deserializer<T> deserializerToUse;

if (specificDeserializer == null) {
Expand All @@ -41,8 +41,8 @@ private static <T> Deserializer<T> prepareDeserializer(final Deserializer<T> spe
return deserializerToUse;
}

@SuppressWarnings({"unchecked", "rawtypes"})
private static <T> Serializer<T> prepareSerializer(final Serializer<T> specificSerializer, final ProcessorContext context, final boolean isKey, final String name) {
@SuppressWarnings({"unchecked", "resource"})
private static <T> Serializer<T> prepareSerializer(final Serializer<T> specificSerializer, final ProcessorContext<?, ?> context, final boolean isKey) {
final Serializer<T> serializerToUse;
if (specificSerializer == null) {
serializerToUse = (Serializer<T>) (isKey ? context.keySerde().serializer() : context.valueSerde().serializer());
Expand All @@ -53,7 +53,7 @@ private static <T> Serializer<T> prepareSerializer(final Serializer<T> specificS
return serializerToUse;
}

@SuppressWarnings({"rawtypes", "unchecked"})
@SuppressWarnings("unchecked")
private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde, final SerdeGetter getter, final boolean isKey) {
final Serde<T> serdeToUse;
if (specificSerde == null) {
Expand All @@ -62,25 +62,25 @@ private static <T> Serde<T> prepareSerde(final Serde<T> specificSerde, final Ser
serdeToUse = specificSerde;
}
if (serdeToUse instanceof WrappingNullableSerde) {
((WrappingNullableSerde) serdeToUse).setIfUnset(getter);
((WrappingNullableSerde<?, ?, ?>) serdeToUse).setIfUnset(getter);
}
return serdeToUse;
}

public static <K> Deserializer<K> prepareKeyDeserializer(final Deserializer<K> specificDeserializer, final ProcessorContext<K, ?> context, final String name) {
return prepareDeserializer(specificDeserializer, context, true, name);
public static <K> Deserializer<K> prepareKeyDeserializer(final Deserializer<K> specificDeserializer, final ProcessorContext<K, ?> context) {
return prepareDeserializer(specificDeserializer, context, true);
}

public static <V> Deserializer<V> prepareValueDeserializer(final Deserializer<V> specificDeserializer, final ProcessorContext<?, V> context, final String name) {
return prepareDeserializer(specificDeserializer, context, false, name);
public static <V> Deserializer<V> prepareValueDeserializer(final Deserializer<V> specificDeserializer, final ProcessorContext<?, V> context) {
return prepareDeserializer(specificDeserializer, context, false);
}

public static <K> Serializer<K> prepareKeySerializer(final Serializer<K> specificSerializer, final ProcessorContext<K, ?> context, final String name) {
return prepareSerializer(specificSerializer, context, true, name);
public static <K> Serializer<K> prepareKeySerializer(final Serializer<K> specificSerializer, final ProcessorContext<?, ?> context) {
return prepareSerializer(specificSerializer, context, true);
}

public static <V> Serializer<V> prepareValueSerializer(final Serializer<V> specificSerializer, final ProcessorContext<?, V> context, final String name) {
return prepareSerializer(specificSerializer, context, false, name);
public static <V> Serializer<V> prepareValueSerializer(final Serializer<V> specificSerializer, final ProcessorContext<?, ?> context) {
return prepareSerializer(specificSerializer, context, false);
}

public static <K> Serde<K> prepareKeySerde(final Serde<K> specificSerde, final SerdeGetter getter) {
Expand All @@ -91,17 +91,15 @@ public static <V> Serde<V> prepareValueSerde(final Serde<V> specificSerde, final
return prepareSerde(specificSerde, getter, false);
}

@SuppressWarnings({"rawtypes", "unchecked"})
public static <T> void initNullableSerializer(final Serializer<T> specificSerializer, final SerdeGetter getter) {
if (specificSerializer instanceof WrappingNullableSerializer) {
((WrappingNullableSerializer) specificSerializer).setIfUnset(getter);
((WrappingNullableSerializer<?, ?, ?>) specificSerializer).setIfUnset(getter);
}
}

@SuppressWarnings({"rawtypes", "unchecked"})
public static <T> void initNullableDeserializer(final Deserializer<T> specificDeserializer, final SerdeGetter getter) {
if (specificDeserializer instanceof WrappingNullableDeserializer) {
((WrappingNullableDeserializer) specificDeserializer).setIfUnset(getter);
((WrappingNullableDeserializer<?, ?, ?>) specificDeserializer).setIfUnset(getter);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,18 @@ public void addChild(final ProcessorNode<Void, Void, ?, ?> child) {
throw new UnsupportedOperationException("sink node does not allow addChild");
}

@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public void init(final InternalProcessorContext<Void, Void> context) {
super.init(context);
this.context = context;
try {
keySerializer = prepareKeySerializer(keySerializer, (InternalProcessorContext) context, this.name());
keySerializer = prepareKeySerializer(keySerializer, context);
} catch (ConfigException | StreamsException e) {
throw new StreamsException(String.format("Failed to initialize key serdes for sink node %s", name()), e, context.taskId());
}

try {
valSerializer = prepareValueSerializer(valSerializer, (InternalProcessorContext) context, this.name());
valSerializer = prepareValueSerializer(valSerializer, context);
} catch (final ConfigException | StreamsException e) {
throw new StreamsException(String.format("Failed to initialize value serdes for sink node %s", name()), e, context.taskId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ public void init(final InternalProcessorContext<KIn, VIn> context) {
this.context = context;

try {
keyDeserializer = prepareKeyDeserializer(keyDeserializer, context, name());
keyDeserializer = prepareKeyDeserializer(keyDeserializer, context);
} catch (final ConfigException | StreamsException e) {
throw new StreamsException(String.format("Failed to initialize key serdes for source node %s", name()), e, context.taskId());
}

try {
valDeserializer = prepareValueDeserializer(valDeserializer, context, name());
valDeserializer = prepareValueDeserializer(valDeserializer, context);
} catch (final ConfigException | StreamsException e) {
throw new StreamsException(String.format("Failed to initialize value serdes for source node %s", name()), e, context.taskId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public class SinkNodeTest {
new StaticTopicNameExtractor<>("any-output-topic"), anySerializer, anySerializer, null);

// Used to verify that the correct exceptions are thrown if the compiler checks are bypassed
@SuppressWarnings({"unchecked", "rawtypes"})
private final SinkNode<Object, Object> illTypedSink = (SinkNode) sink;
@SuppressWarnings("unchecked")
private final SinkNode<Object, Object> illTypedSink = (SinkNode<Object, Object>) ((SinkNode<?, ?>) sink);
private MockedStatic<WrappingNullableUtils> utilsMock;

@BeforeEach
Expand Down Expand Up @@ -77,7 +77,7 @@ public void shouldThrowStreamsExceptionOnInputRecordWithInvalidTimestamp() {

@Test
public void shouldThrowStreamsExceptionOnUndefinedKeySerde() {
utilsMock.when(() -> WrappingNullableUtils.prepareKeySerializer(any(), any(), any()))
utilsMock.when(() -> WrappingNullableUtils.prepareKeySerializer(any(), any()))
.thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG"));

final Throwable exception = assertThrows(StreamsException.class, () -> sink.init(context));
Expand All @@ -94,7 +94,7 @@ public void shouldThrowStreamsExceptionOnUndefinedKeySerde() {

@Test
public void shouldThrowStreamsExceptionOnUndefinedValueSerde() {
utilsMock.when(() -> WrappingNullableUtils.prepareValueSerializer(any(), any(), any()))
utilsMock.when(() -> WrappingNullableUtils.prepareValueSerializer(any(), any()))
.thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG"));

final Throwable exception = assertThrows(StreamsException.class, () -> sink.init(context));
Expand All @@ -111,7 +111,7 @@ public void shouldThrowStreamsExceptionOnUndefinedValueSerde() {

@Test
public void shouldThrowStreamsExceptionWithExplicitErrorMessage() {
utilsMock.when(() -> WrappingNullableUtils.prepareKeySerializer(any(), any(), any())).thenThrow(new StreamsException(""));
utilsMock.when(() -> WrappingNullableUtils.prepareKeySerializer(any(), any())).thenThrow(new StreamsException(""));

final Throwable exception = assertThrows(StreamsException.class, () -> sink.init(context));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void shouldThrowStreamsExceptionOnUndefinedKeySerde() {
final SourceNode<String, String> node =
new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer());

utilsMock.when(() -> WrappingNullableUtils.prepareKeyDeserializer(any(), any(), any()))
utilsMock.when(() -> WrappingNullableUtils.prepareKeyDeserializer(any(), any()))
.thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG"));

final Throwable exception = assertThrows(StreamsException.class, () -> node.init(context));
Expand All @@ -159,7 +159,7 @@ public void shouldThrowStreamsExceptionOnUndefinedValueSerde() {
final SourceNode<String, String> node =
new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer());

utilsMock.when(() -> WrappingNullableUtils.prepareValueDeserializer(any(), any(), any()))
utilsMock.when(() -> WrappingNullableUtils.prepareValueDeserializer(any(), any()))
.thenThrow(new ConfigException("Please set StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG"));

final Throwable exception = assertThrows(StreamsException.class, () -> node.init(context));
Expand All @@ -181,7 +181,7 @@ public void shouldThrowStreamsExceptionWithExplicitErrorMessage() {
final SourceNode<String, String> node =
new SourceNode<>(context.currentNode().name(), new TheDeserializer(), new TheDeserializer());

utilsMock.when(() -> WrappingNullableUtils.prepareKeyDeserializer(any(), any(), any())).thenThrow(new StreamsException(""));
utilsMock.when(() -> WrappingNullableUtils.prepareKeyDeserializer(any(), any())).thenThrow(new StreamsException(""));

final Throwable exception = assertThrows(StreamsException.class, () -> node.init(context));

Expand Down

0 comments on commit acea35d

Please sign in to comment.