From 072ecc72dff5170cbe97599b9ba170c169e48ffd Mon Sep 17 00:00:00 2001 From: mercyblitz Date: Fri, 23 Aug 2019 02:07:09 +0800 Subject: [PATCH] Polish apache/dubbo#4713 : Add Service registration and discovery implementation for Eureka --- .../client/ServiceDiscoveryRegistry.java | 10 ++- .../ServiceInstanceMetadataUtils.java | 16 ++++- .../eureka/EurekaServiceDiscovery.java | 64 ++++++++++++++++++- 3 files changed, 86 insertions(+), 4 deletions(-) diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java index 0a39fb13496..4eac6a780fa 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java @@ -29,6 +29,7 @@ import org.apache.dubbo.registry.Registry; import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent; import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener; +import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils; import org.apache.dubbo.registry.client.metadata.proxy.MetadataServiceProxyFactory; import org.apache.dubbo.registry.client.selector.ServiceInstanceSelector; import org.apache.dubbo.registry.support.FailbackRegistry; @@ -112,7 +113,13 @@ public ServiceDiscoveryRegistry(URL registryURL) { this.writableMetadataService = WritableMetadataService.getExtension(metadataStorageType); } - protected Set getSubscribedServices(URL registryURL) { + /** + * Get the subscribed services from the specified registry {@link URL url} + * + * @param registryURL the specified registry {@link URL url} + * @return non-null + */ + public static Set getSubscribedServices(URL registryURL) { String subscribedServiceNames = registryURL.getParameter(SUBSCRIBED_SERVICE_NAMES_KEY); return isBlank(subscribedServiceNames) ? emptySet() : unmodifiableSet(of(subscribedServiceNames.split(",")) @@ -307,6 +314,7 @@ private List getSubscribedURLs(URL subscribedURL, Collection serviceInstances = instances.stream() .filter(ServiceInstance::isEnabled) .filter(ServiceInstance::isHealthy) + .filter(ServiceInstanceMetadataUtils::isDubboServiceInstance) .collect(Collectors.toList()); /** diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java index 1d7fca9531c..55cdfd1d2d3 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java @@ -208,14 +208,26 @@ public static String getMetadataStorageType(ServiceInstance serviceInstance) { /** * Set the metadata storage type in specified {@link ServiceInstance service instance} * - * @param serviceInstance {@link ServiceInstance service instance} - * @param metadataType remote or local + * @param serviceInstance {@link ServiceInstance service instance} + * @param metadataType remote or local */ public static void setMetadataStorageType(ServiceInstance serviceInstance, String metadataType) { Map metadata = serviceInstance.getMetadata(); metadata.put(METADATA_STORAGE_TYPE_KEY, metadataType); } + /** + * Is Dubbo Service instance or not + * + * @param serviceInstance {@link ServiceInstance service instance} + * @return if Dubbo Service instance, return true, or false + */ + public static boolean isDubboServiceInstance(ServiceInstance serviceInstance) { + Map metadata = serviceInstance.getMetadata(); + return metadata.containsKey(METADATA_SERVICE_URL_PARAMS_KEY) + || metadata.containsKey(METADATA_SERVICE_URLS_PROPERTY_NAME); + } + private static void setProviderHostParam(Map params, URL providerURL) { params.put(HOST_PARAM_NAME, providerURL.getHost()); } diff --git a/dubbo-registry/dubbo-registry-eureka/src/main/java/org/apache/dubbo/registry/eureka/EurekaServiceDiscovery.java b/dubbo-registry/dubbo-registry-eureka/src/main/java/org/apache/dubbo/registry/eureka/EurekaServiceDiscovery.java index ea482e4b558..07fc37d50b8 100644 --- a/dubbo-registry/dubbo-registry-eureka/src/main/java/org/apache/dubbo/registry/eureka/EurekaServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-eureka/src/main/java/org/apache/dubbo/registry/eureka/EurekaServiceDiscovery.java @@ -17,18 +17,22 @@ package org.apache.dubbo.registry.eureka; import org.apache.dubbo.common.URL; +import org.apache.dubbo.event.EventDispatcher; import org.apache.dubbo.registry.client.DefaultServiceInstance; import org.apache.dubbo.registry.client.ServiceDiscovery; import org.apache.dubbo.registry.client.ServiceInstance; +import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent; import com.netflix.appinfo.ApplicationInfoManager; import com.netflix.appinfo.EurekaInstanceConfig; import com.netflix.appinfo.InstanceInfo; import com.netflix.config.ConfigurationManager; +import com.netflix.discovery.CacheRefreshedEvent; import com.netflix.discovery.DefaultEurekaClientConfig; import com.netflix.discovery.DiscoveryClient; import com.netflix.discovery.EurekaClient; import com.netflix.discovery.EurekaClientConfig; +import com.netflix.discovery.EurekaEvent; import com.netflix.discovery.shared.Application; import com.netflix.discovery.shared.Applications; @@ -37,24 +41,37 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; import static java.util.Collections.emptyList; +import static org.apache.dubbo.event.EventDispatcher.getDefaultExtension; +import static org.apache.dubbo.registry.client.ServiceDiscoveryRegistry.getSubscribedServices; /** * Eureka {@link ServiceDiscovery} implementation based on Eureka API */ public class EurekaServiceDiscovery implements ServiceDiscovery { + private final EventDispatcher eventDispatcher = getDefaultExtension(); + private ApplicationInfoManager applicationInfoManager; private EurekaClient eurekaClient; + private Set subscribedServices; + + /** + * last apps hash code is used to identify the {@link Applications} is changed or not + */ + private String lastAppsHashCode; + @Override public void initialize(URL registryURL) throws Exception { Properties eurekaConfigProperties = buildEurekaConfigProperties(registryURL); initConfigurationManager(eurekaConfigProperties); + initSubscribedServices(registryURL); } /** @@ -76,6 +93,15 @@ private Properties buildEurekaConfigProperties(URL registryURL) { return properties; } + /** + * Initialize {@link #subscribedServices} property + * + * @param registryURL the {@link URL url} to connect Eureka + */ + private void initSubscribedServices(URL registryURL) { + this.subscribedServices = getSubscribedServices(registryURL); + } + private boolean filterEurekaProperty(Map.Entry propertyEntry) { String propertyName = propertyEntry.getKey(); return propertyName.startsWith("eureka."); @@ -129,8 +155,44 @@ private void initEurekaClient(ServiceInstance serviceInstance) { return; } initApplicationInfoManager(serviceInstance); + EurekaClient eurekaClient = createEurekaClient(); + registerEurekaEventListener(eurekaClient); + // set eurekaClient + this.eurekaClient = eurekaClient; + } + + private void registerEurekaEventListener(EurekaClient eurekaClient) { + eurekaClient.registerEventListener(this::onEurekaEvent); + } + + private void onEurekaEvent(EurekaEvent event) { + if (event instanceof CacheRefreshedEvent) { + onCacheRefreshedEvent(CacheRefreshedEvent.class.cast(event)); + } + } + + private void onCacheRefreshedEvent(CacheRefreshedEvent event) { + synchronized (this) { // Make sure thread-safe in async execution + Applications applications = eurekaClient.getApplications(); + String appsHashCode = applications.getAppsHashCode(); + if (!Objects.equals(lastAppsHashCode, appsHashCode)) { // Changed + // Dispatch Events + dispatchServiceInstancesChangedEvent(); + lastAppsHashCode = appsHashCode; // update current result + } + } + } + + private void dispatchServiceInstancesChangedEvent() { + subscribedServices.forEach(serviceName -> { + eventDispatcher.dispatch(new ServiceInstancesChangedEvent(serviceName, getInstances(serviceName))); + }); + } + + private EurekaClient createEurekaClient() { EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig(); - this.eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig); + DiscoveryClient eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig); + return eurekaClient; } @Override