diff --git a/dubbo-all/pom.xml b/dubbo-all/pom.xml
index edb0f361348..a4a0b5cc3cd 100644
--- a/dubbo-all/pom.xml
+++ b/dubbo-all/pom.xml
@@ -241,6 +241,13 @@
compile
true
+
+ org.apache.dubbo
+ dubbo-registry-etcd3
+ ${project.version}
+ compile
+ true
+
org.apache.dubbo
dubbo-monitor-api
@@ -360,6 +367,13 @@
compile
true
+
+ org.apache.dubbo
+ dubbo-configcenter-etcd
+ ${project.version}
+ compile
+ true
+
org.apache.dubbo
dubbo-compatible
@@ -494,6 +508,7 @@
org.apache.dubbo:dubbo-registry-zookeeper
org.apache.dubbo:dubbo-registry-redis
org.apache.dubbo:dubbo-registry-consul
+ org.apache.dubbo:dubbo-registry-etcd3
org.apache.dubbo:dubbo-monitor-api
org.apache.dubbo:dubbo-monitor-default
org.apache.dubbo:dubbo-config-api
@@ -515,6 +530,7 @@
org.apache.dubbo:dubbo-configcenter-apollo
org.apache.dubbo:dubbo-configcenter-zookeeper
org.apache.dubbo:dubbo-configcenter-consul
+ org.apache.dubbo:dubbo-configcenter-etcd
org.apache.dubbo:dubbo-metadata-report-api
org.apache.dubbo:dubbo-metadata-definition
org.apache.dubbo:dubbo-metadata-report-redis
diff --git a/dubbo-bom/pom.xml b/dubbo-bom/pom.xml
index d04495a47ce..47875b83ffb 100644
--- a/dubbo-bom/pom.xml
+++ b/dubbo-bom/pom.xml
@@ -343,6 +343,11 @@
dubbo-configcenter-consul
${project.version}
+
+ org.apache.dubbo
+ dubbo-configcenter-etcd
+ ${project.version}
+
org.apache.dubbo
dubbo-metadata-definition
diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/pom.xml b/dubbo-configcenter/dubbo-configcenter-etcd/pom.xml
new file mode 100644
index 00000000000..60efc8e4bb9
--- /dev/null
+++ b/dubbo-configcenter/dubbo-configcenter-etcd/pom.xml
@@ -0,0 +1,46 @@
+
+
+
+
+
+ dubbo-configcenter
+ org.apache.dubbo
+ 2.7.1-SNAPSHOT
+
+ 4.0.0
+
+ dubbo-configcenter-etcd
+ jar
+ ${project.artifactId}
+ The etcd implementation of the config-center api
+
+
+
+ org.apache.dubbo
+ dubbo-configcenter-api
+ ${project.parent.version}
+
+
+ org.apache.dubbo
+ dubbo-remoting-etcd3
+ ${project.parent.version}
+
+
+
\ No newline at end of file
diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java
new file mode 100644
index 00000000000..18e90887592
--- /dev/null
+++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfiguration.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.configcenter.support.etcd;
+
+import com.google.protobuf.ByteString;
+import io.etcd.jetcd.api.Event;
+import io.etcd.jetcd.api.WatchCancelRequest;
+import io.etcd.jetcd.api.WatchCreateRequest;
+import io.etcd.jetcd.api.WatchGrpc;
+import io.etcd.jetcd.api.WatchRequest;
+import io.etcd.jetcd.api.WatchResponse;
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.configcenter.ConfigChangeEvent;
+import org.apache.dubbo.configcenter.ConfigChangeType;
+import org.apache.dubbo.configcenter.ConfigurationListener;
+import org.apache.dubbo.configcenter.DynamicConfiguration;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.jetcd.JEtcdClient;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.dubbo.common.Constants.CONFIG_NAMESPACE_KEY;
+import static org.apache.dubbo.common.Constants.PATH_SEPARATOR;
+
+/**
+ * The etcd implementation of {@link DynamicConfiguration}
+ */
+public class EtcdDynamicConfiguration implements DynamicConfiguration {
+
+ /**
+ * The final root path would be: /$NAME_SPACE/config
+ */
+ private String rootPath;
+
+ /**
+ * The etcd client
+ */
+ private final JEtcdClient etcdClient;
+
+ /**
+ * The map store the key to {@link EtcdConfigWatcher} mapping
+ */
+ private final ConcurrentMap watchListenerMap;
+
+ EtcdDynamicConfiguration(URL url) {
+ rootPath = "/" + url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP) + "/config";
+ etcdClient = new JEtcdClient(url);
+ etcdClient.addStateListener(state -> {
+ if (state == StateListener.CONNECTED) {
+ try {
+ recover();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ });
+ watchListenerMap = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void addListener(String key, String group, ConfigurationListener listener) {
+ if (watchListenerMap.get(listener) == null) {
+ String normalizedKey = convertKey(key);
+ EtcdConfigWatcher watcher = new EtcdConfigWatcher(normalizedKey, listener);
+ watchListenerMap.put(listener, watcher);
+ watcher.watch();
+ }
+ }
+
+ @Override
+ public void removeListener(String key, String group, ConfigurationListener listener) {
+ EtcdConfigWatcher watcher = watchListenerMap.get(listener);
+ watcher.cancelWatch();
+ }
+
+ // TODO Abstract the logic into super class
+ @Override
+ public String getConfig(String key, String group, long timeout) throws IllegalStateException {
+ if (StringUtils.isNotEmpty(group)) {
+ key = group + PATH_SEPARATOR + key;
+ } else {
+ int i = key.lastIndexOf(".");
+ key = key.substring(0, i) + PATH_SEPARATOR + key.substring(i + 1);
+ }
+ return (String) getInternalProperty(rootPath + PATH_SEPARATOR + key);
+ }
+
+ @Override
+ public Object getInternalProperty(String key) {
+ return etcdClient.getKVValue(key);
+ }
+
+
+ private String convertKey(String key) {
+ int index = key.lastIndexOf('.');
+ return rootPath + PATH_SEPARATOR + key.substring(0, index) + PATH_SEPARATOR + key.substring(index + 1);
+ }
+
+ private void recover() {
+ for (EtcdConfigWatcher watcher: watchListenerMap.values()) {
+ watcher.watch();
+ }
+ }
+
+ public class EtcdConfigWatcher implements StreamObserver {
+
+ private ConfigurationListener listener;
+ protected WatchGrpc.WatchStub watchStub;
+ private StreamObserver observer;
+ protected long watchId;
+ private ManagedChannel channel;
+ private String key;
+
+ public EtcdConfigWatcher(String key, ConfigurationListener listener) {
+ this.key = key;
+ this.listener = listener;
+ this.channel = etcdClient.getChannel();
+ }
+
+ @Override
+ public void onNext(WatchResponse watchResponse) {
+ this.watchId = watchResponse.getWatchId();
+ for (Event etcdEvent : watchResponse.getEventsList()) {
+ ConfigChangeType type = ConfigChangeType.MODIFIED;
+ if (etcdEvent.getType() == Event.EventType.DELETE) {
+ type = ConfigChangeType.DELETED;
+ }
+ ConfigChangeEvent event = new ConfigChangeEvent(
+ etcdEvent.getKv().getKey().toString(UTF_8),
+ etcdEvent.getKv().getValue().toString(UTF_8), type);
+ listener.process(event);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ // ignore
+ }
+
+ @Override
+ public void onCompleted() {
+ // ignore
+ }
+
+ public long getWatchId() {
+ return watchId;
+ }
+
+ private void watch() {
+ watchStub = WatchGrpc.newStub(channel);
+ observer = watchStub.watch(this);
+ WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
+ .setKey(ByteString.copyFromUtf8(key))
+ .setProgressNotify(true);
+ WatchRequest req = WatchRequest.newBuilder().setCreateRequest(builder).build();
+ observer.onNext(req);
+ }
+
+ private void cancelWatch() {
+ WatchCancelRequest watchCancelRequest =
+ WatchCancelRequest.newBuilder().setWatchId(watchId).build();
+ WatchRequest cancelRequest = WatchRequest.newBuilder()
+ .setCancelRequest(watchCancelRequest).build();
+ observer.onNext(cancelRequest);
+ }
+ }
+}
diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java
new file mode 100644
index 00000000000..02e91a62db7
--- /dev/null
+++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.configcenter.support.etcd;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.configcenter.AbstractDynamicConfigurationFactory;
+import org.apache.dubbo.configcenter.DynamicConfiguration;
+
+/**
+ * The etcd implementation of {@link AbstractDynamicConfigurationFactory}
+ */
+public class EtcdDynamicConfigurationFactory extends AbstractDynamicConfigurationFactory {
+
+ @Override
+ protected DynamicConfiguration createDynamicConfiguration(URL url) {
+ return new EtcdDynamicConfiguration(url);
+ }
+}
diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory
new file mode 100644
index 00000000000..d84b1ae0e1a
--- /dev/null
+++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.configcenter.DynamicConfigurationFactory
@@ -0,0 +1 @@
+etcd=org.apache.dubbo.configcenter.support.etcd.EtcdDynamicConfigurationFactory
\ No newline at end of file
diff --git a/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java b/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java
new file mode 100644
index 00000000000..87143fdcacc
--- /dev/null
+++ b/dubbo-configcenter/dubbo-configcenter-etcd/src/test/java/org/apache/dubbo/configcenter/support/etcd/EtcdDynamicConfigurationTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.configcenter.support.etcd;
+
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import org.apache.dubbo.common.Constants;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.configcenter.ConfigChangeEvent;
+import org.apache.dubbo.configcenter.ConfigurationListener;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Unit test for etcd config center support
+ * TODO Integrate with https://github.com/etcd-io/jetcd#launcher or using mock data.
+ */
+@Disabled
+public class EtcdDynamicConfigurationTest {
+
+ private static final String ENDPOINT = "http://127.0.0.1:2379";
+
+ private static EtcdDynamicConfiguration config;
+
+ private static Client etcdClient;
+
+ @Test
+ public void testGetConfig() {
+ put("/dubbo/config/org.apache.dubbo.etcd.testService/configurators", "hello");
+ put("/dubbo/config/test/dubbo.properties", "aaa=bbb");
+ Assertions.assertEquals("hello", config.getConfig("org.apache.dubbo.etcd.testService.configurators"));
+ Assertions.assertEquals("aaa=bbb", config.getConfig("dubbo.properties", "test"));
+ }
+
+ @Test
+ public void testAddListener() throws Exception {
+ CountDownLatch latch = new CountDownLatch(4);
+ TestListener listener1 = new TestListener(latch);
+ TestListener listener2 = new TestListener(latch);
+ TestListener listener3 = new TestListener(latch);
+ TestListener listener4 = new TestListener(latch);
+ config.addListener("AService.configurators", listener1);
+ config.addListener("AService.configurators", listener2);
+ config.addListener("testapp.tagrouters", listener3);
+ config.addListener("testapp.tagrouters", listener4);
+
+ put("/dubbo/config/AService/configurators", "new value1");
+ Thread.sleep(200);
+ put("/dubbo/config/testapp/tagrouters", "new value2");
+ Thread.sleep(200);
+ put("/dubbo/config/testapp", "new value3");
+
+ Thread.sleep(1000);
+
+ Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ Assertions.assertEquals(1, listener1.getCount("/dubbo/config/AService/configurators"));
+ Assertions.assertEquals(1, listener2.getCount("/dubbo/config/AService/configurators"));
+ Assertions.assertEquals(1, listener3.getCount("/dubbo/config/testapp/tagrouters"));
+ Assertions.assertEquals(1, listener4.getCount("/dubbo/config/testapp/tagrouters"));
+
+ Assertions.assertEquals("new value1", listener1.getValue());
+ Assertions.assertEquals("new value1", listener2.getValue());
+ Assertions.assertEquals("new value2", listener3.getValue());
+ Assertions.assertEquals("new value2", listener4.getValue());
+ }
+
+ private class TestListener implements ConfigurationListener {
+ private CountDownLatch latch;
+ private String value;
+ private Map countMap = new HashMap<>();
+
+ public TestListener(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ public void process(ConfigChangeEvent event) {
+ Integer count = countMap.computeIfAbsent(event.getKey(), k -> 0);
+ countMap.put(event.getKey(), ++count);
+ value = event.getValue();
+ latch.countDown();
+ }
+
+ public int getCount(String key) {
+ return countMap.get(key);
+ }
+
+ public String getValue() {
+ return value;
+ }
+ }
+
+ static void put(String key, String value) {
+ try {
+ etcdClient.getKVClient()
+ .put(ByteSequence.from(key, UTF_8), ByteSequence.from(value, UTF_8))
+ .get();
+ } catch (Exception e) {
+ System.out.println("Error put value to etcd.");
+ }
+ }
+
+ @BeforeAll
+ static void setUp() {
+ etcdClient = Client.builder().endpoints(ENDPOINT).build();
+ // timeout in 15 seconds.
+ URL url = URL.valueOf("etcd3://127.0.0.1:2379/org.apache.dubbo.etcd.testService")
+ .addParameter(Constants.SESSION_TIMEOUT_KEY, 15000);
+ config = new EtcdDynamicConfiguration(url);
+ }
+
+ @AfterAll
+ static void tearDown() {
+ etcdClient.close();
+ }
+}
diff --git a/dubbo-configcenter/pom.xml b/dubbo-configcenter/pom.xml
index fa703be6864..92f727d2321 100644
--- a/dubbo-configcenter/pom.xml
+++ b/dubbo-configcenter/pom.xml
@@ -34,5 +34,6 @@
dubbo-configcenter-zookeeper
dubbo-configcenter-apollo
dubbo-configcenter-consul
+ dubbo-configcenter-etcd
diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
index 504d521aaed..f0d94067d25 100644
--- a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
+++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdRegistry.java
@@ -71,7 +71,7 @@ public class EtcdRegistry extends FailbackRegistry {
private final Set anyServices = new ConcurrentHashSet();
- private final ConcurrentMap> etcdListeners = new ConcurrentHashMap>();
+ private final ConcurrentMap> etcdListeners = new ConcurrentHashMap<>();
private final EtcdClient etcdClient;
private long expirePeriod;
@@ -86,14 +86,12 @@ public EtcdRegistry(URL url, EtcdTransporter etcdTransporter) {
}
this.root = group;
etcdClient = etcdTransporter.connect(url);
- etcdClient.addStateListener(new StateListener() {
- public void stateChanged(int state) {
- if (state == CONNECTED) {
- try {
- recover();
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
+ etcdClient.addStateListener(state -> {
+ if (state == StateListener.CONNECTED) {
+ try {
+ recover();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
}
}
});
@@ -345,7 +343,7 @@ protected List toUnsubscribedPath(URL url) {
}
protected List toUrlsWithoutEmpty(URL consumer, List providers) {
- List urls = new ArrayList();
+ List urls = new ArrayList<>();
if (providers != null && providers.size() > 0) {
for (String provider : providers) {
provider = URL.decode(provider);
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
index b1e765d3416..286be934469 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java
@@ -164,4 +164,20 @@ public long createLease(long ttl, long timeout, TimeUnit unit)
*/
void revokeLease(long lease);
+
+ /**
+ * Get the value of the specified key.
+ * @param key the specified key
+ * @return null if the value is not found
+ */
+ String getKVValue(String key);
+
+ /**
+ * Put the key value pair to etcd
+ * @param key the specified key
+ * @param value the paired value
+ * @return true if put success
+ */
+ boolean put(String key, String value);
+
}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java
new file mode 100644
index 00000000000..788aa401e9b
--- /dev/null
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/ConnectionStateListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.remoting.etcd.jetcd;
+
+import io.etcd.jetcd.Client;
+
+public interface ConnectionStateListener {
+
+ /**
+ * Called when there is a state change in the connection
+ *
+ * @param client the client
+ * @param newState the new state
+ */
+ void stateChanged(Client client, int newState);
+}
\ No newline at end of file
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
index d07cad06405..ff4c118b2a3 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.remoting.etcd.jetcd;
+import io.grpc.ManagedChannel;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
@@ -185,6 +186,20 @@ public void doClose() {
}
}
+ @Override
+ public String getKVValue(String key) {
+ return clientWrapper.getKVValue(key);
+ }
+
+ @Override
+ public boolean put(String key, String value) {
+ return clientWrapper.put(key, value);
+ }
+
+ public ManagedChannel getChannel() {
+ return clientWrapper.getChannel();
+ }
+
public class EtcdWatcher implements StreamObserver {
protected WatchGrpc.WatchStub watchStub;
@@ -233,12 +248,7 @@ public void onNext(WatchResponse response) {
}
}
if (modified > 0) {
- notifyExecutor.execute(new Runnable() {
- @Override
- public void run() {
- listener.childChanged(path, new ArrayList<>(urls));
- }
- });
+ notifyExecutor.execute(() -> listener.childChanged(path, new ArrayList<>(urls)));
}
}
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
index 8515b617ae9..c7f472d4011 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.remoting.etcd.jetcd;
+import io.etcd.jetcd.kv.PutResponse;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
@@ -31,9 +32,11 @@
import io.etcd.jetcd.Client;
import io.etcd.jetcd.ClientBuilder;
import io.etcd.jetcd.CloseableClient;
+import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Observers;
import io.etcd.jetcd.common.exception.ErrorCode;
import io.etcd.jetcd.common.exception.EtcdException;
+import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
@@ -87,7 +90,8 @@ public class JEtcdClientWrapper {
private RuntimeException failed;
private final ScheduledFuture> retryFuture;
- private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Etcd3RegistryKeepAliveFailedRetryTimer", true));
+ private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1,
+ new NamedThreadFactory("Etcd3RegistryKeepAliveFailedRetryTimer", true));
private final Set failedRegistered = new ConcurrentHashSet();
@@ -117,28 +121,26 @@ public JEtcdClientWrapper(URL url) {
this.failed = new IllegalStateException("Etcd3 registry is not connected yet, url:" + url);
int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
- this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
- public void run() {
- try {
- retry();
- } catch (Throwable t) {
- logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
- }
+ this.retryFuture = retryExecutor.scheduleWithFixedDelay(() -> {
+ try {
+ retry();
+ } catch (Throwable t) {
+ logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
}
}, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
private Client prepareClient(URL url) {
- int maxInboudSize = DEFAULT_INBOUT_SIZE;
- if (StringUtils.isNotEmpty(System.getProperty(GRPC_MAX_INBOUD_SIZE_KEY))) {
- maxInboudSize = Integer.valueOf(System.getProperty(GRPC_MAX_INBOUD_SIZE_KEY));
+ int maxInboundSize = DEFAULT_INBOUND_SIZE;
+ if (StringUtils.isNotEmpty(System.getProperty(GRPC_MAX_INBOUND_SIZE_KEY))) {
+ maxInboundSize = Integer.valueOf(System.getProperty(GRPC_MAX_INBOUND_SIZE_KEY));
}
ClientBuilder clientBuilder = Client.builder()
.loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
.endpoints(endPoints(url.getBackupAddress()))
- .maxInboundMessageSize(maxInboudSize);
+ .maxInboundMessageSize(maxInboundSize);
return clientBuilder.build();
}
@@ -170,29 +172,26 @@ public ManagedChannel getChannel() {
public List getChildren(String path) {
try {
return RetryLoops.invokeWithRetry(
- new Callable>() {
- @Override
- public List call() throws Exception {
- requiredNotNull(client, failed);
- int len = path.length();
- return client.getKVClient()
- .get(ByteSequence.from(path, UTF_8),
- GetOption.newBuilder().withPrefix(ByteSequence.from(path, UTF_8)).build())
- .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
- .getKvs().stream().parallel()
- .filter(pair -> {
- String key = pair.getKey().toString(UTF_8);
- int index = len, count = 0;
- if (key.length() > len) {
- for (; (index = key.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) {
- if (count++ > 1) break;
- }
+ () -> {
+ requiredNotNull(client, failed);
+ int len = path.length();
+ return client.getKVClient()
+ .get(ByteSequence.from(path, UTF_8),
+ GetOption.newBuilder().withPrefix(ByteSequence.from(path, UTF_8)).build())
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+ .getKvs().stream().parallel()
+ .filter(pair -> {
+ String key = pair.getKey().toString(UTF_8);
+ int index = len, count = 0;
+ if (key.length() > len) {
+ for (; (index = key.indexOf(Constants.PATH_SEPARATOR, index)) != -1; ++index) {
+ if (count++ > 1) break;
}
- return count == 1;
- })
- .map(pair -> pair.getKey().toString(UTF_8))
- .collect(toList());
- }
+ }
+ return count == 1;
+ })
+ .map(pair -> pair.getKey().toString(UTF_8))
+ .collect(toList());
}, retryPolicy);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -207,15 +206,12 @@ public boolean isConnected() {
public long createLease(long second) {
try {
return RetryLoops.invokeWithRetry(
- new Callable() {
- @Override
- public Long call() throws Exception {
- requiredNotNull(client, failed);
- return client.getLeaseClient()
- .grant(second)
- .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
- .getID();
- }
+ () -> {
+ requiredNotNull(client, failed);
+ return client.getLeaseClient()
+ .grant(second)
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+ .getID();
}, retryPolicy);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -225,15 +221,12 @@ public Long call() throws Exception {
public void revokeLease(long lease) {
try {
RetryLoops.invokeWithRetry(
- new Callable() {
- @Override
- public Void call() throws Exception {
- requiredNotNull(client, failed);
- client.getLeaseClient()
- .revoke(lease)
- .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
- return null;
- }
+ (Callable) () -> {
+ requiredNotNull(client, failed);
+ client.getLeaseClient()
+ .revoke(lease)
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ return null;
}, retryPolicy);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -260,15 +253,12 @@ public long createLease(long ttl, long timeout, TimeUnit unit)
public boolean checkExists(String path) {
try {
return RetryLoops.invokeWithRetry(
- new Callable() {
- @Override
- public Boolean call() throws Exception {
- requiredNotNull(client, failed);
- return client.getKVClient()
- .get(ByteSequence.from(path, UTF_8), GetOption.newBuilder().withCountOnly(true).build())
- .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
- .getCount() > 0;
- }
+ () -> {
+ requiredNotNull(client, failed);
+ return client.getKVClient()
+ .get(ByteSequence.from(path, UTF_8), GetOption.newBuilder().withCountOnly(true).build())
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+ .getCount() > 0;
}, retryPolicy);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -281,17 +271,14 @@ public Boolean call() throws Exception {
protected Long find(String path) {
try {
return RetryLoops.invokeWithRetry(
- new Callable() {
- @Override
- public Long call() throws Exception {
- requiredNotNull(client, failed);
- return client.getKVClient()
- .get(ByteSequence.from(path, UTF_8))
- .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
- .getKvs().stream()
- .mapToLong(keyValue -> Long.valueOf(keyValue.getValue().toString(UTF_8)))
- .findFirst().getAsLong();
- }
+ () -> {
+ requiredNotNull(client, failed);
+ return client.getKVClient()
+ .get(ByteSequence.from(path, UTF_8))
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)
+ .getKvs().stream()
+ .mapToLong(keyValue -> Long.valueOf(keyValue.getValue().toString(UTF_8)))
+ .findFirst().getAsLong();
}, retryPolicy);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -301,16 +288,13 @@ public Long call() throws Exception {
public void createPersistent(String path) {
try {
RetryLoops.invokeWithRetry(
- new Callable() {
- @Override
- public Void call() throws Exception {
- requiredNotNull(client, failed);
- client.getKVClient()
- .put(ByteSequence.from(path, UTF_8),
- ByteSequence.from(String.valueOf(path.hashCode()), UTF_8))
- .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
- return null;
- }
+ (Callable) () -> {
+ requiredNotNull(client, failed);
+ client.getKVClient()
+ .put(ByteSequence.from(path, UTF_8),
+ ByteSequence.from(String.valueOf(path.hashCode()), UTF_8))
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ return null;
}, retryPolicy);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -328,21 +312,18 @@ public Void call() throws Exception {
public long createEphemeral(String path) {
try {
return RetryLoops.invokeWithRetry(
- new Callable() {
- @Override
- public Long call() throws Exception {
- requiredNotNull(client, failed);
-
- registeredPaths.add(path);
- keepAlive();
- final long leaseId = globalLeaseId;
- client.getKVClient()
- .put(ByteSequence.from(path, UTF_8)
- , ByteSequence.from(String.valueOf(leaseId), UTF_8)
- , PutOption.newBuilder().withLeaseId(leaseId).build())
- .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
- return leaseId;
- }
+ () -> {
+ requiredNotNull(client, failed);
+
+ registeredPaths.add(path);
+ keepAlive();
+ final long leaseId = globalLeaseId;
+ client.getKVClient()
+ .put(ByteSequence.from(path, UTF_8)
+ , ByteSequence.from(String.valueOf(leaseId), UTF_8)
+ , PutOption.newBuilder().withLeaseId(leaseId).build())
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ return leaseId;
}, retryPolicy);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -354,6 +335,7 @@ public void keepAlive(long lease) {
this.keepAlive(lease, null);
}
+ @SuppressWarnings("unchecked")
private void keepAlive(long lease, Consumer onFailed) {
final StreamObserver observer = new Observers.Builder()
.onError((e) -> {
@@ -471,16 +453,13 @@ private void recovery() {
public void delete(String path) {
try {
RetryLoops.invokeWithRetry(
- new Callable() {
- @Override
- public Void call() throws Exception {
- requiredNotNull(client, failed);
- client.getKVClient()
- .delete(ByteSequence.from(path, UTF_8))
- .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
- registeredPaths.remove(path);
- return null;
- }
+ (Callable) () -> {
+ requiredNotNull(client, failed);
+ client.getKVClient()
+ .delete(ByteSequence.from(path, UTF_8))
+ .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ registeredPaths.remove(path);
+ return null;
}, retryPolicy);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -494,13 +473,13 @@ public Void call() throws Exception {
public String[] endPoints(String backupAddress) {
String[] endpoints = backupAddress.split(Constants.COMMA_SEPARATOR);
- List addressess = Arrays.stream(endpoints)
- .map(address -> address.indexOf(Constants.HTTP_SUBFIX_KEY) > -1
+ List addresses = Arrays.stream(endpoints)
+ .map(address -> address.contains(Constants.HTTP_SUBFIX_KEY)
? address
: Constants.HTTP_KEY + address)
.collect(toList());
- Collections.shuffle(addressess);
- return addressess.toArray(new String[0]);
+ Collections.shuffle(addresses);
+ return addresses.toArray(new String[0]);
}
/**
@@ -527,26 +506,22 @@ public void start() {
}
try {
- this.future = reconnectNotify.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- boolean connected = isConnected();
- if (connectState != connected) {
- int notifyState = connected ? StateListener.CONNECTED : StateListener.DISCONNECTED;
- if (connectionStateListener != null) {
- try {
- if (connected) {
- clearKeepAlive();
- }
- connectionStateListener.stateChanged(getClient(), notifyState);
- } finally {
- cancelKeepAlive = false;
+ this.future = reconnectNotify.scheduleWithFixedDelay(() -> {
+ boolean connected = isConnected();
+ if (connectState != connected) {
+ int notifyState = connected ? StateListener.CONNECTED : StateListener.DISCONNECTED;
+ if (connectionStateListener != null) {
+ try {
+ if (connected) {
+ clearKeepAlive();
}
+ connectionStateListener.stateChanged(getClient(), notifyState);
+ } finally {
+ cancelKeepAlive = false;
}
- connectState = connected;
}
+ connectState = connected;
}
-
}, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
logger.error("monitor reconnect status failed.", t);
@@ -575,7 +550,9 @@ protected void doClose() {
try {
cancelKeepAlive = true;
- revokeLease(this.globalLeaseId);
+ if (globalLeaseId > 0) {
+ revokeLease(this.globalLeaseId);
+ }
} catch (Exception e) {
logger.warn("revoke global lease '" + globalLeaseId + "' failed, registry: " + url, e);
}
@@ -638,6 +615,41 @@ public static void requiredNotNull(Object obj, RuntimeException exeception) {
}
}
+ public String getKVValue(String key) {
+ if (null == key) {
+ return null;
+ }
+
+ CompletableFuture responseFuture = this.client.getKVClient().get(ByteSequence.from(key, UTF_8));
+
+ try {
+ List result = responseFuture.get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS).getKvs();
+ if (!result.isEmpty()) {
+ return result.get(0).getValue().toString(UTF_8);
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+
+ return null;
+ }
+
+
+ public boolean put(String key, String value) {
+ if (key == null || value == null) {
+ return false;
+ }
+ CompletableFuture putFuture =
+ this.client.getKVClient().put(ByteSequence.from(key, UTF_8), ByteSequence.from(value, UTF_8));
+ try {
+ putFuture.get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS);
+ return true;
+ } catch (Exception e) {
+ // ignore
+ }
+ return false;
+ }
+
private void retry() {
if (!failedRegistered.isEmpty()) {
Set failed = new HashSet(failedRegistered);
@@ -679,24 +691,14 @@ private void retry() {
}
}
- public interface ConnectionStateListener {
- /**
- * Called when there is a state change in the connection
- *
- * @param client the client
- * @param newState the new state
- */
- public void stateChanged(Client client, int newState);
- }
-
/**
* default request timeout
*/
public static final long DEFAULT_REQUEST_TIMEOUT = obtainRequestTimeout();
- public static final int DEFAULT_INBOUT_SIZE = 100 * 1024 * 1024;
+ public static final int DEFAULT_INBOUND_SIZE = 100 * 1024 * 1024;
- public static final String GRPC_MAX_INBOUD_SIZE_KEY = "grpc.max.inbound.size";
+ public static final String GRPC_MAX_INBOUND_SIZE_KEY = "grpc.max.inbound.size";
public static final String ETCD_REQUEST_TIMEOUT_KEY = "etcd.request.timeout";
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java
index 31752bffe8f..5fecd14d395 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/support/AbstractEtcdClient.java
@@ -57,7 +57,7 @@ public abstract class AbstractEtcdClient implements EtcdClient
private final Set stateListeners = new ConcurrentHashSet<>();
- private final ConcurrentMap> childListeners = new ConcurrentHashMap>();
+ private final ConcurrentMap> childListeners = new ConcurrentHashMap<>();
private final List categroies = Arrays.asList(Constants.PROVIDERS_CATEGORY
, Constants.CONSUMERS_CATEGORY
, Constants.ROUTERS_CATEGORY
@@ -99,7 +99,7 @@ public Set getSessionListeners() {
public List addChildListener(String path, final ChildListener listener) {
ConcurrentMap listeners = childListeners.get(path);
if (listeners == null) {
- childListeners.putIfAbsent(path, new ConcurrentHashMap());
+ childListeners.putIfAbsent(path, new ConcurrentHashMap<>());
listeners = childListeners.get(path);
}
WatcherListener targetListener = listeners.get(listener);
diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java
index 19254abeac7..9674feec35d 100644
--- a/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java
+++ b/dubbo-remoting/dubbo-remoting-etcd3/src/test/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientTest.java
@@ -33,8 +33,21 @@
*/
package org.apache.dubbo.remoting.etcd.jetcd;
+import com.google.protobuf.ByteString;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.Watch;
+import io.etcd.jetcd.api.Event;
+import io.etcd.jetcd.api.WatchCancelRequest;
+import io.etcd.jetcd.api.WatchCreateRequest;
+import io.etcd.jetcd.api.WatchGrpc;
+import io.etcd.jetcd.api.WatchRequest;
+import io.etcd.jetcd.api.WatchResponse;
import io.etcd.jetcd.common.exception.ClosedClientException;
+import io.etcd.jetcd.watch.WatchEvent;
+import io.grpc.ManagedChannel;
import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.remoting.etcd.ChildListener;
@@ -44,8 +57,13 @@
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
@Disabled
public class JEtcdClientTest {
@@ -75,6 +93,139 @@ public void test_watch_when_create_path() throws InterruptedException {
client.delete(child);
}
+ @Test
+ public void test_watch_when_modify() {
+ String path = "/dubbo/config/jetcd-client-unit-test/configurators";
+ String endpoint = "http://127.0.0.1:2379";
+ CountDownLatch latch = new CountDownLatch(1);
+ ByteSequence key = ByteSequence.from(path, UTF_8);
+
+ Watch.Listener listener = Watch.listener(response -> {
+ for (WatchEvent event : response.getEvents()) {
+ Assertions.assertEquals("PUT", event.getEventType().toString());
+ Assertions.assertEquals(path, event.getKeyValue().getKey().toString(UTF_8));
+ Assertions.assertEquals("Hello", event.getKeyValue().getValue().toString(UTF_8));
+ latch.countDown();
+ }
+
+ });
+
+ try (Client client = Client.builder().endpoints(endpoint).build();
+ Watch watch = client.getWatchClient();
+ Watch.Watcher watcher = watch.watch(key, listener)) {
+ // try to modify the key
+ client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
+ latch.await();
+ } catch (Exception e) {
+ Assertions.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testWatchWithGrpc() {
+ String path = "/dubbo/config/test_watch_with_grpc/configurators";
+ String endpoint = "http://127.0.0.1:2379";
+ CountDownLatch latch = new CountDownLatch(1);
+ try (Client client = Client.builder().endpoints(endpoint).build()) {
+ ManagedChannel channel = getChannel(client);
+ StreamObserver observer = WatchGrpc.newStub(channel).watch(new StreamObserver() {
+ @Override
+ public void onNext(WatchResponse response) {
+ for (Event event : response.getEventsList()) {
+ Assertions.assertEquals("PUT", event.getType().toString());
+ Assertions.assertEquals(path, event.getKv().getKey().toString(UTF_8));
+ Assertions.assertEquals("Hello", event.getKv().getValue().toString(UTF_8));
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+
+ }
+
+ @Override
+ public void onCompleted() {
+
+ }
+ });
+ WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
+ .setKey(ByteString.copyFrom(path, UTF_8));
+
+ observer.onNext(WatchRequest.newBuilder().setCreateRequest(builder).build());
+
+ // try to modify the key
+ client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
+ latch.await(5, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ Assertions.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCancelWatchWithGrpc() {
+ String path = "/dubbo/config/testCancelWatchWithGrpc/configurators";
+ String endpoint = "http://127.0.0.1:2379";
+ CountDownLatch updateLatch = new CountDownLatch(1);
+ CountDownLatch cancelLatch = new CountDownLatch(1);
+ final AtomicLong watchID = new AtomicLong(-1L);
+ try (Client client = Client.builder().endpoints(endpoint).build()) {
+ ManagedChannel channel = getChannel(client);
+ StreamObserver observer = WatchGrpc.newStub(channel).watch(new StreamObserver() {
+ @Override
+ public void onNext(WatchResponse response) {
+ watchID.set(response.getWatchId());
+ for (Event event : response.getEventsList()) {
+ Assertions.assertEquals("PUT", event.getType().toString());
+ Assertions.assertEquals(path, event.getKv().getKey().toString(UTF_8));
+ Assertions.assertEquals("Hello", event.getKv().getValue().toString(UTF_8));
+ updateLatch.countDown();
+ }
+ if (response.getCanceled()) {
+ // received the cancel response
+ cancelLatch.countDown();
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+
+ }
+
+ @Override
+ public void onCompleted() {
+
+ }
+ });
+ // create
+ WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder()
+ .setKey(ByteString.copyFrom(path, UTF_8));
+
+ // make the grpc call to watch the key
+ observer.onNext(WatchRequest.newBuilder().setCreateRequest(builder).build());
+
+ // try to put the value
+ client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello", UTF_8));
+
+ // response received, latch counts down to zero
+ updateLatch.await();
+
+ WatchCancelRequest watchCancelRequest =
+ WatchCancelRequest.newBuilder().setWatchId(watchID.get()).build();
+ WatchRequest cancelRequest = WatchRequest.newBuilder()
+ .setCancelRequest(watchCancelRequest).build();
+ observer.onNext(cancelRequest);
+
+ // try to put the value
+ client.getKVClient().put(ByteSequence.from(path, UTF_8), ByteSequence.from("Hello world", UTF_8));
+
+ cancelLatch.await();
+ } catch (Exception e) {
+ Assertions.fail(e.getMessage());
+ }
+
+ }
+
@Test
public void test_watch_when_create_wrong_path() throws InterruptedException {
@@ -257,4 +408,19 @@ synchronized int increaseAndGet() {
return ++value;
}
}
+
+ private ManagedChannel getChannel(Client client) {
+ try {
+ // hack, use reflection to get the shared channel.
+ Field connectionField = client.getClass().getDeclaredField("connectionManager");
+ connectionField.setAccessible(true);
+ Object connection = connectionField.get(client);
+ Method channelMethod = connection.getClass().getDeclaredMethod("getChannel");
+ channelMethod.setAccessible(true);
+ ManagedChannel channel = (ManagedChannel) channelMethod.invoke(connection);
+ return channel;
+ } catch (Exception e) {
+ return null;
+ }
+ }
}