Skip to content

Commit

Permalink
Seperate the python subprocess log from task executor log (#652)
Browse files Browse the repository at this point in the history
Signed-off-by: zhangjunfan <[email protected]>
  • Loading branch information
zuston authored Mar 10, 2022
1 parent 760d5ee commit 7cd4438
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 15 deletions.
3 changes: 3 additions & 0 deletions tony-core/src/main/java/com/linkedin/tony/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ public class Constants {
public static final String AM_STDOUT_FILENAME = "amstdout.log";
public static final String AM_STDERR_FILENAME = "amstderr.log";

public static final String TASK_EXECUTOR_EXECUTION_STDERR_FILENAME = "execution.err";
public static final String TASK_EXECUTOR_EXECUTION_STDOUT_FILENAME = "execution.out";

public static final String HDFS_SITE_CONF = "hdfs-site.xml";
public static final String HDFS_DEFAULT_CONF = "hdfs-default.xml";
public static final String YARN_SITE_CONF = YarnConfiguration.YARN_SITE_CONFIGURATION_FILE;
Expand Down
3 changes: 2 additions & 1 deletion tony-core/src/main/java/com/linkedin/tony/Framework.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public interface TaskExecutorAdapter {
boolean needReserveTBPort();

default int executorPythonShell(TaskExecutor executor) throws IOException, InterruptedException {
return Utils.executeShell(executor.getTaskCommand(), executor.getExecutionTimeout(), executor.getShellEnv());
return Utils.executeShell(executor.getTaskCommand(), executor.getExecutionTimeout(),
executor.getShellEnv(), executor.getPythonStdErrFile(), executor.getPythonStdOutFile());
}
}
}
14 changes: 14 additions & 0 deletions tony-core/src/main/java/com/linkedin/tony/TaskExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package com.linkedin.tony;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
Expand All @@ -26,6 +27,7 @@
import com.linkedin.tony.rpc.MetricsRpc;
import com.linkedin.tony.rpc.impl.ApplicationRpcClient;
import com.linkedin.tony.util.Utils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -78,6 +80,8 @@ public class TaskExecutor implements AutoCloseable {

private volatile boolean markedAsLostConnectionWithAM = false;

private String containerLogDir;

@VisibleForTesting
public TaskExecutor() { }

Expand Down Expand Up @@ -279,6 +283,8 @@ protected void initConfigs() {
registerToAMTimeout = tonyConf.getInt(TonyConfigurationKeys.TASK_EXECUTOR_MAX_REGISTRY_SEC,
TonyConfigurationKeys.DEFAULT_TASK_EXECUTOR_MAX_REGISTRY_SEC);

containerLogDir = System.getProperty(YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR);

Utils.initYarnConf(yarnConf);
Utils.initHdfsConf(hdfsConf);
}
Expand Down Expand Up @@ -484,4 +490,12 @@ public void setJobName(String jobName) {
public void setTaskCommand(String taskCommand) {
this.taskCommand = taskCommand;
}

public String getPythonStdErrFile() {
return String.format("%s%s%s", containerLogDir, File.separatorChar, Constants.TASK_EXECUTOR_EXECUTION_STDERR_FILENAME);
}

public String getPythonStdOutFile() {
return String.format("%s%s%s", containerLogDir, File.separatorChar, Constants.TASK_EXECUTOR_EXECUTION_STDOUT_FILENAME);
}
}
3 changes: 3 additions & 0 deletions tony-core/src/main/java/com/linkedin/tony/TonySession.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;

import static com.linkedin.tony.Constants.CHIEF_JOB_NAME;
Expand Down Expand Up @@ -79,6 +80,8 @@ public String getTaskCommand(String appIdString, String applicationName, String
StringBuilder cmd = new StringBuilder();
cmd.append("$JAVA_HOME/bin/java ")
.append(jvmArgs)
.append(" -D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "="
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR)
.append(" com.linkedin.tony.TaskExecutor ")
.append(" appId=")
.append(appIdString)
Expand Down
45 changes: 33 additions & 12 deletions tony-core/src/main/java/com/linkedin/tony/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
Expand Down Expand Up @@ -282,16 +283,8 @@ public static Options getCommonOptions() {
return opts;
}

/**
* Execute a shell command.
* @param taskCommand the shell command to execute
* @param timeout the timeout to stop running the shell command
* @param env the environment for this shell command
* @return the exit code of the shell command
* @throws IOException
* @throws InterruptedException
*/
public static int executeShell(String taskCommand, long timeout, Map<String, String> env) throws IOException, InterruptedException {
public static int executeShell(String taskCommand, long timeout, Map<String, String> env,
String errorFilePath, String standardFilePath) throws IOException, InterruptedException {
LOG.info("Executing command: " + taskCommand);
String executablePath = taskCommand.trim().split(" ")[0];
File executable = new File(executablePath);
Expand All @@ -306,8 +299,23 @@ public static int executeShell(String taskCommand, long timeout, Map<String, Str
taskCommand = Constants.HADOOP_CLASSPATH_COMMAND + taskCommand;
}
ProcessBuilder taskProcessBuilder = new ProcessBuilder("bash", "-c", taskCommand);
taskProcessBuilder.redirectError(ProcessBuilder.Redirect.INHERIT);
taskProcessBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT);

if (errorFilePath != null) {
File errFile = new File(errorFilePath);
FileUtils.touch(errFile);
taskProcessBuilder.redirectError(errFile);
} else {
taskProcessBuilder.redirectError(ProcessBuilder.Redirect.INHERIT);
}

if (standardFilePath != null) {
File outFile = new File(standardFilePath);
FileUtils.touch(outFile);
taskProcessBuilder.redirectOutput(outFile);
} else {
taskProcessBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
}

// Unset MALLOC_ARENA_MAX for better performance, see https://github.com/linkedin/TonY/issues/346
taskProcessBuilder.environment().remove("MALLOC_ARENA_MAX");
if (env != null) {
Expand All @@ -330,6 +338,19 @@ public static int executeShell(String taskCommand, long timeout, Map<String, Str
return exitValue;
}

/**
* Execute a shell command.
* @param taskCommand the shell command to execute
* @param timeout the timeout to stop running the shell command
* @param env the environment for this shell command
* @return the exit code of the shell command
* @throws IOException
* @throws InterruptedException
*/
public static int executeShell(String taskCommand, long timeout, Map<String, String> env) throws IOException, InterruptedException {
return executeShell(taskCommand, timeout, env, null, null);
}

public static String getCurrentHostName() {
return System.getenv(ApplicationConstants.Environment.NM_HOST.name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testGetTaskCommand() {
session.setJvmArgs("");

String commandline = session.getTaskCommand("application_1645363183601_0001", "tony_tensorflow", "ps", "1");
Assert.assertEquals(commandline, "$JAVA_HOME/bin/java com.linkedin.tony.TaskExecutor "
+ "appId=application_1645363183601_0001 appName=tony_tensorflow task=ps:1");
Assert.assertEquals(commandline, "$JAVA_HOME/bin/java -Dyarn.app.container.log.dir=<LOG_DIR> "
+ "com.linkedin.tony.TaskExecutor appId=application_1645363183601_0001 appName=tony_tensorflow task=ps:1");
}
}
1 change: 1 addition & 0 deletions tony-core/src/test/resources/scripts/exit_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@


def return_1():
print("hello world")
time.sleep(1)
return 1

Expand Down

0 comments on commit 7cd4438

Please sign in to comment.