Skip to content

Commit

Permalink
Proxy and SchemaRegistry: allow to run on a only-TLS Pulsar cluster (s…
Browse files Browse the repository at this point in the history
…treamnative#15)

Contents:

- Clean up and factorise the code to bootstrap TLS endpoints (Broker/Proxy, SchemaRegistry on Broker and Proxy)
- Allow to use Pulsar Broker configuration for TLS even with KOP (keep legacy KOP custom configuration)
- Allow to bind the Broker SchemaRegistry on TLS
- Allow the proxy to work with brokerServiceURLTLS (and without brokerServiceURL)
- Implement support for tlsHostnameVerificationEnabled
- Add Docker tests with TLS
  • Loading branch information
eolivelli authored Jun 10, 2022
1 parent cb659ab commit df957c1
Show file tree
Hide file tree
Showing 57 changed files with 1,504 additions and 1,571 deletions.
52 changes: 52 additions & 0 deletions .github/workflows/ci-cancel-duplicate-workflows.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: CI - Cancel duplicate workflows
on:
workflow_run:
# this could be any workflow that is always executed by PUSH/PR operation
workflows: ["kop tests"]
types: ['requested']

jobs:

cancel-workflow-runs:
runs-on: ubuntu-20.04
steps:
# the potiuk/cancel-workflow-run action has been allow-listed by
# the Apache Infrastructure
- name: cancel duplicate pr-docker-tests.yml
uses: potiuk/cancel-workflow-runs@953e057dc81d3458935a18d1184c386b0f6b5738
with:
token: ${{ secrets.GITHUB_TOKEN }}
cancelMode: allDuplicates
workflowFileName: pr-docker-tests.yml
- name: cancel duplicate pr-impl-test.yml
uses: potiuk/cancel-workflow-runs@953e057dc81d3458935a18d1184c386b0f6b5738
with:
token: ${{ secrets.GITHUB_TOKEN }}
cancelMode: allDuplicates
workflowFileName: pr-impl-test.yml
- name: cancel duplicate pr-tests.yml
uses: potiuk/cancel-workflow-runs@953e057dc81d3458935a18d1184c386b0f6b5738
with:
token: ${{ secrets.GITHUB_TOKEN }}
cancelMode: allDuplicates
workflowFileName: pr-tests.yaml

46 changes: 46 additions & 0 deletions .github/workflows/pr-docker-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: docker tests

on:
push:
branches:
- 2.10_ds
pull_request:


jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v1
- name: Set up JDK 11
uses: actions/setup-java@v1
with:
java-version: 11

- name: Start and init the oauth server
run: ./ci/init_hydra_oauth_server.sh
timeout-minutes: 5

- name: Build with Maven skipTests
run: mvn clean checkstyle:check install -ntp -B -DskipTests

- name: tests module
run: mvn test -ntp -B -DfailIfNoTests=false '-Dtest=DockerTest' -pl tests
timeout-minutes: 120

- name: package surefire artifacts
if: failure()
run: |
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts
- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
with:
name: surefire-artifacts
path: artifacts.zip
2 changes: 1 addition & 1 deletion .github/workflows/pr-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
run: mvn clean checkstyle:check install -ntp -B -DskipTests

- name: tests module
run: mvn test -ntp -B -DfailIfNoTests=false '-Dtest=!KafkaIntegration*Test' -pl tests
run: mvn test -ntp -B -DfailIfNoTests=false '-Dtest=!Docker*Test' -pl tests
timeout-minutes: 120

- name: package surefire artifacts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,11 @@
*/
package io.streamnative.pulsar.handlers.kop;

import static io.streamnative.pulsar.handlers.kop.KafkaProtocolHandler.TLS_HANDLER;

import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.streamnative.pulsar.handlers.kop.format.SchemaManager;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;
Expand All @@ -31,7 +28,6 @@
import lombok.Getter;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.broker.PulsarService;
import org.eclipse.jetty.util.ssl.SslContextFactory;

/**
* A channel initializer that initialize channels for kafka protocol.
Expand Down Expand Up @@ -60,8 +56,7 @@ public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
@Getter
private final EndPoint advertisedEndPoint;
private final boolean skipMessagesWithoutIndex;
@Getter
private final SslContextFactory.Server sslContextFactory;
private SSLUtils.ServerSideTLSSupport tlsSupport;
@Getter
private final RequestStats requestStats;
private final OrderedScheduler sendResponseScheduler;
Expand Down Expand Up @@ -94,9 +89,9 @@ public KafkaChannelInitializer(PulsarService pulsarService,
this.skipMessagesWithoutIndex = skipMessagesWithoutIndex;
this.requestStats = requestStats;
if (enableTls) {
sslContextFactory = SSLUtils.createSslContextFactory(kafkaConfig);
tlsSupport = new SSLUtils.ServerSideTLSSupport(kafkaConfig);
} else {
sslContextFactory = null;
tlsSupport = null;
}
this.sendResponseScheduler = sendResponseScheduler;
this.kafkaTopicManagerSharedState = kafkaTopicManagerSharedState;
Expand All @@ -110,8 +105,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
kafkaConfig.getConnectionMaxIdleMs(),
0,
TimeUnit.MILLISECONDS));
if (this.enableTls) {
ch.pipeline().addLast(TLS_HANDLER, new SslHandler(SSLUtils.createSslEngine(sslContextFactory)));
if (tlsSupport != null) {
tlsSupport.addTlsHandler(ch);
}
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast("frameDecoder",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.group.GroupCoordinator;
Expand All @@ -38,12 +39,14 @@
import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
import io.streamnative.pulsar.handlers.kop.utils.ssl.SSLUtils;
import io.streamnative.pulsar.handlers.kop.utils.timer.SystemTimer;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import lombok.Getter;
import lombok.NonNull;
Expand Down Expand Up @@ -448,7 +451,15 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
forEach((listener, endPoint) ->
builder.put(endPoint.getInetAddress(), newKafkaChannelInitializer(endPoint))
);
Optional<SchemaRegistryChannelInitializer> schemaRegistryChannelInitializer = schemaRegistryManager.build();
Consumer<ChannelPipeline> tlsConfigurator = null;
if (kafkaConfig.isKopSchemaRegistryProxyEnableTls()) {
SSLUtils.ServerSideTLSSupport tlsSupport = new SSLUtils.ServerSideTLSSupport(kafkaConfig);
tlsConfigurator = (ChannelPipeline pipeline) ->{
tlsSupport.addTlsHandler((SocketChannel) pipeline.channel());
};
}
Optional<SchemaRegistryChannelInitializer> schemaRegistryChannelInitializer =
schemaRegistryManager.build(tlsConfigurator);
if (schemaRegistryChannelInitializer.isPresent()) {
builder.put(schemaRegistryManager.getAddress(), schemaRegistryChannelInitializer.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,12 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
)
private boolean kopTlsEnabledWithBroker = false;

@FieldContext(
category = CATEGORY_KOP_SSL,
doc = "Enable hostname verification while connecting to other brokers"
)
private boolean tlsHostnameVerificationEnabled = false;

@FieldContext(
category = CATEGORY_KOP_SSL,
doc = "Kafka ssl configuration map with: SSL_KEYSTORE_LOCATION_CONFIG = \"ssl.keystore.location\""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.kop;

import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
Expand All @@ -39,6 +40,7 @@
import java.util.Base64;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import javax.naming.AuthenticationException;
import lombok.AllArgsConstructor;
import lombok.Getter;
Expand Down Expand Up @@ -199,7 +201,8 @@ public InetSocketAddress getAddress() {
return new InetSocketAddress(kafkaConfig.getKopSchemaRegistryPort());
}

public Optional<SchemaRegistryChannelInitializer> build() throws Exception {
public Optional<SchemaRegistryChannelInitializer> build(Consumer<ChannelPipeline> tlsConfigurator)
throws Exception {
if (!kafkaConfig.isKopSchemaRegistryEnable()) {
return Optional.empty();
}
Expand Down Expand Up @@ -230,7 +233,7 @@ public Optional<SchemaRegistryChannelInitializer> build() throws Exception {
new CompatibilityResource(schemaStorage, schemaRegistryRequestAuthenticator).register(handler);
handler.addProcessor(new DummyOptionsCORSProcessor());

return Optional.of(new SchemaRegistryChannelInitializer(handler));
return Optional.of(new SchemaRegistryChannelInitializer(handler, tlsConfigurator));
}

public void close() {
Expand Down
Loading

0 comments on commit df957c1

Please sign in to comment.