"""Main module for creating experiments."""
import logging
import os
import re
import sys
from collections import OrderedDict
from pathlib import Path
from lab import environments, tools
from lab.fetcher import Fetcher
from lab.parser import Parser
from lab.steps import Step, get_step, get_steps_text
# How many tasks to group into one top-level directory.
SHARD_SIZE = 100
# Make argparser available globally so users can add custom arguments.
ARGPARSER = tools.get_argument_parser()
ARGPARSER.epilog = "The list of available steps will be added later."
steps_group = ARGPARSER.add_mutually_exclusive_group()
steps_group.add_argument(
"steps",
metavar="step",
nargs="*",
default=[],
help="Name or number of a step below. If none is given, print help.",
)
steps_group.add_argument(
"--all", dest="run_all_steps", action="store_true", help="Run all steps."
)
STATIC_EXPERIMENT_PROPERTIES_FILENAME = "static-experiment-properties"
STATIC_RUN_PROPERTIES_FILENAME = "static-properties"
def get_default_data_dir():
"""E.g. "ham/spam/eggs.py" => "ham/spam/data/"."""
return os.path.join(os.path.dirname(tools.get_script_path()), "data")
def _get_default_experiment_name():
"""Get default name for experiment.
Derived from the filename of the main script, e.g.
"ham/spam/eggs.py" => "eggs".
"""
return os.path.splitext(os.path.basename(tools.get_script_path()))[0]
def _get_default_experiment_dir():
"""E.g. "ham/spam/eggs.py" => "ham/spam/data/eggs"."""
return os.path.join(get_default_data_dir(), _get_default_experiment_name())
def get_run_dir(task_id):
lower = ((task_id - 1) // SHARD_SIZE) * SHARD_SIZE + 1
upper = ((task_id + SHARD_SIZE - 1) // SHARD_SIZE) * SHARD_SIZE
return f"runs-{lower:0>5}-{upper:0>5}/{task_id:0>5}"
def _check_name(name, typ, extra_chars=""):
if not isinstance(name, str):
logging.critical(f"Name for {typ} must be a string: {name}")
if not name:
logging.critical(f"Name for {typ} must not be empty")
alpha_num_name = name
for c in extra_chars:
alpha_num_name = alpha_num_name.replace(c, "")
if not name[0].isalpha():
logging.critical(f"Name for {typ} must start with a letter.")
if not alpha_num_name.isalnum():
logging.critical(
f"Name for {typ} may only use characters from"
f" [A-Z], [a-z], [0-9], [{extra_chars}]: {name}"
)
class _Resource:
def __init__(self, name, source, dest, symlink):
self.name = name
self.source = source
self.dest = dest
self.symlink = symlink
class _Buildable:
"""Abstract base class for Experiment and Run."""
def __init__(self):
self.resources = []
self.new_files = []
self.env_vars_relative = {}
self.commands = OrderedDict()
self.properties = tools.Properties()
def set_property(self, name, value):
"""Add a key-value property.
These can be used later, for example, in reports. ::
>>> exp = Experiment()
>>> exp.set_property("suite", ["gripper", "grid"])
>>> run = exp.add_run()
>>> run.set_property("domain", "gripper")
>>> run.set_property("problem", "prob01.pddl")
Each run must have the property *id* which must be a *unique*
list of strings. They determine where the results for this run
will land in the combined properties file. ::
>>> run.set_property("id", ["algo1", "task1"])
>>> run.set_property("id", ["algo2", "domain1", "problem1"])
"""
self.properties[name] = value
def _check_alias(self, name):
_check_name(name, "parser or resource", extra_chars="_")
if name in self.env_vars_relative:
logging.critical(f"Parser and resource names must be unique: {name!r}")
def add_resource(self, name, source, dest="", symlink=False):
"""Include the file or directory *source* in the experiment or run.
*name* is an alias for the resource in commands. It must start with a
letter and consist exclusively of letters, numbers and underscores.
If you don't need an alias for the resource, set name=''.
*source* is copied to /path/to/exp-or-run/*dest*. If *dest* is
omitted, the last part of the path to *source* will be taken as the
destination filename. If you only want an alias for your resource, but
don't want to copy or link it, set *dest* to None.
Example::
>>> exp = Experiment()
>>> exp.add_resource("planner", "path/to/my-planner")
includes my-planner in the experiment directory. You can use
``{planner}`` to reference my-planner in a run's commands::
>>> run = exp.add_run()
>>> run.add_resource("domain", "path-to/gripper/domain.pddl")
>>> run.add_resource("task", "path-to/gripper/prob01.pddl")
>>> run.add_command("plan", ["{planner}", "{domain}", "{task}"])
"""
if dest == "":
dest = os.path.basename(source)
if dest is None:
dest = os.path.abspath(source)
if name:
self._check_alias(name)
self.env_vars_relative[name] = dest
self.resources.append(_Resource(name, source, dest, symlink))
def add_new_file(self, name, dest, content, permissions=0o644):
"""
Write *content* to /path/to/exp-or-run/*dest* and make the new file
available to the commands as *name*.
*name* is an alias for the resource in commands. It must start with a
letter and consist exclusively of letters, numbers and underscores. ::
>>> exp = Experiment()
>>> run = exp.add_run()
>>> run.add_new_file("learn", "learn.txt", "a = 5; b = 2; c = 5")
>>> run.add_command("print-trainingset", ["cat", "{learn}"])
"""
if name:
self._check_alias(name)
self.env_vars_relative[name] = dest
self.new_files.append((dest, content, permissions))
def add_command(
self,
name,
command,
time_limit=None,
memory_limit=None,
soft_stdout_limit=1024,
hard_stdout_limit=10 * 1024,
soft_stderr_limit=64,
hard_stderr_limit=10 * 1024,
**kwargs,
):
"""Call an executable.
If invoked on a *run*, this method adds the command to the
**specific** run. If invoked on the experiment, the command is
appended to the list of commands of **all** runs.
*name* is a string describing the command. It must start with a
letter and consist exclusively of letters, numbers, underscores
and hyphens.
*command* has to be a list of strings where the first item is
the executable.
After *time_limit* seconds the signal SIGXCPU is sent to the
command. The process can catch this signal and exit gracefully.
If it doesn't catch the SIGXCPU signal, the command is aborted
with SIGKILL after five additional seconds. The time spent by a
command is the sum of time spent across all threads of the
process.
The command is aborted with SIGKILL when it uses more than
*memory_limit* MiB.
You can limit the log size (in KiB) with a soft and hard limit
for both stdout and stderr. When the soft limit is hit, an
unexplained error is registered for this run, but the command is
allowed to continue running. When the hard limit is hit, the
command is killed with SIGTERM. This signal can be caught and
handled by the process.
By default, there are limits for the log and error output, but
time and memory are not restricted.
All *kwargs* (except ``stdin``) are passed to `subprocess.Popen
<http://docs.python.org/library/subprocess.html>`_. Instead of
file handles you can also pass filenames for the ``stdout`` and
``stderr`` keyword arguments. Specifying the ``stdin`` kwarg is
not supported.
>>> exp = Experiment()
>>> run = exp.add_run()
>>> # Add commands to a *specific* run.
>>> run.add_command("solver", ["mysolver", "input-file"], time_limit=60)
>>> # Add a command to *all* runs.
>>> exp.add_command("cleanup", ["rm", "my-temp-file"])
Make sure to call all Python programs from the currently active
Python interpreter, i.e., ``sys.executable``. Otherwise, the
system Python version might be used instead of the Python version
from the virtual environment.
>>> run.add_command("myplanner", [sys.executable, "planner.py", "input-file"])
"""
_check_name(name, "command", extra_chars="_-")
if name in self.commands:
logging.critical(f"Command names must be unique: {name}")
if not isinstance(command, list):
logging.critical(f"The command for {name} is not a list: {command}")
if not command:
logging.critical(f'Command "{name}" must not be empty')
# Raise an error if the command calls Python directly.
if re.match(r"^python[0-9\.]*$", command[0]):
msg = (
'Command "{name}" calls Python directly. '
"To make sure the command uses the correct Python interpreter, "
"please use {part} instead."
)
raise ValueError(
msg.format(name=name, part=["sys.executable"] + command[1:])
)
elif command[0].endswith(".py"):
msg = (
'Command "{name}" calls the Python script "{part}" directly. '
"To make sure the script uses the correct Python interpreter, "
'please use "[sys.executable, "{part}", ...] instead.'
)
raise ValueError(msg.format(name=name, part=command[0]))
if "stdin" in kwargs:
logging.critical("redirecting stdin is not supported")
kwargs["time_limit"] = time_limit
kwargs["memory_limit"] = memory_limit
kwargs["soft_stdout_limit"] = soft_stdout_limit
kwargs["hard_stdout_limit"] = hard_stdout_limit
kwargs["soft_stderr_limit"] = soft_stderr_limit
kwargs["hard_stderr_limit"] = hard_stderr_limit
self.commands[name] = (command, kwargs)
@property
def _env_vars(self):
return {
name: self._get_abs_path(dest)
for name, dest in self.env_vars_relative.items()
}
def _get_abs_path(self, rel_path):
"""Return absolute path by applying rel_path to the base dir."""
return os.path.join(self.path, rel_path)
def _get_rel_path(self, abs_path):
return os.path.relpath(abs_path, start=self.path)
def _build_properties_file(self, properties_filename):
combined_props = tools.Properties(self._get_abs_path(properties_filename))
combined_props.update(self.properties)
combined_props.write()
def _build_new_files(self):
for dest, content, permissions in self.new_files:
filename = self._get_abs_path(dest)
tools.makedirs(os.path.dirname(filename))
logging.debug(f'Writing file "{filename}"')
tools.write_file(filename, content)
os.chmod(filename, permissions)
def _build_resources(self):
for resource in self.resources:
if not os.path.exists(resource.source):
logging.critical(f"Resource not found: {resource.source}")
dest = self._get_abs_path(resource.dest)
if not dest.startswith(self.path):
# Only copy resources that reside in the experiment/run dir.
continue
if resource.symlink:
# Do not create a symlink if the file doesn't exist.
if not os.path.exists(resource.source):
continue
source = self._get_rel_path(resource.source)
os.symlink(source, dest)
logging.debug(f"Linking from {source} to {dest}")
continue
# Even if the directory containing a resource has already been added,
# we copy the resource since we might want to overwrite it.
logging.debug(f"Copying {resource.source} to {dest}")
tools.copy(resource.source, dest)
[docs]
class Experiment(_Buildable):
"""Base class for Lab experiments.
See :ref:`concepts` for a description of how Lab experiments are
structured.
"""
def __init__(self, path=None, environment=None):
"""
The experiment will be built at *path*. It defaults to
``<scriptdir>/data/<scriptname>/``. E.g., for the script
``experiments/myexp.py``, the default *path* will be
``experiments/data/myexp/``.
*environment* must be an :ref:`Environment <environments>`
instance. You can use
:class:`~lab.environments.LocalEnvironment` to run your
experiment on a single computer (default). If you have access to
the computer grid in Basel you can use the predefined grid
environment :class:`~lab.environments.BaselSlurmEnvironment`.
Alternatively, you can derive your own class from
:ref:`Environment <environments>`.
"""
tools.configure_logging()
_Buildable.__init__(self)
path = path or _get_default_experiment_dir()
self.path = os.path.abspath(path)
if any(char in self.path for char in (":", ",")):
logging.critical(f"Path contains commas or colons: {self.path}")
self.environment = environment or environments.LocalEnvironment()
self.environment.exp = self
self.steps = []
self.runs = []
self.parsers = []
self.set_property("experiment_file", self._script)
@property
def name(self):
"""Return the directory name of the experiment's ``path``."""
return os.path.basename(self.path)
@property
def eval_dir(self):
"""Return the name of the default evaluation directory.
This is the directory where the fetched and parsed results will land by
default.
"""
return self.path + "-eval"
@property
def _script(self):
"""Return the filename of the experiment script."""
return os.path.basename(sys.argv[0])
[docs]
def add_step(self, name, function, *args, **kwargs):
"""Add a step to the list of experiment steps.
Use this method to add experiment steps like writing the
experiment file to disk, removing directories and publishing
results. To add fetch and report steps, use the convenience
methods :meth:`.add_fetcher` and :meth:`.add_report`.
*name* is a descriptive name for the step. When selecting steps
on the command line, you may either use step names or their
indices.
*function* must be a callable Python object, e.g., a function
or a class implementing `__call__`.
*args* and *kwargs* will be passed to *function* when the step
is executed.
>>> import shutil
>>> import subprocess
>>> from lab.experiment import Experiment
>>> exp = Experiment("/tmp/myexp")
>>> exp.add_step("build", exp.build)
>>> exp.add_step("start", exp.start_runs)
>>> exp.add_step("rm-eval-dir", shutil.rmtree, exp.eval_dir)
>>> exp.add_step("greet", subprocess.call, ["echo", "Hello"])
"""
if not isinstance(name, str):
logging.critical(f"Step name must be a string: {name}")
if not name:
logging.critical("Step name must not be empty")
if any(step.name == name for step in self.steps):
raise ValueError(f"Step names must be unique: {name}")
self.steps.append(Step(name, function, *args, **kwargs))
[docs]
def add_parser(self, parser):
"""
Add a :class:`lab.parser.Parser` to each run of the experiment.
Each parser is executed in each run directory and manipulates the run's
"properties" file. For information about how to write parsers see
:ref:`parsing`.
"""
if not isinstance(parser, Parser):
raise TypeError(f'"{parser}" must be a Parser instance')
self.parsers.append(parser)
[docs]
def parse(self):
"""
Run all parsers that have been added to the experiment with
:meth:`.add_parser`.
After parsing, you'll want to run a "fetch" step to collect the parsed
data from the experiment into the evaluation directory.
"""
if not os.path.isdir(self.path):
logging.critical(f"{self.path} is missing or not a directory")
run_dirs = sorted(Path(self.path).glob("runs-*-*/*"))
num_runs = len(run_dirs)
logging.info(
f"Running {len(self.parsers)} parsers in {num_runs:d} run directories."
)
for index, run_dir in enumerate(run_dirs, start=1):
props_path = run_dir / "properties"
if props_path.is_file():
props_path.unlink()
loglevel = logging.INFO if index % 100 == 0 else logging.DEBUG
logging.log(loglevel, f"Parsing run: {index:6d}/{num_runs:d}")
props = tools.Properties(filename=props_path)
for parser in self.parsers:
parser.parse(run_dir, props)
props.write()
[docs]
def add_fetcher(
self, src=None, dest=None, merge=None, name=None, filter=None, **kwargs
):
"""
Add a step that fetches results from an experiment or evaluation
directory into a new or existing evaluation directory.
You can use this method to combine results from multiple
experiments.
*src* can be an experiment or evaluation directory or a properties
file. It defaults to ``exp.path``.
*dest* must be a new or existing evaluation directory. It
defaults to ``exp.eval_dir``. If *dest* already contains
data and *merge* is set to None, the user will be prompted
whether to override the existing data or to merge the old and
new data. Setting *merge* to True or to False has the effect
that the old data is merged or replaced (and the user will not
be prompted).
If no *name* is given, call this step "fetch-``basename(src)``".
You can fetch only a subset of runs (e.g., runs for specific
domains or algorithms) by passing :py:class:`filters <.Report>`
with the *filter* argument.
Example setup:
>>> exp = Experiment("/tmp/exp")
Fetch all results and write a single combined properties file
to the default evaluation directory (this step is added by
default):
>>> exp.add_fetcher(name="fetch")
Merge the results from "other-exp" into this experiment's
results:
>>> exp.add_fetcher(src="/path/to/other-exp-eval")
Fetch only the runs for certain algorithms:
>>> exp.add_fetcher(filter_algorithm=["algo_1", "algo_5"])
"""
src = src or self.path
dest = dest or self.eval_dir
name = name or f"fetch-{os.path.basename(src.rstrip('/'))}"
self.add_step(name, Fetcher(), src, dest, merge=merge, filter=filter, **kwargs)
[docs]
def add_report(self, report, name="", eval_dir="", outfile=""):
"""Add *report* to the list of experiment steps.
This method is a shortcut for ``add_step(name, report,
eval_dir, outfile)`` and uses sensible defaults for omitted
arguments.
If no *name* is given, use *outfile* or the *report*'s class name.
By default, use the experiment's standard *eval_dir*.
If *outfile* is omitted, compose a filename from *name* and the
*report*'s format. If *outfile* is a relative path, put it under
*eval_dir*.
>>> from downward.reports.absolute import AbsoluteReport
>>> exp = Experiment("/tmp/exp")
>>> exp.add_report(AbsoluteReport(attributes=["coverage"]))
"""
name = name or os.path.basename(outfile) or report.__class__.__name__.lower()
eval_dir = eval_dir or self.eval_dir
outfile = outfile or f"{name}.{report.output_format}"
if not os.path.isabs(outfile):
outfile = os.path.join(eval_dir, outfile)
self.add_step(name, report, eval_dir, outfile)
[docs]
def add_run(self, run=None):
"""Schedule *run* to be part of the experiment.
If *run* is None, create a new run, add it to the experiment
and return it.
"""
run = run or Run(self)
self.runs.append(run)
return run
[docs]
def run_steps(self):
"""Parse the commandline and run selected steps."""
ARGPARSER.epilog = get_steps_text(self.steps)
args = ARGPARSER.parse_args()
assert not args.steps or not args.run_all_steps
if not args.steps and not args.run_all_steps:
ARGPARSER.print_help()
return
# Run all steps if --all is passed.
steps = [get_step(self.steps, name) for name in args.steps] or self.steps
# Use LocalEnvironment if the main experiment step is inactive.
if any(environments.is_run_step(step) for step in steps):
env = self.environment
else:
env = environments.LocalEnvironment()
env.run_steps(steps)
def _remove_experiment_dir(self):
if os.path.exists(self.path):
tools.confirm_overwrite_or_abort(self.path)
tools.remove_path(self.path)
[docs]
def build(self, write_to_disk=True):
"""
Finalize the internal data structures, then write all files
needed for the experiment to disk.
If *write_to_disk* is False, only compute the internal data
structures. This is only needed on grids for
FastDownwardExperiments.build() which turns the added algorithms
and benchmarks into Runs.
"""
if not write_to_disk:
return
logging.info(f'Experiment path: "{tools.get_relative_path(self.path)}"')
self._remove_experiment_dir()
tools.makedirs(self.path)
self._build_resources()
self._build_runs()
self._build_properties_file(STATIC_EXPERIMENT_PROPERTIES_FILENAME)
# The main script can need other experiment files and it adds new files
self.environment.write_main_script()
self._build_new_files()
[docs]
def start_runs(self):
"""Execute all runs that were added to the experiment.
Depending on the selected environment this method will start
the runs locally or on a computer grid.
"""
self.environment.start_runs()
def _build_runs(self):
"""
Uses the relative directory information and writes all runs to disc.
"""
if not self.runs:
logging.critical("No runs have been added to the experiment.")
num_runs = len(self.runs)
self.set_property("runs", num_runs)
logging.info(f"Building {num_runs} runs")
for index, run in enumerate(self.runs, 1):
if index % 100 == 0:
logging.info(f"Build run {index:6}/{num_runs}")
for name, (command, kwargs) in self.commands.items():
run.add_command(name, command, **kwargs)
run.build(index)
logging.info("Finished building runs")
[docs]
class Run(_Buildable):
"""
An experiment consists of multiple runs. There should be one run
for each (algorithm, benchmark) pair.
A run consists of one or more commands.
"""
def __init__(self, experiment):
"""
*experiment* must be an :class:`~lab.experiment.Experiment` instance.
"""
_Buildable.__init__(self)
self.experiment = experiment
self.path = None
def build(self, run_id):
"""Write the run's files to disk.
This method is called automatically by the experiment.
"""
rel_run_dir = get_run_dir(run_id)
self.set_property("run_dir", rel_run_dir)
self.path = os.path.join(self.experiment.path, rel_run_dir)
os.makedirs(self.path)
# We need to build the run script before the resources, because
# the run script is added as a resource.
self._build_run_script()
self._build_new_files()
self._build_resources()
self._check_id()
self._build_properties_file(STATIC_RUN_PROPERTIES_FILENAME)
def _build_run_script(self):
if not self.commands:
logging.critical("Please add at least one command")
exp_vars = self.experiment._env_vars
run_vars = self._env_vars
doubly_used_vars = set(exp_vars) & set(run_vars)
if doubly_used_vars:
logging.critical(
f"Resource names cannot be shared between experiments "
f"and runs, they must be unique: {doubly_used_vars}"
)
env_vars = exp_vars
env_vars.update(run_vars)
env_vars = self._prepare_env_vars(env_vars)
def make_call(name, cmd, kwargs):
kwargs["name"] = name
# Support running globally installed binaries.
def format_arg(arg):
if isinstance(arg, str):
try:
return repr(arg.format(**env_vars))
except KeyError as err:
logging.critical(f"Resource {err} is undefined.")
else:
return repr(str(arg))
def format_key_value_pair(key, val):
formatted_value = format_arg(val) if isinstance(val, str) else repr(val)
return f"{key}={formatted_value}"
cmd_string = f"[{', '.join([format_arg(arg) for arg in cmd])}]"
kwargs_string = ", ".join(
format_key_value_pair(key, value)
for key, value in sorted(kwargs.items())
)
parts = [cmd_string]
if kwargs_string:
parts.append(kwargs_string)
return f"Call({', '.join(parts)}, **redirects).wait()\n"
calls_text = "\n".join(
make_call(name, cmd, kwargs)
for name, (cmd, kwargs) in self.commands.items()
)
run_script = tools.fill_template("run.py", calls=calls_text)
self.add_new_file("", "run", run_script, permissions=0o755)
def _prepare_env_vars(self, env_vars):
"""Use relative filenames for paths in the experiment dir."""
new_env_vars = {}
for var, path in env_vars.items():
abspath = self._get_abs_path(path)
if abspath.startswith(self.experiment.path):
new_env_vars[var] = self._get_rel_path(path)
else:
new_env_vars[var] = abspath
return new_env_vars
def _check_id(self):
run_id = self.properties.get("id")
if run_id is None:
logging.critical("Each run must have an id")
if not isinstance(run_id, (list, tuple)):
logging.critical(f"id must be a list: {run_id}")
for id_part in run_id:
if not isinstance(id_part, str):
logging.critical(f"run IDs must be a list of strings: {run_id}")