Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-192: Implement broker registry for new load manager #18810

Merged

Conversation

Demogorgon314
Copy link
Member

Master Issue: #16691

Motivation

We will start raising PRs to implement PIP-192, #16691

Modifications

This PR should not impact the existing broker load balance behavior.

For the PIP-192 project, this PR

  • Implemented broker registry.
  • Added units test.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: Demogorgon314#7

@Demogorgon314 Demogorgon314 added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages area/broker labels Dec 8, 2022
@Demogorgon314 Demogorgon314 self-assigned this Dec 8, 2022
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Dec 8, 2022
@Demogorgon314 Demogorgon314 marked this pull request as ready for review December 8, 2022 07:31
Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String brokerId : availableBrokers) {
futures.add(this.lookupAsync(brokerId).thenAccept(lookupDataOpt ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's check if lookupAsync is complete or not.

If not, we probably want to print a log with the i/o waiting time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain it in detail?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In most cases, the returned future of this.lookupAsync(brokerId) should be Completed, as we expect the lookup is mainly from the cache. However, if the returned future is Incomplete, we probably need to measure the latency of the lookup i/o to the metadata store.

In general, I think every i/o to the dependent services needs to be tracked(preferably as metrics). I am not sure about this practice in the Pulsar repo, though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks for the explanation.

Comment on lines 98 to 112
if (started.get()) {
return;
}
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
try {
this.register();
this.started.set(true);
} catch (MetadataStoreException e) {
throw new PulsarServerException(e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (started.get()) {
return;
}
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
try {
this.register();
this.started.set(true);
} catch (MetadataStoreException e) {
throw new PulsarServerException(e);
}
if (started.compareAndSet(false, true)) {
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
try {
this.register();
} catch (MetadataStoreException e) {
throw new PulsarServerException(e);
}
}

Comment on lines 116 to 144
public synchronized void register() throws MetadataStoreException {
if (!registered) {
try {
this.brokerLookupDataLock = brokerLookupDataLockManager.acquireLock(keyPath(brokerId), brokerLookupData)
.get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
registered = true;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw MetadataStoreException.unwrap(e);
}
}
}

@Override
public synchronized void unregister() throws MetadataStoreException {
if (registered) {
try {
brokerLookupDataLock.release().get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
registered = false;
} catch (CompletionException | InterruptedException | ExecutionException | TimeoutException e) {
throw MetadataStoreException.unwrap(e);
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to change registered to AtomicBoolean? So that we don't need the synchronized

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used AtomicBoolean in the first version. #18810 (comment)

Comment on lines 146 to 192
public CompletableFuture<List<String>> getAvailableBrokersAsync() {
return brokerLookupDataLockManager.listLocks(LOOKUP_DATA_PATH).thenApply(Lists::newArrayList);
}

@Override
public CompletableFuture<Optional<BrokerLookupData>> lookupAsync(String broker) {
return brokerLookupDataLockManager.readLock(keyPath(broker));
}

public CompletableFuture<Map<String, BrokerLookupData>> getAvailableBrokerLookupDataAsync() {
return this.getAvailableBrokersAsync().thenCompose(availableBrokers -> {
Map<String, BrokerLookupData> map = new ConcurrentHashMap<>();
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String brokerId : availableBrokers) {
futures.add(this.lookupAsync(brokerId).thenAccept(lookupDataOpt -> {
if (lookupDataOpt.isPresent()) {
map.put(brokerId, lookupDataOpt.get());
} else {
log.warn("Got an empty lookup data, brokerId: {}", brokerId);
}
}));
}
return FutureUtil.waitForAll(futures).thenApply(__ -> map);
});
}

public void listen(BiConsumer<String, NotificationType> listener) {
this.listeners.add(listener);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to check if the registry is started or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Comment on lines 178 to 211
if (!started.get()) {
return;
}
try {
this.unregister();
brokerLookupDataLockManager.close();
scheduler.shutdown();
this.listeners.clear();
started.set(false);
} catch (Exception ex) {
if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
throw new PulsarServerException.NotFoundException(MetadataStoreException.unwrap(ex));
} else {
throw new PulsarServerException(MetadataStoreException.unwrap(ex));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (!started.get()) {
return;
}
try {
this.unregister();
brokerLookupDataLockManager.close();
scheduler.shutdown();
this.listeners.clear();
started.set(false);
} catch (Exception ex) {
if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
throw new PulsarServerException.NotFoundException(MetadataStoreException.unwrap(ex));
} else {
throw new PulsarServerException(MetadataStoreException.unwrap(ex));
}
}
if (started.compareAndSet(true, false)) {
try {
this.unregister();
brokerLookupDataLockManager.close();
scheduler.shutdown();
this.listeners.clear();
} catch (Exception ex) {
if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
throw new PulsarServerException.NotFoundException(MetadataStoreException.unwrap(ex));
} else {
throw new PulsarServerException(MetadataStoreException.unwrap(ex));
}
}
}

Copy link
Contributor

@heesung-sn heesung-sn Dec 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to reset started.set(false) when an exception is thrown?

We don't expect this register() / unregister() (or start() / close() in similar classes) to be called competitively many times from multiple threads. What's the practice in Pulsar for concurrency control in such cases? (I think the simple synchronized is enough in these cases because threads waiting happens less likely )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed no synchronized for this method.

What's the practice in Pulsar for concurrency control in such cases?

Most places change to the CLOSING state from READY, and then to the CLOSED from CLOSING after all the tasks are done. And we can try multiple times to close the resource from CLOSING to CLOSED.

Comment on lines 197 to 233
if (!isVerifiedNotification(t)) {
return;
}
try {
if (log.isDebugEnabled()) {
log.debug("Handle notification: [{}]", t);
}
if (listeners.isEmpty()) {
return;
}
this.scheduler.submit(() -> {
String brokerId = t.getPath().substring(LOOKUP_DATA_PATH.length() + 1);
for (BiConsumer<String, NotificationType> listener : listeners) {
listener.accept(brokerId, t.getType());
}
});
} catch (RejectedExecutionException e) {
// Executor is shutting down
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to perform the listeners if the registry is closed, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

*/
@Slf4j
@Test(groups = "broker")
public class BrokerRegistryTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add tests for the behavior of a closed registry. Make sure the resources are released, the containers are cleaned up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a state field to store the registry state, PTAL.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@heesung-sn
Copy link
Contributor

heesung-sn commented Jan 11, 2023

nit:

Why not use LoadDataStore<BrokerLoadData> + TableView<BrokerLookupData> for this broker registry service?

LoadDataStore<BrokerLoadData>.keySet() can return available brokers.

TableView<BrokerLookupData>.get(broker) can return the pulsar version and other lookup info.

This will further reduce the metadata store dependency.

Sorry for this random idea, and I just wanted to share it here. We may need a separate discussion(a PIP) to think deeper if we agree with this direction.

@Demogorgon314
Copy link
Member Author

nit:

Why not use LoadDataStore<BrokerLoadData> + TableView<BrokerLookupData> for this broker registry service?

LoadDataStore<BrokerLoadData>.keySet() can return available brokers.

TableView<BrokerLookupData>.get(broker) can return the pulsar version and other lookup info.

This will further reduce the metadata store dependency.

Sorry for this random idea, and I just wanted to share it here. We may need a separate discussion(a PIP) to think deeper if we agree with this direction.

@heesung-sn The lookup data will not be too heavy for Zookeeper, right? Maybe we can do some tests to find out is necessary to use LoadDataStore to store the lookup data.

@codelipenghui
Copy link
Contributor

The lookup data will not be too heavy for Zookeeper, right? Maybe we can do some tests to find out is necessary to use LoadDataStore to store the lookup data.

IMO, it's ok. One broker will have one znode to store the lookup data. And we also use the znode as a mutex lock to avoid the different brokers registering with the same lookup data.

@Demogorgon314 Demogorgon314 requested review from shibd and removed request for heesung-sn January 19, 2023 01:58
@Demogorgon314 Demogorgon314 reopened this Jan 31, 2023
@BewareMyPower
Copy link
Contributor

org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistryTest.MockLoadManager is not abstract and does not override abstract method setNamespaceBundleAffinity(java.lang.String,java.lang.String) in org.apache.pulsar.broker.loadbalance.LoadManager

Please fix the style check

@Demogorgon314 Demogorgon314 force-pushed the Demogorgon314/Implement_broker_registry branch from 38c0beb to fdb9ebd Compare February 1, 2023 03:27
@Demogorgon314
Copy link
Member Author

@BewareMyPower Fixed.

Copy link
Contributor

@Technoboy- Technoboy- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Technoboy- Technoboy- merged commit 8a4a8a6 into apache:master Feb 1, 2023
@lhotari
Copy link
Member

lhotari commented Feb 2, 2023

This PR introduced a new flaky test #19401 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker doc-not-needed Your PR changes do not impact docs ready-to-test type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants