Skip to content

Commit

Permalink
[improve][client] Make table view support read the non-persistent topic
Browse files Browse the repository at this point in the history
### Motivation

Currently, the table view only supports persistent topics with read compacted.

However, some data don't require persistent storage, like load data in PIP-192 [#16691](#16691). 

### Modifications

Make table view support non-persistent topic
  • Loading branch information
Demogorgon314 authored Nov 10, 2022
1 parent 8407986 commit 65baa6c
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
Expand Down Expand Up @@ -79,6 +80,11 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@DataProvider(name = "topicDomain")
public static Object[] topicDomain() {
return new Object[]{ TopicDomain.persistent.value(), TopicDomain.non_persistent.value()};
}

private Set<String> publishMessages(String topic, int count, boolean enableBatch) throws Exception {
Set<String> keys = new HashSet<>();
ProducerBuilder<byte[]> builder = pulsarClient.newProducer();
Expand Down Expand Up @@ -155,18 +161,22 @@ public void testTableView() throws Exception {
}
}

@Test(timeOut = 30 * 1000)
public void testTableViewUpdatePartitions() throws Exception {
String topic = "persistent://public/default/tableview-test-update-partitions";
@Test(timeOut = 30 * 1000, dataProvider = "topicDomain")
public void testTableViewUpdatePartitions(String topicDomain) throws Exception {
String topic = topicDomain + "://public/default/tableview-test-update-partitions";
admin.topics().createPartitionedTopic(topic, 3);
int count = 20;
// For non-persistent topic, this keys will never be received.
Set<String> keys = this.publishMessages(topic, count, false);
@Cleanup
TableView<byte[]> tv = pulsarClient.newTableViewBuilder(Schema.BYTES)
.topic(topic)
.autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
.create();
log.info("start tv size: {}", tv.size());
if (topicDomain.equals(TopicDomain.non_persistent.value())) {
keys = this.publishMessages(topic, count, false);
}
tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v)));
Awaitility.await().untilAsserted(() -> {
log.info("Current tv size: {}", tv.size());
Expand All @@ -178,6 +188,10 @@ public void testTableViewUpdatePartitions() throws Exception {
admin.topics().updatePartitionedTopic(topic, 4);
TopicName topicName = TopicName.get(topic);

// Make sure the new partition-3 consumer already started.
if (topic.startsWith(TopicDomain.non_persistent.toString())) {
TimeUnit.SECONDS.sleep(6);
}
// Send more data to partition 3, which is not in the current TableView, need update partitions
Set<String> keys2 =
this.publishMessages(topicName.getPartition(3).toString(), count * 2, false);
Expand All @@ -188,9 +202,9 @@ public void testTableViewUpdatePartitions() throws Exception {
assertEquals(tv.keySet(), keys2);
}

@Test(timeOut = 30 * 1000)
public void testPublishNullValue() throws Exception {
String topic = "persistent://public/default/tableview-test-publish-null-value";
@Test(timeOut = 30 * 1000, dataProvider = "topicDomain")
public void testPublishNullValue(String topicDomain) throws Exception {
String topic = topicDomain + "://public/default/tableview-test-publish-null-value";
admin.topics().createPartitionedTopic(topic, 3);

final TableView<String> tv = pulsarClient.newTableViewBuilder(Schema.STRING)
Expand Down Expand Up @@ -221,8 +235,12 @@ public void testPublishNullValue() throws Exception {
.autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
.create();

assertEquals(tv1.size(), 1);
assertEquals(tv.get("key2"), "value2");
if (topicDomain.equals(TopicDomain.persistent.value())) {
assertEquals(tv1.size(), 1);
assertEquals(tv.get("key2"), "value2");
} else {
assertEquals(tv1.size(), 0);
}
}

@DataProvider(name = "partitionedTopic")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.TopicDomain;

@Slf4j
public class TableViewImpl<T> implements TableView<T> {
Expand All @@ -51,26 +53,35 @@ public class TableViewImpl<T> implements TableView<T> {

private final List<BiConsumer<String, T>> listeners;
private final ReentrantLock listenersMutex;
private final boolean isPersistentTopic;

TableViewImpl(PulsarClientImpl client, Schema<T> schema, TableViewConfigurationData conf) {
this.conf = conf;
this.isPersistentTopic = conf.getTopicName().startsWith(TopicDomain.persistent.toString());
this.data = new ConcurrentHashMap<>();
this.immutableData = Collections.unmodifiableMap(data);
this.listeners = new ArrayList<>();
this.listenersMutex = new ReentrantLock();
this.reader = client.newReader(schema)
ReaderBuilder<T> readerBuilder = client.newReader(schema)
.topic(conf.getTopicName())
.startMessageId(MessageId.earliest)
.autoUpdatePartitions(true)
.autoUpdatePartitionsInterval((int) conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS)
.readCompacted(true)
.poolMessages(true)
.createAsync();
.poolMessages(true);
if (isPersistentTopic) {
readerBuilder.readCompacted(true);
}
this.reader = readerBuilder.createAsync();
}

CompletableFuture<TableView<T>> start() {
return reader.thenCompose(this::readAllExistingMessages)
.thenApply(__ -> this);
return reader.thenCompose((reader) -> {
if (!isPersistentTopic) {
readTailMessages(reader);
return CompletableFuture.completedFuture(reader);
}
return this.readAllExistingMessages(reader);
}).thenApply(__ -> this);
}

@Override
Expand Down

0 comments on commit 65baa6c

Please sign in to comment.