Skip to content

Commit

Permalink
Defer admin client creation for switchEmptyCase
Browse files Browse the repository at this point in the history
  • Loading branch information
germanosin committed Feb 28, 2025
1 parent fae0628 commit 6f6a59d
Showing 1 changed file with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Service
@Slf4j
Expand All @@ -41,7 +42,7 @@ public Mono<ReactiveAdminClient> get(KafkaCluster cluster) {
}

private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
return Mono.defer(() -> Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
return Mono.fromSupplier(() -> {
Properties properties = new Properties();
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
properties.putAll(cluster.getProperties());
Expand All @@ -52,7 +53,8 @@ private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
"kafbat-ui-admin-" + Instant.now().getEpochSecond() + "-" + CLIENT_ID_SEQ.incrementAndGet()
);
return AdminClient.create(properties);
}))).flatMap(ac -> ReactiveAdminClient.create(ac).doOnError(th -> ac.close()))
}).subscribeOn(Schedulers.boundedElastic())
.flatMap(ac -> ReactiveAdminClient.create(ac).doOnError(th -> ac.close()))
.onErrorMap(th -> new IllegalStateException(
"Error while creating AdminClient for the cluster " + cluster.getName(), th));
}
Expand Down

0 comments on commit 6f6a59d

Please sign in to comment.