diff options
Diffstat (limited to 'deps/v8/tools/testrunner/local/pool.py')
-rw-r--r-- | deps/v8/tools/testrunner/local/pool.py | 201 |
1 files changed, 133 insertions, 68 deletions
diff --git a/deps/v8/tools/testrunner/local/pool.py b/deps/v8/tools/testrunner/local/pool.py index 9199b62d8a..7c9a250bc3 100644 --- a/deps/v8/tools/testrunner/local/pool.py +++ b/deps/v8/tools/testrunner/local/pool.py @@ -4,42 +4,38 @@ # found in the LICENSE file. from Queue import Empty -from multiprocessing import Event, Process, Queue +from contextlib import contextmanager +from multiprocessing import Process, Queue +import os +import signal +import time import traceback +from . import command + def setup_testing(): """For testing only: Use threading under the hood instead of multiprocessing to make coverage work. """ global Queue - global Event global Process del Queue - del Event del Process from Queue import Queue - from threading import Event from threading import Thread as Process + # Monkeypatch threading Queue to look like multiprocessing Queue. + Queue.cancel_join_thread = lambda self: None class NormalResult(): def __init__(self, result): self.result = result - self.exception = False - self.break_now = False - + self.exception = None class ExceptionResult(): - def __init__(self): - self.exception = True - self.break_now = False - - -class BreakResult(): - def __init__(self): - self.exception = False - self.break_now = True + def __init__(self, exception): + self.exception = exception class MaybeResult(): @@ -56,26 +52,43 @@ class MaybeResult(): return MaybeResult(False, value) -def Worker(fn, work_queue, done_queue, done, +def Worker(fn, work_queue, done_queue, process_context_fn=None, process_context_args=None): """Worker to be run in a child process. - The worker stops on two conditions. 1. When the poison pill "STOP" is - reached or 2. when the event "done" is set.""" + The worker stops when the poison pill "STOP" is reached. + """ try: kwargs = {} if process_context_fn and process_context_args is not None: kwargs.update(process_context=process_context_fn(*process_context_args)) for args in iter(work_queue.get, "STOP"): - if done.is_set(): - break try: done_queue.put(NormalResult(fn(*args, **kwargs))) + except command.AbortException: + # SIGINT, SIGTERM or internal hard timeout. + break except Exception, e: traceback.print_exc() print(">>> EXCEPTION: %s" % e) - done_queue.put(ExceptionResult()) + done_queue.put(ExceptionResult(e)) + # When we reach here on normal tear down, all items have been pulled from + # the done_queue before and this should have no effect. On fast abort, it's + # possible that a fast worker left items on the done_queue in memory, which + # will never be pulled. This call purges those to avoid a deadlock. + done_queue.cancel_join_thread() except KeyboardInterrupt: - done_queue.put(BreakResult()) + assert False, 'Unreachable' + + +@contextmanager +def without_sig(): + int_handler = signal.signal(signal.SIGINT, signal.SIG_IGN) + term_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN) + try: + yield + finally: + signal.signal(signal.SIGINT, int_handler) + signal.signal(signal.SIGTERM, term_handler) class Pool(): @@ -88,24 +101,28 @@ class Pool(): # Necessary to not overflow the queue's pipe if a keyboard interrupt happens. BUFFER_FACTOR = 4 - def __init__(self, num_workers, heartbeat_timeout=30): + def __init__(self, num_workers, heartbeat_timeout=1): self.num_workers = num_workers self.processes = [] self.terminated = False + self.abort_now = False - # Invariant: count >= #work_queue + #done_queue. It is greater when a - # worker takes an item from the work_queue and before the result is + # Invariant: processing_count >= #work_queue + #done_queue. It is greater + # when a worker takes an item from the work_queue and before the result is # submitted to the done_queue. It is equal when no worker is working, # e.g. when all workers have finished, and when no results are processed. # Count is only accessed by the parent process. Only the parent process is # allowed to remove items from the done_queue and to add items to the # work_queue. - self.count = 0 - self.work_queue = Queue() - self.done_queue = Queue() - self.done = Event() + self.processing_count = 0 self.heartbeat_timeout = heartbeat_timeout + # Disable sigint and sigterm to prevent subprocesses from capturing the + # signals. + with without_sig(): + self.work_queue = Queue() + self.done_queue = Queue() + def imap_unordered(self, fn, gen, process_context_fn=None, process_context_args=None): """Maps function "fn" to items in generator "gen" on the worker processes @@ -123,58 +140,63 @@ class Pool(): process_context_fn. All arguments will be pickled and sent beyond the process boundary. """ + if self.terminated: + return try: internal_error = False gen = iter(gen) self.advance = self._advance_more - for w in xrange(self.num_workers): - p = Process(target=Worker, args=(fn, - self.work_queue, - self.done_queue, - self.done, - process_context_fn, - process_context_args)) - p.start() - self.processes.append(p) + # Disable sigint and sigterm to prevent subprocesses from capturing the + # signals. + with without_sig(): + for w in xrange(self.num_workers): + p = Process(target=Worker, args=(fn, + self.work_queue, + self.done_queue, + process_context_fn, + process_context_args)) + p.start() + self.processes.append(p) self.advance(gen) - while self.count > 0: + while self.processing_count > 0: while True: try: - result = self.done_queue.get(timeout=self.heartbeat_timeout) - break - except Empty: - # Indicate a heartbeat. The iterator will continue fetching the - # next result. - yield MaybeResult.create_heartbeat() - self.count -= 1 - if result.exception: - # TODO(machenbach): Handle a few known types of internal errors - # gracefully, e.g. missing test files. - internal_error = True - continue - elif result.break_now: - # A keyboard interrupt happened in one of the worker processes. - raise KeyboardInterrupt - else: - yield MaybeResult.create_result(result.result) + # Read from result queue in a responsive fashion. If available, + # this will return a normal result immediately or a heartbeat on + # heartbeat timeout (default 1 second). + result = self._get_result_from_queue() + except: + # TODO(machenbach): Handle a few known types of internal errors + # gracefully, e.g. missing test files. + internal_error = True + continue + + if self.abort_now: + # SIGINT, SIGTERM or internal hard timeout. + return + + yield result + break + self.advance(gen) except KeyboardInterrupt: - raise + assert False, 'Unreachable' except Exception as e: traceback.print_exc() print(">>> EXCEPTION: %s" % e) finally: - self.terminate() + self._terminate() + if internal_error: raise Exception("Internal error in a worker process.") def _advance_more(self, gen): - while self.count < self.num_workers * self.BUFFER_FACTOR: + while self.processing_count < self.num_workers * self.BUFFER_FACTOR: try: self.work_queue.put(gen.next()) - self.count += 1 + self.processing_count += 1 except StopIteration: self.advance = self._advance_empty break @@ -185,27 +207,51 @@ class Pool(): def add(self, args): """Adds an item to the work queue. Can be called dynamically while processing the results from imap_unordered.""" + assert not self.terminated + self.work_queue.put(args) - self.count += 1 + self.processing_count += 1 + + def abort(self): + """Schedules abort on next queue read. + + This is safe to call when handling SIGINT, SIGTERM or when an internal + hard timeout is reached. + """ + self.abort_now = True - def terminate(self): + def _terminate(self): + """Terminates execution and cleans up the queues. + + If abort() was called before termination, this also terminates the + subprocesses and doesn't wait for ongoing tests. + """ if self.terminated: return self.terminated = True - # For exceptional tear down set the "done" event to stop the workers before - # they empty the queue buffer. - self.done.set() + # Drain out work queue from tests + try: + while True: + self.work_queue.get(True, 0.1) + except Empty: + pass - for p in self.processes: + # Make sure all processes stop + for _ in self.processes: # During normal tear down the workers block on get(). Feed a poison pill # per worker to make them stop. self.work_queue.put("STOP") + if self.abort_now: + for p in self.processes: + os.kill(p.pid, signal.SIGTERM) + for p in self.processes: p.join() - # Drain the queues to prevent failures when queues are garbage collected. + # Drain the queues to prevent stderr chatter when queues are garbage + # collected. try: while True: self.work_queue.get(False) except: @@ -214,3 +260,22 @@ class Pool(): while True: self.done_queue.get(False) except: pass + + def _get_result_from_queue(self): + """Attempts to get the next result from the queue. + + Returns: A wrapped result if one was available within heartbeat timeout, + a heartbeat result otherwise. + Raises: + Exception: If an exception occured when processing the task on the + worker side, it is reraised here. + """ + while True: + try: + result = self.done_queue.get(timeout=self.heartbeat_timeout) + self.processing_count -= 1 + if result.exception: + raise result.exception + return MaybeResult.create_result(result.result) + except Empty: + return MaybeResult.create_heartbeat() |