Skip to content

Commit

Permalink
[bugfix] Support for broker advertised listeners (#299)
Browse files Browse the repository at this point in the history
Master Issue: #300 

### Motivation

This PR reworks the broker service lookup logic for compatibility with 2.9.  The compat issues are:
1. removal of `PulserService::getLocalZkCache` and increased encapsulation of ZooKeeper.  
2. improve compatibility with `advertisedListeners` (don't compare broker service url with web service url).

### Modifications

- Update Pulsar dependency to `2.9.0-rc-202110152205`
- Update `PulsarServiceLookupHandler` to use `NamespaceService` and `MetadataStoreCacheLoader`.
- No need for `PulsarClient` in the lookup handler
- Set `amqp+ssl://` as the scheme that would be used for (as-yet unimplemented) TLS support
- New config option `brokerLookupTimeoutSeconds`

Co-authored-by: gaoran10 <[email protected]>
  • Loading branch information
EronWright and gaoran10 authored Oct 26, 2021
1 parent 8738a4c commit 1e5af44
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 246 deletions.
2 changes: 1 addition & 1 deletion amqp-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>pulsar-protocol-handler-amqp-parent</artifactId>
<version>2.8.0-rc-202106071430</version>
<version>2.9.0-rc-202110152205</version>
</parent>

<groupId>io.streamnative.pulsar.handlers</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,13 @@
package io.streamnative.pulsar.handlers.amqp;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.pulsar.zookeeper.ZooKeeperCache;


/**
Expand Down Expand Up @@ -67,105 +55,6 @@ public AmqpBrokerDecoder getBrokerDecoder() {

abstract void close();

protected CompletableFuture<Optional<String>>
getProtocolDataToAdvertise(InetSocketAddress pulsarAddress,
TopicName topic) {
CompletableFuture<Optional<String>> returnFuture = new CompletableFuture<>();

if (pulsarAddress == null) {
log.error("[{}] failed get pulsar address, returned null.", topic.toString());

returnFuture.complete(Optional.empty());
return returnFuture;
}

if (log.isDebugEnabled()) {
log.debug("Found broker for topic {} puslarAddress: {}",
topic, pulsarAddress);
}

// advertised data is write in /loadbalance/brokers/advertisedAddress:webServicePort
// here we get the broker url, need to find related webServiceUrl.
MetadataCache<ServiceLookupData> serviceLookupDataCache =
pulsarService.getLocalMetadataStore().getMetadataCache(ServiceLookupData.class);
ZooKeeperCache zkCache = pulsarService.getLocalZkCache();
zkCache.getChildrenAsync(LoadManager.LOADBALANCE_BROKERS_ROOT, zkCache)
.whenComplete((set, throwable) -> {
if (throwable != null) {
log.error("Error in getChildrenAsync(zk://loadbalance) for {}", pulsarAddress, throwable);
returnFuture.complete(Optional.empty());
return;
}

String hostAndPort = pulsarAddress.getHostName() + ":" + pulsarAddress.getPort();
List<String> matchBrokers = Lists.newArrayList();
// match host part of url
for (String activeBroker : set) {
if (activeBroker.startsWith(pulsarAddress.getHostName() + ":")) {
matchBrokers.add(activeBroker);
}
}

if (matchBrokers.isEmpty()) {
log.error("No node for broker {} under zk://loadbalance", pulsarAddress);
returnFuture.complete(Optional.empty());
return;
}

// Get a list of ServiceLookupData for each matchBroker.
List<CompletableFuture<Optional<ServiceLookupData>>> list = matchBrokers.stream()
.map(matchBroker ->
serviceLookupDataCache.get(
String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, matchBroker)))
.collect(Collectors.toList());

FutureUtil.waitForAll(list)
.whenComplete((ignore, th) -> {
if (th != null) {
log.error("Error in getDataAsync() for {}", pulsarAddress, th);
returnFuture.complete(Optional.empty());
return;
}

try {
for (CompletableFuture<Optional<ServiceLookupData>> lookupData : list) {
ServiceLookupData data = lookupData.get().get();
if (log.isDebugEnabled()) {
log.debug("Handle getProtocolDataToAdvertise for {}, pulsarUrl: {}, "
+ "pulsarUrlTls: {}, webUrl: {}, webUrlTls: {} amqp: {}",
topic, data.getPulsarServiceUrl(), data.getPulsarServiceUrlTls(),
data.getWebServiceUrl(), data.getWebServiceUrlTls(),
data.getProtocol(AmqpProtocolHandler.PROTOCOL_NAME));
}

if (lookupDataContainsAddress(data, hostAndPort)) {
returnFuture.complete(data.getProtocol(AmqpProtocolHandler.PROTOCOL_NAME));
return;
}
}
} catch (Exception e) {
log.error("Error in {} lookupFuture get: ", pulsarAddress, e);
returnFuture.complete(Optional.empty());
return;
}

// no matching lookup data in all matchBrokers.
log.error("Not able to search {} in all child of zk://loadbalance", pulsarAddress);
returnFuture.complete(Optional.empty());
}
);
});
return returnFuture;
}

// whether a ServiceLookupData contains wanted address.
static boolean lookupDataContainsAddress(ServiceLookupData data, String hostAndPort) {
return (data.getPulsarServiceUrl() != null && data.getPulsarServiceUrl().contains(hostAndPort))
|| (data.getPulsarServiceUrlTls() != null && data.getPulsarServiceUrlTls().contains(hostAndPort))
|| (data.getWebServiceUrl() != null && data.getWebServiceUrl().contains(hostAndPort))
|| (data.getWebServiceUrlTls() != null && data.getWebServiceUrlTls().contains(hostAndPort));
}

public PulsarService getPulsarService() {
return pulsarService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.amqp;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;

import com.google.common.collect.ImmutableMap;
Expand All @@ -29,6 +30,7 @@
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.protocol.ProtocolHandler;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;

/**
* Amqp Protocol Handler load and run by Pulsar Service.
Expand All @@ -37,7 +39,7 @@
public class AmqpProtocolHandler implements ProtocolHandler {

public static final String PROTOCOL_NAME = "amqp";
public static final String SSL_PREFIX = "SSL://";
public static final String SSL_PREFIX = "amqp+ssl://";
public static final String PLAINTEXT_PREFIX = "amqp://";
public static final String LISTENER_DEL = ",";
public static final String LISTENER_PATTEN = "^(amqp)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-0-9+]";
Expand Down Expand Up @@ -94,9 +96,12 @@ public void start(BrokerService service) {
proxyConfig.setAmqpMaxFrameSize(amqpConfig.getAmqpMaxFrameSize());
proxyConfig.setAmqpHeartBeat(amqpConfig.getAmqpHeartBeat());
proxyConfig.setAmqpProxyPort(amqpConfig.getAmqpProxyPort());
proxyConfig.setBrokerServiceURL("pulsar://"
+ ServiceConfigurationUtils.getAppliedAdvertisedAddress(amqpConfig, true) + ":"
+ amqpConfig.getBrokerServicePort().get());

AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(amqpConfig);
checkArgument(internalListener.getBrokerServiceUrl() != null,
"plaintext must be configured on internal listener");
proxyConfig.setBrokerServiceURL(internalListener.getBrokerServiceUrl().toString());

ProxyService proxyService = new ProxyService(proxyConfig, service.getPulsar());
try {
proxyService.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
*/
package io.streamnative.pulsar.handlers.amqp.proxy;

import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.naming.TopicName;

/**
* Lookup handler.
*/
public interface LookupHandler {
public interface LookupHandler extends Closeable {

/**
* Find broker for protocolHandler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,9 @@ public class ProxyConfiguration {
)
private String brokerServiceURL;

@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
doc = "The timeout for broker lookups (in seconds)"
)
private int brokerLookupTimeoutSeconds = 30;
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ public void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString cap
public void handleConnect(AtomicInteger retryTimes) {
log.info("handle connect residue retryTimes: {}", retryTimes);
if (retryTimes.get() == 0) {
log.warn("Handle connect retryTime is 0.");
log.warn("Handle connect retryTimes is 0.");
close();
return;
}
if (proxyService.getVhostBrokerMap().containsKey(vhost)) {
Expand All @@ -265,12 +266,20 @@ public void handleConnect(AtomicInteger retryTimes) {
CompletableFuture<Pair<String, Integer>> lookupData = lookupHandler.findBroker(
TopicName.get(topic), AmqpProtocolHandler.PROTOCOL_NAME);
lookupData.whenComplete((pair, throwable) -> {
if (throwable != null) {
log.error("Lookup broker failed; may retry.", throwable);
retryTimes.decrementAndGet();
handleConnect(retryTimes);
return;
}
assert pair != null;
handleConnectComplete(pair.getLeft(), pair.getRight(), retryTimes);
proxyService.cacheVhostMap(vhost, pair);
});
} catch (Exception e) {
log.error("Lookup broker failed.", e);
resetProxyHandler();
close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;

Expand All @@ -41,12 +40,9 @@ public class ProxyService implements Closeable {

@Getter
private ProxyConfiguration proxyConfig;
private String serviceUrl;
@Getter
private PulsarService pulsarService;
@Getter
private PulsarClientImpl pulsarClient;
@Getter
private LookupHandler lookupHandler;

private Channel listenChannel;
Expand Down Expand Up @@ -96,9 +92,7 @@ public void start() throws Exception {
throw new IOException("Failed to bind Pulsar Proxy on port " + proxyConfig.getAmqpProxyPort(), e);
}

this.pulsarClient = (PulsarClientImpl) this.pulsarService.getClient();

this.lookupHandler = new PulsarServiceLookupHandler(pulsarService, pulsarClient);
this.lookupHandler = new PulsarServiceLookupHandler(proxyConfig, pulsarService);
}

private void releaseConnection(String namespaceName) {
Expand All @@ -121,6 +115,9 @@ public void cacheVhostMapRemove(String vhost) {

@Override
public void close() throws IOException {
if (lookupHandler != null) {
lookupHandler.close();
}
if (listenChannel != null) {
listenChannel.close();
}
Expand Down
Loading

0 comments on commit 1e5af44

Please sign in to comment.