From fb68f00ea0421e723ac46447cfcba7dfaafc1beb Mon Sep 17 00:00:00 2001 From: Anmol Khurana Date: Thu, 11 Aug 2022 11:43:41 -0700 Subject: [PATCH 1/3] Add User Queue Level Routing --- baseapp/pom.xml | 2 +- gateway-ha/pom.xml | 2 +- .../clustermonitor/ActiveClusterMonitor.java | 75 +++++++++--- .../ha/clustermonitor/ClusterStats.java | 3 + .../PrestoQueueLengthChecker.java | 12 +- .../handler/QueryIdCachingProxyHandler.java | 7 +- .../router/PrestoQueueLengthRoutingTable.java | 63 ++++++++-- .../gateway/ha/router/RoutingManager.java | 6 +- .../TestPrestoQueueLengthRoutingTable.java | 111 +++++++++++++++++- pom.xml | 2 +- proxyserver/pom.xml | 2 +- 11 files changed, 249 insertions(+), 36 deletions(-) diff --git a/baseapp/pom.xml b/baseapp/pom.xml index 148eec29..97b74b4c 100644 --- a/baseapp/pom.xml +++ b/baseapp/pom.xml @@ -7,7 +7,7 @@ com.lyft.data prestogateway-parent - 1.8.9 + 1.8.10-SNAPSHOT ../ diff --git a/gateway-ha/pom.xml b/gateway-ha/pom.xml index 4fe7c473..cb6648b9 100644 --- a/gateway-ha/pom.xml +++ b/gateway-ha/pom.xml @@ -8,7 +8,7 @@ com.lyft.data prestogateway-parent - 1.8.9 + 1.8.10-SNAPSHOT ../ diff --git a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/ActiveClusterMonitor.java b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/ActiveClusterMonitor.java index a52aeab1..1099ba34 100644 --- a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/ActiveClusterMonitor.java +++ b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/ActiveClusterMonitor.java @@ -1,7 +1,9 @@ package com.lyft.data.gateway.ha.clustermonitor; +import static com.lyft.data.gateway.ha.handler.QueryIdCachingProxyHandler.UI_API_QUEUED_LIST_PATH; import static com.lyft.data.gateway.ha.handler.QueryIdCachingProxyHandler.UI_API_STATS_PATH; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.lyft.data.gateway.ha.config.MonitorConfiguration; @@ -16,6 +18,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -31,6 +34,8 @@ public class ActiveClusterMonitor implements Managed { public static final int MONITOR_TASK_DELAY_MIN = 1; public static final int DEFAULT_THREAD_POOL_SIZE = 10; + private static final String SESSION_USER = "sessionUser"; + private final List clusterStatsObservers; private final GatewayBackendManager gatewayBackendManager; private final int connectionTimeout; @@ -94,10 +99,7 @@ public void start() { }); } - private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) { - ClusterStats clusterStats = new ClusterStats(); - clusterStats.setClusterId(backend.getName()); - String target = backend.getProxyTo() + UI_API_STATS_PATH; + private String queryCluster(String target) { HttpURLConnection conn = null; try { URL url = new URL(target); @@ -108,22 +110,15 @@ private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) { conn.connect(); int responseCode = conn.getResponseCode(); if (responseCode == HttpStatus.SC_OK) { - clusterStats.setHealthy(true); BufferedReader reader = - new BufferedReader(new InputStreamReader((InputStream) conn.getContent())); + new BufferedReader(new InputStreamReader((InputStream) conn.getContent())); StringBuilder sb = new StringBuilder(); String line; while ((line = reader.readLine()) != null) { sb.append(line + "\n"); } - HashMap result = OBJECT_MAPPER.readValue(sb.toString(), HashMap.class); - clusterStats.setNumWorkerNodes((int) result.get("activeWorkers")); - clusterStats.setQueuedQueryCount((int) result.get("queuedQueries")); - clusterStats.setRunningQueryCount((int) result.get("runningQueries")); - clusterStats.setBlockedQueryCount((int) result.get("blockedQueries")); - clusterStats.setProxyTo(backend.getProxyTo()); - clusterStats.setExternalUrl(backend.getExternalUrl()); - clusterStats.setRoutingGroup(backend.getRoutingGroup()); + + return sb.toString(); } else { log.warn("Received non 200 response, response code: {}", responseCode); } @@ -134,6 +129,58 @@ private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) { conn.disconnect(); } } + return null; + } + + private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) { + ClusterStats clusterStats = new ClusterStats(); + clusterStats.setClusterId(backend.getName()); + + // Fetch Cluster level Stats. + String target = backend.getProxyTo() + UI_API_STATS_PATH; + String response = queryCluster(target); + if (response == null) { + log.error("Received null response for {}", target); + return clusterStats; + } + clusterStats.setHealthy(true); + try { + HashMap result = null; + result = OBJECT_MAPPER.readValue(response, HashMap.class); + + clusterStats.setNumWorkerNodes((int) result.get("activeWorkers")); + clusterStats.setQueuedQueryCount((int) result.get("queuedQueries")); + clusterStats.setRunningQueryCount((int) result.get("runningQueries")); + clusterStats.setBlockedQueryCount((int) result.get("blockedQueries")); + clusterStats.setProxyTo(backend.getProxyTo()); + clusterStats.setExternalUrl(backend.getExternalUrl()); + clusterStats.setRoutingGroup(backend.getRoutingGroup()); + + } catch (Exception e) { + log.error("Error parsing cluster stats from [{}]", response, e); + } + + // Fetch User Level Stats. + Map clusterUserStats = new HashMap<>(); + target = backend.getProxyTo() + UI_API_QUEUED_LIST_PATH; + response = queryCluster(target); + if (response == null) { + log.error("Received null response for {}", target); + return clusterStats; + } + try { + List> queries = OBJECT_MAPPER.readValue(response, + new TypeReference>>(){}); + + for (Map q : queries) { + String user = (String) q.get(SESSION_USER); + clusterUserStats.put(user, clusterUserStats.getOrDefault(user, 0) + 1); + } + } catch (Exception e) { + log.error("Error parsing cluster user stats: {}", e); + } + clusterStats.setUserQueuedCount(clusterUserStats); + return clusterStats; } diff --git a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/ClusterStats.java b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/ClusterStats.java index f241f35c..807693d1 100644 --- a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/ClusterStats.java +++ b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/ClusterStats.java @@ -1,5 +1,7 @@ package com.lyft.data.gateway.ha.clustermonitor; +import java.util.Map; + import lombok.Data; import lombok.ToString; @@ -15,4 +17,5 @@ public class ClusterStats { private String proxyTo; private String externalUrl; private String routingGroup; + private Map userQueuedCount; } diff --git a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/PrestoQueueLengthChecker.java b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/PrestoQueueLengthChecker.java index bdb51d34..ab3a28ba 100644 --- a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/PrestoQueueLengthChecker.java +++ b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/PrestoQueueLengthChecker.java @@ -22,6 +22,8 @@ public void observe(List stats) { Map> clusterQueueMap = new HashMap>(); Map> clusterRunningMap = new HashMap>(); + Map> userClusterQueuedCount + = new HashMap<>(); for (ClusterStats stat : stats) { if (!clusterQueueMap.containsKey(stat.getRoutingGroup())) { @@ -43,8 +45,16 @@ public void observe(List stats) { clusterRunningMap.get(stat.getRoutingGroup()).put(stat.getClusterId(), stat.getRunningQueryCount()); } + + // Create inverse map from user -> {cluster-> count} + for (Map.Entry queueCount : stat.getUserQueuedCount().entrySet()) { + Map clusterQueue = userClusterQueuedCount.getOrDefault(queueCount.getKey(), + new HashMap<>()); + clusterQueue.put(stat.getClusterId(), queueCount.getValue()); + userClusterQueuedCount.put(queueCount.getKey(), clusterQueue); + } } - routingManager.updateRoutingTable(clusterQueueMap, clusterRunningMap); + routingManager.updateRoutingTable(clusterQueueMap, clusterRunningMap, userClusterQueuedCount); } } diff --git a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/handler/QueryIdCachingProxyHandler.java b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/handler/QueryIdCachingProxyHandler.java index 0bb5b1cd..9ce04f72 100644 --- a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/handler/QueryIdCachingProxyHandler.java +++ b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/handler/QueryIdCachingProxyHandler.java @@ -35,6 +35,7 @@ public class QueryIdCachingProxyHandler extends ProxyHandler { public static final String V1_QUERY_PATH = "/v1/query"; public static final String V1_INFO_PATH = "/v1/info"; public static final String UI_API_STATS_PATH = "/ui/api/stats"; + public static final String UI_API_QUEUED_LIST_PATH = "/ui/api/query?state=QUEUED"; public static final String PRESTO_UI_PATH = "/ui"; public static final String USER_HEADER = "X-Trino-User"; public static final String ALTERNATE_USER_HEADER = "X-Presto-User"; @@ -121,11 +122,13 @@ public String rewriteTarget(HttpServletRequest request) { backendAddress = routingManager.findBackendForQueryId(queryId); } else { String routingGroup = routingGroupSelector.findRoutingGroup(request); + String user = Optional.ofNullable(request.getHeader(USER_HEADER)) + .orElse(request.getHeader(ALTERNATE_USER_HEADER)); if (!Strings.isNullOrEmpty(routingGroup)) { // This falls back on adhoc backend if there are no cluster found for the routing group. - backendAddress = routingManager.provideBackendForRoutingGroup(routingGroup); + backendAddress = routingManager.provideBackendForRoutingGroup(routingGroup, user); } else { - backendAddress = routingManager.provideAdhocBackend(); + backendAddress = routingManager.provideAdhocBackend(user); } } // set target backend so that we could save queryId to backend mapping later. diff --git a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/PrestoQueueLengthRoutingTable.java b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/PrestoQueueLengthRoutingTable.java index 6bec8382..9fd6654e 100644 --- a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/PrestoQueueLengthRoutingTable.java +++ b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/PrestoQueueLengthRoutingTable.java @@ -1,5 +1,6 @@ package com.lyft.data.gateway.ha.router; +import com.google.common.base.Strings; import com.lyft.data.gateway.ha.config.ProxyBackendConfiguration; import java.util.Collection; import java.util.Collections; @@ -14,6 +15,7 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; + import lombok.extern.slf4j.Slf4j; /** @@ -35,6 +37,8 @@ public class PrestoQueueLengthRoutingTable extends HaRoutingManager { private ConcurrentHashMap routingGroupWeightSum; private ConcurrentHashMap> clusterQueueLengthMap; + private ConcurrentHashMap> userClusterQueueLengthMap; + private Map> weightedDistributionRouting; /** @@ -47,6 +51,7 @@ public PrestoQueueLengthRoutingTable(GatewayBackendManager gatewayBackendManager routingGroupWeightSum = new ConcurrentHashMap(); clusterQueueLengthMap = new ConcurrentHashMap>(); weightedDistributionRouting = new HashMap>(); + userClusterQueueLengthMap = new ConcurrentHashMap<>(); } /** @@ -182,7 +187,7 @@ private void computeWeightsBasedOnQueueLength(ConcurrentHashMap backends) { @@ -212,11 +217,23 @@ public void updateRoutingTable(String routingGroup, Set backends) { * Update routing Table with new Queue Lengths. */ public void updateRoutingTable(Map> updatedQueueLengthMap, - Map> updatedRunningLengthMap) { + Map> updatedRunningLengthMap, + Map> updatedUserQueueLengthMap) { synchronized (lockObject) { log.debug("Update Routing table with new cluster queue lengths : [{}]", updatedQueueLengthMap.toString()); clusterQueueLengthMap.clear(); + userClusterQueueLengthMap.clear(); + + if (updatedUserQueueLengthMap != null) { + log.debug("Update user queue sizes:[{}]", updatedUserQueueLengthMap.toString()); + + for (String user : updatedUserQueueLengthMap.keySet()) { + ConcurrentHashMap clusterQueueMap = + new ConcurrentHashMap<>(updatedUserQueueLengthMap.get(user)); + userClusterQueueLengthMap.put(user, clusterQueueMap); + } + } for (String grp : updatedQueueLengthMap.keySet()) { if (grp == null) { @@ -270,7 +287,37 @@ public Map getInternalClusterQueueLength(String routingGroup) { /** * Looks up the closest weight to random number generated for a given routing group. */ - public String getEligibleBackEnd(String routingGroup) { + public String getEligibleBackEnd(String routingGroup, String user) { + + // Route to the least queued backend for the user out of all backends for that group + if (!Strings.isNullOrEmpty(user)) { + Map clusterQueueCountForUser = userClusterQueueLengthMap.get(user); + + if (clusterQueueCountForUser != null && !clusterQueueCountForUser.isEmpty()) { + Set backends = clusterQueueLengthMap.get(routingGroup).keySet(); + String leastQueuedCluster = null; + Integer minQueueCount = Integer.MAX_VALUE; + Integer maxQueueCount = Integer.MIN_VALUE; + for (String b : backends) { + // If missing, we assume no queued queries for the user on that cluster. + Integer queueCount = clusterQueueCountForUser.getOrDefault(b, 0); + + if (queueCount < minQueueCount) { + leastQueuedCluster = b; + minQueueCount = queueCount; + } + if (queueCount > maxQueueCount) { + maxQueueCount = queueCount; + } + } + // If all clusters have the same queue count, then fallback to the older weighted logic. + if (!Strings.isNullOrEmpty(leastQueuedCluster) && minQueueCount != maxQueueCount) { + log.debug("Routing to:{} with userQueueCount:{}", leastQueuedCluster, minQueueCount); + + return leastQueuedCluster; + } + } + } if (routingGroupWeightSum.containsKey(routingGroup) && weightedDistributionRouting.containsKey(routingGroup)) { int rnd = RANDOM.nextInt(routingGroupWeightSum.get(routingGroup)); @@ -285,12 +332,12 @@ public String getEligibleBackEnd(String routingGroup) { * backend is found. */ @Override - public String provideBackendForRoutingGroup(String routingGroup) { + public String provideBackendForRoutingGroup(String routingGroup, String user) { List backends = getGatewayBackendManager().getActiveBackends(routingGroup); if (backends.isEmpty()) { - return provideAdhocBackend(); + return provideAdhocBackend(user); } Map proxyMap = new HashMap<>(); for (ProxyBackendConfiguration backend : backends) { @@ -298,7 +345,7 @@ public String provideBackendForRoutingGroup(String routingGroup) { } updateRoutingTable(routingGroup, proxyMap.keySet()); - String clusterId = getEligibleBackEnd(routingGroup); + String clusterId = getEligibleBackEnd(routingGroup, user); log.debug("Routing to eligible backend : [{}] for routing group: [{}]", clusterId, routingGroup); @@ -318,7 +365,7 @@ public String provideBackendForRoutingGroup(String routingGroup) { *

d. */ @Override - public String provideAdhocBackend() { + public String provideAdhocBackend(String user) { Map proxyMap = new HashMap<>(); List backends = getGatewayBackendManager().getActiveAdhocBackends(); if (backends.size() == 0) { @@ -331,7 +378,7 @@ public String provideAdhocBackend() { updateRoutingTable("adhoc", proxyMap.keySet()); - String clusterId = getEligibleBackEnd("adhoc"); + String clusterId = getEligibleBackEnd("adhoc", user); log.debug("Routing to eligible backend : " + clusterId + " for routing group: adhoc"); if (clusterId != null) { return proxyMap.get(clusterId); diff --git a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/RoutingManager.java b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/RoutingManager.java index 0ae6441c..d7b44f2c 100644 --- a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/RoutingManager.java +++ b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/RoutingManager.java @@ -61,7 +61,7 @@ public void setBackendForQueryId(String queryId, String backend) { * * @return */ - public String provideAdhocBackend() { + public String provideAdhocBackend(String user) { List backends = this.gatewayBackendManager.getActiveAdhocBackends(); if (backends.size() == 0) { throw new IllegalStateException("Number of active backends found zero"); @@ -76,11 +76,11 @@ public String provideAdhocBackend() { * * @return */ - public String provideBackendForRoutingGroup(String routingGroup) { + public String provideBackendForRoutingGroup(String routingGroup, String user) { List backends = gatewayBackendManager.getActiveBackends(routingGroup); if (backends.isEmpty()) { - return provideAdhocBackend(); + return provideAdhocBackend(user); } int backendId = Math.abs(RANDOM.nextInt()) % backends.size(); return backends.get(backendId).getProxyTo(); diff --git a/gateway-ha/src/test/java/com/lyft/data/gateway/ha/router/TestPrestoQueueLengthRoutingTable.java b/gateway-ha/src/test/java/com/lyft/data/gateway/ha/router/TestPrestoQueueLengthRoutingTable.java index c9d38d8d..96ae2e59 100644 --- a/gateway-ha/src/test/java/com/lyft/data/gateway/ha/router/TestPrestoQueueLengthRoutingTable.java +++ b/gateway-ha/src/test/java/com/lyft/data/gateway/ha/router/TestPrestoQueueLengthRoutingTable.java @@ -1,13 +1,17 @@ package com.lyft.data.gateway.ha.router; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.Assert.assertEquals; import com.lyft.data.gateway.ha.HaGatewayTestUtils; import com.lyft.data.gateway.ha.config.DataStoreConfiguration; import com.lyft.data.gateway.ha.config.ProxyBackendConfiguration; import com.lyft.data.gateway.ha.persistence.JdbcConnectionManager; import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.Executors; @@ -25,6 +29,9 @@ public class TestPrestoQueueLengthRoutingTable { QueryHistoryManager historyManager; String[] mockRoutingGroups = {"adhoc", "scheduled"}; String mockRoutingGroup = "adhoc"; + + String mockUser = "user"; + Map> clusterQueueMap; Map> clusterRunningMap; @@ -86,7 +93,7 @@ private void registerBackEndsWithRandomQueueLengths(String groupName, int numBac clusterQueueMap.put(groupName, queueLengths); // Running counts don't matter if queue lengths are random. - routingTable.updateRoutingTable(clusterQueueMap, clusterRunningMap); + routingTable.updateRoutingTable(clusterQueueMap, clusterRunningMap, null); } private void registerBackEnds(String groupName, int numBackends, @@ -108,7 +115,39 @@ private void registerBackEnds(String groupName, int numBackends, clusterQueueMap.put(groupName, queueLengths); clusterRunningMap.put(groupName, runningLengths); - routingTable.updateRoutingTable(clusterQueueMap, clusterRunningMap); + routingTable.updateRoutingTable(clusterQueueMap, clusterRunningMap, null); + } + + + + private void registerBackEndsWithUserQueue(String groupName, int numBackends, + List userQueues) { + + deactiveAllBackends(); + int mockQueueLength = 0; + int mockRunningLength = 0; + String backend; + + Map queueLengths = new HashMap<>(); + Map runningLengths = new HashMap<>(); + Map> userClusterQueue = new HashMap<>(); + + for (int i = 0; i < numBackends; i++) { + backend = groupName + i; + backendManager.activateBackend(backend); + queueLengths.put(backend, mockQueueLength); + runningLengths.put(backend, mockRunningLength); + if (userQueues.size() > i) { + Map userQueueMap = + userClusterQueue.getOrDefault(mockUser, new HashMap<>()); + userQueueMap.put(backend, userQueues.get(i)); + userClusterQueue.put(mockUser, userQueueMap); + } + } + + clusterQueueMap.put(groupName, queueLengths); + clusterRunningMap.put(groupName, runningLengths); + routingTable.updateRoutingTable(clusterQueueMap, clusterRunningMap, userClusterQueue); } private void resetBackends(String groupName, int numBk, @@ -124,7 +163,7 @@ private Map routeQueries(String groupName, int numRequests) { for (int i = 0; i < numRequests; i++) { - eligibleBackend = routingTable.getEligibleBackEnd(groupName); + eligibleBackend = routingTable.getEligibleBackEnd(groupName, null); if (!routingDistribution.containsKey(eligibleBackend)) { routingDistribution.put(eligibleBackend, 1); @@ -197,6 +236,70 @@ public void testRoutingWithSkewedWeightDistribution() { } } + @Test + public void testRoutingWithUserQueuedLength() { + int numBackends = 2; + int queryVolume = 10000; + + // Case 1: All user queue counts Present. + // Validate always routed to cluster with lowest user queue + registerBackEndsWithUserQueue(mockRoutingGroup, numBackends, Arrays.asList(1, 2)); + for (int i = 0; i < queryVolume; i++) { + assertEquals(routingTable.getEligibleBackEnd(mockRoutingGroup, mockUser), + mockRoutingGroup + "0"); + } + + // Case 2: Not all user queue counts Present. + // Validate always routed to cluster with zero queue length i.e. the missing cluster. + registerBackEndsWithUserQueue(mockRoutingGroup, numBackends, Arrays.asList(1)); + for (int i = 0; i < queryVolume; i++) { + assertEquals(routingTable.getEligibleBackEnd(mockRoutingGroup, mockUser), + mockRoutingGroup + "1"); + } + + // Case 3: All user queue counts Present but equal + // Validate equally routed to all clusters. + registerBackEndsWithUserQueue(mockRoutingGroup, numBackends, Arrays.asList(2, 2)); + Map counts = new HashMap<>(); + for (int i = 0; i < queryVolume; i++) { + String cluster = routingTable.getEligibleBackEnd(mockRoutingGroup, mockUser); + counts.put(cluster, counts.getOrDefault(cluster, 0) + 1); + } + double variance = 0.1; + double expectedLowerBound = (queryVolume / numBackends) * (1 - variance); + double expectedUpperBound = (queryVolume / numBackends) * (1 + variance); + + for (Integer c : counts.values()) { + assert c >= expectedLowerBound && c <= expectedUpperBound; + } + + // Case 4: NO user queue lengths present + // Validate equally routed to all clusters. + registerBackEndsWithUserQueue(mockRoutingGroup, numBackends, new ArrayList<>()); + counts = new HashMap<>(); + for (int i = 0; i < queryVolume; i++) { + String cluster = routingTable.getEligibleBackEnd(mockRoutingGroup, mockUser); + counts.put(cluster, counts.getOrDefault(cluster, 0) + 1); + } + for (Integer c : counts.values()) { + assert c >= expectedLowerBound && c <= expectedUpperBound; + } + + // Case 5: Null or empty users + // Validate equally routed to all clusters. + registerBackEndsWithUserQueue(mockRoutingGroup, numBackends, new ArrayList<>()); + counts = new HashMap<>(); + for (int i = 0; i < queryVolume; i++) { + String cluster = routingTable.getEligibleBackEnd(mockRoutingGroup, null); + counts.put(cluster, counts.getOrDefault(cluster, 0) + 1); + } + for (Integer c : counts.values()) { + assert c >= expectedLowerBound && c <= expectedUpperBound; + } + + + } + @Test public void testRoutingWithEqualWeightDistribution() { int queueDistribution = 0; @@ -316,7 +419,7 @@ public void testActiveClusterMonitorUpdateAndRouting() throws InterruptedExcepti } globalToggle.set(!globalToggle.get()); clusterQueueMap.put(mockRoutingGroup, queueLenghts); - routingTable.updateRoutingTable(clusterQueueMap, clusterQueueMap); + routingTable.updateRoutingTable(clusterQueueMap, clusterQueueMap, null); }; resetBackends(mockRoutingGroup, numBk, 0, 0); diff --git a/pom.xml b/pom.xml index 886b7390..9ab09421 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ prestogateway-parent prestogateway-parent pom - 1.8.9 + 1.8.10-SNAPSHOT 1.8 diff --git a/proxyserver/pom.xml b/proxyserver/pom.xml index a4472f93..86934b61 100644 --- a/proxyserver/pom.xml +++ b/proxyserver/pom.xml @@ -8,7 +8,7 @@ com.lyft.data prestogateway-parent - 1.8.9 + 1.8.10-SNAPSHOT ../ From b7b27637d82642d384f12f0ff9aa52aec5b19970 Mon Sep 17 00:00:00 2001 From: Anmol Khurana Date: Mon, 22 Aug 2022 15:23:49 -0700 Subject: [PATCH 2/3] PR comments --- .../gateway/ha/clustermonitor/ActiveClusterMonitor.java | 9 +++++---- .../gateway/ha/router/PrestoQueueLengthRoutingTable.java | 3 ++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/ActiveClusterMonitor.java b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/ActiveClusterMonitor.java index 1099ba34..66dfd203 100644 --- a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/ActiveClusterMonitor.java +++ b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/clustermonitor/ActiveClusterMonitor.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Strings; import com.google.inject.Inject; import com.lyft.data.gateway.ha.config.MonitorConfiguration; import com.lyft.data.gateway.ha.config.ProxyBackendConfiguration; @@ -139,8 +140,8 @@ private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) { // Fetch Cluster level Stats. String target = backend.getProxyTo() + UI_API_STATS_PATH; String response = queryCluster(target); - if (response == null) { - log.error("Received null response for {}", target); + if (Strings.isNullOrEmpty(response)) { + log.error("Received null/empty response for {}", target); return clusterStats; } clusterStats.setHealthy(true); @@ -164,8 +165,8 @@ private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) { Map clusterUserStats = new HashMap<>(); target = backend.getProxyTo() + UI_API_QUEUED_LIST_PATH; response = queryCluster(target); - if (response == null) { - log.error("Received null response for {}", target); + if (Strings.isNullOrEmpty(response)) { + log.error("Received null/empty response for {}", target); return clusterStats; } try { diff --git a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/PrestoQueueLengthRoutingTable.java b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/PrestoQueueLengthRoutingTable.java index 9fd6654e..9a4c377f 100644 --- a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/PrestoQueueLengthRoutingTable.java +++ b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/PrestoQueueLengthRoutingTable.java @@ -285,7 +285,7 @@ public Map getInternalClusterQueueLength(String routingGroup) { } /** - * Looks up the closest weight to random number generated for a given routing group. + * Find the cluster with least user queue else fall back to overall cluster weight based routing. */ public String getEligibleBackEnd(String routingGroup, String user) { @@ -318,6 +318,7 @@ public String getEligibleBackEnd(String routingGroup, String user) { } } } + // Looks up the closest weight to random number generated for a given routing group. if (routingGroupWeightSum.containsKey(routingGroup) && weightedDistributionRouting.containsKey(routingGroup)) { int rnd = RANDOM.nextInt(routingGroupWeightSum.get(routingGroup)); From 9e33f463f128b6dfedf0a87ab911b87e0c874421 Mon Sep 17 00:00:00 2001 From: Anmol Khurana Date: Mon, 29 Aug 2022 14:59:34 -0700 Subject: [PATCH 3/3] Bump version to 1.9.0 --- baseapp/pom.xml | 2 +- gateway-ha/pom.xml | 2 +- .../gateway/ha/router/PrestoQueueLengthRoutingTable.java | 7 +++---- pom.xml | 4 ++-- proxyserver/pom.xml | 2 +- 5 files changed, 8 insertions(+), 9 deletions(-) diff --git a/baseapp/pom.xml b/baseapp/pom.xml index 97b74b4c..29a2805d 100644 --- a/baseapp/pom.xml +++ b/baseapp/pom.xml @@ -7,7 +7,7 @@ com.lyft.data prestogateway-parent - 1.8.10-SNAPSHOT + 1.9.0 ../ diff --git a/gateway-ha/pom.xml b/gateway-ha/pom.xml index cb6648b9..408cb829 100644 --- a/gateway-ha/pom.xml +++ b/gateway-ha/pom.xml @@ -8,7 +8,7 @@ com.lyft.data prestogateway-parent - 1.8.10-SNAPSHOT + 1.9.0 ../ diff --git a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/PrestoQueueLengthRoutingTable.java b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/PrestoQueueLengthRoutingTable.java index 9a4c377f..c99e9eae 100644 --- a/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/PrestoQueueLengthRoutingTable.java +++ b/gateway-ha/src/main/java/com/lyft/data/gateway/ha/router/PrestoQueueLengthRoutingTable.java @@ -226,8 +226,6 @@ public void updateRoutingTable(Map> updatedQueueLen userClusterQueueLengthMap.clear(); if (updatedUserQueueLengthMap != null) { - log.debug("Update user queue sizes:[{}]", updatedUserQueueLengthMap.toString()); - for (String user : updatedUserQueueLengthMap.keySet()) { ConcurrentHashMap clusterQueueMap = new ConcurrentHashMap<>(updatedUserQueueLengthMap.get(user)); @@ -244,7 +242,8 @@ public void updateRoutingTable(Map> updatedQueueLen int maxQueueLen = Collections.max(updatedQueueLengthMap.get(grp).values()); int minQueueLen = Collections.min(updatedQueueLengthMap.get(grp).values()); - if (minQueueLen == maxQueueLen && updatedQueueLengthMap.get(grp).size() > 1) { + if (minQueueLen == maxQueueLen && updatedQueueLengthMap.get(grp).size() > 1 + && updatedRunningLengthMap.containsKey(grp)) { log.info("Queue lengths equal: {} for all clusters in the group {}." + " Falling back to Running Counts : {}", maxQueueLen, grp, updatedRunningLengthMap.get(grp)); @@ -312,7 +311,7 @@ public String getEligibleBackEnd(String routingGroup, String user) { } // If all clusters have the same queue count, then fallback to the older weighted logic. if (!Strings.isNullOrEmpty(leastQueuedCluster) && minQueueCount != maxQueueCount) { - log.debug("Routing to:{} with userQueueCount:{}", leastQueuedCluster, minQueueCount); + log.debug("{} routing to:{}. userQueueCount:{}", user, leastQueuedCluster, minQueueCount); return leastQueuedCluster; } diff --git a/pom.xml b/pom.xml index 9ab09421..c7a9dcdd 100644 --- a/pom.xml +++ b/pom.xml @@ -9,8 +9,8 @@ prestogateway-parent prestogateway-parent pom - 1.8.10-SNAPSHOT - + 1.9.0 + 1.8 1.8 diff --git a/proxyserver/pom.xml b/proxyserver/pom.xml index 86934b61..9d94a5bb 100644 --- a/proxyserver/pom.xml +++ b/proxyserver/pom.xml @@ -8,7 +8,7 @@ com.lyft.data prestogateway-parent - 1.8.10-SNAPSHOT + 1.9.0 ../