Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
Fix #93: failed ListOffsets in Integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
PierreZ committed Feb 10, 2020
1 parent 7583f1f commit 18d70a1
Showing 1 changed file with 3 additions and 12 deletions.
15 changes: 3 additions & 12 deletions src/main/java/io/streamnative/kop/KafkaRequestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -810,18 +810,9 @@ private CompletableFuture<AbstractResponse> handleListOffsetRequestV0(KafkaHeade
Collections.singletonList(ListOffsetResponse.UNKNOWN_OFFSET)));
}

// topic not exist, return UNKNOWN_TOPIC_OR_PARTITION
if (!topicManager.topicExists(pulsarTopic.toString())) {
log.warn("Topic {} not exist in topic manager while list offset.", pulsarTopic.toString());
partitionData = new CompletableFuture<>();
partitionData.complete(new ListOffsetResponse
.PartitionData(
Errors.UNKNOWN_TOPIC_OR_PARTITION,
Collections.singletonList(ListOffsetResponse.UNKNOWN_OFFSET)));
} else {
CompletableFuture<PersistentTopic> persistentTopic = topicManager.getTopic(pulsarTopic.toString());
partitionData = fetchOffsetForTimestamp(persistentTopic, times, true);
}
CompletableFuture<PersistentTopic> persistentTopic = topicManager.getTopic(pulsarTopic.toString());
partitionData = fetchOffsetForTimestamp(persistentTopic, times, true);

responseData.put(topic, partitionData);
});

Expand Down

0 comments on commit 18d70a1

Please sign in to comment.