import logging
import math
import multiprocessing
import os
import platform
import random
import re
import subprocess
import sys
from lab import tools
def _get_job_prefix(exp_name):
assert exp_name
escape_char = "j" if exp_name[0].isdigit() else ""
return "".join([escape_char, exp_name, "-"])
def is_build_step(step):
"""Return true iff the given step is the "build" step."""
return step._funcname == "build"
def is_run_step(step):
"""Return true iff the given step is the "run" step."""
return step._funcname == "start_runs"
[docs]
class Environment:
"""Abstract base class for all environments."""
def __init__(self, randomize_task_order=True):
"""
If *randomize_task_order* is True (default), tasks for runs are
started in a random order. This is useful to avoid systematic
noise due to, e.g., one of the algorithms being run on a
machine with heavy load. Note that due to the randomization,
run directories may be pristine while the experiment is running
even though the logs say the runs are finished.
"""
self.exp = None
self.randomize_task_order = randomize_task_order
def _get_task_order(self, num_tasks):
task_order = list(range(1, num_tasks + 1))
if self.randomize_task_order:
random.shuffle(task_order)
return task_order
def write_main_script(self):
raise NotImplementedError
def start_runs(self):
"""
Execute all runs that are part of the experiment.
"""
raise NotImplementedError
def run_steps(self):
raise NotImplementedError
[docs]
class LocalEnvironment(Environment):
"""
Environment for running experiments locally on a single machine.
"""
EXP_RUN_SCRIPT = "run"
def __init__(self, processes=None, **kwargs):
"""
If given, *processes* must be between 1 and #CPUs. If omitted,
it will be set to #CPUs.
See :py:class:`~lab.environments.Environment` for inherited
parameters.
"""
Environment.__init__(self, **kwargs)
cores = multiprocessing.cpu_count()
if processes is None:
processes = cores
if not 1 <= processes <= cores:
raise ValueError("processes must be in the range [1, ..., #CPUs].")
self.processes = processes
def write_main_script(self):
script = tools.fill_template(
"local-job.py",
task_order=self._get_task_order(len(self.exp.runs)),
processes=self.processes,
)
self.exp.add_new_file("", self.EXP_RUN_SCRIPT, script, permissions=0o755)
def start_runs(self):
tools.run_command(
[tools.get_python_executable(), self.EXP_RUN_SCRIPT], cwd=self.exp.path
)
def run_steps(self, steps):
for step in steps:
step()
[docs]
class SlurmEnvironment(Environment):
"""Abstract base class for Slurm environments.
If the main experiment step is part of the selected steps, the
selected steps are submitted to Slurm. Otherwise, the selected steps
are run locally.
.. note::
If the steps are run by Slurm, this class writes job files to
the directory ``<exppath>-grid-steps`` and makes them depend on
one another. Please inspect the \\*.log and \\*.err files in
this directory if something goes wrong. Since the job files call
the experiment script during execution, it mustn't be changed
during the experiment.
If *email* is provided and the steps run on the grid, a message will
be sent when the last experiment step finishes.
Use *extra_options* to pass additional options. The *extra_options*
string may contain newlines. The first example below uses only a given
set of nodes (additional nodes will be used if the given ones don't
satisfy the resource constraints). The second example shows show to
specify a project account (needed on NSC if you're part of multiple
projects). ::
extra_options="#SBATCH --nodelist=ase[1-5,7,10]"
extra_options="#SBATCH --account=snic2021-5-330"
*partition* must be a valid Slurm partition name. In Basel you
can choose from
* "infai_1": 24 nodes with 16 cores, 64GB memory, 500GB Sata (default)
* "infai_2": 24 nodes with 20 cores, 128GB memory, 240GB SSD
* "infai_3": 12 nodes with 128 cores, 512GB memory, 240GB SSD
*qos* must be a valid Slurm QOS name. In Basel this must be
"normal".
*time_limit_per_task* sets the wall-clock time limit for each Slurm task.
The BaselSlurmEnvironment subclass uses a default of "0", i.e., no limit.
(Note that there may still be an external limit set in slurm.conf.)
The TetralithEnvironment class uses a default of "24:00:00", i.e., 24
hours. This is because in certain situations, the scheduler prefers to
schedule tasks shorter than 24 hours.
*memory_per_cpu* must be a string specifying the memory
allocated for each core. The string must end with one of the
letters K, M or G. The default is "3872M". The value for
*memory_per_cpu* should not surpass the amount of memory that is
available per core, which is "3872M" for infai_1, "6354M" for
infai_2, and "4028M" for infai_3. Processes that surpass the
*memory_per_cpu* limit are terminated with SIGKILL. To impose a
soft limit that can be caught from within your programs, you can
use the ``memory_limit`` kwarg of
:py:func:`~lab.experiment.Run.add_command`. Fast Downward users
should set memory limits via the ``driver_options``.
Slurm limits the memory with cgroups. Unfortunately, this often
fails on our nodes, so we set our own soft memory limit for all
Slurm jobs. We derive the soft memory limit by multiplying the
value denoted by the *memory_per_cpu* parameter with 0.98 (the
Slurm config file contains "AllowedRAMSpace=99" and we add some
slack). We use a soft instead of a hard limit so that child
processes can raise the limit.
*cpus_per_task* sets the number of cores to be allocated per Slurm
task (default: 1).
Examples that reserve the maximum amount of memory available per core:
>>> env1 = BaselSlurmEnvironment(partition="infai_1", memory_per_cpu="3872M")
>>> env2 = BaselSlurmEnvironment(partition="infai_2", memory_per_cpu="6354M")
>>> env3 = BaselSlurmEnvironment(partition="infai_3", memory_per_cpu="4028M")
Example that reserves 12 GiB of memory on infai_1:
>>> # 12 * 1024 / 3872 = 3.17 -> round to next int -> 4 cores per task
>>> # 12G / 4 = 3G per core
>>> env = BaselSlurmEnvironment(
... partition="infai_1",
... memory_per_cpu="3G",
... cpus_per_task=4,
... )
Example that reserves 12 GiB of memory on infai_2:
>>> # 12 * 1024 / 6354 = 1.93 -> round to next int -> 2 cores per task
>>> # 12G / 2 = 6G per core
>>> env = BaselSlurmEnvironment(
... partition="infai_2",
... memory_per_cpu="6G",
... cpus_per_task=2,
... )
Example that reserves 12 GiB of memory on infai_3:
>>> # 12 * 1024 / 4028 = 3.05 -> round to next int -> 4 cores per task
>>> # 12G / 4 = 3G per core
>>> env = BaselSlurmEnvironment(
... partition="infai_3",
... memory_per_cpu="3G",
... cpus_per_task=4,
... )
Use *export* to specify a list of environment variables that
should be exported from the login node to the compute nodes
(default: ["PATH"]).
You can alter the environment in which the experiment runs with
the *setup* argument. If given, it must be a string of Bash
commands. Example::
# Load Singularity module.
setup="module load Singularity/2.6.1 2> /dev/null"
Slurm limits the number of job array tasks. You must set the
appropriate value for your cluster in the *MAX_TASKS* class
variable. Lab groups `ceil(runs/MAX_TASKS)` runs in one array
task.
See :py:class:`~lab.environments.Environment` for inherited
parameters.
"""
# Must be overridden in derived classes.
JOB_HEADER_TEMPLATE_FILE = None
RUN_JOB_BODY_TEMPLATE_FILE = None
STEP_JOB_BODY_TEMPLATE_FILE = None
MAX_TASKS: int = None # Value between 1 and MaxArraySize-1 (from slurm.conf).
DEFAULT_PARTITION = None
DEFAULT_QOS = None
DEFAULT_MEMORY_PER_CPU = None
# Can be overridden in derived classes.
DEFAULT_TIME_LIMIT_PER_TASK = "0" # No limit.
DEFAULT_EXPORT = ["PATH"]
DEFAULT_SETUP = ""
NICE_VALUE = 0
JOB_HEADER_TEMPLATE_FILE = "slurm-job-header"
RUN_JOB_BODY_TEMPLATE_FILE = "slurm-run-job-body"
STEP_JOB_BODY_TEMPLATE_FILE = "slurm-step-job-body"
def __init__(
self,
email=None,
extra_options=None,
partition=None,
qos=None,
time_limit_per_task=None,
memory_per_cpu=None,
cpus_per_task=1,
export=None,
setup=None,
**kwargs,
):
super().__init__(**kwargs)
self.email = email
self.extra_options = extra_options or "## (not used)"
if partition is None:
partition = self.DEFAULT_PARTITION
if qos is None:
qos = self.DEFAULT_QOS
if time_limit_per_task is None:
time_limit_per_task = self.DEFAULT_TIME_LIMIT_PER_TASK
if memory_per_cpu is None:
memory_per_cpu = self.DEFAULT_MEMORY_PER_CPU
if export is None:
export = self.DEFAULT_EXPORT
if setup is None:
setup = self.DEFAULT_SETUP
self.partition = partition
self.qos = qos
self.time_limit_per_task = time_limit_per_task
self.memory_per_cpu = memory_per_cpu
self.cpus_per_task = cpus_per_task
self.export = export
self.setup = setup
@staticmethod
def _get_memory_in_kb(limit):
match = re.match(r"^(\d+)(k|m|g)?$", limit, flags=re.I)
if not match:
logging.critical(f"malformed memory_per_cpu parameter: {limit}")
memory = int(match.group(1))
suffix = match.group(2)
if suffix is not None:
suffix = suffix.lower()
if suffix == "k":
pass
elif suffix is None or suffix == "m":
memory *= 1024
elif suffix == "g":
memory *= 1024 * 1024
return memory
def start_runs(self):
# The queue will start the experiment by itself.
pass
def _get_job_name(self, step):
return (
f"{_get_job_prefix(self.exp.name)}"
f"{self.exp.steps.index(step) + 1:02d}-{step.name}"
)
def _get_num_runs_per_task(self):
return math.ceil(len(self.exp.runs) / self.MAX_TASKS)
def _get_num_tasks(self, step):
if is_run_step(step):
num_runs = len(self.exp.runs)
num_tasks = math.ceil(num_runs / self._get_num_runs_per_task())
else:
num_tasks = 1
return num_tasks
def _get_job_header(self, step, is_last):
job_params = self._get_job_params(step, is_last)
return tools.fill_template(self.JOB_HEADER_TEMPLATE_FILE, **job_params)
def _get_run_job_body(self, run_step):
num_runs = len(self.exp.runs)
num_tasks = self._get_num_tasks(run_step)
logging.info(f"Grouping {num_runs} runs into {num_tasks} Slurm tasks.")
return tools.fill_template(
self.RUN_JOB_BODY_TEMPLATE_FILE,
exp_path="../" + self.exp.name,
num_runs=num_runs,
python=tools.get_python_executable(),
runs_per_task=self._get_num_runs_per_task(),
task_order=" ".join(str(i) for i in self._get_task_order(num_tasks)),
)
def _get_step_job_body(self, step):
return tools.fill_template(
self.STEP_JOB_BODY_TEMPLATE_FILE,
cwd=os.getcwd(),
python=tools.get_python_executable(),
script=sys.argv[0],
step_name=step.name,
)
def _get_job_body(self, step):
if is_run_step(step):
return self._get_run_job_body(step)
return self._get_step_job_body(step)
def _get_job(self, step, is_last):
return f"{self._get_job_header(step, is_last)}\n\n{self._get_job_body(step)}"
def write_main_script(self):
# The main script is written by the run_steps() method.
pass
def run_steps(self, steps):
"""
We can't submit jobs from within the grid, so we submit them
all at once with dependencies. We also can't rewrite the job
files after they have been submitted.
"""
self.exp.build(write_to_disk=False)
# Prepare job dir.
job_dir = self.exp.path + "-grid-steps"
if os.path.exists(job_dir):
tools.confirm_or_abort(
f'The path "{job_dir}" already exists, so the experiment has '
f"already been submitted. Are you sure you want to "
f"delete the grid-steps and submit it again?"
)
tools.remove_path(job_dir)
# Overwrite exp dir if it exists.
if any(is_build_step(step) for step in steps):
self.exp._remove_experiment_dir()
# Remove eval dir if it exists.
if os.path.exists(self.exp.eval_dir):
tools.confirm_or_abort(
f'The evaluation directory "{self.exp.eval_dir}" already exists. '
f"Do you want to remove it?"
)
tools.remove_path(self.exp.eval_dir)
# Create job dir only when we need it.
tools.makedirs(job_dir)
prev_job_id = None
for step in steps:
job_name = self._get_job_name(step)
job_file = os.path.join(job_dir, job_name)
job_content = self._get_job(step, is_last=(step == steps[-1]))
tools.write_file(job_file, job_content)
prev_job_id = self._submit_job(
job_name, job_file, job_dir, dependency=prev_job_id
)
def _get_job_params(self, step, is_last):
job_params = {
"errfile": "driver.err",
"extra_options": self.extra_options,
"logfile": "driver.log",
"name": self._get_job_name(step),
"num_tasks": self._get_num_tasks(step),
}
# Let all tasks write into the same two files. We could use %a
# (which is replaced by the array ID) to prevent mangled up logs,
# but we don't want so many files.
job_params["logfile"] = "slurm.log"
job_params["errfile"] = "slurm.err"
job_params["partition"] = self.partition
job_params["qos"] = self.qos
job_params["time_limit_per_task"] = self.time_limit_per_task
job_params["memory_per_cpu"] = self.memory_per_cpu
job_params["cpus_per_task"] = self.cpus_per_task
memory_per_cpu_kb = SlurmEnvironment._get_memory_in_kb(self.memory_per_cpu)
job_params["soft_memory_limit"] = int(
self.cpus_per_task * memory_per_cpu_kb * 0.98
)
job_params["nice"] = self.NICE_VALUE if is_run_step(step) else 0
job_params["environment_setup"] = self.setup
if is_last and self.email:
job_params["mailtype"] = "END,FAIL,REQUEUE,STAGE_OUT"
job_params["mailuser"] = self.email
else:
job_params["mailtype"] = "NONE"
job_params["mailuser"] = ""
return job_params
def _submit_job(self, job_name, job_file, job_dir, dependency=None):
submit = ["sbatch"]
if self.export:
submit += ["--export", ",".join(self.export)]
if dependency:
submit.extend(["-d", "afterany:" + dependency, "--kill-on-invalid-dep=yes"])
submit.append(job_file)
logging.info(f"Executing {' '.join(submit)}")
out = subprocess.check_output(submit, cwd=job_dir).decode()
logging.info(f"Output: {out.strip()}")
match = re.match(r"Submitted batch job (\d*)", out)
assert match, f"Submitting job with sbatch failed: '{out}'"
return match.group(1)
[docs]
class BaselSlurmEnvironment(SlurmEnvironment):
"""Environment for Basel's AI group."""
DEFAULT_PARTITION = "infai_1"
DEFAULT_QOS = "normal"
# infai_1 nodes have 61964 MiB and 16 cores => 3872.75 MiB per core
# (see http://issues.fast-downward.org/issue733).
DEFAULT_MEMORY_PER_CPU = "3872M"
MAX_TASKS = 150000 - 1 # see slurm.conf
# Prioritize jobs from Autonice users on Basel grid.
NICE_VALUE = 5000
[docs]
class TetralithEnvironment(SlurmEnvironment):
"""Environment for the NSC Tetralith cluster in Linköping."""
DEFAULT_PARTITION = "tetralith"
DEFAULT_QOS = "normal"
# The maximum wall-clock time limit for a task is 7 days. The default
# is 2 hours. In certain situations, the scheduler prefers to schedule
# tasks shorter than 24 hours.
DEFAULT_TIME_LIMIT_PER_TASK = "24:00:00"
# There are 1908 nodes. 1844 nodes have 93.1 GiB (97637616 KiB) of
# memory and 64 nodes have 384 GB of memory. All nodes have 32 cores.
# So for the vast majority of nodes, we have 2979 MiB per core. The
# slurm.conf file sets DefMemPerCPU=2904. Since this is rather low, we
# use the default value from the BaselSlurmEnvironment. This also
# allows us to keep the default memory limit in the
# FastDownwardExperiment class.
DEFAULT_MEMORY_PER_CPU = "3872M"
# See slurm.conf
MAX_TASKS = 2000
@classmethod
def is_present(cls):
node = platform.node()
return re.match(r"tetralith\d+\.nsc\.liu\.se|n\d+", node)