Skip to content

Commit

Permalink
Spark Configuration Threshold Heuristic (#286)
Browse files Browse the repository at this point in the history
  • Loading branch information
skakker authored and akshayrai committed Jan 8, 2018
1 parent 8c99625 commit a208c31
Show file tree
Hide file tree
Showing 12 changed files with 192 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
*Added this class to accommodate the status "PENDING" for stages.
*
*TODO: remove this class if using the spark version having "PENDING" StageStatus.
*/

package com.linkedin.drelephant.spark.fetchers.statusapiv1;

import org.apache.spark.util.EnumUtil;

public enum StageStatus {
ACTIVE,
COMPLETE,
FAILED,
SKIPPED,
PENDING;

private StageStatus() {
}

public static StageStatus fromString(String str) {
return (StageStatus) EnumUtil.parseIgnoreCase(StageStatus.class, str);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import java.util.Date
import scala.collection.Map

import org.apache.spark.JobExecutionStatus
import org.apache.spark.status.api.v1.StageStatus
import com.fasterxml.jackson.annotation.JsonSubTypes.Type
import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,31 @@
package com.linkedin.drelephant.spark.heuristics

import java.util.ArrayList

import com.linkedin.drelephant.math.Statistics

import scala.collection.JavaConverters
import scala.util.Try

import com.linkedin.drelephant.analysis.{HeuristicResultDetails, Heuristic, HeuristicResult, Severity}
import com.linkedin.drelephant.analysis._
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.util.MemoryFormatUtils

import com.linkedin.drelephant.math.Statistics

/**
* A heuristic based on an app's known configuration.
*
* The results from this heuristic primarily inform users about key app configuration settings, including
* driver memory, executor cores, executor instances, executor memory, and the serializer.
* driver memory, driver cores, executor cores, executor instances, executor memory, and the serializer.
*
* It also checks whether the values specified are within threshold.
*/
class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData)
extends Heuristic[SparkApplicationData] {
extends Heuristic[SparkApplicationData] {

import ConfigurationHeuristic._
import JavaConverters._

val sparkOverheadMemoryThreshold: SeverityThresholds = SeverityThresholds.parse(heuristicConfigurationData.getParamMap.get(SPARK_OVERHEAD_MEMORY_THRESHOLD_KEY), ascending = true)
.getOrElse(DEFAULT_SPARK_OVERHEAD_MEMORY_THRESHOLDS)

val serializerIfNonNullRecommendation: String =
Option(heuristicConfigurationData.getParamMap.get(SERIALIZER_IF_NON_NULL_RECOMMENDATION_KEY))
.getOrElse(DEFAULT_SERIALIZER_IF_NON_NULL_RECOMMENDATION)
Expand Down Expand Up @@ -76,6 +78,18 @@ class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicCo
new HeuristicResultDetails(
SPARK_DYNAMIC_ALLOCATION_ENABLED,
formatProperty(evaluator.isDynamicAllocationEnabled.map(_.toString))
),
new HeuristicResultDetails(
SPARK_DRIVER_CORES_KEY,
formatProperty(evaluator.driverCores.map(_.toString))
),
new HeuristicResultDetails(
SPARK_YARN_DRIVER_MEMORY_OVERHEAD,
evaluator.sparkYarnDriverMemoryOverhead
),
new HeuristicResultDetails(
SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD,
evaluator.sparkYarnExecutorMemoryOverhead
)
)
// Constructing a mutable ArrayList for resultDetails, otherwise addResultDetail method HeuristicResult cannot be used.
Expand All @@ -93,7 +107,22 @@ class ConfigurationHeuristic(private val heuristicConfigurationData: HeuristicCo
}
if (evaluator.shuffleAndDynamicAllocationSeverity != Severity.NONE) {
result.addResultDetail(SPARK_SHUFFLE_SERVICE_ENABLED, formatProperty(evaluator.isShuffleServiceEnabled.map(_.toString)),
"Spark shuffle service is not enabled.")
"Spark shuffle service is not enabled.")
}
if (evaluator.severityMinExecutors == Severity.CRITICAL) {
result.addResultDetail("Minimum Executors", "The minimum executors for Dynamic Allocation should be <=1. Please change it in the " + SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS + " field.")
}
if (evaluator.severityMaxExecutors == Severity.CRITICAL) {
result.addResultDetail("Maximum Executors", "The maximum executors for Dynamic Allocation should be <=900. Please change it in the " + SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS + " field.")
}
if (evaluator.jarsSeverity == Severity.CRITICAL) {
result.addResultDetail("Jars notation", "It is recommended to not use * notation while specifying jars in the field " + SPARK_YARN_JARS)
}
if(evaluator.severityDriverMemoryOverhead.getValue >= Severity.SEVERE.getValue) {
result.addResultDetail("Driver Overhead Memory", "Please do not specify excessive amount of overhead memory for Driver. Change it in the field " + SPARK_YARN_DRIVER_MEMORY_OVERHEAD)
}
if(evaluator.severityExecutorMemoryOverhead.getValue >= Severity.SEVERE.getValue) {
result.addResultDetail("Executor Overhead Memory", "Please do not specify excessive amount of overhead memory for Executors. Change it in the field " + SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD)
}
result
}
Expand All @@ -113,6 +142,18 @@ object ConfigurationHeuristic {
val SPARK_APPLICATION_DURATION = "spark.application.duration"
val SPARK_SHUFFLE_SERVICE_ENABLED = "spark.shuffle.service.enabled"
val SPARK_DYNAMIC_ALLOCATION_ENABLED = "spark.dynamicAllocation.enabled"
val SPARK_DRIVER_CORES_KEY = "spark.driver.cores"
val SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS = "spark.dynamicAllocation.minExecutors"
val SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS = "spark.dynamicAllocation.maxExecutors"
val SPARK_YARN_JARS = "spark.yarn.secondary.jars"
val SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD = "spark.yarn.executor.memoryOverhead"
val SPARK_YARN_DRIVER_MEMORY_OVERHEAD = "spark.yarn.driver.memoryOverhead"
val THRESHOLD_MIN_EXECUTORS: Int = 1
val THRESHOLD_MAX_EXECUTORS: Int = 900
val SPARK_OVERHEAD_MEMORY_THRESHOLD_KEY = "spark.overheadMemory.thresholds.key"
val DEFAULT_SPARK_OVERHEAD_MEMORY_THRESHOLDS =
SeverityThresholds(low = MemoryFormatUtils.stringToBytes("2G"), MemoryFormatUtils.stringToBytes("4G"),
severe = MemoryFormatUtils.stringToBytes("6G"), critical = MemoryFormatUtils.stringToBytes("8G"), ascending = true)

class Evaluator(configurationHeuristic: ConfigurationHeuristic, data: SparkApplicationData) {
lazy val appConfigurationProperties: Map[String, String] =
Expand All @@ -130,24 +171,75 @@ object ConfigurationHeuristic {
lazy val executorCores: Option[Int] =
Try(getProperty(SPARK_EXECUTOR_CORES_KEY).map(_.toInt)).getOrElse(None)

lazy val applicationDuration : Long = {
lazy val driverCores: Option[Int] =
Try(getProperty(SPARK_DRIVER_CORES_KEY).map(_.toInt)).getOrElse(None)

lazy val dynamicMinExecutors: Option[Int] =
Try(getProperty(SPARK_DYNAMIC_ALLOCATION_MIN_EXECUTORS).map(_.toInt)).getOrElse(None)

lazy val dynamicMaxExecutors: Option[Int] =
Try(getProperty(SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS).map(_.toInt)).getOrElse(None)

lazy val applicationDuration: Long = {
require(data.applicationInfo.attempts.nonEmpty)
val lastApplicationAttemptInfo = data.applicationInfo.attempts.last
(lastApplicationAttemptInfo.endTime.getTime - lastApplicationAttemptInfo.startTime.getTime) / Statistics.SECOND_IN_MS
}

lazy val sparkYarnJars: String = getProperty(SPARK_YARN_JARS).getOrElse("")

lazy val jarsSeverity: Severity = if (sparkYarnJars.contains("*")) {
Severity.CRITICAL
} else {
Severity.NONE
}

lazy val sparkYarnExecutorMemoryOverhead: String = if (getProperty(SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD).getOrElse("0").matches("(.*)[0-9]"))
MemoryFormatUtils.bytesToString(MemoryFormatUtils.stringToBytes(getProperty(SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD).getOrElse("0") + "MB")) else (getProperty(SPARK_YARN_EXECUTOR_MEMORY_OVERHEAD).getOrElse("0"))
lazy val sparkYarnDriverMemoryOverhead: String = if (getProperty(SPARK_YARN_DRIVER_MEMORY_OVERHEAD).getOrElse("0").matches("(.*)[0-9]"))
MemoryFormatUtils.bytesToString(MemoryFormatUtils.stringToBytes(getProperty(SPARK_YARN_DRIVER_MEMORY_OVERHEAD).getOrElse("0") + "MB")) else getProperty(SPARK_YARN_DRIVER_MEMORY_OVERHEAD).getOrElse("0")

lazy val serializer: Option[String] = getProperty(SPARK_SERIALIZER_KEY)

/**
* If the serializer is either not configured or not equal to KryoSerializer, then the severity will be moderate.
*/
* If the serializer is either not configured or not equal to KryoSerializer, then the severity will be moderate.
*/

lazy val serializerSeverity: Severity = serializer match {
case None => Severity.MODERATE
case Some(`serializerIfNonNullRecommendation`) => Severity.NONE
case Some(_) => DEFAULT_SERIALIZER_IF_NON_NULL_SEVERITY_IF_RECOMMENDATION_UNMET
}

//The following thresholds are for checking if the memory and cores values (executor and driver) are above normal. These thresholds are experimental, and may change in the future.
val DEFAULT_SPARK_MEMORY_THRESHOLDS =
SeverityThresholds(low = MemoryFormatUtils.stringToBytes("10G"), MemoryFormatUtils.stringToBytes("15G"),
severe = MemoryFormatUtils.stringToBytes("20G"), critical = MemoryFormatUtils.stringToBytes("25G"), ascending = true)
val DEFAULT_SPARK_CORES_THRESHOLDS =
SeverityThresholds(low = 4, moderate = 6, severe = 8, critical = 10, ascending = true)

val severityExecutorMemory = DEFAULT_SPARK_MEMORY_THRESHOLDS.severityOf(executorMemoryBytes.getOrElse(0).asInstanceOf[Number].longValue)
val severityDriverMemory = DEFAULT_SPARK_MEMORY_THRESHOLDS.severityOf(driverMemoryBytes.getOrElse(0).asInstanceOf[Number].longValue)
val severityDriverCores = DEFAULT_SPARK_CORES_THRESHOLDS.severityOf(driverCores.getOrElse(0).asInstanceOf[Number].intValue)
val severityExecutorCores = DEFAULT_SPARK_CORES_THRESHOLDS.severityOf(executorCores.getOrElse(0).asInstanceOf[Number].intValue)
val severityMinExecutors = if (dynamicMinExecutors.getOrElse(0).asInstanceOf[Number].intValue > THRESHOLD_MIN_EXECUTORS) {
Severity.CRITICAL
} else {
Severity.NONE
}
val severityMaxExecutors = if (dynamicMaxExecutors.getOrElse(0).asInstanceOf[Number].intValue > THRESHOLD_MAX_EXECUTORS) {
Severity.CRITICAL
} else {
Severity.NONE
}
val severityExecutorMemoryOverhead = configurationHeuristic.sparkOverheadMemoryThreshold.severityOf(MemoryFormatUtils.stringToBytes(sparkYarnExecutorMemoryOverhead))
val severityDriverMemoryOverhead = configurationHeuristic.sparkOverheadMemoryThreshold.severityOf(MemoryFormatUtils.stringToBytes(sparkYarnDriverMemoryOverhead))


//Severity for the configuration thresholds
val severityConfThresholds: Severity = Severity.max(severityDriverCores, severityDriverMemory, severityExecutorCores, severityExecutorMemory,
severityMinExecutors, severityMaxExecutors, jarsSeverity, severityExecutorMemoryOverhead, severityDriverMemoryOverhead)

/**
* The following logic computes severity based on shuffle service and dynamic allocation flags.
* If dynamic allocation is disabled, then the severity will be MODERATE if shuffle service is disabled or not specified.
Expand All @@ -163,10 +255,11 @@ object ConfigurationHeuristic {
case (Some(true), Some(false)) => Severity.SEVERE
}

lazy val severity: Severity = Severity.max(serializerSeverity, shuffleAndDynamicAllocationSeverity)
lazy val severity: Severity = Severity.max(serializerSeverity, shuffleAndDynamicAllocationSeverity, severityConfThresholds)

private val serializerIfNonNullRecommendation: String = configurationHeuristic.serializerIfNonNullRecommendation

private def getProperty(key: String): Option[String] = appConfigurationProperties.get(key)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationDa
import com.linkedin.drelephant.math.Statistics
import com.linkedin.drelephant.spark.data.SparkApplicationData
import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageData
import org.apache.spark.status.api.v1.StageStatus

import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus

/**
* A heuristic based on metrics for a Spark app's stages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.util.Try

import com.linkedin.drelephant.spark.fetchers.statusapiv1._
import org.apache.spark.JobExecutionStatus
import org.apache.spark.status.api.v1.StageStatus
import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus

/**
* Converters for legacy SparkApplicationData to current SparkApplicationData.
Expand Down
2 changes: 1 addition & 1 deletion app/com/linkedin/drelephant/util/SparkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ trait SparkUtils {
}

private val IN_PROGRESS = ".inprogress"
private val DEFAULT_COMPRESSION_CODEC = "snappy"
private val DEFAULT_COMPRESSION_CODEC = "lz4"

private val compressionCodecClassNamesByShortName = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName,
Expand Down
5 changes: 4 additions & 1 deletion app/views/help/spark/helpConfigurationHeuristic.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,8 @@
* the License.
*@
<p>The results from this heuristic primarily inform you about key app
configuration settings, including driver memory, executor cores,
configuration settings, including driver memory, driver cores, executor cores,
executor instances, executor memory, and the serializer.</p>
<p>It also checks the values of dynamically allocated min and max executors, the specified yarn jars, executor and driver memory overhead and whether other configuration values are within threshold.</p>
<h3>Suggestions</h3>
<p>Suggestions based on the configurations you have set are given in the heuristic result itself.</p>
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import java.util.zip.{ZipInputStream, ZipEntry, ZipOutputStream}
import java.util.{Calendar, Date, SimpleTimeZone}
import javax.ws.rs.client.WebTarget

import org.apache.spark.status.api.v1.StageStatus

import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus
import scala.concurrent.ExecutionContext
import scala.util.Try
import com.fasterxml.jackson.databind.ObjectMapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,17 @@ class ConfigurationHeuristicTest extends FunSpec with Matchers {
"spark.executor.memory" -> "1g",
"spark.shuffle.memoryFraction" -> "0.5",
"spark.shuffle.service.enabled" -> "true",
"spark.dynamicAllocation.enabled" -> "true"
"spark.dynamicAllocation.enabled" -> "true",
"spark.yarn.secondary.jars" -> "something without star",
"spark.yarn.driver.memoryOverhead" -> "500"
)

val data = newFakeSparkApplicationData(configurationProperties)
val heuristicResult = configurationHeuristic.apply(data)
val heuristicResultDetails = heuristicResult.getHeuristicResultDetails

it("returns the size of result details") {
heuristicResultDetails.size() should be(6)
heuristicResultDetails.size() should be(9)
}

it("returns the severity") {
Expand Down Expand Up @@ -100,6 +102,18 @@ class ConfigurationHeuristicTest extends FunSpec with Matchers {
details.getName should include("spark.dynamicAllocation.enabled")
details.getValue should be("true")
}

it("returns the driver cores") {
val details = heuristicResultDetails.get(6)
details.getName should include("spark.driver.cores")
details.getValue should include("default")
}

it("returns the driver overhead memory") {
val details = heuristicResultDetails.get(7)
details.getName should include("spark.yarn.driver.memoryOverhead")
details.getValue should include("500 MB")
}
}

describe("apply with Severity") {
Expand All @@ -114,7 +128,7 @@ class ConfigurationHeuristicTest extends FunSpec with Matchers {
val heuristicResultDetails = heuristicResult.getHeuristicResultDetails

it("returns the size of result details") {
heuristicResultDetails.size() should be(8)
heuristicResultDetails.size() should be(11)
}

it("returns the severity") {
Expand All @@ -128,14 +142,14 @@ class ConfigurationHeuristicTest extends FunSpec with Matchers {
}

it("returns the serializer") {
val details = heuristicResultDetails.get(6)
val details = heuristicResultDetails.get(9)
details.getName should include("spark.serializer")
details.getValue should be("dummySerializer")
details.getDetails should be("KyroSerializer is Not Enabled.")
}

it("returns the shuffle service flag") {
val details = heuristicResultDetails.get(7)
val details = heuristicResultDetails.get(10)
details.getName should include("spark.shuffle.service.enabled")
details.getValue should be("false")
details.getDetails should be("Spark shuffle service is not enabled.")
Expand Down Expand Up @@ -184,16 +198,36 @@ class ConfigurationHeuristicTest extends FunSpec with Matchers {
evaluator.executorCores should be(Some(2))
}

it("has the driver cores when they're present") {
val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.driver.cores" -> "3"))
evaluator.driverCores should be(Some(3))
}

it("has no executor cores when they're absent") {
val evaluator = newEvaluatorWithConfigurationProperties(Map.empty)
evaluator.executorCores should be(None)
}

it("has no driver cores when they're absent") {
val evaluator = newEvaluatorWithConfigurationProperties(Map.empty)
evaluator.driverCores should be(None)
}

it("has the serializer when it's present") {
val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.serializer" -> "org.apache.spark.serializer.KryoSerializer"))
evaluator.serializer should be(Some("org.apache.spark.serializer.KryoSerializer"))
}

it("jars severity when NONE") {
val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.yarn.secondary.jars" -> "somethingWithoutStar"))
evaluator.jarsSeverity should be(Severity.NONE)
}

it("jars severity when CRITICAL") {
val evaluator = newEvaluatorWithConfigurationProperties(Map("spark.yarn.secondary.jars" -> "somethingWith*.jar"))
evaluator.jarsSeverity should be(Severity.CRITICAL)
}

it("has no serializer, dynamic allocation flag, and shuffle flag when they are absent") {
val evaluator = newEvaluatorWithConfigurationProperties(Map.empty)
evaluator.serializer should be(None)
Expand Down Expand Up @@ -254,6 +288,7 @@ class ConfigurationHeuristicTest extends FunSpec with Matchers {
evaluator.isShuffleServiceEnabled should be(Some(false))
evaluator.serializerSeverity should be(Severity.NONE)
evaluator.shuffleAndDynamicAllocationSeverity should be(Severity.SEVERE)
evaluator.severityConfThresholds should be(Severity.NONE)
evaluator.severity should be(Severity.SEVERE)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import com.linkedin.drelephant.analysis.{ApplicationType, Severity}
import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData
import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, JobDataImpl, StageDataImpl}
import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
import org.apache.spark.status.api.v1.StageStatus
import org.scalatest.{FunSpec, Matchers}


class StagesHeuristicTest extends FunSpec with Matchers {
import StagesHeuristicTest._

Expand Down
Loading

0 comments on commit a208c31

Please sign in to comment.