summaryrefslogtreecommitdiff
path: root/deps/v8/tools/testrunner/local/pool.py
diff options
context:
space:
mode:
Diffstat (limited to 'deps/v8/tools/testrunner/local/pool.py')
-rw-r--r--deps/v8/tools/testrunner/local/pool.py201
1 files changed, 133 insertions, 68 deletions
diff --git a/deps/v8/tools/testrunner/local/pool.py b/deps/v8/tools/testrunner/local/pool.py
index 9199b62d8a..7c9a250bc3 100644
--- a/deps/v8/tools/testrunner/local/pool.py
+++ b/deps/v8/tools/testrunner/local/pool.py
@@ -4,42 +4,38 @@
# found in the LICENSE file.
from Queue import Empty
-from multiprocessing import Event, Process, Queue
+from contextlib import contextmanager
+from multiprocessing import Process, Queue
+import os
+import signal
+import time
import traceback
+from . import command
+
def setup_testing():
"""For testing only: Use threading under the hood instead of multiprocessing
to make coverage work.
"""
global Queue
- global Event
global Process
del Queue
- del Event
del Process
from Queue import Queue
- from threading import Event
from threading import Thread as Process
+ # Monkeypatch threading Queue to look like multiprocessing Queue.
+ Queue.cancel_join_thread = lambda self: None
class NormalResult():
def __init__(self, result):
self.result = result
- self.exception = False
- self.break_now = False
-
+ self.exception = None
class ExceptionResult():
- def __init__(self):
- self.exception = True
- self.break_now = False
-
-
-class BreakResult():
- def __init__(self):
- self.exception = False
- self.break_now = True
+ def __init__(self, exception):
+ self.exception = exception
class MaybeResult():
@@ -56,26 +52,43 @@ class MaybeResult():
return MaybeResult(False, value)
-def Worker(fn, work_queue, done_queue, done,
+def Worker(fn, work_queue, done_queue,
process_context_fn=None, process_context_args=None):
"""Worker to be run in a child process.
- The worker stops on two conditions. 1. When the poison pill "STOP" is
- reached or 2. when the event "done" is set."""
+ The worker stops when the poison pill "STOP" is reached.
+ """
try:
kwargs = {}
if process_context_fn and process_context_args is not None:
kwargs.update(process_context=process_context_fn(*process_context_args))
for args in iter(work_queue.get, "STOP"):
- if done.is_set():
- break
try:
done_queue.put(NormalResult(fn(*args, **kwargs)))
+ except command.AbortException:
+ # SIGINT, SIGTERM or internal hard timeout.
+ break
except Exception, e:
traceback.print_exc()
print(">>> EXCEPTION: %s" % e)
- done_queue.put(ExceptionResult())
+ done_queue.put(ExceptionResult(e))
+ # When we reach here on normal tear down, all items have been pulled from
+ # the done_queue before and this should have no effect. On fast abort, it's
+ # possible that a fast worker left items on the done_queue in memory, which
+ # will never be pulled. This call purges those to avoid a deadlock.
+ done_queue.cancel_join_thread()
except KeyboardInterrupt:
- done_queue.put(BreakResult())
+ assert False, 'Unreachable'
+
+
+@contextmanager
+def without_sig():
+ int_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
+ term_handler = signal.signal(signal.SIGTERM, signal.SIG_IGN)
+ try:
+ yield
+ finally:
+ signal.signal(signal.SIGINT, int_handler)
+ signal.signal(signal.SIGTERM, term_handler)
class Pool():
@@ -88,24 +101,28 @@ class Pool():
# Necessary to not overflow the queue's pipe if a keyboard interrupt happens.
BUFFER_FACTOR = 4
- def __init__(self, num_workers, heartbeat_timeout=30):
+ def __init__(self, num_workers, heartbeat_timeout=1):
self.num_workers = num_workers
self.processes = []
self.terminated = False
+ self.abort_now = False
- # Invariant: count >= #work_queue + #done_queue. It is greater when a
- # worker takes an item from the work_queue and before the result is
+ # Invariant: processing_count >= #work_queue + #done_queue. It is greater
+ # when a worker takes an item from the work_queue and before the result is
# submitted to the done_queue. It is equal when no worker is working,
# e.g. when all workers have finished, and when no results are processed.
# Count is only accessed by the parent process. Only the parent process is
# allowed to remove items from the done_queue and to add items to the
# work_queue.
- self.count = 0
- self.work_queue = Queue()
- self.done_queue = Queue()
- self.done = Event()
+ self.processing_count = 0
self.heartbeat_timeout = heartbeat_timeout
+ # Disable sigint and sigterm to prevent subprocesses from capturing the
+ # signals.
+ with without_sig():
+ self.work_queue = Queue()
+ self.done_queue = Queue()
+
def imap_unordered(self, fn, gen,
process_context_fn=None, process_context_args=None):
"""Maps function "fn" to items in generator "gen" on the worker processes
@@ -123,58 +140,63 @@ class Pool():
process_context_fn. All arguments will be pickled and sent beyond the
process boundary.
"""
+ if self.terminated:
+ return
try:
internal_error = False
gen = iter(gen)
self.advance = self._advance_more
- for w in xrange(self.num_workers):
- p = Process(target=Worker, args=(fn,
- self.work_queue,
- self.done_queue,
- self.done,
- process_context_fn,
- process_context_args))
- p.start()
- self.processes.append(p)
+ # Disable sigint and sigterm to prevent subprocesses from capturing the
+ # signals.
+ with without_sig():
+ for w in xrange(self.num_workers):
+ p = Process(target=Worker, args=(fn,
+ self.work_queue,
+ self.done_queue,
+ process_context_fn,
+ process_context_args))
+ p.start()
+ self.processes.append(p)
self.advance(gen)
- while self.count > 0:
+ while self.processing_count > 0:
while True:
try:
- result = self.done_queue.get(timeout=self.heartbeat_timeout)
- break
- except Empty:
- # Indicate a heartbeat. The iterator will continue fetching the
- # next result.
- yield MaybeResult.create_heartbeat()
- self.count -= 1
- if result.exception:
- # TODO(machenbach): Handle a few known types of internal errors
- # gracefully, e.g. missing test files.
- internal_error = True
- continue
- elif result.break_now:
- # A keyboard interrupt happened in one of the worker processes.
- raise KeyboardInterrupt
- else:
- yield MaybeResult.create_result(result.result)
+ # Read from result queue in a responsive fashion. If available,
+ # this will return a normal result immediately or a heartbeat on
+ # heartbeat timeout (default 1 second).
+ result = self._get_result_from_queue()
+ except:
+ # TODO(machenbach): Handle a few known types of internal errors
+ # gracefully, e.g. missing test files.
+ internal_error = True
+ continue
+
+ if self.abort_now:
+ # SIGINT, SIGTERM or internal hard timeout.
+ return
+
+ yield result
+ break
+
self.advance(gen)
except KeyboardInterrupt:
- raise
+ assert False, 'Unreachable'
except Exception as e:
traceback.print_exc()
print(">>> EXCEPTION: %s" % e)
finally:
- self.terminate()
+ self._terminate()
+
if internal_error:
raise Exception("Internal error in a worker process.")
def _advance_more(self, gen):
- while self.count < self.num_workers * self.BUFFER_FACTOR:
+ while self.processing_count < self.num_workers * self.BUFFER_FACTOR:
try:
self.work_queue.put(gen.next())
- self.count += 1
+ self.processing_count += 1
except StopIteration:
self.advance = self._advance_empty
break
@@ -185,27 +207,51 @@ class Pool():
def add(self, args):
"""Adds an item to the work queue. Can be called dynamically while
processing the results from imap_unordered."""
+ assert not self.terminated
+
self.work_queue.put(args)
- self.count += 1
+ self.processing_count += 1
+
+ def abort(self):
+ """Schedules abort on next queue read.
+
+ This is safe to call when handling SIGINT, SIGTERM or when an internal
+ hard timeout is reached.
+ """
+ self.abort_now = True
- def terminate(self):
+ def _terminate(self):
+ """Terminates execution and cleans up the queues.
+
+ If abort() was called before termination, this also terminates the
+ subprocesses and doesn't wait for ongoing tests.
+ """
if self.terminated:
return
self.terminated = True
- # For exceptional tear down set the "done" event to stop the workers before
- # they empty the queue buffer.
- self.done.set()
+ # Drain out work queue from tests
+ try:
+ while True:
+ self.work_queue.get(True, 0.1)
+ except Empty:
+ pass
- for p in self.processes:
+ # Make sure all processes stop
+ for _ in self.processes:
# During normal tear down the workers block on get(). Feed a poison pill
# per worker to make them stop.
self.work_queue.put("STOP")
+ if self.abort_now:
+ for p in self.processes:
+ os.kill(p.pid, signal.SIGTERM)
+
for p in self.processes:
p.join()
- # Drain the queues to prevent failures when queues are garbage collected.
+ # Drain the queues to prevent stderr chatter when queues are garbage
+ # collected.
try:
while True: self.work_queue.get(False)
except:
@@ -214,3 +260,22 @@ class Pool():
while True: self.done_queue.get(False)
except:
pass
+
+ def _get_result_from_queue(self):
+ """Attempts to get the next result from the queue.
+
+ Returns: A wrapped result if one was available within heartbeat timeout,
+ a heartbeat result otherwise.
+ Raises:
+ Exception: If an exception occured when processing the task on the
+ worker side, it is reraised here.
+ """
+ while True:
+ try:
+ result = self.done_queue.get(timeout=self.heartbeat_timeout)
+ self.processing_count -= 1
+ if result.exception:
+ raise result.exception
+ return MaybeResult.create_result(result.result)
+ except Empty:
+ return MaybeResult.create_heartbeat()