quickjs-tart

quickjs-based runtime for wallet-core logic
Log | Files | Refs | README | LICENSE

outcome_analysis.py (16820B)


      1 """Outcome file analysis code.
      2 
      3 This module is the bulk of the code of tests/scripts/analyze_outcomes.py
      4 in each consuming branch. The consuming script is expected to derive
      5 the classes with branch-specific customizations such as ignore lists.
      6 """
      7 
      8 # Copyright The Mbed TLS Contributors
      9 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
     10 
     11 import argparse
     12 import gzip
     13 import lzma
     14 import sys
     15 import traceback
     16 import re
     17 import subprocess
     18 import os
     19 import typing
     20 
     21 from . import collect_test_cases
     22 
     23 
     24 # `ComponentOutcomes` is a named tuple which is defined as:
     25 # ComponentOutcomes(
     26 #     successes = {
     27 #         "<suite_case>",
     28 #         ...
     29 #     },
     30 #     failures = {
     31 #         "<suite_case>",
     32 #         ...
     33 #     }
     34 # )
     35 # suite_case = "<suite>;<case>"
     36 ComponentOutcomes = typing.NamedTuple('ComponentOutcomes',
     37                                       [('successes', typing.Set[str]),
     38                                        ('failures', typing.Set[str])])
     39 
     40 # `Outcomes` is a representation of the outcomes file,
     41 # which defined as:
     42 # Outcomes = {
     43 #     "<component>": ComponentOutcomes,
     44 #     ...
     45 # }
     46 Outcomes = typing.Dict[str, ComponentOutcomes]
     47 
     48 
     49 class Results:
     50     """Process analysis results."""
     51 
     52     def __init__(self,
     53                  stderr: bool = True,
     54                  log_file: str = '') -> None:
     55         """Log and count errors.
     56 
     57         Log to stderr if stderr is true.
     58         Log to log_file if specified and non-empty.
     59         """
     60         self.error_count = 0
     61         self.warning_count = 0
     62         self.stderr = stderr
     63         self.log_file = None
     64         if log_file:
     65             self.log_file = open(log_file, 'w', encoding='utf-8')
     66 
     67     def new_section(self, fmt, *args, **kwargs):
     68         self._print_line('\n*** ' + fmt + ' ***\n', *args, **kwargs)
     69 
     70     def info(self, fmt, *args, **kwargs):
     71         self._print_line('Info: ' + fmt, *args, **kwargs)
     72 
     73     def error(self, fmt, *args, **kwargs):
     74         self.error_count += 1
     75         self._print_line('Error: ' + fmt, *args, **kwargs)
     76 
     77     def warning(self, fmt, *args, **kwargs):
     78         self.warning_count += 1
     79         self._print_line('Warning: ' + fmt, *args, **kwargs)
     80 
     81     def _print_line(self, fmt, *args, **kwargs):
     82         line = (fmt + '\n').format(*args, **kwargs)
     83         if self.stderr:
     84             sys.stderr.write(line)
     85         if self.log_file:
     86             self.log_file.write(line)
     87 
     88 def execute_reference_driver_tests(results: Results, ref_component: str, driver_component: str, \
     89                                    outcome_file: str) -> None:
     90     """Run the tests specified in ref_component and driver_component. Results
     91     are stored in the output_file and they will be used for the following
     92     coverage analysis"""
     93     results.new_section("Test {} and {}", ref_component, driver_component)
     94 
     95     shell_command = "tests/scripts/all.sh --outcome-file " + outcome_file + \
     96                     " " + ref_component + " " + driver_component
     97     results.info("Running: {}", shell_command)
     98     ret_val = subprocess.run(shell_command.split(), check=False).returncode
     99 
    100     if ret_val != 0:
    101         results.error("failed to run reference/driver components")
    102 
    103 IgnoreEntry = typing.Union[str, typing.Pattern]
    104 
    105 def name_matches_pattern(name: str, str_or_re: IgnoreEntry) -> bool:
    106     """Check if name matches a pattern, that may be a string or regex.
    107     - If the pattern is a string, name must be equal to match.
    108     - If the pattern is a regex, name must fully match.
    109     """
    110     # The CI's python is too old for re.Pattern
    111     #if isinstance(str_or_re, re.Pattern):
    112     if not isinstance(str_or_re, str):
    113         return str_or_re.fullmatch(name) is not None
    114     else:
    115         return str_or_re == name
    116 
    117 def open_outcome_file(outcome_file: str) -> typing.TextIO:
    118     if outcome_file.endswith('.gz'):
    119         return gzip.open(outcome_file, 'rt', encoding='utf-8')
    120     elif outcome_file.endswith('.xz'):
    121         return lzma.open(outcome_file, 'rt', encoding='utf-8')
    122     else:
    123         return open(outcome_file, 'rt', encoding='utf-8')
    124 
    125 def read_outcome_file(outcome_file: str) -> Outcomes:
    126     """Parse an outcome file and return an outcome collection.
    127     """
    128     outcomes = {}
    129     with open_outcome_file(outcome_file) as input_file:
    130         for line in input_file:
    131             (_platform, component, suite, case, result, _cause) = line.split(';')
    132             # Note that `component` is not unique. If a test case passes on Linux
    133             # and fails on FreeBSD, it'll end up in both the successes set and
    134             # the failures set.
    135             suite_case = ';'.join([suite, case])
    136             if component not in outcomes:
    137                 outcomes[component] = ComponentOutcomes(set(), set())
    138             if result == 'PASS':
    139                 outcomes[component].successes.add(suite_case)
    140             elif result == 'FAIL':
    141                 outcomes[component].failures.add(suite_case)
    142 
    143     return outcomes
    144 
    145 
    146 class Task:
    147     """Base class for outcome analysis tasks."""
    148 
    149     # Override the following in child classes.
    150     # Map test suite names (with the test_suite_prefix) to a list of ignored
    151     # test cases. Each element in the list can be either a string or a regex;
    152     # see the `name_matches_pattern` function.
    153     IGNORED_TESTS = {} #type: typing.Dict[str, typing.List[IgnoreEntry]]
    154 
    155     def __init__(self, options) -> None:
    156         """Pass command line options to the tasks.
    157 
    158         Each task decides which command line options it cares about.
    159         """
    160         pass
    161 
    162     def section_name(self) -> str:
    163         """The section name to use in results."""
    164         raise NotImplementedError
    165 
    166     def ignored_tests(self, test_suite: str) -> typing.Iterator[IgnoreEntry]:
    167         """Generate the ignore list for the specified test suite."""
    168         if test_suite in self.IGNORED_TESTS:
    169             yield from self.IGNORED_TESTS[test_suite]
    170         pos = test_suite.find('.')
    171         if pos != -1:
    172             base_test_suite = test_suite[:pos]
    173             if base_test_suite in self.IGNORED_TESTS:
    174                 yield from self.IGNORED_TESTS[base_test_suite]
    175 
    176     def is_test_case_ignored(self, test_suite: str, test_string: str) -> bool:
    177         """Check if the specified test case is ignored."""
    178         for str_or_re in self.ignored_tests(test_suite):
    179             if name_matches_pattern(test_string, str_or_re):
    180                 return True
    181         return False
    182 
    183     def run(self, results: Results, outcomes: Outcomes):
    184         """Run the analysis on the specified outcomes.
    185 
    186         Signal errors via the results objects
    187         """
    188         raise NotImplementedError
    189 
    190 
    191 class CoverageTask(Task):
    192     """Analyze test coverage."""
    193 
    194     # Test cases whose suite and description are matched by an entry in
    195     # IGNORED_TESTS are expected to be never executed.
    196     # All other test cases are expected to be executed at least once.
    197 
    198     def __init__(self, options) -> None:
    199         super().__init__(options)
    200         self.full_coverage = options.full_coverage #type: bool
    201 
    202     @staticmethod
    203     def section_name() -> str:
    204         return "Analyze coverage"
    205 
    206     def run(self, results: Results, outcomes: Outcomes) -> None:
    207         """Check that all available test cases are executed at least once."""
    208         # Make sure that the generated data files are present (and up-to-date).
    209         # This allows analyze_outcomes.py to run correctly on a fresh Git
    210         # checkout.
    211         cp = subprocess.run(['make', 'generated_files'],
    212                             cwd='tests',
    213                             stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
    214                             check=False)
    215         if cp.returncode != 0:
    216             sys.stderr.write(cp.stdout.decode('utf-8'))
    217             results.error("Failed \"make generated_files\" in tests. "
    218                           "Coverage analysis may be incorrect.")
    219         available = collect_test_cases.collect_available_test_cases()
    220         for suite_case in available:
    221             hit = any(suite_case in comp_outcomes.successes or
    222                       suite_case in comp_outcomes.failures
    223                       for comp_outcomes in outcomes.values())
    224             (test_suite, test_description) = suite_case.split(';')
    225             ignored = self.is_test_case_ignored(test_suite, test_description)
    226 
    227             if not hit and not ignored:
    228                 if self.full_coverage:
    229                     results.error('Test case not executed: {}', suite_case)
    230                 else:
    231                     results.warning('Test case not executed: {}', suite_case)
    232             elif hit and ignored:
    233                 # If a test case is no longer always skipped, we should remove
    234                 # it from the ignore list.
    235                 if self.full_coverage:
    236                     results.error('Test case was executed but marked as ignored for coverage: {}',
    237                                   suite_case)
    238                 else:
    239                     results.warning('Test case was executed but marked as ignored for coverage: {}',
    240                                     suite_case)
    241 
    242 
    243 class DriverVSReference(Task):
    244     """Compare outcomes from testing with and without a driver.
    245 
    246     There are 2 options to use analyze_driver_vs_reference_xxx locally:
    247     1. Run tests and then analysis:
    248       - tests/scripts/all.sh --outcome-file "$PWD/out.csv" <component_ref> <component_driver>
    249       - tests/scripts/analyze_outcomes.py out.csv analyze_driver_vs_reference_xxx
    250     2. Let this script run both automatically:
    251       - tests/scripts/analyze_outcomes.py out.csv analyze_driver_vs_reference_xxx
    252     """
    253 
    254     # Override the following in child classes.
    255     # Configuration name (all.sh component) used as the reference.
    256     REFERENCE = ''
    257     # Configuration name (all.sh component) used as the driver.
    258     DRIVER = ''
    259     # Ignored test suites (without the test_suite_ prefix).
    260     IGNORED_SUITES = [] #type: typing.List[str]
    261 
    262     def __init__(self, options) -> None:
    263         super().__init__(options)
    264         self.ignored_suites = frozenset('test_suite_' + x
    265                                         for x in self.IGNORED_SUITES)
    266 
    267     def section_name(self) -> str:
    268         return f"Analyze driver {self.DRIVER} vs reference {self.REFERENCE}"
    269 
    270     def run(self, results: Results, outcomes: Outcomes) -> None:
    271         """Check that all tests passing in the driver component are also
    272         passing in the corresponding reference component.
    273         Skip:
    274         - full test suites provided in ignored_suites list
    275         - only some specific test inside a test suite, for which the corresponding
    276           output string is provided
    277         """
    278         ref_outcomes = outcomes.get("component_" + self.REFERENCE)
    279         driver_outcomes = outcomes.get("component_" + self.DRIVER)
    280 
    281         if ref_outcomes is None or driver_outcomes is None:
    282             results.error("required components are missing: bad outcome file?")
    283             return
    284 
    285         if not ref_outcomes.successes:
    286             results.error("no passing test in reference component: bad outcome file?")
    287             return
    288 
    289         for suite_case in ref_outcomes.successes:
    290             # suite_case is like "test_suite_foo.bar;Description of test case"
    291             (full_test_suite, test_string) = suite_case.split(';')
    292             test_suite = full_test_suite.split('.')[0] # retrieve main part of test suite name
    293 
    294             # Immediately skip fully-ignored test suites
    295             if test_suite in self.ignored_suites or \
    296                full_test_suite in self.ignored_suites:
    297                 continue
    298 
    299             # For ignored test cases inside test suites, just remember and:
    300             # don't issue an error if they're skipped with drivers,
    301             # but issue an error if they're not (means we have a bad entry).
    302             ignored = self.is_test_case_ignored(full_test_suite, test_string)
    303 
    304             if not ignored and not suite_case in driver_outcomes.successes:
    305                 results.error("SKIP/FAIL -> PASS: {}", suite_case)
    306             if ignored and suite_case in driver_outcomes.successes:
    307                 results.error("uselessly ignored: {}", suite_case)
    308 
    309 
    310 # Set this to False if a consuming branch can't achieve full test coverage
    311 # in its default CI run.
    312 FULL_COVERAGE_BY_DEFAULT = True
    313 
    314 def main(known_tasks: typing.Dict[str, typing.Type[Task]]) -> None:
    315     try:
    316         parser = argparse.ArgumentParser(description=__doc__)
    317         parser.add_argument('outcomes', metavar='OUTCOMES.CSV',
    318                             help='Outcome file to analyze (can be .gz or .xz)')
    319         parser.add_argument('specified_tasks', default='all', nargs='?',
    320                             help='Analysis to be done. By default, run all tasks. '
    321                                  'With one or more TASK, run only those. '
    322                                  'TASK can be the name of a single task or '
    323                                  'comma/space-separated list of tasks. ')
    324         parser.add_argument('--allow-partial-coverage', action='store_false',
    325                             dest='full_coverage', default=FULL_COVERAGE_BY_DEFAULT,
    326                             help=("Only warn if a test case is skipped in all components" +
    327                                   (" (default)" if not FULL_COVERAGE_BY_DEFAULT else "") +
    328                                   ". Only used by the 'analyze_coverage' task."))
    329         parser.add_argument('--list', action='store_true',
    330                             help='List all available tasks and exit.')
    331         parser.add_argument('--log-file',
    332                             default='tests/analyze_outcomes.log',
    333                             help='Log file (default: tests/analyze_outcomes.log;'
    334                                  ' empty means no log file)')
    335         parser.add_argument('--require-full-coverage', action='store_true',
    336                             dest='full_coverage', default=FULL_COVERAGE_BY_DEFAULT,
    337                             help=("Require all available test cases to be executed" +
    338                                   (" (default)" if FULL_COVERAGE_BY_DEFAULT else "") +
    339                                   ". Only used by the 'analyze_coverage' task."))
    340         options = parser.parse_args()
    341 
    342         if options.list:
    343             for task_name in known_tasks:
    344                 print(task_name)
    345             sys.exit(0)
    346 
    347         main_results = Results(log_file=options.log_file)
    348 
    349         if options.specified_tasks == 'all':
    350             tasks_list = list(known_tasks.keys())
    351         else:
    352             tasks_list = re.split(r'[, ]+', options.specified_tasks)
    353             for task_name in tasks_list:
    354                 if task_name not in known_tasks:
    355                     sys.stderr.write('invalid task: {}\n'.format(task_name))
    356                     sys.exit(2)
    357 
    358         # If the outcome file exists, parse it once and share the result
    359         # among tasks to improve performance.
    360         # Otherwise, it will be generated by execute_reference_driver_tests.
    361         if not os.path.exists(options.outcomes):
    362             if len(tasks_list) > 1:
    363                 sys.stderr.write("mutiple tasks found, please provide a valid outcomes file.\n")
    364                 sys.exit(2)
    365 
    366             task_name = tasks_list[0]
    367             task_class = known_tasks[task_name]
    368             if not issubclass(task_class, DriverVSReference):
    369                 sys.stderr.write("please provide valid outcomes file for {}.\n".format(task_name))
    370                 sys.exit(2)
    371             # mypy isn't smart enough to know that REFERENCE and DRIVER
    372             # are *class* attributes of all classes derived from
    373             # DriverVSReference. (It would be smart enough if we had an
    374             # instance of task_class, but we can't construct an instance
    375             # until we have the outcome data, so at this point we only
    376             # have the class.) So we use indirection to access the class
    377             # attributes.
    378             execute_reference_driver_tests(main_results,
    379                                            getattr(task_class, 'REFERENCE'),
    380                                            getattr(task_class, 'DRIVER'),
    381                                            options.outcomes)
    382 
    383         outcomes = read_outcome_file(options.outcomes)
    384 
    385         for task_name in tasks_list:
    386             task_constructor = known_tasks[task_name]
    387             task_instance = task_constructor(options)
    388             main_results.new_section(task_instance.section_name())
    389             task_instance.run(main_results, outcomes)
    390 
    391         main_results.info("Overall results: {} warnings and {} errors",
    392                           main_results.warning_count, main_results.error_count)
    393 
    394         sys.exit(0 if (main_results.error_count == 0) else 1)
    395 
    396     except Exception: # pylint: disable=broad-except
    397         # Print the backtrace and exit explicitly with our chosen status.
    398         traceback.print_exc()
    399         sys.exit(120)