Skip to content

Commit

Permalink
Merge branch 'release/1.41.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
HenrikBengtsson committed Dec 18, 2024
2 parents c802281 + bdd67e1 commit bf44f96
Show file tree
Hide file tree
Showing 17 changed files with 857 additions and 360 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Package: parallelly
Version: 1.40.1
Version: 1.41.0
Title: Enhancing the 'parallel' Package
Imports:
parallel,
Expand Down
26 changes: 26 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,29 @@
# Version 1.41.0 [2024-12-17]

## New Features

* Now `availableCores()` queries also `/proc/self/status` for CPU
affinity allotments.

* `makeClusterPSOCK()` will now produce an error, rather than a
warning, when the local system command used to launch the parallel
worker failed with a non-zero exit code.

* Now `serializedSize()` always returns a double. Previously, it
would return an integer, if the value could be represented by an
integer. However, it turned out that returning an integer increased
the risk for integer overflow later on if, say, two such values
were added together.

## Bug Fixes

* `makeClusterPSOCK()` on MS Windows failed to launch remote workers,
with warnings on `"In system(local_cmd, wait = FALSE, input =
input) : 'C:\WINDOWS\System32\OpenSSH\ssh.exe' not found"`. This
bug was introduced in version 1.38.0 (2024-07-27), when adding
richer support for the `rscript_sh` argument.


# Version 1.40.1 [2024-12-03]

## Bug Fixes
Expand Down
248 changes: 150 additions & 98 deletions R/availableCores.R
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
#' \item `"system"` -
#' Query \code{\link[parallel]{detectCores}(logical = logical)}.
#'
#' \item `"/proc/self/status"` -
#' Query \code{Cpus_allowed_list} of `/proc/self/status`.
#'
#' \item `"cgroups.cpuset"` -
#' On Unix, query control group (cgroup v1) value \code{cpuset.set}.
#'
Expand Down Expand Up @@ -216,25 +219,11 @@
#'
#' @importFrom parallel detectCores
#' @export
availableCores <- function(constraints = NULL, methods = getOption2("parallelly.availableCores.methods", c("system", "cgroups.cpuset", "cgroups.cpuquota", "cgroups2.cpu.max", "nproc", "mc.cores", "BiocParallel", "_R_CHECK_LIMIT_CORES_", "Bioconductor", "LSF", "PJM", "PBS", "SGE", "Slurm", "fallback", "custom")), na.rm = TRUE, logical = getOption2("parallelly.availableCores.logical", TRUE), default = c(current = 1L), which = c("min", "max", "all"), omit = getOption2("parallelly.availableCores.omit", 0L)) {
## Local functions
getenv <- function(name, mode = "integer") {
value <- trim(getEnvVar2(name, default = NA_character_))
storage.mode(value) <- mode
value
} # getenv()

getopt <- function(name, mode = "integer") {
value <- getOption2(name, default = NA_integer_)
storage.mode(value) <- mode
value
} # getopt()

availableCores <- function(constraints = NULL, methods = getOption2("parallelly.availableCores.methods", c("system", "/proc/self/status", "cgroups.cpuset", "cgroups.cpuquota", "cgroups2.cpu.max", "nproc", "mc.cores", "BiocParallel", "_R_CHECK_LIMIT_CORES_", "Bioconductor", "LSF", "PJM", "PBS", "SGE", "Slurm", "fallback", "custom")), na.rm = TRUE, logical = getOption2("parallelly.availableCores.logical", TRUE), default = c(current = 1L), which = c("min", "max", "all"), omit = getOption2("parallelly.availableCores.omit", 0L)) {
stop_if_not(
is.null(constraints) || is.character(constraints), !anyNA(constraints)
)


if ("connections" %in% constraints) {
methods <- unique(c(methods, "connections"))
}
Expand All @@ -252,99 +241,27 @@ availableCores <- function(constraints = NULL, methods = getOption2("parallelly.
method <- methods[kk]
if (method == "Slurm") {
## Number of cores assigned by Slurm

## The assumption is that the following works regardless of
## number of nodes requested /HB 2020-09-18
## Example: --cpus-per-task={n}
n <- getenv("SLURM_CPUS_PER_TASK")
if (is.na(n)) {
## Example: --nodes={nnodes} (defaults to 1, short: -N {nnodes})
## From 'man sbatch':
## SLURM_JOB_NUM_NODES (and SLURM_NNODES for backwards compatibility)
## Total number of nodes in the job's resource allocation.
nnodes <- getenv("SLURM_JOB_NUM_NODES")
if (is.na(nnodes)) nnodes <- getenv("SLURM_NNODES")
if (is.na(nnodes)) nnodes <- 1L ## Can this happen? /HB 2020-09-18

if (nnodes == 1L) {
## Example: --nodes=1 --ntasks={n} (short: -n {n})
## IMPORTANT: 'SLURM_CPUS_ON_NODE' appears to be rounded up when nodes > 1.
## Example 1: With --nodes=2 --cpus-per-task=3 we see SLURM_CPUS_ON_NODE=4
## although SLURM_CPUS_PER_TASK=3.
## Example 2: With --nodes=2 --ntasks=7, we see SLURM_CPUS_ON_NODE=6,
## SLURM_JOB_CPUS_PER_NODE=6,2, no SLURM_CPUS_PER_TASK, and
## SLURM_TASKS_PER_NODE=5,2.
## Conclusions: We can only use 'SLURM_CPUS_ON_NODE' for nnodes = 1.
n <- getenv("SLURM_CPUS_ON_NODE")
} else {
## Parse `SLURM_TASKS_PER_NODE`
nodecounts <- getenv("SLURM_TASKS_PER_NODE", mode = "character")
if (!is.na(nodecounts)) {
## Examples:
## SLURM_TASKS_PER_NODE=5,2
## SLURM_TASKS_PER_NODE=2(x2),1(x3) # Source: 'man sbatch'
n <- slurm_expand_nodecounts(nodecounts)
if (anyNA(n)) next

## ASSUMPTION: We assume that it is the first component on the list that
## corresponds to the current machine. /HB 2021-03-05
n <- n[1]
}
}
}

## TODO?: Can we validate above assumptions/results? /HB 2020-09-18
if (FALSE && !is.na(n)) {
## Is any of the following useful?

## Example: --ntasks={ntasks} (no default, short: -n {ntasks})
## From 'man sbatch':
## SLURM_NTASKS (and SLURM_NPROCS for backwards compatibility)
## Same as -n, --ntasks
ntasks <- getenv("SLURM_NTASKS")
if (is.na(ntasks)) ntasks <- getenv("SLURM_NPROCS")
}
n <- availableCoresSlurm()
} else if (method == "PBS") {
## Number of cores assigned by TORQUE/PBS
n <- getenv("PBS_NUM_PPN")
if (is.na(n)) {
## PBSPro sets 'NCPUS' but not 'PBS_NUM_PPN'
n <- getenv("NCPUS")
}
n <- availableCoresPBS()
} else if (method == "SGE") {
## Number of cores assigned by Oracle/Son/Sun/Univa Grid Engine (SGE/UGE)
n <- getenv("NSLOTS")
n <- availableCoresSGE()
} else if (method == "LSF") {
## Number of slots assigned by LSF
n <- getenv("LSB_DJOB_NUMPROC")
n <- availableCoresLSF()
} else if (method == "PJM") {
## Number of slots assigned by Fujitsu Technical Computing Suite
## We choose to call this job scheduler "PJM" based on the prefix
## it's environment variables use.
## PJM_VNODE_CORE: e.g. pjsub -L vnode-core=8
## "This environment variable is set only when virtual nodes
## are allocated, and it is not set when nodes are allocated."
n <- getenv("PJM_VNODE_CORE")
if (is.na(n)) {
## PJM_PROC_BY_NODE: e.g. pjsub -L vnode-core=8
## "Maximum number of processes that are generated per node by
## an MPI program. However, if a single node (node=1) or virtual
## node (vnode=1) is allocated and the mpi option of the pjsub
## command is not specified, this environment variable is not set."
n <- getenv("PJM_PROC_BY_NODE")
}
n <- availableCoresPJM()
} else if (method == "mc.cores") {
## Number of cores by option defined by 'parallel' package
n <- getopt("mc.cores")
n <- getopt_int("mc.cores")
if (!is.na(n) && n == 0) n <- 1L ## Because options(mc.cores = 0) may be set
} else if (method == "mc.cores+1") {
## Number of cores by option defined by 'parallel' package
n <- getopt("mc.cores") + 1L
n <- getopt_int("mc.cores") + 1L
} else if (method == "connections") {
## Number of available connections, which are needed by PSOCK clusters
n <- freeConnections()
} else if (method == "BiocParallel") {
n <- getenv("BIOCPARALLEL_WORKER_NUMBER")
n <- getenv_int("BIOCPARALLEL_WORKER_NUMBER")
} else if (method == "_R_CHECK_LIMIT_CORES_") {
## A flag set by R CMD check for constraining number of
## cores allowed to be use in package tests. Here we
Expand All @@ -368,6 +285,19 @@ availableCores <- function(constraints = NULL, methods = getOption2("parallelly.
} else if (method == "system") {
## Number of cores available according to parallel::detectCores()
n <- detectCores(logical = logical)
} else if (method == "/proc/self/status") {
pathname <- "/proc/self/status"
if (file_test("-f", pathname)) {
bfr <- readLines(pathname, warn = FALSE)
bfr <- grep("^Cpus_allowed_list:", bfr, value = TRUE)
if (length(bfr) == 1) {
bfr <- sub("^Cpus_allowed_list:\t", "", bfr)
if (nzchar(bfr)) {
bfr <- slurm_expand_nodelist(sprintf("[%s]", bfr))
n <- length(bfr)
}
}
}
} else if (method == "cgroups.cpuset") {
## Number of cores according to Unix cgroups v1 CPU set
n <- length(getCGroups1CpuSet())
Expand Down Expand Up @@ -410,11 +340,11 @@ availableCores <- function(constraints = NULL, methods = getOption2("parallelly.
## covr: skip=3
## Fall back to querying option and system environment variable
## with the given name
n <- getopt(method)
if (is.na(n)) n <- getenv(method)
n <- getopt_int(method)
if (is.na(n)) n <- getenv_int(method)
}
ncores[kk] <- n
}
} ## for (kk in seq_along(methods))

## Validate settings
ncoresT <- ncores[!is.na(ncores)]
Expand Down Expand Up @@ -577,3 +507,125 @@ checkNumberOfLocalWorkers <- function(workers) {
warning(msg)
}
} ## checkNumberOfLocalWorkers()


# --------------------------------------------------------------------------
# Utility functions
# --------------------------------------------------------------------------
getenv_int <- function(name, mode = "integer") {
value <- trim(getEnvVar2(name, default = NA_character_))
storage.mode(value) <- mode
value
} # getenv_int()

getopt_int <- function(name, mode = "integer") {
value <- getOption2(name, default = NA_integer_)
storage.mode(value) <- mode
value
} # getopt_int()


# --------------------------------------------------------------------------
# High-Performance Compute (HPC) Schedulers
# --------------------------------------------------------------------------
## Number of slots assigned by LSF
availableCoresLSF <- function() {
n <- getenv_int("LSB_DJOB_NUMPROC")
n
}


## Number of cores assigned by TORQUE/PBS
availableCoresPBS <- function() {
n <- getenv_int("PBS_NUM_PPN")
if (is.na(n)) {
## PBSPro sets 'NCPUS' but not 'PBS_NUM_PPN'
n <- getenv_int("NCPUS")
}
n
}


## Number of slots assigned by Fujitsu Technical Computing Suite
## We choose to call this job scheduler "PJM" based on the prefix
## it's environment variables use.
availableCoresPJM <- function() {
## PJM_VNODE_CORE: e.g. pjsub -L vnode-core=8
## "This environment variable is set only when virtual nodes
## are allocated, and it is not set when nodes are allocated."
n <- getenv_int("PJM_VNODE_CORE")
if (is.na(n)) {
## PJM_PROC_BY_NODE: e.g. pjsub -L vnode-core=8
## "Maximum number of processes that are generated per node by
## an MPI program. However, if a single node (node=1) or virtual
## node (vnode=1) is allocated and the mpi option of the pjsub
## command is not specified, this environment variable is not set."
n <- getenv_int("PJM_PROC_BY_NODE")
}
n
}


## Number of cores assigned by Oracle/Son/Sun/Univa Grid Engine (SGE/UGE)
availableCoresSGE <- function() {
n <- getenv_int("NSLOTS")
n
}


## Number of cores assigned by Slurm
availableCoresSlurm <- function() {
## The assumption is that the following works regardless of
## number of nodes requested /HB 2020-09-18
## Example: --cpus-per-task={n}
n <- getenv_int("SLURM_CPUS_PER_TASK")
if (is.na(n)) {
## Example: --nodes={nnodes} (defaults to 1, short: -N {nnodes})
## From 'man sbatch':
## SLURM_JOB_NUM_NODES (and SLURM_NNODES for backwards compatibility)
## Total number of nodes in the job's resource allocation.
nnodes <- getenv_int("SLURM_JOB_NUM_NODES")
if (is.na(nnodes)) nnodes <- getenv_int("SLURM_NNODES")
if (is.na(nnodes)) nnodes <- 1L ## Can this happen? /HB 2020-09-18

if (nnodes == 1L) {
## Example: --nodes=1 --ntasks={n} (short: -n {n})
## IMPORTANT: 'SLURM_CPUS_ON_NODE' appears to be rounded up when nodes > 1.
## Example 1: With --nodes=2 --cpus-per-task=3 we see SLURM_CPUS_ON_NODE=4
## although SLURM_CPUS_PER_TASK=3.
## Example 2: With --nodes=2 --ntasks=7, we see SLURM_CPUS_ON_NODE=6,
## SLURM_JOB_CPUS_PER_NODE=6,2, no SLURM_CPUS_PER_TASK, and
## SLURM_TASKS_PER_NODE=5,2.
## Conclusions: We can only use 'SLURM_CPUS_ON_NODE' for nnodes = 1.
n <- getenv_int("SLURM_CPUS_ON_NODE")
} else {
## Parse `SLURM_TASKS_PER_NODE`
nodecounts <- getenv_int("SLURM_TASKS_PER_NODE", mode = "character")
if (!is.na(nodecounts)) {
## Examples:
## SLURM_TASKS_PER_NODE=5,2
## SLURM_TASKS_PER_NODE=2(x2),1(x3) # Source: 'man sbatch'
n <- slurm_expand_nodecounts(nodecounts)
if (anyNA(n)) return(NA_real_)

## ASSUMPTION: We assume that it is the first component on the list that
## corresponds to the current machine. /HB 2021-03-05
n <- n[1]
}
}
}

## TODO?: Can we validate above assumptions/results? /HB 2020-09-18
if (FALSE && !is.na(n)) {
## Is any of the following useful?

## Example: --ntasks={ntasks} (no default, short: -n {ntasks})
## From 'man sbatch':
## SLURM_NTASKS (and SLURM_NPROCS for backwards compatibility)
## Same as -n, --ntasks
ntasks <- getenv_int("SLURM_NTASKS")
if (is.na(ntasks)) ntasks <- getenv_int("SLURM_NPROCS")
}

n
} ## availableCoresSlurm()
Loading

0 comments on commit bf44f96

Please sign in to comment.