Skip to content

Commit

Permalink
Merge branch 'apache:master' into 19671-consumerimpl-compare-and-set
Browse files Browse the repository at this point in the history
  • Loading branch information
JooHyukKim authored May 8, 2023
2 parents 0950d86 + 2f9f5df commit 4075a39
Show file tree
Hide file tree
Showing 11 changed files with 398 additions and 25 deletions.
2 changes: 1 addition & 1 deletion distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ The Apache Software License, Version 2.0
* RabbitMQ Java Client
- com.rabbitmq-amqp-client-5.5.3.jar
* RoaringBitmap
- org.roaringbitmap-RoaringBitmap-0.9.15.jar
- org.roaringbitmap-RoaringBitmap-0.9.44.jar

BSD 3-clause "New" or "Revised" License
* Google auth library
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ flexible messaging model and an intuitive client API.</description>
<j2objc-annotations.version>1.3</j2objc-annotations.version>
<lightproto-maven-plugin.version>0.4</lightproto-maven-plugin.version>
<dependency-check-maven.version>8.1.2</dependency-check-maven.version>
<roaringbitmap.version>0.9.15</roaringbitmap.version>
<roaringbitmap.version>0.9.44</roaringbitmap.version>
<extra-enforcer-rules.version>1.6.1</extra-enforcer-rules.version>
<oshi.version>6.4.0</oshi.version>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ protected CompletableFuture<Map<String, Set<AuthAction>>> internalGetPermissions
String topicUri = topicName.toString();
AuthPolicies auth = policies.get().auth_policies;
// First add namespace level permissions
auth.getNamespaceAuthentication().forEach(permissions::put);
permissions.putAll(auth.getNamespaceAuthentication());

// Then add topic level permissions
if (auth.getTopicAuthentication().containsKey(topicUri)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ private void loadAllResourceGroups() {
final Set<String> existingSet = rgService.resourceGroupGetAll();
HashSet<String> newSet = new HashSet<>();

for (String rgName : rgList) {
newSet.add(rgName);
}
newSet.addAll(rgList);

final Sets.SetView<String> deleteList = Sets.difference(existingSet, newSet);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,10 @@ public synchronized void updateStats(
}
});
if (clusterReplicationMetrics.isMetricsEnabled()) {
clusterReplicationMetrics.get().forEach(clusterMetric -> tempMetricsCollection.add(clusterMetric));
tempMetricsCollection.addAll(clusterReplicationMetrics.get());
clusterReplicationMetrics.reset();
}
brokerOperabilityMetrics.getMetrics()
.forEach(brokerOperabilityMetric -> tempMetricsCollection.add(brokerOperabilityMetric));
tempMetricsCollection.addAll(brokerOperabilityMetrics.getMetrics());

// json end
topicStatsStream.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,13 @@ public void deleteComplete(Object ctx) {
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to delete message at {}: {}", replicatorId, ctx,
exception.getMessage(), exception);
if (exception instanceof CursorAlreadyClosedException) {
log.error("[{}] Asynchronous ack failure because replicator is already deleted and cursor is already"
+ " closed {}, ({})", replicatorId, ctx, exception.getMessage(), exception);
// replicator is already deleted and cursor is already closed so, producer should also be stopped
closeProducerAsync();
return;
}
if (ctx instanceof PositionImpl) {
PositionImpl deletedEntry = (PositionImpl) ctx;
if (deletedEntry.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2226,9 +2226,8 @@ public CompletableFuture<TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog

if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
} else {
stats.addPublisher(publisherStats);
}
stats.addPublisher(publisherStats);
});

stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar;

import static org.testng.Assert.assertTrue;
import org.roaringbitmap.RoaringBitmap;
import org.testng.annotations.Test;

public class RoaringbitmapTest {

@Test
public void testRoaringBitmapContains() {
RoaringBitmap roaringBitmap = new RoaringBitmap();
for (long i = 1; i <= 100_000; i++) {
roaringBitmap.add(i, i + 1);
}

for (long i = 1; i <= 100_000; i++) {
assertTrue(roaringBitmap.contains(i, i + 1));
}

RoaringBitmap roaringBitmap2 = new RoaringBitmap();
for (long i = 1; i <= 1000_000; i++) {
roaringBitmap2.add(i, i + 1);
}

for (long i = 1; i <= 1000_000; i++) {
assertTrue(roaringBitmap2.contains(i, i + 1));
}

RoaringBitmap roaringBitmap3 = new RoaringBitmap();
for (long i = 1; i <= 10_000_000; i++) {
roaringBitmap3.add(i, i + 1);
}

for (long i = 1; i <= 10_000_000; i++) {
assertTrue(roaringBitmap3.contains(i, i + 1));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertEquals;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.junit.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class OneWayReplicatorTest extends OneWayReplicatorTestBase {

@Override
@BeforeClass(alwaysRun = true, timeOut = 300000)
public void setup() throws Exception {
super.setup();
}

@Override
@AfterClass(alwaysRun = true, timeOut = 300000)
public void cleanup() throws Exception {
super.cleanup();
}

@Test
public void testReplicatorProducerStatInTopic() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp_");
final String subscribeName = "subscribe_1";
final byte[] msgValue = "test".getBytes();

admin1.topics().createNonPartitionedTopic(topicName);
admin2.topics().createNonPartitionedTopic(topicName);
admin1.topics().createSubscription(topicName, subscribeName, MessageId.earliest);
admin2.topics().createSubscription(topicName, subscribeName, MessageId.earliest);

// Verify replicator works.
Producer<byte[]> producer1 = client1.newProducer().topic(topicName).create();
Consumer<byte[]> consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subscribeName).subscribe();
producer1.newMessage().value(msgValue).send();
pulsar1.getBrokerService().checkReplicationPolicies();
assertEquals(consumer2.receive(10, TimeUnit.SECONDS).getValue(), msgValue);

// Verify there has one item in the attribute "publishers" or "replications"
TopicStats topicStats2 = admin2.topics().getStats(topicName);
Assert.assertTrue(topicStats2.getPublishers().size() + topicStats2.getReplication().size() > 0);

// cleanup.
consumer2.close();
producer1.close();
cleanupTopics(() -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
});
}
}
Loading

0 comments on commit 4075a39

Please sign in to comment.