outcome_analysis.py (16820B)
1 """Outcome file analysis code. 2 3 This module is the bulk of the code of tests/scripts/analyze_outcomes.py 4 in each consuming branch. The consuming script is expected to derive 5 the classes with branch-specific customizations such as ignore lists. 6 """ 7 8 # Copyright The Mbed TLS Contributors 9 # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later 10 11 import argparse 12 import gzip 13 import lzma 14 import sys 15 import traceback 16 import re 17 import subprocess 18 import os 19 import typing 20 21 from . import collect_test_cases 22 23 24 # `ComponentOutcomes` is a named tuple which is defined as: 25 # ComponentOutcomes( 26 # successes = { 27 # "<suite_case>", 28 # ... 29 # }, 30 # failures = { 31 # "<suite_case>", 32 # ... 33 # } 34 # ) 35 # suite_case = "<suite>;<case>" 36 ComponentOutcomes = typing.NamedTuple('ComponentOutcomes', 37 [('successes', typing.Set[str]), 38 ('failures', typing.Set[str])]) 39 40 # `Outcomes` is a representation of the outcomes file, 41 # which defined as: 42 # Outcomes = { 43 # "<component>": ComponentOutcomes, 44 # ... 45 # } 46 Outcomes = typing.Dict[str, ComponentOutcomes] 47 48 49 class Results: 50 """Process analysis results.""" 51 52 def __init__(self, 53 stderr: bool = True, 54 log_file: str = '') -> None: 55 """Log and count errors. 56 57 Log to stderr if stderr is true. 58 Log to log_file if specified and non-empty. 59 """ 60 self.error_count = 0 61 self.warning_count = 0 62 self.stderr = stderr 63 self.log_file = None 64 if log_file: 65 self.log_file = open(log_file, 'w', encoding='utf-8') 66 67 def new_section(self, fmt, *args, **kwargs): 68 self._print_line('\n*** ' + fmt + ' ***\n', *args, **kwargs) 69 70 def info(self, fmt, *args, **kwargs): 71 self._print_line('Info: ' + fmt, *args, **kwargs) 72 73 def error(self, fmt, *args, **kwargs): 74 self.error_count += 1 75 self._print_line('Error: ' + fmt, *args, **kwargs) 76 77 def warning(self, fmt, *args, **kwargs): 78 self.warning_count += 1 79 self._print_line('Warning: ' + fmt, *args, **kwargs) 80 81 def _print_line(self, fmt, *args, **kwargs): 82 line = (fmt + '\n').format(*args, **kwargs) 83 if self.stderr: 84 sys.stderr.write(line) 85 if self.log_file: 86 self.log_file.write(line) 87 88 def execute_reference_driver_tests(results: Results, ref_component: str, driver_component: str, \ 89 outcome_file: str) -> None: 90 """Run the tests specified in ref_component and driver_component. Results 91 are stored in the output_file and they will be used for the following 92 coverage analysis""" 93 results.new_section("Test {} and {}", ref_component, driver_component) 94 95 shell_command = "tests/scripts/all.sh --outcome-file " + outcome_file + \ 96 " " + ref_component + " " + driver_component 97 results.info("Running: {}", shell_command) 98 ret_val = subprocess.run(shell_command.split(), check=False).returncode 99 100 if ret_val != 0: 101 results.error("failed to run reference/driver components") 102 103 IgnoreEntry = typing.Union[str, typing.Pattern] 104 105 def name_matches_pattern(name: str, str_or_re: IgnoreEntry) -> bool: 106 """Check if name matches a pattern, that may be a string or regex. 107 - If the pattern is a string, name must be equal to match. 108 - If the pattern is a regex, name must fully match. 109 """ 110 # The CI's python is too old for re.Pattern 111 #if isinstance(str_or_re, re.Pattern): 112 if not isinstance(str_or_re, str): 113 return str_or_re.fullmatch(name) is not None 114 else: 115 return str_or_re == name 116 117 def open_outcome_file(outcome_file: str) -> typing.TextIO: 118 if outcome_file.endswith('.gz'): 119 return gzip.open(outcome_file, 'rt', encoding='utf-8') 120 elif outcome_file.endswith('.xz'): 121 return lzma.open(outcome_file, 'rt', encoding='utf-8') 122 else: 123 return open(outcome_file, 'rt', encoding='utf-8') 124 125 def read_outcome_file(outcome_file: str) -> Outcomes: 126 """Parse an outcome file and return an outcome collection. 127 """ 128 outcomes = {} 129 with open_outcome_file(outcome_file) as input_file: 130 for line in input_file: 131 (_platform, component, suite, case, result, _cause) = line.split(';') 132 # Note that `component` is not unique. If a test case passes on Linux 133 # and fails on FreeBSD, it'll end up in both the successes set and 134 # the failures set. 135 suite_case = ';'.join([suite, case]) 136 if component not in outcomes: 137 outcomes[component] = ComponentOutcomes(set(), set()) 138 if result == 'PASS': 139 outcomes[component].successes.add(suite_case) 140 elif result == 'FAIL': 141 outcomes[component].failures.add(suite_case) 142 143 return outcomes 144 145 146 class Task: 147 """Base class for outcome analysis tasks.""" 148 149 # Override the following in child classes. 150 # Map test suite names (with the test_suite_prefix) to a list of ignored 151 # test cases. Each element in the list can be either a string or a regex; 152 # see the `name_matches_pattern` function. 153 IGNORED_TESTS = {} #type: typing.Dict[str, typing.List[IgnoreEntry]] 154 155 def __init__(self, options) -> None: 156 """Pass command line options to the tasks. 157 158 Each task decides which command line options it cares about. 159 """ 160 pass 161 162 def section_name(self) -> str: 163 """The section name to use in results.""" 164 raise NotImplementedError 165 166 def ignored_tests(self, test_suite: str) -> typing.Iterator[IgnoreEntry]: 167 """Generate the ignore list for the specified test suite.""" 168 if test_suite in self.IGNORED_TESTS: 169 yield from self.IGNORED_TESTS[test_suite] 170 pos = test_suite.find('.') 171 if pos != -1: 172 base_test_suite = test_suite[:pos] 173 if base_test_suite in self.IGNORED_TESTS: 174 yield from self.IGNORED_TESTS[base_test_suite] 175 176 def is_test_case_ignored(self, test_suite: str, test_string: str) -> bool: 177 """Check if the specified test case is ignored.""" 178 for str_or_re in self.ignored_tests(test_suite): 179 if name_matches_pattern(test_string, str_or_re): 180 return True 181 return False 182 183 def run(self, results: Results, outcomes: Outcomes): 184 """Run the analysis on the specified outcomes. 185 186 Signal errors via the results objects 187 """ 188 raise NotImplementedError 189 190 191 class CoverageTask(Task): 192 """Analyze test coverage.""" 193 194 # Test cases whose suite and description are matched by an entry in 195 # IGNORED_TESTS are expected to be never executed. 196 # All other test cases are expected to be executed at least once. 197 198 def __init__(self, options) -> None: 199 super().__init__(options) 200 self.full_coverage = options.full_coverage #type: bool 201 202 @staticmethod 203 def section_name() -> str: 204 return "Analyze coverage" 205 206 def run(self, results: Results, outcomes: Outcomes) -> None: 207 """Check that all available test cases are executed at least once.""" 208 # Make sure that the generated data files are present (and up-to-date). 209 # This allows analyze_outcomes.py to run correctly on a fresh Git 210 # checkout. 211 cp = subprocess.run(['make', 'generated_files'], 212 cwd='tests', 213 stdout=subprocess.PIPE, stderr=subprocess.STDOUT, 214 check=False) 215 if cp.returncode != 0: 216 sys.stderr.write(cp.stdout.decode('utf-8')) 217 results.error("Failed \"make generated_files\" in tests. " 218 "Coverage analysis may be incorrect.") 219 available = collect_test_cases.collect_available_test_cases() 220 for suite_case in available: 221 hit = any(suite_case in comp_outcomes.successes or 222 suite_case in comp_outcomes.failures 223 for comp_outcomes in outcomes.values()) 224 (test_suite, test_description) = suite_case.split(';') 225 ignored = self.is_test_case_ignored(test_suite, test_description) 226 227 if not hit and not ignored: 228 if self.full_coverage: 229 results.error('Test case not executed: {}', suite_case) 230 else: 231 results.warning('Test case not executed: {}', suite_case) 232 elif hit and ignored: 233 # If a test case is no longer always skipped, we should remove 234 # it from the ignore list. 235 if self.full_coverage: 236 results.error('Test case was executed but marked as ignored for coverage: {}', 237 suite_case) 238 else: 239 results.warning('Test case was executed but marked as ignored for coverage: {}', 240 suite_case) 241 242 243 class DriverVSReference(Task): 244 """Compare outcomes from testing with and without a driver. 245 246 There are 2 options to use analyze_driver_vs_reference_xxx locally: 247 1. Run tests and then analysis: 248 - tests/scripts/all.sh --outcome-file "$PWD/out.csv" <component_ref> <component_driver> 249 - tests/scripts/analyze_outcomes.py out.csv analyze_driver_vs_reference_xxx 250 2. Let this script run both automatically: 251 - tests/scripts/analyze_outcomes.py out.csv analyze_driver_vs_reference_xxx 252 """ 253 254 # Override the following in child classes. 255 # Configuration name (all.sh component) used as the reference. 256 REFERENCE = '' 257 # Configuration name (all.sh component) used as the driver. 258 DRIVER = '' 259 # Ignored test suites (without the test_suite_ prefix). 260 IGNORED_SUITES = [] #type: typing.List[str] 261 262 def __init__(self, options) -> None: 263 super().__init__(options) 264 self.ignored_suites = frozenset('test_suite_' + x 265 for x in self.IGNORED_SUITES) 266 267 def section_name(self) -> str: 268 return f"Analyze driver {self.DRIVER} vs reference {self.REFERENCE}" 269 270 def run(self, results: Results, outcomes: Outcomes) -> None: 271 """Check that all tests passing in the driver component are also 272 passing in the corresponding reference component. 273 Skip: 274 - full test suites provided in ignored_suites list 275 - only some specific test inside a test suite, for which the corresponding 276 output string is provided 277 """ 278 ref_outcomes = outcomes.get("component_" + self.REFERENCE) 279 driver_outcomes = outcomes.get("component_" + self.DRIVER) 280 281 if ref_outcomes is None or driver_outcomes is None: 282 results.error("required components are missing: bad outcome file?") 283 return 284 285 if not ref_outcomes.successes: 286 results.error("no passing test in reference component: bad outcome file?") 287 return 288 289 for suite_case in ref_outcomes.successes: 290 # suite_case is like "test_suite_foo.bar;Description of test case" 291 (full_test_suite, test_string) = suite_case.split(';') 292 test_suite = full_test_suite.split('.')[0] # retrieve main part of test suite name 293 294 # Immediately skip fully-ignored test suites 295 if test_suite in self.ignored_suites or \ 296 full_test_suite in self.ignored_suites: 297 continue 298 299 # For ignored test cases inside test suites, just remember and: 300 # don't issue an error if they're skipped with drivers, 301 # but issue an error if they're not (means we have a bad entry). 302 ignored = self.is_test_case_ignored(full_test_suite, test_string) 303 304 if not ignored and not suite_case in driver_outcomes.successes: 305 results.error("SKIP/FAIL -> PASS: {}", suite_case) 306 if ignored and suite_case in driver_outcomes.successes: 307 results.error("uselessly ignored: {}", suite_case) 308 309 310 # Set this to False if a consuming branch can't achieve full test coverage 311 # in its default CI run. 312 FULL_COVERAGE_BY_DEFAULT = True 313 314 def main(known_tasks: typing.Dict[str, typing.Type[Task]]) -> None: 315 try: 316 parser = argparse.ArgumentParser(description=__doc__) 317 parser.add_argument('outcomes', metavar='OUTCOMES.CSV', 318 help='Outcome file to analyze (can be .gz or .xz)') 319 parser.add_argument('specified_tasks', default='all', nargs='?', 320 help='Analysis to be done. By default, run all tasks. ' 321 'With one or more TASK, run only those. ' 322 'TASK can be the name of a single task or ' 323 'comma/space-separated list of tasks. ') 324 parser.add_argument('--allow-partial-coverage', action='store_false', 325 dest='full_coverage', default=FULL_COVERAGE_BY_DEFAULT, 326 help=("Only warn if a test case is skipped in all components" + 327 (" (default)" if not FULL_COVERAGE_BY_DEFAULT else "") + 328 ". Only used by the 'analyze_coverage' task.")) 329 parser.add_argument('--list', action='store_true', 330 help='List all available tasks and exit.') 331 parser.add_argument('--log-file', 332 default='tests/analyze_outcomes.log', 333 help='Log file (default: tests/analyze_outcomes.log;' 334 ' empty means no log file)') 335 parser.add_argument('--require-full-coverage', action='store_true', 336 dest='full_coverage', default=FULL_COVERAGE_BY_DEFAULT, 337 help=("Require all available test cases to be executed" + 338 (" (default)" if FULL_COVERAGE_BY_DEFAULT else "") + 339 ". Only used by the 'analyze_coverage' task.")) 340 options = parser.parse_args() 341 342 if options.list: 343 for task_name in known_tasks: 344 print(task_name) 345 sys.exit(0) 346 347 main_results = Results(log_file=options.log_file) 348 349 if options.specified_tasks == 'all': 350 tasks_list = list(known_tasks.keys()) 351 else: 352 tasks_list = re.split(r'[, ]+', options.specified_tasks) 353 for task_name in tasks_list: 354 if task_name not in known_tasks: 355 sys.stderr.write('invalid task: {}\n'.format(task_name)) 356 sys.exit(2) 357 358 # If the outcome file exists, parse it once and share the result 359 # among tasks to improve performance. 360 # Otherwise, it will be generated by execute_reference_driver_tests. 361 if not os.path.exists(options.outcomes): 362 if len(tasks_list) > 1: 363 sys.stderr.write("mutiple tasks found, please provide a valid outcomes file.\n") 364 sys.exit(2) 365 366 task_name = tasks_list[0] 367 task_class = known_tasks[task_name] 368 if not issubclass(task_class, DriverVSReference): 369 sys.stderr.write("please provide valid outcomes file for {}.\n".format(task_name)) 370 sys.exit(2) 371 # mypy isn't smart enough to know that REFERENCE and DRIVER 372 # are *class* attributes of all classes derived from 373 # DriverVSReference. (It would be smart enough if we had an 374 # instance of task_class, but we can't construct an instance 375 # until we have the outcome data, so at this point we only 376 # have the class.) So we use indirection to access the class 377 # attributes. 378 execute_reference_driver_tests(main_results, 379 getattr(task_class, 'REFERENCE'), 380 getattr(task_class, 'DRIVER'), 381 options.outcomes) 382 383 outcomes = read_outcome_file(options.outcomes) 384 385 for task_name in tasks_list: 386 task_constructor = known_tasks[task_name] 387 task_instance = task_constructor(options) 388 main_results.new_section(task_instance.section_name()) 389 task_instance.run(main_results, outcomes) 390 391 main_results.info("Overall results: {} warnings and {} errors", 392 main_results.warning_count, main_results.error_count) 393 394 sys.exit(0 if (main_results.error_count == 0) else 1) 395 396 except Exception: # pylint: disable=broad-except 397 # Print the backtrace and exit explicitly with our chosen status. 398 traceback.print_exc() 399 sys.exit(120)