-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP-187: Add API to analyze a subscription backlog and provide a accurate value #16545
Conversation
The related PR support the precise backlog #14958 |
cbe6ac2
to
b7bb192
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work
I wonder if analise
is a correct word in english, analyze
should be the right verb
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
Outdated
Show resolved
Hide resolved
...broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
Outdated
Show resolved
Hide resolved
8318f6f
to
ff182d0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nicoloboschi thanks for your review.
I have addressed your comments and also added one test about partitioned topics
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM, a couple of comments to address
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
Outdated
Show resolved
Hide resolved
ff182d0
to
d15e6b8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
} finally { | ||
entry.release(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this could be finally block of outer try (on line 74)
d15e6b8
to
9cb51d1
Compare
.../java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Outdated
Show resolved
Hide resolved
ba722e6
to
4c41c1b
Compare
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Outdated
Show resolved
Hide resolved
5ba6e19
to
bb35022
Compare
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
Outdated
Show resolved
Hide resolved
subscriptionBacklogScanMaxTimeMs=120000 | ||
|
||
# Maximum number of entries to be read within a Analise backlog operation | ||
subscriptionBacklogScanMaxEntries=10000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can support these 2 options in the http endpoint, instead of support ns/topic polices.
@Test(timeOut = 20000) | ||
void testScanSingleEntry() throws Exception { | ||
testScan(10,1); | ||
} | ||
|
||
@Test(timeOut = 30000) | ||
void testScanBatchesWithSomeRemainder() throws Exception { | ||
testScan(10,3); | ||
} | ||
|
||
@Test(timeOut = 30000) | ||
void testScanBatches() throws Exception { | ||
testScan(10,5); | ||
} | ||
|
||
@Test(timeOut = 30000) | ||
void testScanBatchWholeLedger() throws Exception { | ||
testScan(10,1000); | ||
} | ||
|
||
@Test(timeOut = 1000) | ||
void testScanBatchEmptyLedger() throws Exception { | ||
testScan(0,10); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to add a data provider instead of separate methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure.
I usually prefer this form while developing tests, because it is easier to run single combination of parameters and the method name explains the meaning of the test.
I have updated the patch
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicated license header.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I don't know how it happened. removed
@@ -1569,6 +1590,63 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn | |||
}); | |||
} | |||
|
|||
private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncResponse asyncResponse, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to return CompletableFuture here to avoid passing the AsyncResponse to the internal of implement, it will help us to understand the returned type without looking at the implementation details
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you, but this whole class is coded this way.
I am not sure it is worth to change the code style only for one method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have changed most of the admin implements with future returned types.
if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, | ||
authoritative, false).partitions > 0) { | ||
throw new RestException(Status.METHOD_NOT_ALLOWED, | ||
"Analyze backlog on a partitioned topic is not allowed, " | ||
+ "please try do it on specific topic partition"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check the topic is a partitioned topic or not first? If it's a partitioned topic, the ownership checking is redundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
other methods check if the topic is partitioned in this same place.
if the broker is not the owner we will be redirected to another broker and we won't reach this point.
I notice that getPartitionedTopicMetadata is sync, I have reworked this part to make it fully async
class OpScan implements ReadEntriesCallback { | ||
private final ManagedCursorImpl cursor; | ||
private final ManagedLedgerImpl ledger; | ||
private final PositionImpl startPosition; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks we can remove this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, removed
fa14b9b
to
67873e0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui I have addressed your comments and posted some answers.
thanks
class OpScan implements ReadEntriesCallback { | ||
private final ManagedCursorImpl cursor; | ||
private final ManagedLedgerImpl ledger; | ||
private final PositionImpl startPosition; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, removed
@Test(timeOut = 20000) | ||
void testScanSingleEntry() throws Exception { | ||
testScan(10,1); | ||
} | ||
|
||
@Test(timeOut = 30000) | ||
void testScanBatchesWithSomeRemainder() throws Exception { | ||
testScan(10,3); | ||
} | ||
|
||
@Test(timeOut = 30000) | ||
void testScanBatches() throws Exception { | ||
testScan(10,5); | ||
} | ||
|
||
@Test(timeOut = 30000) | ||
void testScanBatchWholeLedger() throws Exception { | ||
testScan(10,1000); | ||
} | ||
|
||
@Test(timeOut = 1000) | ||
void testScanBatchEmptyLedger() throws Exception { | ||
testScan(0,10); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure.
I usually prefer this form while developing tests, because it is easier to run single combination of parameters and the method name explains the meaning of the test.
I have updated the patch
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I don't know how it happened. removed
@@ -1569,6 +1590,63 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn | |||
}); | |||
} | |||
|
|||
private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncResponse asyncResponse, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you, but this whole class is coded this way.
I am not sure it is worth to change the code style only for one method.
if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName, | ||
authoritative, false).partitions > 0) { | ||
throw new RestException(Status.METHOD_NOT_ALLOWED, | ||
"Analyze backlog on a partitioned topic is not allowed, " | ||
+ "please try do it on specific topic partition"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
other methods check if the topic is partitioned in this same place.
if the broker is not the owner we will be redirected to another broker and we won't reach this point.
I notice that getPartitionedTopicMetadata is sync, I have reworked this part to make it fully async
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Show resolved
Hide resolved
@Technoboy- I have updated ManagedCursorImpl. |
PIP link: #16597
Also fixes #7623
Documentation
The new command is automatically documented for the REST API and for pulsar-admin
doc-not-needed