"""
Module that permits generating reports by reading properties files
"""
import collections
import fnmatch
import logging
import math
import numbers
import os
import sys
from collections import defaultdict
import txt2tags
from lab import tools
from lab.reports import markup
from lab.reports.markup import ESCAPE_WORDBREAK, Document
[docs]
def arithmetic_mean(values):
"""Compute the arithmetic mean of a sequence of numbers.
>>> arithmetic_mean([20, 30, 70])
40.0
"""
assert None not in values
return math.fsum(values) / len(values)
[docs]
def geometric_mean(values):
"""Compute the geometric mean of a sequence of numbers.
>>> round(geometric_mean([2, 8]), 2)
4.0
"""
assert None not in values
exp = 1.0 / len(values)
return tools.product([val**exp for val in values])
def finite_sum(values):
"""Compute the sum of a list of numbers, excluding values of
None and 'infinity'.
"""
return sum(x for x in values if x is not None and x != sys.maxsize)
def function_name(f):
names = {
"arithmetic_mean": "arithmetic mean",
"finite_sum": "finite sum",
"geometric_mean": "geometric mean",
}
return names.get(f.__name__, f.__name__)
def get_aggregation_function(function, functions):
"""
Code for backwards compatibility.
"""
if function and functions:
logging.critical(
'You cannot use "function" and "functions" kwargs for '
"Attribute at the same time."
)
elif functions:
tools.show_deprecation_warning(
'"functions" kwarg for Attribute is deprecated. Use ' '"function" instead.'
)
if len(functions) > 1:
logging.critical("Using multiple aggregation functions is unsupported.")
return functions[0]
else:
return function
[docs]
class Attribute(str):
"""A string subclass for attributes in reports."""
def __new__(cls, name, **kwargs):
return str.__new__(cls, name)
def __init__(
self,
name,
absolute=False,
min_wins=True,
function=None,
functions=None,
scale=None,
digits=2,
):
"""
Use this class if your **custom** attribute needs a non-default
value for:
* *absolute*: if False, only include tasks for which all task
runs have values in a per-domain table (e.g. ``coverage`` is
absolute, whereas ``expansions`` is not, because we can't
compare algorithms A and B for task X if B has no value for
``expansions``).
* *min_wins*: set to True if a smaller value for this attribute
is better, to False if a higher value is better and to None
if values can't be compared. (E.g., *min_wins* is False for
``coverage``, but it is True for ``expansions``).
* *function*: the function used to aggregate
values of multiple runs for this attribute, for example, in
domain reports. Defaults to :py:func:`sum`.
* *functions*: deprecated. Pass a single *function* instead.
* *scale*: default scaling. Can be one of "linear", "log" and
"symlog". If *scale* is None (default), the reports will
choose the scaling.
* *digits*: number of digits after the decimal point.
The ``downward`` package automatically uses appropriate
settings for most attributes.
>>> avg_h = Attribute("avg_h", min_wins=False)
>>> abstraction_done = Attribute(
... "abstraction_done", absolute=True, min_wins=False
... )
"""
self.absolute = absolute
self.min_wins = min_wins
self.function = (
get_aggregation_function(function, tools.make_list(functions)) or sum
)
self.scale = scale
self.digits = digits
def __repr__(self):
return f"Attribute({str(self)!r}, min_wins={self.min_wins}, ...)"
def copy(self, name):
return Attribute(
name,
absolute=self.absolute,
min_wins=self.min_wins,
function=self.function,
scale=self.scale,
digits=self.digits,
)
[docs]
class Report:
"""
Base class for all reports.
"""
def __init__(self, attributes=None, format="html", filter=None, **kwargs):
"""
Inherit from this or a child class to implement a custom report.
Depending on the type of output you want to make, you will have
to overwrite the :meth:`.write`, :meth:`.get_text` or
:meth:`.get_markup` method.
*attributes* is the list of attributes you want to include in
your report. If omitted, use all numerical attributes. Globbing
characters * and ? are allowed. Example:
>>> report = Report(attributes=["coverage", "translator_*"])
When a report is made, both the available and the selected
attributes are printed on the commandline.
*format* can be one of e.g. html, tex, wiki (MediaWiki), doku
(DokuWiki), pmw (PmWiki), moin (MoinMoin) and txt (Plain text).
Subclasses may allow additional formats.
If given, *filter* must be a function or a list of functions
that are passed a dictionary of a run's attribute keys and
values. Filters must return True, False or a new dictionary.
Depending on the returned value, the run is included or excluded
from the report, or replaced by the new dictionary,
respectively.
Filters for properties can be given in shorter form without
defining a function. To include only runs where attribute
``foo`` has value v, use ``filter_foo=v``. To include only runs
where attribute ``foo`` has value v1, v2 or v3, use
``filter_foo=[v1, v2, v3]``.
Filters are applied sequentially, i.e., the first filter is
applied to all runs before the second filter is executed.
Filters given as ``filter_*`` kwargs are applied *after* all
filters passed via the ``filter`` kwarg.
Examples:
Include only the "cost" attribute in a LaTeX report:
>>> report = Report(attributes=["cost"], format="tex")
Only include successful runs in the report:
>>> report = Report(filter_coverage=1)
Only include runs in the report where the initial h value is
at most 100:
>>> def low_init_h(run):
... return run["initial_h_value"] <= 100
...
>>> report = Report(filter=low_init_h)
Only include runs from "blocks" and "barman" with a timeout:
>>> report = Report(filter_domain=["blocks", "barman"], filter_search_timeout=1)
Add a new attribute:
>>> def add_expansions_per_time(run):
... expansions = run.get("expansions")
... time = run.get("search_time")
... if expansions is not None and time:
... run["expansions_per_time"] = expansions / time
... return run
...
>>> report = Report(
... attributes=["expansions_per_time"], filter=[add_expansions_per_time]
... )
Rename, filter and sort algorithms:
>>> def rename_algorithms(run):
... name = run["algorithm"]
... paper_names = {"lama11": "LAMA 2011", "fdss_sat1": "FDSS 1"}
... run["algorithm"] = paper_names[name]
... return run
...
>>> # We want LAMA 2011 to be the leftmost column.
>>> # filter_* filters are evaluated last, so we use the updated
>>> # algorithm names here.
>>> algorithms = ["LAMA 2011", "FDSS 1"]
>>> report = Report(filter=rename_algorithms, filter_algorithm=algorithms)
"""
# Turn strings into Attribute objects and set non-default options for some.
self.attributes = [
self._prepare_attribute(attr) for attr in tools.make_list(attributes)
]
if format not in txt2tags.TARGETS + ["eps", "pdf", "pgf", "png", "py"]:
raise ValueError(f"invalid format: {format}")
self.output_format = format
self.toc = True
self.run_filter = tools.RunFilter(filter, **kwargs)
[docs]
def __call__(self, eval_dir, outfile):
"""Make the report.
This method is called automatically when the report step is
executed. It loads the data and calls :meth:`.write`.
*eval_dir* must be a path to an evaluation directory containing
a ``properties`` file.
The report will be written to *outfile*.
"""
if not eval_dir.endswith("-eval"):
logging.info(
'The source directory does not end with "-eval". '
"Are you sure this is an evaluation directory?"
)
self.eval_dir = os.path.abspath(eval_dir)
# It would be nice if we could infer "format" from "outfile", but the
# former is needed before the latter is available.
# Also we can't add the extension ".format" to "outfile" in case it's
# missing, because "outfile" might be a directory.
self.outfile = os.path.abspath(outfile)
# Map from attribute to type.
self._all_attributes = {}
self._load_data()
self._apply_filter()
self._scan_data()
# Expand glob characters.
self.attributes = self._glob_attributes(self.attributes)
if not self.attributes:
logging.info(f"Available attributes: {', '.join(self.all_attributes)}")
logging.info("Using all numerical attributes.")
self.attributes = self._get_numerical_attributes()
self.attributes = sorted(self.attributes)
# Check for duplicate attributes to avoid "coverage" overwriting
# Attribute("coverage") by accident.
counter = collections.Counter(self.attributes)
duplicates = [name for name, count in sorted(counter.items()) if count > 1]
if duplicates:
logging.critical(f"Duplicate attributes detected: {duplicates}")
self.write()
def _prepare_attribute(self, attr):
if isinstance(attr, Attribute):
return attr
return Attribute(attr)
def _glob_attributes(self, attributes):
expanded_attrs = []
for attr in attributes:
# Attribute without wildcards. Filtering would reset its options.
if attr in self.all_attributes:
expanded_attrs.append(attr)
continue
matches = fnmatch.filter(self.all_attributes, attr)
if not matches:
logging.warning(
f'There is no attribute "{attr}" in the properties file.'
)
# Use the attribute options from the pattern for all matches, but
# don't try to guess options for attributes that appear in the list.
expanded_attrs.extend(
[attr.copy(match) for match in matches if match not in attributes]
)
if attributes and not expanded_attrs:
logging.critical("No attributes match your patterns.")
return expanded_attrs
@property
def all_attributes(self):
return sorted(self._all_attributes.keys())
def _get_numerical_attributes(self):
return [
attr for attr in self._all_attributes if self.attribute_is_numeric(attr)
]
def attribute_is_numeric(self, attribute):
"""Return true if the values for *attribute* are ints or floats.
If the attribute is None in all runs it may be numeric.
"""
return self._all_attributes[attribute] is None or issubclass(
self._all_attributes[attribute], numbers.Number
)
[docs]
def get_markup(self):
"""
Return `txt2tags <http://txt2tags.org/>`_ markup for the report.
"""
table = Table()
for run_id, run in self.props.items():
row = {}
for key, value in run.items():
if key not in self.attributes:
continue
if isinstance(value, (list, tuple)):
key = "-".join(str(item) for item in value)
row[key] = value
table.add_row(run_id, row)
return str(table)
[docs]
def get_text(self):
"""
Return text (e.g., HTML, LaTeX, etc.) for the report.
By default this method calls :meth:`.get_markup` and converts
the markup to the desired output *format*.
"""
name, _ = os.path.splitext(os.path.basename(self.outfile))
doc = Document(title=name)
doc.add_text(
self.get_markup()
or "No tables were generated. "
"This happens when no significant changes occured or "
"if for all attributes and all problems never all "
"algorithms had a value for this attribute in a "
"per-domain report."
)
return doc.render(self.output_format, {"toc": self.toc})
[docs]
def write(self):
"""
Write the report files.
By default this method calls :meth:`.get_text` and writes the
obtained text to *outfile*.
Overwrite this method if you want to write the report file(s)
directly. You should write them to *self.outfile*.
"""
content = self.get_text()
tools.makedirs(os.path.dirname(self.outfile))
tools.write_file(self.outfile, content)
logging.info(f"Wrote file://{self.outfile}")
def _get_type(self, attribute):
for run in self.props.values():
val = run.get(attribute)
if val is not None:
return type(val)
# Attribute is None in all runs.
return None
def _scan_data(self):
attributes = set()
for run in self.props.values():
attributes |= set(run.keys())
self._all_attributes = {
self._prepare_attribute(attr): self._get_type(attr) for attr in attributes
}
def _load_data(self):
props_file = os.path.join(self.eval_dir, "properties")
logging.info("Reading properties file")
self.props = tools.Properties(filename=props_file)
if not self.props:
logging.critical(f"No properties found in {self.eval_dir}")
logging.info("Reading properties file finished")
def _apply_filter(self):
self.run_filter.apply(self.props)
if not self.props:
logging.critical("All runs have been filtered -> Nothing to report.")
class CellFormatter:
"""Formating information for one cell in a table."""
def __init__(
self, bold=False, count=None, link=None, color=None, align_right=False
):
self.bold = bold
self.count = count
self.link = link
self.color = color
self.align_right = align_right
def format_value(self, value):
result = str(value)
if self.link:
result = f"[''{result}'' {self.link}]"
if self.count:
result = f"{result} ({self.count})"
if self.bold:
result = f"**{result}**"
if self.color:
result = f"{{{result}|color:{self.color}}}"
if self.align_right:
result = " " + result
else:
result += " "
return result
class Table(collections.defaultdict):
def __init__(self, title="", min_wins=None, colored=False, digits=2):
"""
The *Table* class can be useful for `Report` subclasses that want to
return a table as txt2tags markup. It is realized as a dictionary of
dictionaries mapping row names to colum names to cell values. To obtain
the markup from a table, use the ``str()`` function.
*title* will be printed in the top left cell.
*min_wins* can be either None, True or False. If it is True (False),
the cell with the lowest (highest) value in each row will be
highlighted.
If *colored* is True, the values of each row will be given colors from a
colormap.
Numbers are rounded to *digits* positions after the decimal point.
>>> t = Table(title="expansions")
>>> t.add_cell("prob1", "cfg1", 10)
>>> t.add_cell("prob1", "cfg2", 20)
>>> t.add_row("prob2", {"cfg1": 15, "cfg2": 25})
>>> def remove_quotes(s):
... return s.replace('""', "")
...
>>> print(remove_quotes(str(t)))
|| expansions | cfg1 | cfg2 |
| prob1 | 10 | 20 |
| prob2 | 15 | 25 |
>>> t.row_names
['prob1', 'prob2']
>>> t.col_names
['cfg1', 'cfg2']
>>> t.get_row("prob2")
[15, 25]
>>> t.get_columns() == {"cfg1": [10, 15], "cfg2": [20, 25]}
True
>>> t.add_summary_function("SUM", sum)
>>> print(remove_quotes(str(t)))
|| expansions | cfg1 | cfg2 |
| prob1 | 10 | 20 |
| prob2 | 15 | 25 |
| **SUM** | 25 | 45 |
>>> t.set_column_order(["cfg2", "cfg1"])
>>> print(remove_quotes(str(t)))
|| expansions | cfg2 | cfg1 |
| prob1 | 20 | 10 |
| prob2 | 25 | 15 |
| **SUM** | 45 | 25 |
"""
collections.defaultdict.__init__(self, dict)
self.title = title
self.min_wins = min_wins
self.row_min_wins = {}
self.colored = colored
self.digits = digits
self.summary_funcs = {}
self.info = []
self.num_values = None
self.dynamic_data_modules = []
self._cols = None
# For printing.
self.header_row = "column names (never printed)"
self.header_column = "row names (never printed)"
self.cell_formatters = collections.defaultdict(dict)
self.row_order = None
self.column_order = None
self.summary_row_order = []
def add_cell(self, row, col, value):
"""Set Table[row][col] = value."""
self[row][col] = value
self._cols = None
def add_row(self, row_name, row):
"""Add a new data row called *row_name* to the table.
*row* must be a mapping from column names to values.
"""
self[row_name] = row
self._cols = None
def add_col(self, col_name, col):
"""Add a new data column called *col_name* to the table.
*col* must be a mapping from row names to values.
"""
for row_name, value in col.items():
self[row_name][col_name] = value
self._cols = None
@property
def row_names(self):
"""Return all data row names in sorted order."""
return self.row_order or tools.natural_sort(self.keys())
@property
def col_names(self):
"""Return all data column names in sorted order."""
if self._cols:
return self._cols
col_names = set()
for row in self.values():
col_names |= set(row.keys())
self._cols = []
if self.column_order:
# First use all elements for which we know an order.
# All remaining elements will be sorted alphabetically.
self._cols = [c for c in self.column_order if c in col_names]
col_names -= set(self._cols)
self._cols += tools.natural_sort(col_names)
return self._cols
def get_row(self, row_name):
"""Return a list of the values in *row*."""
return [self[row_name].get(col_name, None) for col_name in self.col_names]
def get_columns(self):
"""
Return a mapping from column names to the list of values in that column.
"""
values = defaultdict(list)
for row_name in self.row_names:
for col_name in self.col_names:
values[col_name].append(self[row_name].get(col_name))
return values
def add_summary_function(self, name, func):
"""
Add a bottom row with the values ``func(column_values)`` for
each column. *func* can be e.g. :func:`sum`,
:func:`arithmetic_mean` or :func:`geometric_mean`.
"""
self.summary_funcs[name] = func
self.summary_row_order.append(name)
def set_row_order(self, order):
self.row_order = order
def set_column_order(self, order):
self.column_order = order
self._cols = None
def get_min_wins(self, row_name=None):
"""
The table class can store information on whether higher or
lower values are better for each row or globally. If no row
specific setting for *row_name* is found, the global setting is
returned.
"""
return self.row_min_wins.get(row_name, self.min_wins)
def get_summary_rows(self):
"""
Returns a dictionary mapping names of summary rows to dictionaries
mapping column names to values.
"""
summary_rows = {}
for row_name in self.summary_row_order:
func = self.summary_funcs[row_name]
summary_row = {}
for col_name, column in self.get_columns().items():
values = [val for val in column if val is not None]
if values:
summary_row[col_name] = func(values)
else:
summary_row[col_name] = None
summary_row[self.header_column] = row_name
summary_rows[row_name] = summary_row
formatter = CellFormatter(bold=True, count=self.num_values)
self.cell_formatters[row_name][self.header_column] = formatter
return summary_rows
def _get_printable_row_order(self):
"""
Return a list of all rows (including non-data rows) in the order
they should be printed.
"""
row_order = [self.header_row]
for row_name in self.row_names + self.summary_row_order:
row_order.append(row_name)
for module in self.dynamic_data_modules:
row_order = module.modify_printable_row_order(self, row_order) or row_order
return row_order
def _get_printable_column_order(self):
"""
Return a list of all columns (including non-data columns) in the order
they should be printed.
"""
col_order = [self.header_column]
for col_name in self.col_names:
col_order.append(col_name)
for module in self.dynamic_data_modules:
col_order = (
module.modify_printable_column_order(self, col_order) or col_order
)
return col_order
def _collect_cells(self):
"""
Collect all cells that should be printed including table headers,
row names, summary rows, etc. Returns a dictionary mapping row names
to dictionaries mapping column names to values.
"""
cells = collections.defaultdict(dict)
cells[self.header_row][self.header_column] = self.title
for col_name in self.col_names:
cells[self.header_row][col_name] = str(col_name)
# Add data rows and summary rows.
for row_name, row in list(self.items()) + list(self.get_summary_rows().items()):
cells[row_name][self.header_column] = str(row_name)
for col_name in self.col_names:
cells[row_name][col_name] = row.get(col_name)
for dynamic_data_module in self.dynamic_data_modules:
cells = dynamic_data_module.collect(self, cells) or cells
return cells
def _format(self, cells):
"""Format all entries in **cells** (in place)."""
for row_name, row in cells.items():
self._format_row(row_name, row)
for dynamic_data_module in self.dynamic_data_modules:
dynamic_data_module.format(self, cells)
def _format_value(self, value):
if isinstance(value, float):
return f"{value:.{self.digits}f}"
else:
result = str(value)
# Only escape text if it doesn't contain LaTeX or HTML markup.
if "''" in result:
return result
else:
return markup.escape(result)
def _format_row(self, row_name, row):
"""Format all entries in **row** (in place)."""
if row_name == self.header_row:
for col_name, value in row.items():
# Allow breaking after underlines.
value = value.replace("_", "_" + ESCAPE_WORDBREAK)
# Right-align headers (except the left-most one).
if col_name != self.header_column:
value = " " + value
row[col_name] = value
return
# Get the slice of the row that should be formatted (i.e., the data columns).
# Note that there might be other columns (e.g., added by dynamic data
# modules) that should not be formatted.
row_slice = {col_name: row.get(col_name) for col_name in self.col_names}
min_wins = self.get_min_wins(row_name)
highlight = min_wins is not None
colored = self.colored and highlight
if colored:
def try_to_round(v):
try:
return round(v, self.digits)
except TypeError:
return v
rounded_row_slice = {
col: try_to_round(val) for col, val in row_slice.items()
}
colors = tools.get_colors(rounded_row_slice, min_wins)
if highlight:
min_value, max_value = tools.get_min_max(row_slice.values())
else:
min_value, max_value = None, None
def is_close(a, b):
# Highlight based on precision visible in table, not actual values.
return self._format_value(a) == self._format_value(b)
for col_name, value in row.items():
color = None
bold = False
# Format data columns
if col_name in row_slice:
if colored:
color = tools.rgb_fractions_to_html_color(*colors[col_name])
elif (
highlight
and value is not None
and (
(is_close(value, min_value) and min_wins)
or (is_close(value, max_value) and not min_wins)
)
):
bold = True
row[col_name] = self._format_cell(
row_name, col_name, value, color=color, bold=bold
)
def _format_cell(self, row_name, col_name, value, color=None, bold=False):
"""
Return the formatted value for a single cell in the table.
*row_name* and *col_name* specify the position of the cell and *value* is the
unformatted value.
Floats are rounded to two decimal places and lists are quoted. The *color* to
render the result in can be given as a string and setting *bold* to true
renders the value in bold.
If a custom formatter is specified for this cell, it is used instead of this
default format.
"""
formatter = self.cell_formatters.get(row_name, {}).get(col_name)
if not formatter:
align_right = (
isinstance(value, (float, int)) or value is None or value == "?"
)
value = self._format_value(value)
formatter = CellFormatter(bold=bold, color=color, align_right=align_right)
return formatter.format_value(value)
def _get_markup(self, cells):
"""
Return a string cotaining all printable cells (see
**_get_printable_column_order** and **_get_printable_row_order**)
as correctly formatted markup.
"""
parts = []
for row_name in self._get_printable_row_order():
if row_name == self.header_row:
parts.append(self._get_header_markup(row_name, cells[row_name]))
else:
parts.append(self._get_row_markup(row_name, cells[row_name]))
if self.info:
parts.append(" ".join(self.info))
return "\n".join(parts)
def _get_header_markup(self, row_name, row):
"""Return the txt2tags table markup for the headers."""
return self._get_row_markup(row_name, row, template="|| {} |")
def _get_row_markup(self, row_name, row, template=" | {} |"):
"""Return the txt2tags table markup for one row."""
formatted_cells = []
for col_name in self._get_printable_column_order():
formatted_cells.append(row.get(col_name, ""))
return template.format(" | ".join(formatted_cells))
def __str__(self):
"""Return the txt2tags markup for this table."""
cells = self._collect_cells()
self._format(cells)
return self._get_markup(cells)
def extract_summary_rows(from_table, to_table, link=None):
"""
Extract all summary rows of **from_table** and add them as data rows
to **to_table**.
"""
for name, row in from_table.get_summary_rows().items():
row_name = f"{from_table.title} - {name}"
if link is not None:
formatter = CellFormatter(link=link)
to_table.cell_formatters[row_name][to_table.header_column] = formatter
to_table.row_min_wins[row_name] = from_table.min_wins
for col_name, value in row.items():
if col_name == from_table.header_column:
continue
to_table.add_cell(row_name, col_name, value)
class DynamicDataModule:
"""Interface for modules that dynamically add or modify data in a table."""
def collect(self, table, cells):
"""
Called after the data collection in the table. Subclasses can
add new values to **cells** or modify existing values.
"""
return cells
def format(self, table, formatted_cells):
"""
Called after the formatting in the table. Subclasses can
(re-)format all values in **formatted_cells**. Specifically all new
values added by the **collect** method should be formatted.
"""
pass
def modify_printable_row_order(self, table, row_order):
"""
Called after retrieving a row order in the table. Subclasses can
modify the order or add new rows. Specifically all rows that were
added by the **collect** method should be appended or
inserted.
"""
return row_order
def modify_printable_column_order(self, table, column_order):
"""
Called after retrieving a column order in the table. Subclassed can
modify the order or add new columns. Specifically all columns that were
values added by the **collect** method should be appended or
inserted.
"""
return column_order