summaryrefslogtreecommitdiff
path: root/deps/v8/tools/testrunner/testproc/execution.py
blob: 021b02af3e9b9341b5f99b5cecc022fb2745f882 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# Copyright 2018 the V8 project authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import collections
import traceback

from . import base
from ..local import pool


# Global function for multiprocessing, because pickling a static method doesn't
# work on Windows.
def run_job(job, process_context):
  return job.run(process_context)


def create_process_context(requirement):
  return ProcessContext(base.get_reduce_result_function(requirement))


JobResult = collections.namedtuple('JobResult', ['id', 'result'])
ProcessContext = collections.namedtuple('ProcessContext', ['reduce_result_f'])


class Job(object):
  def __init__(self, test_id, cmd, outproc, keep_output):
    self.test_id = test_id
    self.cmd = cmd
    self.outproc = outproc
    self.keep_output = keep_output

  def run(self, process_ctx):
    output = self.cmd.execute()
    result = self.outproc.process(output)
    if not self.keep_output:
      result = process_ctx.reduce_result_f(result)
    return JobResult(self.test_id, result)


class ExecutionProc(base.TestProc):
  """Last processor in the chain. Instead of passing tests further it creates
  commands and output processors, executes them in multiple worker processes and
  sends results to the previous processor.
  """

  def __init__(self, jobs, context):
    super(ExecutionProc, self).__init__()
    self._pool = pool.Pool(jobs)
    self._context = context
    self._tests = {}

  def connect_to(self, next_proc):
    assert False, 'ExecutionProc cannot be connected to anything'

  def start(self):
    try:
      it = self._pool.imap_unordered(
        fn=run_job,
        gen=[],
        process_context_fn=create_process_context,
        process_context_args=[self._prev_requirement],
      )
      for pool_result in it:
        if pool_result.heartbeat:
          continue

        job_result = pool_result.value
        test_id, result = job_result

        test, result.cmd = self._tests[test_id]
        del self._tests[test_id]
        self._send_result(test, result)
    except KeyboardInterrupt:
      raise
    except:
      traceback.print_exc()
      raise
    finally:
      self._pool.terminate()

  def next_test(self, test):
    test_id = test.procid
    cmd = test.get_command(self._context)
    self._tests[test_id] = test, cmd

    # TODO(majeski): Needs factory for outproc as in local/execution.py
    outproc = test.output_proc
    self._pool.add([Job(test_id, cmd, outproc, test.keep_output)])

  def result_for(self, test, result):
    assert False, 'ExecutionProc cannot receive results'