Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support for User Queue Level Routing #176

Merged
merged 3 commits into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion baseapp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.lyft.data</groupId>
<artifactId>prestogateway-parent</artifactId>
<version>1.8.9</version>
<version>1.8.10-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gateway-ha/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>com.lyft.data</groupId>
<artifactId>prestogateway-parent</artifactId>
<version>1.8.9</version>
<version>1.8.10-SNAPSHOT</version>
<relativePath>../</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.lyft.data.gateway.ha.clustermonitor;

import static com.lyft.data.gateway.ha.handler.QueryIdCachingProxyHandler.UI_API_QUEUED_LIST_PATH;
import static com.lyft.data.gateway.ha.handler.QueryIdCachingProxyHandler.UI_API_STATS_PATH;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.inject.Inject;
import com.lyft.data.gateway.ha.config.MonitorConfiguration;
import com.lyft.data.gateway.ha.config.ProxyBackendConfiguration;
Expand All @@ -16,6 +19,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -31,6 +35,8 @@ public class ActiveClusterMonitor implements Managed {
public static final int MONITOR_TASK_DELAY_MIN = 1;
public static final int DEFAULT_THREAD_POOL_SIZE = 10;

private static final String SESSION_USER = "sessionUser";

private final List<PrestoClusterStatsObserver> clusterStatsObservers;
private final GatewayBackendManager gatewayBackendManager;
private final int connectionTimeout;
Expand Down Expand Up @@ -94,10 +100,7 @@ public void start() {
});
}

private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) {
ClusterStats clusterStats = new ClusterStats();
clusterStats.setClusterId(backend.getName());
String target = backend.getProxyTo() + UI_API_STATS_PATH;
private String queryCluster(String target) {
HttpURLConnection conn = null;
try {
URL url = new URL(target);
Expand All @@ -108,22 +111,15 @@ private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) {
conn.connect();
int responseCode = conn.getResponseCode();
if (responseCode == HttpStatus.SC_OK) {
clusterStats.setHealthy(true);
BufferedReader reader =
new BufferedReader(new InputStreamReader((InputStream) conn.getContent()));
new BufferedReader(new InputStreamReader((InputStream) conn.getContent()));
StringBuilder sb = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
sb.append(line + "\n");
}
HashMap<String, Object> result = OBJECT_MAPPER.readValue(sb.toString(), HashMap.class);
clusterStats.setNumWorkerNodes((int) result.get("activeWorkers"));
clusterStats.setQueuedQueryCount((int) result.get("queuedQueries"));
clusterStats.setRunningQueryCount((int) result.get("runningQueries"));
clusterStats.setBlockedQueryCount((int) result.get("blockedQueries"));
clusterStats.setProxyTo(backend.getProxyTo());
clusterStats.setExternalUrl(backend.getExternalUrl());
clusterStats.setRoutingGroup(backend.getRoutingGroup());

return sb.toString();
} else {
log.warn("Received non 200 response, response code: {}", responseCode);
}
Expand All @@ -134,6 +130,58 @@ private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) {
conn.disconnect();
}
}
return null;
}

private ClusterStats getPrestoClusterStats(ProxyBackendConfiguration backend) {
ClusterStats clusterStats = new ClusterStats();
clusterStats.setClusterId(backend.getName());

// Fetch Cluster level Stats.
String target = backend.getProxyTo() + UI_API_STATS_PATH;
String response = queryCluster(target);
if (Strings.isNullOrEmpty(response)) {
log.error("Received null/empty response for {}", target);
return clusterStats;
}
clusterStats.setHealthy(true);
try {
HashMap<String, Object> result = null;
result = OBJECT_MAPPER.readValue(response, HashMap.class);

clusterStats.setNumWorkerNodes((int) result.get("activeWorkers"));
clusterStats.setQueuedQueryCount((int) result.get("queuedQueries"));
clusterStats.setRunningQueryCount((int) result.get("runningQueries"));
clusterStats.setBlockedQueryCount((int) result.get("blockedQueries"));
clusterStats.setProxyTo(backend.getProxyTo());
clusterStats.setExternalUrl(backend.getExternalUrl());
clusterStats.setRoutingGroup(backend.getRoutingGroup());

} catch (Exception e) {
log.error("Error parsing cluster stats from [{}]", response, e);
}

// Fetch User Level Stats.
Map<String, Integer> clusterUserStats = new HashMap<>();
target = backend.getProxyTo() + UI_API_QUEUED_LIST_PATH;
response = queryCluster(target);
if (Strings.isNullOrEmpty(response)) {
log.error("Received null/empty response for {}", target);
return clusterStats;
}
try {
List<Map<String, Object>> queries = OBJECT_MAPPER.readValue(response,
new TypeReference<List<Map<String, Object>>>(){});

for (Map<String, Object> q : queries) {
String user = (String) q.get(SESSION_USER);
clusterUserStats.put(user, clusterUserStats.getOrDefault(user, 0) + 1);
}
} catch (Exception e) {
log.error("Error parsing cluster user stats: {}", e);
}
clusterStats.setUserQueuedCount(clusterUserStats);

return clusterStats;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.lyft.data.gateway.ha.clustermonitor;

import java.util.Map;

import lombok.Data;
import lombok.ToString;

Expand All @@ -15,4 +17,5 @@ public class ClusterStats {
private String proxyTo;
private String externalUrl;
private String routingGroup;
private Map<String, Integer> userQueuedCount;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public void observe(List<ClusterStats> stats) {
Map<String, Map<String, Integer>> clusterQueueMap = new HashMap<String, Map<String, Integer>>();
Map<String, Map<String, Integer>> clusterRunningMap
= new HashMap<String, Map<String, Integer>>();
Map<String, Map<String, Integer>> userClusterQueuedCount
= new HashMap<>();

for (ClusterStats stat : stats) {
if (!clusterQueueMap.containsKey(stat.getRoutingGroup())) {
Expand All @@ -43,8 +45,16 @@ public void observe(List<ClusterStats> stats) {
clusterRunningMap.get(stat.getRoutingGroup()).put(stat.getClusterId(),
stat.getRunningQueryCount());
}

// Create inverse map from user -> {cluster-> count}
for (Map.Entry<String, Integer> queueCount : stat.getUserQueuedCount().entrySet()) {
Map<String, Integer> clusterQueue = userClusterQueuedCount.getOrDefault(queueCount.getKey(),
new HashMap<>());
clusterQueue.put(stat.getClusterId(), queueCount.getValue());
userClusterQueuedCount.put(queueCount.getKey(), clusterQueue);
}
}

routingManager.updateRoutingTable(clusterQueueMap, clusterRunningMap);
routingManager.updateRoutingTable(clusterQueueMap, clusterRunningMap, userClusterQueuedCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class QueryIdCachingProxyHandler extends ProxyHandler {
public static final String V1_QUERY_PATH = "/v1/query";
public static final String V1_INFO_PATH = "/v1/info";
public static final String UI_API_STATS_PATH = "/ui/api/stats";
public static final String UI_API_QUEUED_LIST_PATH = "/ui/api/query?state=QUEUED";
public static final String PRESTO_UI_PATH = "/ui";
public static final String USER_HEADER = "X-Trino-User";
public static final String ALTERNATE_USER_HEADER = "X-Presto-User";
Expand Down Expand Up @@ -121,11 +122,13 @@ public String rewriteTarget(HttpServletRequest request) {
backendAddress = routingManager.findBackendForQueryId(queryId);
} else {
String routingGroup = routingGroupSelector.findRoutingGroup(request);
String user = Optional.ofNullable(request.getHeader(USER_HEADER))
.orElse(request.getHeader(ALTERNATE_USER_HEADER));
if (!Strings.isNullOrEmpty(routingGroup)) {
// This falls back on adhoc backend if there are no cluster found for the routing group.
backendAddress = routingManager.provideBackendForRoutingGroup(routingGroup);
backendAddress = routingManager.provideBackendForRoutingGroup(routingGroup, user);
} else {
backendAddress = routingManager.provideAdhocBackend();
backendAddress = routingManager.provideAdhocBackend(user);
}
}
// set target backend so that we could save queryId to backend mapping later.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.lyft.data.gateway.ha.router;

import com.google.common.base.Strings;
import com.lyft.data.gateway.ha.config.ProxyBackendConfiguration;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -14,6 +15,7 @@
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -35,6 +37,8 @@ public class PrestoQueueLengthRoutingTable extends HaRoutingManager {
private ConcurrentHashMap<String, Integer> routingGroupWeightSum;
private ConcurrentHashMap<String, ConcurrentHashMap<String, Integer>> clusterQueueLengthMap;

private ConcurrentHashMap<String, ConcurrentHashMap<String, Integer>> userClusterQueueLengthMap;

private Map<String, TreeMap<Integer, String>> weightedDistributionRouting;

/**
Expand All @@ -47,6 +51,7 @@ public PrestoQueueLengthRoutingTable(GatewayBackendManager gatewayBackendManager
routingGroupWeightSum = new ConcurrentHashMap<String, Integer>();
clusterQueueLengthMap = new ConcurrentHashMap<String, ConcurrentHashMap<String, Integer>>();
weightedDistributionRouting = new HashMap<String, TreeMap<Integer, String>>();
userClusterQueueLengthMap = new ConcurrentHashMap<>();
}

/**
Expand Down Expand Up @@ -182,7 +187,7 @@ private void computeWeightsBasedOnQueueLength(ConcurrentHashMap<String,
/**
* Update the Routing Table only if a previously known backend has been deactivated.
* Newly added backends are handled through
* {@link PrestoQueueLengthRoutingTable#updateRoutingTable(Map, Map)}
* {@link PrestoQueueLengthRoutingTable#updateRoutingTable(Map, Map, Map)}
* updateRoutingTable}
*/
public void updateRoutingTable(String routingGroup, Set<String> backends) {
Expand Down Expand Up @@ -212,11 +217,23 @@ public void updateRoutingTable(String routingGroup, Set<String> backends) {
* Update routing Table with new Queue Lengths.
*/
public void updateRoutingTable(Map<String, Map<String, Integer>> updatedQueueLengthMap,
Map<String, Map<String, Integer>> updatedRunningLengthMap) {
Map<String, Map<String, Integer>> updatedRunningLengthMap,
Map<String, Map<String, Integer>> updatedUserQueueLengthMap) {
synchronized (lockObject) {
log.debug("Update Routing table with new cluster queue lengths : [{}]",
updatedQueueLengthMap.toString());
clusterQueueLengthMap.clear();
userClusterQueueLengthMap.clear();

if (updatedUserQueueLengthMap != null) {
log.debug("Update user queue sizes:[{}]", updatedUserQueueLengthMap.toString());

for (String user : updatedUserQueueLengthMap.keySet()) {
ConcurrentHashMap<String, Integer> clusterQueueMap =
new ConcurrentHashMap<>(updatedUserQueueLengthMap.get(user));
userClusterQueueLengthMap.put(user, clusterQueueMap);
}
}

for (String grp : updatedQueueLengthMap.keySet()) {
if (grp == null) {
Expand Down Expand Up @@ -268,9 +285,40 @@ public Map<String, Integer> getInternalClusterQueueLength(String routingGroup) {
}

/**
* Looks up the closest weight to random number generated for a given routing group.
* Find the cluster with least user queue else fall back to overall cluster weight based routing.
*/
public String getEligibleBackEnd(String routingGroup) {
public String getEligibleBackEnd(String routingGroup, String user) {

// Route to the least queued backend for the user out of all backends for that group
if (!Strings.isNullOrEmpty(user)) {
Map<String, Integer> clusterQueueCountForUser = userClusterQueueLengthMap.get(user);

if (clusterQueueCountForUser != null && !clusterQueueCountForUser.isEmpty()) {
Set<String> backends = clusterQueueLengthMap.get(routingGroup).keySet();
String leastQueuedCluster = null;
Integer minQueueCount = Integer.MAX_VALUE;
Integer maxQueueCount = Integer.MIN_VALUE;
for (String b : backends) {
// If missing, we assume no queued queries for the user on that cluster.
Integer queueCount = clusterQueueCountForUser.getOrDefault(b, 0);

if (queueCount < minQueueCount) {
leastQueuedCluster = b;
minQueueCount = queueCount;
}
if (queueCount > maxQueueCount) {
maxQueueCount = queueCount;
}
}
// If all clusters have the same queue count, then fallback to the older weighted logic.
if (!Strings.isNullOrEmpty(leastQueuedCluster) && minQueueCount != maxQueueCount) {
log.debug("Routing to:{} with userQueueCount:{}", leastQueuedCluster, minQueueCount);

return leastQueuedCluster;
}
}
}
// Looks up the closest weight to random number generated for a given routing group.
if (routingGroupWeightSum.containsKey(routingGroup)
&& weightedDistributionRouting.containsKey(routingGroup)) {
int rnd = RANDOM.nextInt(routingGroupWeightSum.get(routingGroup));
Expand All @@ -285,20 +333,20 @@ public String getEligibleBackEnd(String routingGroup) {
* backend is found.
*/
@Override
public String provideBackendForRoutingGroup(String routingGroup) {
public String provideBackendForRoutingGroup(String routingGroup, String user) {
List<ProxyBackendConfiguration> backends =
getGatewayBackendManager().getActiveBackends(routingGroup);

if (backends.isEmpty()) {
return provideAdhocBackend();
return provideAdhocBackend(user);
}
Map<String, String> proxyMap = new HashMap<>();
for (ProxyBackendConfiguration backend : backends) {
proxyMap.put(backend.getName(), backend.getProxyTo());
}

updateRoutingTable(routingGroup, proxyMap.keySet());
String clusterId = getEligibleBackEnd(routingGroup);
String clusterId = getEligibleBackEnd(routingGroup, user);
log.debug("Routing to eligible backend : [{}] for routing group: [{}]",
clusterId, routingGroup);

Expand All @@ -318,7 +366,7 @@ public String provideBackendForRoutingGroup(String routingGroup) {
* <p>d.
*/
@Override
public String provideAdhocBackend() {
public String provideAdhocBackend(String user) {
Map<String, String> proxyMap = new HashMap<>();
List<ProxyBackendConfiguration> backends = getGatewayBackendManager().getActiveAdhocBackends();
if (backends.size() == 0) {
Expand All @@ -331,7 +379,7 @@ public String provideAdhocBackend() {

updateRoutingTable("adhoc", proxyMap.keySet());

String clusterId = getEligibleBackEnd("adhoc");
String clusterId = getEligibleBackEnd("adhoc", user);
log.debug("Routing to eligible backend : " + clusterId + " for routing group: adhoc");
if (clusterId != null) {
return proxyMap.get(clusterId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void setBackendForQueryId(String queryId, String backend) {
*
* @return
*/
public String provideAdhocBackend() {
public String provideAdhocBackend(String user) {
List<ProxyBackendConfiguration> backends = this.gatewayBackendManager.getActiveAdhocBackends();
if (backends.size() == 0) {
throw new IllegalStateException("Number of active backends found zero");
Expand All @@ -76,11 +76,11 @@ public String provideAdhocBackend() {
*
* @return
*/
public String provideBackendForRoutingGroup(String routingGroup) {
public String provideBackendForRoutingGroup(String routingGroup, String user) {
List<ProxyBackendConfiguration> backends =
gatewayBackendManager.getActiveBackends(routingGroup);
if (backends.isEmpty()) {
return provideAdhocBackend();
return provideAdhocBackend(user);
}
int backendId = Math.abs(RANDOM.nextInt()) % backends.size();
return backends.get(backendId).getProxyTo();
Expand Down
Loading