-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] PIP-192: Implement broker registry for new load manager #18810
[improve][broker] PIP-192: Implement broker registry for new load manager #18810
Conversation
...broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
Outdated
Show resolved
Hide resolved
...broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
Outdated
Show resolved
Hide resolved
...broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
Outdated
Show resolved
Hide resolved
...broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
Outdated
Show resolved
Hide resolved
...broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
Outdated
Show resolved
Hide resolved
...broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
Show resolved
Hide resolved
...broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
Show resolved
Hide resolved
...broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
Outdated
Show resolved
Hide resolved
...broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
Outdated
Show resolved
Hide resolved
...broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
Outdated
Show resolved
Hide resolved
Map<String, BrokerLookupData> map = new ConcurrentHashMap<>(); | ||
List<CompletableFuture<Void>> futures = new ArrayList<>(); | ||
for (String brokerId : availableBrokers) { | ||
futures.add(this.lookupAsync(brokerId).thenAccept(lookupDataOpt -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's check if lookupAsync is complete or not.
If not, we probably want to print a log with the i/o waiting time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain it in detail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In most cases, the returned future of this.lookupAsync(brokerId) should be Completed, as we expect the lookup is mainly from the cache. However, if the returned future is Incomplete, we probably need to measure the latency of the lookup i/o to the metadata store.
In general, I think every i/o to the dependent services needs to be tracked(preferably as metrics). I am not sure about this practice in the Pulsar repo, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Thanks for the explanation.
...broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
Outdated
Show resolved
Hide resolved
if (started.get()) { | ||
return; | ||
} | ||
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification); | ||
try { | ||
this.register(); | ||
this.started.set(true); | ||
} catch (MetadataStoreException e) { | ||
throw new PulsarServerException(e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (started.get()) { | |
return; | |
} | |
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification); | |
try { | |
this.register(); | |
this.started.set(true); | |
} catch (MetadataStoreException e) { | |
throw new PulsarServerException(e); | |
} | |
if (started.compareAndSet(false, true)) { | |
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification); | |
try { | |
this.register(); | |
} catch (MetadataStoreException e) { | |
throw new PulsarServerException(e); | |
} | |
} |
public synchronized void register() throws MetadataStoreException { | ||
if (!registered) { | ||
try { | ||
this.brokerLookupDataLock = brokerLookupDataLockManager.acquireLock(keyPath(brokerId), brokerLookupData) | ||
.get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); | ||
registered = true; | ||
} catch (InterruptedException | ExecutionException | TimeoutException e) { | ||
throw MetadataStoreException.unwrap(e); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public synchronized void unregister() throws MetadataStoreException { | ||
if (registered) { | ||
try { | ||
brokerLookupDataLock.release().get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); | ||
registered = false; | ||
} catch (CompletionException | InterruptedException | ExecutionException | TimeoutException e) { | ||
throw MetadataStoreException.unwrap(e); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to change registered to AtomicBoolean? So that we don't need the synchronized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used AtomicBoolean
in the first version. #18810 (comment)
public CompletableFuture<List<String>> getAvailableBrokersAsync() { | ||
return brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH).thenApply(Lists::newArrayList); | ||
} | ||
|
||
@Override | ||
public CompletableFuture<Optional<BrokerLookupData>> lookupAsync(String broker) { | ||
return brokerLookupDataLockManager.readLock(keyPath(broker)); | ||
} | ||
|
||
public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() { | ||
return this.getAvailableBrokersAsync().thenCompose(availableBrokers -> { | ||
Map<String, BrokerLookupData> map = new ConcurrentHashMap<>(); | ||
List<CompletableFuture<Void>> futures = new ArrayList<>(); | ||
for (String brokerId : availableBrokers) { | ||
futures.add(this.lookupAsync(brokerId).thenAccept(lookupDataOpt -> { | ||
if (lookupDataOpt.isPresent()) { | ||
map.put(brokerId, lookupDataOpt.get()); | ||
} else { | ||
log.warn("Got an empty lookup data, brokerId: {}", brokerId); | ||
} | ||
})); | ||
} | ||
return FutureUtil.waitForAll(futures).thenApply(__ -> map); | ||
}); | ||
} | ||
|
||
public void listen(BiConsumer<String, NotificationType> listener) { | ||
this.listeners.add(listener); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to check if the registry is started or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
if (!started.get()) { | ||
return; | ||
} | ||
try { | ||
this.unregister(); | ||
brokerLookupDataLockManager.close(); | ||
scheduler.shutdown(); | ||
this.listeners.clear(); | ||
started.set(false); | ||
} catch (Exception ex) { | ||
if (ex.getCause() instanceof MetadataStoreException.NotFoundException) { | ||
throw new PulsarServerException.NotFoundException(MetadataStoreException.unwrap(ex)); | ||
} else { | ||
throw new PulsarServerException(MetadataStoreException.unwrap(ex)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (!started.get()) { | |
return; | |
} | |
try { | |
this.unregister(); | |
brokerLookupDataLockManager.close(); | |
scheduler.shutdown(); | |
this.listeners.clear(); | |
started.set(false); | |
} catch (Exception ex) { | |
if (ex.getCause() instanceof MetadataStoreException.NotFoundException) { | |
throw new PulsarServerException.NotFoundException(MetadataStoreException.unwrap(ex)); | |
} else { | |
throw new PulsarServerException(MetadataStoreException.unwrap(ex)); | |
} | |
} | |
if (started.compareAndSet(true, false)) { | |
try { | |
this.unregister(); | |
brokerLookupDataLockManager.close(); | |
scheduler.shutdown(); | |
this.listeners.clear(); | |
} catch (Exception ex) { | |
if (ex.getCause() instanceof MetadataStoreException.NotFoundException) { | |
throw new PulsarServerException.NotFoundException(MetadataStoreException.unwrap(ex)); | |
} else { | |
throw new PulsarServerException(MetadataStoreException.unwrap(ex)); | |
} | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to reset started.set(false)
when an exception is thrown?
We don't expect this register() / unregister()
(or start() / close()
in similar classes) to be called competitively many times from multiple threads. What's the practice in Pulsar for concurrency control in such cases? (I think the simple synchronized
is enough in these cases because threads waiting happens less likely )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed no synchronized
for this method.
What's the practice in Pulsar for concurrency control in such cases?
Most places change to the CLOSING state from READY, and then to the CLOSED from CLOSING after all the tasks are done. And we can try multiple times to close the resource from CLOSING to CLOSED.
if (!isVerifiedNotification(t)) { | ||
return; | ||
} | ||
try { | ||
if (log.isDebugEnabled()) { | ||
log.debug("Handle notification: [{}]", t); | ||
} | ||
if (listeners.isEmpty()) { | ||
return; | ||
} | ||
this.scheduler.submit(() -> { | ||
String brokerId = t.getPath().substring(LOOKUP_DATA_PATH.length() + 1); | ||
for (BiConsumer<String, NotificationType> listener : listeners) { | ||
listener.accept(brokerId, t.getType()); | ||
} | ||
}); | ||
} catch (RejectedExecutionException e) { | ||
// Executor is shutting down | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to perform the listeners if the registry is closed, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
*/ | ||
@Slf4j | ||
@Test(groups = "broker") | ||
public class BrokerRegistryTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also add tests for the behavior of a closed registry. Make sure the resources are released, the containers are cleaned up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added a state field to store the registry state, PTAL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping @codelipenghui
nit: Why not use
This will further reduce the metadata store dependency. Sorry for this random idea, and I just wanted to share it here. We may need a separate discussion(a PIP) to think deeper if we agree with this direction. |
@heesung-sn The lookup data will not be too heavy for Zookeeper, right? Maybe we can do some tests to find out is necessary to use |
IMO, it's ok. One broker will have one znode to store the lookup data. And we also use the znode as a mutex lock to avoid the different brokers registering with the same lookup data. |
Please fix the style check |
38c0beb
to
fdb9ebd
Compare
@BewareMyPower Fixed. |
...broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
Outdated
Show resolved
Hide resolved
...broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This PR introduced a new flaky test #19401 . |
Master Issue: #16691
Motivation
We will start raising PRs to implement PIP-192, #16691
Modifications
This PR should not impact the existing broker load balance behavior.
For the PIP-192 project, this PR
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: Demogorgon314#7