-
Notifications
You must be signed in to change notification settings - Fork 139
Add ssl listeners in channelInitializer #45
Conversation
jiazhai
commented
Nov 11, 2019
- Add ssl listeners and sslCtxRefresher in channelInitializer
// TODO: support data register, when do https://github.com/streamnative/kop/issues/2 | ||
return "mock-data-for-kafka"; | ||
log.debug("Get configured listeners", kafkaConfig.getListeners()); | ||
return kafkaConfig.getListeners(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
if (log.isDebugEnabled()) {
log.debug("Get configured listeners", kafkaConfig.getListeners());
}
for (String listener: parts) { | ||
if (listener.startsWith(PLAINTEXT_PREFIX)) { | ||
builder.put( | ||
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we use the address in the listener as the bind address?
true)); | ||
} else { | ||
log.error("KafkaProtocolHandler listeners {} not supported. supports {} and {}", | ||
listeners, PLAINTEXT_PREFIX, SSL_PREFIX); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
listeners, PLAINTEXT_PREFIX, SSL_PREFIX); | |
listener, PLAINTEXT_PREFIX, SSL_PREFIX); |
return initializerMap; | ||
true)); | ||
} else { | ||
log.error("KafkaProtocolHandler listeners {} not supported. supports {} and {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.error("KafkaProtocolHandler listeners {} not supported. supports {} and {}", | |
log.error("Kafka listener {} is not supported. Only {} and {} are supported now.", |
@@ -119,11 +127,15 @@ | |||
private final NamespaceName kafkaNamespace; | |||
private final ExecutorService executor; | |||
private final PulsarAdmin admin; | |||
private final Boolean tlsEnabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need tlsEnabled
if we already support listeners?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In some method, it will need construct a Node(contains broker host and port), the port should be identified separately(9092 or 9093), so for this handler we need to know it is created with SSL enabled or not.
if (!lookupResult.isPresent()) { | ||
log.error("Can't find broker for topic {}", topic); | ||
CompletableFuture<Optional<String>> future = new CompletableFuture<>(); | ||
future.completeExceptionally(new KeeperException.NoNodeException()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should return a zookeeper exception here. We should return a Pulsar exception.
} catch (Exception e) { | ||
log.error("Failed to get URI from {} for topic {}", candidateBroker, topic); | ||
CompletableFuture<Optional<String>> future = new CompletableFuture<>(); | ||
future.completeExceptionally(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Convert the exception into a broker exception?
} catch (Exception e) { | ||
log.error("Caught error while find Broker for topic:{} ", topic, e); | ||
resultFuture.completeExceptionally(e); | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we return null
here, did we handle null
in the caller?