Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow users to differentiate task pods from different namespaces in multi-namespace settings #17749

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,15 @@ protected enum State

protected KubernetesPeonLifecycle(
Task task,
K8sTaskId taskId,
KubernetesPeonClient kubernetesClient,
TaskLogs taskLogs,
ObjectMapper mapper,
TaskStateListener stateListener
)
{
this.taskId = new K8sTaskId(task);
this.task = task;
this.taskId = taskId;
this.kubernetesClient = kubernetesClient;
this.taskLogs = taskLogs;
this.mapper = mapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.tasklogs.TaskLogs;

Expand All @@ -42,10 +43,11 @@ public KubernetesPeonLifecycleFactory(
}

@Override
public KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener)
public KubernetesPeonLifecycle build(Task task, K8sTaskId k8sTaskId, KubernetesPeonLifecycle.TaskStateListener stateListener)
{
return new KubernetesPeonLifecycle(
task,
k8sTaskId,
client,
taskLogs,
mapper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.tasklogs.TaskLogStreamer;
Expand Down Expand Up @@ -154,6 +155,7 @@ public ListenableFuture<TaskStatus> run(Task task)
exec.submit(() -> runTask(task)),
peonLifecycleFactory.build(
task,
new K8sTaskId(config.getAlias(), task),
this::emitTaskStateMetrics
)
)).getResult();
Expand All @@ -168,6 +170,7 @@ protected KubernetesWorkItem joinAsync(Task task)
exec.submit(() -> joinTask(task)),
peonLifecycleFactory.build(
task,
new K8sTaskId(config.getAlias(), task),
this::emitTaskStateMetrics
)
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class KubernetesTaskRunnerConfig
@NotNull
private String namespace;

@JsonProperty
private String alias = "";

@JsonProperty
private boolean debugJobs = false;

Expand Down Expand Up @@ -131,6 +134,7 @@ public KubernetesTaskRunnerConfig()

private KubernetesTaskRunnerConfig(
@Nonnull String namespace,
String alias,
boolean debugJobs,
boolean sidecarSupport,
String primaryContainerName,
Expand All @@ -151,6 +155,7 @@ private KubernetesTaskRunnerConfig(
)
{
this.namespace = namespace;
this.alias = alias;
this.debugJobs = ObjectUtils.defaultIfNull(
debugJobs,
this.debugJobs
Expand Down Expand Up @@ -223,6 +228,11 @@ public String getNamespace()
return namespace;
}

public String getAlias()
{
return alias;
}

public boolean isDebugJobs()
{
return debugJobs;
Expand Down Expand Up @@ -317,6 +327,7 @@ public static Builder builder()
public static class Builder
{
private String namespace;
private String alias;
private boolean debugJob;
private boolean sidecarSupport;
private String primaryContainerName;
Expand Down Expand Up @@ -345,6 +356,12 @@ public Builder withNamespace(String namespace)
return this;
}

public Builder withAlias(String alias)
{
this.alias = alias;
return this;
}

public Builder withDebugJob(boolean debugJob)
{
this.debugJob = debugJob;
Expand Down Expand Up @@ -452,6 +469,7 @@ public KubernetesTaskRunnerConfig build()
{
return new KubernetesTaskRunnerConfig(
this.namespace,
this.alias,
this.debugJob,
this.sidecarSupport,
this.primaryContainerName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
package org.apache.druid.k8s.overlord;

import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.k8s.overlord.common.K8sTaskId;

public interface PeonLifecycleFactory
{
KubernetesPeonLifecycle build(Task task, KubernetesPeonLifecycle.TaskStateListener stateListener);
KubernetesPeonLifecycle build(Task task, K8sTaskId taskId, KubernetesPeonLifecycle.TaskStateListener stateListener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ public class K8sTaskId
private final String k8sJobName;
private final String originalTaskId;

public K8sTaskId(Task task)
public K8sTaskId(String alias, Task task)
{
this(task.getId());
this(alias, task.getId());
}

public K8sTaskId(String taskId)
public K8sTaskId(String alias, String taskId)
{
this.originalTaskId = taskId;
this.k8sJobName = KubernetesOverlordUtils.convertTaskIdToJobName(taskId);
this.k8sJobName = KubernetesOverlordUtils.convertTaskIdToJobName(alias, taskId);
}

public String getK8sJobName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,22 @@ public static String convertTaskIdToK8sLabel(String taskId)
.toLowerCase(Locale.ENGLISH), 63);
}

public static String convertTaskIdToJobName(String alias, String taskId)
{
return (alias == null || alias.isEmpty())
? convertTaskIdToJobName(taskId)
: StringUtils.left(RegExUtils.replaceAll(alias, K8S_TASK_ID_PATTERN, "")
.toLowerCase(Locale.ENGLISH), 30) + "-" + hashString(taskId);
}

public static String convertTaskIdToJobName(String taskId)
{
return taskId == null ? "" : StringUtils.left(RegExUtils.replaceAll(taskId, K8S_TASK_ID_PATTERN, "")
.toLowerCase(Locale.ENGLISH), 30) + "-" + Hashing.murmur3_128().hashString(taskId, StandardCharsets.UTF_8);
.toLowerCase(Locale.ENGLISH), 30) + "-" + hashString(taskId);
}

private static String hashString(String rawString)
{
return Hashing.murmur3_128().hashString(rawString, StandardCharsets.UTF_8).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public K8sTaskId getTaskId(Job from)
if (taskId == null) {
throw DruidException.defensive().build("No task_id annotation found on pod spec for job [%s]", from.getMetadata().getName());
}
return new K8sTaskId(taskId);
return new K8sTaskId(taskRunnerConfig.getAlias(), taskId);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public MultiContainerTaskAdapter(
@Override
Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException
{
K8sTaskId k8sTaskId = new K8sTaskId(task.getId());
K8sTaskId k8sTaskId = new K8sTaskId(taskRunnerConfig.getAlias(), task.getId());

// get the container size from java_opts array
long containerSize = getContainerMemory(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public Job fromTask(Task task) throws IOException

return new JobBuilder()
.withNewMetadata()
.withName(new K8sTaskId(task).getK8sJobName())
.withName(new K8sTaskId(taskRunnerConfig.getAlias(), task).getK8sJobName())
.addToLabels(getJobLabels(taskRunnerConfig, task))
.addToAnnotations(getJobAnnotations(taskRunnerConfig, task))
.addToAnnotations(DruidK8sConstants.TASK_JOB_TEMPLATE, podTemplateWithName.getName())
Expand Down Expand Up @@ -203,7 +203,7 @@ public K8sTaskId getTaskId(Job from)
if (taskId == null) {
throw DruidException.defensive().build("No task_id annotation found on pod spec for job [%s]", from.getMetadata().getName());
}
return new K8sTaskId(taskId);
return new K8sTaskId(taskRunnerConfig.getAlias(), taskId);
}

private Collection<EnvVar> getEnv(Task task) throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public SingleContainerTaskAdapter(
@Override
Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException
{
K8sTaskId k8sTaskId = new K8sTaskId(task.getId());
K8sTaskId k8sTaskId = new K8sTaskId(taskRunnerConfig.getAlias(), task.getId());

// get the container size from java_opts array
long containerSize = getContainerMemory(context);
Expand Down
Loading