Source code for downward.reports.absolute

from collections import defaultdict
import logging
import re

from downward import outcomes
from downward.reports import PlanningReport
from lab import reports


def _abbreviate_node_names(nodes):
    """
    ase05.cluster.bc2.ch -> ase05
    {ase10, ase11, ase12, ase13, ase14} -> {ase10, ..., ase14}
    """
    abbrev_nodes = []
    sequence_buffer = []

    def flush_buffer():
        if len(sequence_buffer) <= 2:
            abbrev_nodes.extend(sequence_buffer)
        else:
            abbrev_nodes.extend([sequence_buffer[0], "...", sequence_buffer[-1]])
        del sequence_buffer[:]

    for node in sorted(nodes):
        node = node.replace(".cluster.bc2.ch", "")
        match = re.match(r"ase(\d{2})", node)
        if match:
            infai_node_id = int(match.group(1))
            if sequence_buffer:
                if sequence_buffer[-1] == f"ase{infai_node_id - 1:02d}":
                    sequence_buffer.append(node)
                elif len(sequence_buffer) in [1, 2]:
                    flush_buffer()
                    sequence_buffer = [node]
                else:
                    flush_buffer()
                    sequence_buffer = [node]
            else:
                sequence_buffer.append(node)
        else:
            flush_buffer()
            abbrev_nodes.append(node)
    flush_buffer()
    return abbrev_nodes


[docs] class AbsoluteReport(PlanningReport): """ Report absolute values for the selected attributes. This report should be part of all your Fast Downward experiments as it includes a table of unexplained errors, e.g. invalid solutions, segmentation faults, etc. >>> from downward.experiment import FastDownwardExperiment >>> exp = FastDownwardExperiment() >>> exp.add_report(AbsoluteReport(attributes=["expansions"]), outfile="report.html") Example output: +------------+--------+--------+ | expansions | hFF | hCEA | +============+========+========+ | gripper | 118 | 72 | +------------+--------+--------+ | zenotravel | 21 | 17 | +------------+--------+--------+ """ def __init__(self, **kwargs): PlanningReport.__init__(self, **kwargs) self.colored = "html" in self.output_format self.use_domain_links = "html" in self.output_format self.toc = False def get_markup(self): sections = [] toc_lines = [] warnings = self._get_warnings_text_and_table() if warnings: toc_lines.append("- **[" "Unexplained Errors" " #unexplained-errors]**") sections.append(("unexplained-errors", warnings)) toc_lines.append("- **[Info #info]**") sections.append(("info", self._get_general_info())) # Index of summary section. summary_index = len(sections) # Build a table containing summary functions of all other tables. # The actual section is added at position summary_index after creating # all other tables. summary = self._get_empty_table(title="Summary") summary.colored = self.colored toc_lines.append("- **[Summary #summary]**") for attribute in self.attributes: logging.info(f"Creating table(s) for {attribute}") tables = [] if attribute == "error": seen_errors = set() error_counter = defaultdict(int) for run in self.runs.values(): error = run.get("error", "attribute-error-missing") seen_errors.add(error) error_counter[(run["algorithm"], run["domain"], error)] += 1 error_to_min_wins = { outcome.msg: outcome.min_wins for outcome in outcomes.OUTCOMES } for error in sorted(seen_errors): # Txt2tags seems to only allow letters, "-" and "_" in anchors. pseudo_attribute = "error-" + error table = self._get_empty_table(title=pseudo_attribute) min_wins = error_to_min_wins.get(error, None) table.min_wins = min_wins table.colored = min_wins is not None for domain in self.domains: if self.use_domain_links: table.cell_formatters[domain][ table.header_column ] = reports.CellFormatter(link=f"#error-{domain}") for algorithm in self.algorithms: count = error_counter.get((algorithm, domain, error), 0) table.add_cell(domain, algorithm, count) table.add_summary_function("Sum", sum) reports.extract_summary_rows( table, summary, link="#" + "error-" + pseudo_attribute ) tables.append((pseudo_attribute, table)) elif self.attribute_is_numeric(attribute): domain_table = self._get_suite_table(attribute) tables.append(("", domain_table)) reports.extract_summary_rows( domain_table, summary, link="#" + attribute ) else: tables.append( ( "", f"Per-domain reports only support numeric " f"attributes, but {attribute} has type " f"{self._all_attributes[attribute].__name__}.", ) ) for domain in sorted(self.domains.keys()): tables.append((domain, self._get_domain_table(attribute, domain))) parts = [] toc_line = [] for (domain, table) in tables: if domain: assert table toc_line.append(f"[''{domain}'' #{attribute}-{domain}]") parts.append(f"=== {domain} ===[{attribute}-{domain}]\n{table}\n") else: if table: parts.append(f"{table}\n") else: parts.append( f"No task was found where all algorithms " f'have a value for "{attribute}". Therefore no ' f"per-domain table can be generated.\n" ) toc_lines.append(f"- **[''{attribute}'' #{attribute}]**") toc_lines.append(" - " + " ".join(toc_line)) sections.append((attribute, "\n".join(parts))) # Add summary before main content. This is done after creating the main content # because the summary table is extracted from all other tables. sections.insert(summary_index, ("summary", summary)) toc = "\n".join(toc_lines) content = "\n".join( f"== {attr} ==[{attr}]\n\n{section}" for (attr, section) in sections ) return f"{toc}\n\n\n{content}" def _get_general_info(self): table = reports.Table(title="algorithm") for algo, info in self.algorithm_info.items(): for attr in self.INFO_ATTRIBUTES: if info[attr]: table.add_cell(algo, attr, info[attr]) table.set_column_order(self.INFO_ATTRIBUTES) used_nodes = ", ".join(_abbreviate_node_names(self._get_node_names())) node_info = f"Used nodes: {{{used_nodes}}}" if table: return str(table) + "\n" + node_info else: logging.warning("Table containing algorithm information is empty.") return node_info def _get_aggregation_function(self, attribute): """Decide on a list of group functions for this attribute.""" func = attribute.function return (reports.function_name(func), func) def _add_table_info(self, attribute, func_name, table): """ Add some information to the table for attributes where data is missing. """ if not attribute.absolute: table.info.append( f"Only tasks where all algorithms have a " f'value for "{attribute}" are considered.' ) table.info.append( f'Each table entry gives the {func_name} of "{attribute}" for that ' f"domain." ) summary_names = [name.lower() for name, _ in table.summary_funcs.items()] if len(summary_names) == 1: table.info.append( f"The bottom row reports the {summary_names[0]} across all domains." ) elif len(summary_names) > 1: names = " and ".join(summary_names) table.info.append(f"The bottom rows report the {names} across all domains.") def _get_suite_table(self, attribute): assert self.attribute_is_numeric(attribute), attribute table = self._get_empty_table(attribute) self._add_summary_functions(table, attribute) func_name, func = self._get_aggregation_function(attribute) num_probs = 0 self._add_table_info(attribute, func_name, table) domain_algo_values = {} for domain in self.domains: for algorithm in self.algorithms: domain_algo_values[(domain, algorithm)] = [] for (domain, _), runs in self.problem_runs.items(): # If the attribute is absolute, no runs must have been filtered and # no values must be missing. if not attribute.absolute and ( len(runs) < len(self.algorithms) or any(run.get(attribute) is None for run in runs) ): continue num_probs += 1 for run in runs: value = run.get(attribute) if value is not None: domain_algo_values[(domain, run["algorithm"])].append(value) # If the attribute is absolute (e.g. coverage) we may have # added problems for which not all algorithms have a value. Therefore, we # can only print the number of instances (in brackets after the domain # name) if that number is the same for all algorithms. If not all algorithms # have values for the same number of problems, we write the full list of # different problem numbers. for domain in self.domains: task_counts = [ str(len(domain_algo_values[(domain, algo)])) for algo in self.algorithms ] if len(set(task_counts)) == 1: count = task_counts[0] else: count = ", ".join(task_counts) link = None if self.use_domain_links: link = f"#{attribute}-{domain}" formatter = reports.CellFormatter(link=link, count=count) table.cell_formatters[domain][table.header_column] = formatter for (domain, algo), values in domain_algo_values.items(): domain_value = func(values) if values else None table.add_cell(domain, algo, domain_value) table.num_values = num_probs return table def _get_domain_table(self, attribute, domain): table = self._get_empty_table(attribute) for algo in self.algorithms: for run in self.domain_algorithm_runs[domain, algo]: table.add_cell(run["problem"], algo, run.get(attribute)) return table def _get_empty_table(self, attribute=None, title=None, columns=None): """Return an empty table.""" if title is None: assert attribute is not None title = attribute if self.output_format == "tex": title = title.capitalize().replace("_", " ") if columns is None: columns = self.algorithms if attribute is not None and self.attribute_is_numeric(attribute): # Decide whether we want to highlight minima or maxima. kwargs = { "min_wins": attribute.min_wins, "colored": self.colored and attribute.min_wins is not None, "digits": attribute.digits, } else: # Do not highlight anything. kwargs = {} table = reports.Table(title=title, **kwargs) table.set_column_order(columns) link = f"#{title}" formatter = reports.CellFormatter(link=link) table.cell_formatters[table.header_row][table.header_column] = formatter return table def _add_summary_functions(self, table, attribute): funcname, func = self._get_aggregation_function(attribute) table.add_summary_function(funcname.capitalize(), func)