Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Add ssl listeners in channelInitializer #45

Merged
merged 7 commits into from
Nov 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,60 @@ bin/java-consumer-demo.sh
```
bin/java-producer-demo.sh
```

#### SSL Connection

KOP support Kafka listeners config of type "PLAINTEXT" and "SSL".
You could set config like `listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093`.
Please reference [Kafka SSL document](https://kafka.apache.org/documentation/#security_ssl) for how to config SSL keys.
Here is some steps that you need to be able to connect KOP through SSL.

1. create SSL related Keys.

Here is an example of a bash script to create related CA and jks files.
```access transformers
#!/bin/bash
#Step 1
keytool -keystore server.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey
#Step 2
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
#Step 3
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
```

2. config KOP Broker.

In configration file, e.g. [`kop_standalone.conf`](https://github.com/streamnative/kop/blob/master/conf/kop_standalone.conf),
Add related configurations that using the jks configs that create in step1:
```access transformers
listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093

kopSslKeystoreLocation=/Users/kop/server.keystore.jks
kopSslKeystorePassword=test1234
kopSslKeyPassword=test1234
kopSslTruststoreLocation=/Users/kop/server.truststore.jks
kopSslTruststorePassword=test1234
```

3. config kafka clients

This is similar to [Kafka client config doc](https://kafka.apache.org/documentation/#security_configclients).

Prepare a file named `client-ssl.properties`, which contains:
```
security.protocol=SSL
ssl.truststore.location=client.truststore.jks
ssl.truststore.password=test1234
ssl.endpoint.identification.algorithm=
```

And verify us console-producer and console-consumer:
```access transformers
kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.properties
kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties
```
2 changes: 1 addition & 1 deletion conf/kop_standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

### --- Kafka broker settings --- ###

kafkaServicePort=9092

enableGroupCoordinator=true

messagingProtocols=kafka

listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
### --- General broker settings --- ###

# Zookeeper quorum connection string
Expand Down
24 changes: 20 additions & 4 deletions src/main/java/io/streamnative/kop/KafkaChannelInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,25 @@
*/
package io.streamnative.kop;

import static io.streamnative.kop.KafkaProtocolHandler.TLS_HANDLER;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.ssl.SslHandler;
import io.streamnative.kop.coordinator.group.GroupCoordinator;
import io.streamnative.kop.utils.ssl.SSLUtils;
import lombok.Getter;
import org.apache.pulsar.broker.PulsarService;
import org.eclipse.jetty.util.ssl.SslContextFactory;

/**
* A channel initializer that initialize channels for kafka protocol.
*/
public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {

static final int MAX_FRAME_LENGTH = 100 * 1024 * 1024; // 100MB
public static final int MAX_FRAME_LENGTH = 100 * 1024 * 1024; // 100MB

@Getter
private final PulsarService pulsarService;
Expand All @@ -36,9 +41,10 @@ public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
private final KafkaTopicManager kafkaTopicManager;
@Getter
private final GroupCoordinator groupCoordinator;
// TODO: handle TLS -- https://github.com/streamnative/kop/issues/2
// can turn into get this config from kafkaConfig.
@Getter
private final boolean enableTls;
@Getter
private final SslContextFactory sslContextFactory;

public KafkaChannelInitializer(PulsarService pulsarService,
KafkaServiceConfiguration kafkaConfig,
Expand All @@ -51,14 +57,24 @@ public KafkaChannelInitializer(PulsarService pulsarService,
this.kafkaTopicManager = kafkaTopicManager;
this.groupCoordinator = groupCoordinator;
this.enableTls = enableTLS;

if (enableTls) {
sslContextFactory = SSLUtils.createSslContextFactory(kafkaConfig);
} else {
sslContextFactory = null;
}
}

@Override
protected void initChannel(SocketChannel ch) throws Exception {
if (this.enableTls) {
ch.pipeline().addLast(TLS_HANDLER, new SslHandler(SSLUtils.createSslEngine(sslContextFactory)));
}
ch.pipeline().addLast(new LengthFieldPrepender(4));
ch.pipeline().addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
ch.pipeline().addLast("handler",
new KafkaRequestHandler(pulsarService, kafkaConfig, kafkaTopicManager, groupCoordinator));
new KafkaRequestHandler(pulsarService, kafkaConfig, kafkaTopicManager, groupCoordinator, enableTls));
}

}
105 changes: 92 additions & 13 deletions src/main/java/io/streamnative/kop/KafkaProtocolHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
Expand All @@ -37,6 +36,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.protocol.ProtocolHandler;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand All @@ -58,6 +58,19 @@
public class KafkaProtocolHandler implements ProtocolHandler {

public static final String PROTOCOL_NAME = "kafka";
public static final String SSL_PREFIX = "SSL://";
public static final String PLAINTEXT_PREFIX = "PLAINTEXT://";
public static final String LISTENER_DEL = ",";
public static final String TLS_HANDLER = "tls";
public static final String LISTENER_PATTEN = "^(PLAINTEXT?|SSL)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-0-9+]";

/**
* Kafka Listener Type.
*/
public enum ListenerType {
PLAINTEXT,
SSL
}

@Getter
private KafkaServiceConfiguration kafkaConfig;
Expand All @@ -67,6 +80,9 @@ public class KafkaProtocolHandler implements ProtocolHandler {
private KafkaTopicManager kafkaTopicManager;
@Getter
private GroupCoordinator groupCoordinator;
@Getter
private String bindAddress;


@Override
public String protocolName() {
Expand All @@ -88,12 +104,16 @@ public void initialize(ServiceConfiguration conf) throws Exception {
// when loaded with PulsarService as NAR, `conf` will be type of ServiceConfiguration
kafkaConfig = ConfigurationUtils.create(conf.getProperties(), KafkaServiceConfiguration.class);
}
this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(kafkaConfig.getBindAddress());
}

// This method is called after initialize
@Override
public String getProtocolDataToAdvertise() {
// TODO: support data register, when do https://github.com/streamnative/kop/issues/2
return "mock-data-for-kafka";
if (log.isDebugEnabled()) {
log.debug("Get configured listeners", kafkaConfig.getListeners());
}
return kafkaConfig.getListeners();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

if (log.isDebugEnabled()) {
    log.debug("Get configured listeners", kafkaConfig.getListeners());
}

}

@Override
Expand All @@ -114,30 +134,50 @@ public void start(BrokerService service) {
}
}

// this is called after init, and with kafkaTopicManager, kafkaConfig, brokerService all set.
// this is called after initialize, and with kafkaTopicManager, kafkaConfig, brokerService all set.
@Override
public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() {
checkState(kafkaConfig != null);
checkState(kafkaConfig.getListeners() != null);
checkState(brokerService != null);
checkState(kafkaTopicManager != null);
if (kafkaConfig.isEnableGroupCoordinator()) {
checkState(groupCoordinator != null);
}

Optional<Integer> port = kafkaConfig.getKafkaServicePort();
InetSocketAddress addr = new InetSocketAddress(brokerService.pulsar().getBindAddress(), port.get());
String listeners = kafkaConfig.getListeners();
String[] parts = listeners.split(LISTENER_DEL);

try {
Map<InetSocketAddress, ChannelInitializer<SocketChannel>> initializerMap =
ImmutableMap.<InetSocketAddress, ChannelInitializer<SocketChannel>>builder()
.put(addr,
(new KafkaChannelInitializer(brokerService.pulsar(),
ImmutableMap.Builder<InetSocketAddress, ChannelInitializer<SocketChannel>> builder =
ImmutableMap.<InetSocketAddress, ChannelInitializer<SocketChannel>>builder();

for (String listener: parts) {
if (listener.startsWith(PLAINTEXT_PREFIX)) {
builder.put(
// TODO: consider using the address in the listener as the bind address.
// https://github.com/streamnative/kop/issues/46
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use the address in the listener as the bind address?

new KafkaChannelInitializer(brokerService.pulsar(),
kafkaConfig,
kafkaTopicManager,
groupCoordinator,
false));
} else if (listener.startsWith(SSL_PREFIX)) {
builder.put(
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
new KafkaChannelInitializer(brokerService.pulsar(),
kafkaConfig,
kafkaTopicManager,
groupCoordinator,
false)))
.build();
return initializerMap;
true));
} else {
log.error("Kafka listener {} not supported. supports {} and {}",
listener, PLAINTEXT_PREFIX, SSL_PREFIX);
}
}

return builder.build();
} catch (Exception e){
log.error("KafkaProtocolHandler newChannelInitializers failed with", e);
return null;
Expand Down Expand Up @@ -256,4 +296,43 @@ private String createKafkaOffsetsTopic(BrokerService service) throws PulsarServe

return offsetsTopic;
}

public static int getListenerPort(String listener) {
checkState(listener.matches(LISTENER_PATTEN), "listener not match patten");

int lastIndex = listener.lastIndexOf(':');
return Integer.parseInt(listener.substring(lastIndex + 1));
}

public static int getListenerPort(String listeners, ListenerType type) {
String[] parts = listeners.split(LISTENER_DEL);

for (String listener: parts) {
if (type == ListenerType.PLAINTEXT && listener.startsWith(PLAINTEXT_PREFIX)) {
return getListenerPort(listener);
}
if (type == ListenerType.SSL && listener.startsWith(SSL_PREFIX)) {
return getListenerPort(listener);
}
}

log.error("KafkaProtocolHandler listeners {} not contains type {}", listeners, type);
return -1;
}

public static String getBrokerUrl(String listeners, Boolean tlsEnabled) {
String[] parts = listeners.split(LISTENER_DEL);

for (String listener: parts) {
if (tlsEnabled && listener.startsWith(SSL_PREFIX)) {
return listener;
}
if (!tlsEnabled && listener.startsWith(PLAINTEXT_PREFIX)) {
return listener;
}
}

log.error("listener {} not contains a valid SSL or PLAINTEXT address", listeners);
return null;
}
}
Loading