This repository has been archived by the owner on Jan 24, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 139
Add ssl listeners in channelInitializer #45
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
259cbeb
add ssl listeners and sslCtxRefresher in channelInitializer
jiazhai f966443
fix typo prefix
jiazhai fd09f4b
pass test
jiazhai a535f33
pass ut
jiazhai 027ea77
fix check
jiazhai 3332486
change following sijie's comments
jiazhai 63bde88
fix check
jiazhai File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,6 @@ | |
import java.net.InetSocketAddress; | ||
import java.nio.ByteBuffer; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.TimeUnit; | ||
import lombok.Getter; | ||
|
@@ -37,6 +36,7 @@ | |
import org.apache.kafka.common.utils.Time; | ||
import org.apache.pulsar.broker.PulsarServerException; | ||
import org.apache.pulsar.broker.ServiceConfiguration; | ||
import org.apache.pulsar.broker.ServiceConfigurationUtils; | ||
import org.apache.pulsar.broker.protocol.ProtocolHandler; | ||
import org.apache.pulsar.broker.service.BrokerService; | ||
import org.apache.pulsar.client.admin.PulsarAdmin; | ||
|
@@ -58,6 +58,19 @@ | |
public class KafkaProtocolHandler implements ProtocolHandler { | ||
|
||
public static final String PROTOCOL_NAME = "kafka"; | ||
public static final String SSL_PREFIX = "SSL://"; | ||
public static final String PLAINTEXT_PREFIX = "PLAINTEXT://"; | ||
public static final String LISTENER_DEL = ","; | ||
public static final String TLS_HANDLER = "tls"; | ||
public static final String LISTENER_PATTEN = "^(PLAINTEXT?|SSL)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-0-9+]"; | ||
|
||
/** | ||
* Kafka Listener Type. | ||
*/ | ||
public enum ListenerType { | ||
PLAINTEXT, | ||
SSL | ||
} | ||
|
||
@Getter | ||
private KafkaServiceConfiguration kafkaConfig; | ||
|
@@ -67,6 +80,9 @@ public class KafkaProtocolHandler implements ProtocolHandler { | |
private KafkaTopicManager kafkaTopicManager; | ||
@Getter | ||
private GroupCoordinator groupCoordinator; | ||
@Getter | ||
private String bindAddress; | ||
|
||
|
||
@Override | ||
public String protocolName() { | ||
|
@@ -88,12 +104,16 @@ public void initialize(ServiceConfiguration conf) throws Exception { | |
// when loaded with PulsarService as NAR, `conf` will be type of ServiceConfiguration | ||
kafkaConfig = ConfigurationUtils.create(conf.getProperties(), KafkaServiceConfiguration.class); | ||
} | ||
this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(kafkaConfig.getBindAddress()); | ||
} | ||
|
||
// This method is called after initialize | ||
@Override | ||
public String getProtocolDataToAdvertise() { | ||
// TODO: support data register, when do https://github.com/streamnative/kop/issues/2 | ||
return "mock-data-for-kafka"; | ||
if (log.isDebugEnabled()) { | ||
log.debug("Get configured listeners", kafkaConfig.getListeners()); | ||
} | ||
return kafkaConfig.getListeners(); | ||
} | ||
|
||
@Override | ||
|
@@ -114,30 +134,50 @@ public void start(BrokerService service) { | |
} | ||
} | ||
|
||
// this is called after init, and with kafkaTopicManager, kafkaConfig, brokerService all set. | ||
// this is called after initialize, and with kafkaTopicManager, kafkaConfig, brokerService all set. | ||
@Override | ||
public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelInitializers() { | ||
checkState(kafkaConfig != null); | ||
checkState(kafkaConfig.getListeners() != null); | ||
checkState(brokerService != null); | ||
checkState(kafkaTopicManager != null); | ||
if (kafkaConfig.isEnableGroupCoordinator()) { | ||
checkState(groupCoordinator != null); | ||
} | ||
|
||
Optional<Integer> port = kafkaConfig.getKafkaServicePort(); | ||
InetSocketAddress addr = new InetSocketAddress(brokerService.pulsar().getBindAddress(), port.get()); | ||
String listeners = kafkaConfig.getListeners(); | ||
String[] parts = listeners.split(LISTENER_DEL); | ||
|
||
try { | ||
Map<InetSocketAddress, ChannelInitializer<SocketChannel>> initializerMap = | ||
ImmutableMap.<InetSocketAddress, ChannelInitializer<SocketChannel>>builder() | ||
.put(addr, | ||
(new KafkaChannelInitializer(brokerService.pulsar(), | ||
ImmutableMap.Builder<InetSocketAddress, ChannelInitializer<SocketChannel>> builder = | ||
ImmutableMap.<InetSocketAddress, ChannelInitializer<SocketChannel>>builder(); | ||
|
||
for (String listener: parts) { | ||
if (listener.startsWith(PLAINTEXT_PREFIX)) { | ||
builder.put( | ||
// TODO: consider using the address in the listener as the bind address. | ||
// https://github.com/streamnative/kop/issues/46 | ||
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we use the address in the listener as the bind address? |
||
new KafkaChannelInitializer(brokerService.pulsar(), | ||
kafkaConfig, | ||
kafkaTopicManager, | ||
groupCoordinator, | ||
false)); | ||
} else if (listener.startsWith(SSL_PREFIX)) { | ||
builder.put( | ||
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)), | ||
new KafkaChannelInitializer(brokerService.pulsar(), | ||
kafkaConfig, | ||
kafkaTopicManager, | ||
groupCoordinator, | ||
false))) | ||
.build(); | ||
return initializerMap; | ||
true)); | ||
} else { | ||
log.error("Kafka listener {} not supported. supports {} and {}", | ||
listener, PLAINTEXT_PREFIX, SSL_PREFIX); | ||
} | ||
} | ||
|
||
return builder.build(); | ||
} catch (Exception e){ | ||
log.error("KafkaProtocolHandler newChannelInitializers failed with", e); | ||
return null; | ||
|
@@ -256,4 +296,43 @@ private String createKafkaOffsetsTopic(BrokerService service) throws PulsarServe | |
|
||
return offsetsTopic; | ||
} | ||
|
||
public static int getListenerPort(String listener) { | ||
checkState(listener.matches(LISTENER_PATTEN), "listener not match patten"); | ||
|
||
int lastIndex = listener.lastIndexOf(':'); | ||
return Integer.parseInt(listener.substring(lastIndex + 1)); | ||
} | ||
|
||
public static int getListenerPort(String listeners, ListenerType type) { | ||
String[] parts = listeners.split(LISTENER_DEL); | ||
|
||
for (String listener: parts) { | ||
if (type == ListenerType.PLAINTEXT && listener.startsWith(PLAINTEXT_PREFIX)) { | ||
return getListenerPort(listener); | ||
} | ||
if (type == ListenerType.SSL && listener.startsWith(SSL_PREFIX)) { | ||
return getListenerPort(listener); | ||
} | ||
} | ||
|
||
log.error("KafkaProtocolHandler listeners {} not contains type {}", listeners, type); | ||
return -1; | ||
} | ||
|
||
public static String getBrokerUrl(String listeners, Boolean tlsEnabled) { | ||
String[] parts = listeners.split(LISTENER_DEL); | ||
|
||
for (String listener: parts) { | ||
if (tlsEnabled && listener.startsWith(SSL_PREFIX)) { | ||
return listener; | ||
} | ||
if (!tlsEnabled && listener.startsWith(PLAINTEXT_PREFIX)) { | ||
return listener; | ||
} | ||
} | ||
|
||
log.error("listener {} not contains a valid SSL or PLAINTEXT address", listeners); | ||
return null; | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: