summaryrefslogtreecommitdiff
path: root/deps/v8/src/heap/page-parallel-job.h
blob: eb215efbb489fe89120108a2abe0645aefe6a4e7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// Copyright 2016 the V8 project authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef V8_HEAP_PAGE_PARALLEL_JOB_
#define V8_HEAP_PAGE_PARALLEL_JOB_

#include "src/allocation.h"
#include "src/cancelable-task.h"
#include "src/utils.h"
#include "src/v8.h"

namespace v8 {
namespace internal {

class Heap;
class Isolate;

// This class manages background tasks that process set of pages in parallel.
// The JobTraits class needs to define:
// - PerPageData type - state associated with each page.
// - PerTaskData type - state associated with each task.
// - static void ProcessPageInParallel(Heap* heap,
//                                     PerTaskData task_data,
//                                     MemoryChunk* page,
//                                     PerPageData page_data)
template <typename JobTraits>
class PageParallelJob {
 public:
  // PageParallelJob cannot dynamically create a semaphore because of a bug in
  // glibc. See http://crbug.com/609249 and
  // https://sourceware.org/bugzilla/show_bug.cgi?id=12674.
  // The caller must provide a semaphore with value 0 and ensure that
  // the lifetime of the semaphore is the same as the lifetime of the Isolate.
  // It is guaranteed that the semaphore value will be 0 after Run() call.
  PageParallelJob(Heap* heap, CancelableTaskManager* cancelable_task_manager,
                  base::Semaphore* semaphore)
      : heap_(heap),
        cancelable_task_manager_(cancelable_task_manager),
        items_(nullptr),
        num_items_(0),
        num_tasks_(0),
        pending_tasks_(semaphore) {}

  ~PageParallelJob() {
    Item* item = items_;
    while (item != nullptr) {
      Item* next = item->next;
      delete item;
      item = next;
    }
  }

  void AddPage(MemoryChunk* chunk, typename JobTraits::PerPageData data) {
    Item* item = new Item(chunk, data, items_);
    items_ = item;
    ++num_items_;
  }

  int NumberOfPages() const { return num_items_; }

  // Returns the number of tasks that were spawned when running the job.
  int NumberOfTasks() const { return num_tasks_; }

  // Runs the given number of tasks in parallel and processes the previously
  // added pages. This function blocks until all tasks finish.
  // The callback takes the index of a task and returns data for that task.
  template <typename Callback>
  void Run(int num_tasks, Callback per_task_data_callback) {
    if (num_items_ == 0) return;
    DCHECK_GE(num_tasks, 1);
    uint32_t task_ids[kMaxNumberOfTasks];
    const int max_num_tasks = Min(
        kMaxNumberOfTasks,
        static_cast<int>(
            V8::GetCurrentPlatform()->NumberOfAvailableBackgroundThreads()));
    num_tasks_ = Max(1, Min(num_tasks, max_num_tasks));
    int items_per_task = (num_items_ + num_tasks_ - 1) / num_tasks_;
    int start_index = 0;
    Task* main_task = nullptr;
    for (int i = 0; i < num_tasks_; i++, start_index += items_per_task) {
      if (start_index >= num_items_) {
        start_index -= num_items_;
      }
      Task* task = new Task(heap_, items_, num_items_, start_index,
                            pending_tasks_, per_task_data_callback(i));
      task_ids[i] = task->id();
      if (i > 0) {
        V8::GetCurrentPlatform()->CallOnBackgroundThread(
            task, v8::Platform::kShortRunningTask);
      } else {
        main_task = task;
      }
    }
    // Contribute on main thread.
    main_task->Run();
    delete main_task;
    // Wait for background tasks.
    for (int i = 0; i < num_tasks_; i++) {
      if (cancelable_task_manager_->TryAbort(task_ids[i]) !=
          CancelableTaskManager::kTaskAborted) {
        pending_tasks_->Wait();
      }
    }
  }

 private:
  static const int kMaxNumberOfTasks = 32;

  enum ProcessingState { kAvailable, kProcessing, kFinished };

  struct Item : public Malloced {
    Item(MemoryChunk* chunk, typename JobTraits::PerPageData data, Item* next)
        : chunk(chunk), state(kAvailable), data(data), next(next) {}
    MemoryChunk* chunk;
    base::AtomicValue<ProcessingState> state;
    typename JobTraits::PerPageData data;
    Item* next;
  };

  class Task : public CancelableTask {
   public:
    Task(Heap* heap, Item* items, int num_items, int start_index,
         base::Semaphore* on_finish, typename JobTraits::PerTaskData data)
        : CancelableTask(heap->isolate()),
          heap_(heap),
          items_(items),
          num_items_(num_items),
          start_index_(start_index),
          on_finish_(on_finish),
          data_(data) {}

    virtual ~Task() {}

   private:
    // v8::internal::CancelableTask overrides.
    void RunInternal() override {
      // Each task starts at a different index to improve parallelization.
      Item* current = items_;
      int skip = start_index_;
      while (skip-- > 0) {
        current = current->next;
      }
      for (int i = 0; i < num_items_; i++) {
        if (current->state.TrySetValue(kAvailable, kProcessing)) {
          JobTraits::ProcessPageInParallel(heap_, data_, current->chunk,
                                           current->data);
          current->state.SetValue(kFinished);
        }
        current = current->next;
        // Wrap around if needed.
        if (current == nullptr) {
          current = items_;
        }
      }
      on_finish_->Signal();
    }

    Heap* heap_;
    Item* items_;
    int num_items_;
    int start_index_;
    base::Semaphore* on_finish_;
    typename JobTraits::PerTaskData data_;
    DISALLOW_COPY_AND_ASSIGN(Task);
  };

  Heap* heap_;
  CancelableTaskManager* cancelable_task_manager_;
  Item* items_;
  int num_items_;
  int num_tasks_;
  base::Semaphore* pending_tasks_;
  DISALLOW_COPY_AND_ASSIGN(PageParallelJob);
};

}  // namespace internal
}  // namespace v8

#endif  // V8_HEAP_PAGE_PARALLEL_JOB_