summaryrefslogtreecommitdiff
path: root/lib/internal/process/worker_thread_only.js
blob: 3fcd052542d09d29d93359e2d66b57bf95a5478b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
'use strict';

// This file contains process bootstrappers that can only be
// run in the worker thread.
const {
  getEnvMessagePort,
  threadId
} = internalBinding('worker');

const {
  messageTypes,
  kStdioWantsMoreDataCallback,
  kWaitingStreams,
  ReadableWorkerStdio,
  WritableWorkerStdio
} = require('internal/worker/io');

let debuglog;
function debug(...args) {
  if (!debuglog) {
    debuglog = require('util').debuglog('worker');
  }
  return debuglog(...args);
}

const workerStdio = {};

function initializeWorkerStdio() {
  const port = getEnvMessagePort();
  port[kWaitingStreams] = 0;
  workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin');
  workerStdio.stdout = new WritableWorkerStdio(port, 'stdout');
  workerStdio.stderr = new WritableWorkerStdio(port, 'stderr');

  return {
    getStdout() { return workerStdio.stdout; },
    getStderr() { return workerStdio.stderr; },
    getStdin() { return workerStdio.stdin; }
  };
}

function createMessageHandler(port) {
  const publicWorker = require('worker_threads');

  return function(message) {
    if (message.type === messageTypes.LOAD_SCRIPT) {
      const { filename, doEval, workerData, publicPort, hasStdin } = message;
      publicWorker.parentPort = publicPort;
      publicWorker.workerData = workerData;

      if (!hasStdin)
        workerStdio.stdin.push(null);

      debug(`[${threadId}] starts worker script ${filename} ` +
            `(eval = ${eval}) at cwd = ${process.cwd()}`);
      port.unref();
      port.postMessage({ type: messageTypes.UP_AND_RUNNING });
      if (doEval) {
        const { evalScript } = require('internal/process/execution');
        evalScript('[worker eval]', filename);
      } else {
        process.argv[1] = filename; // script filename
        require('module').runMain();
      }
      return;
    } else if (message.type === messageTypes.STDIO_PAYLOAD) {
      const { stream, chunk, encoding } = message;
      workerStdio[stream].push(chunk, encoding);
      return;
    } else if (message.type === messageTypes.STDIO_WANTS_MORE_DATA) {
      const { stream } = message;
      workerStdio[stream][kStdioWantsMoreDataCallback]();
      return;
    }

    require('assert').fail(`Unknown worker message type ${message.type}`);
  };
}

// XXX(joyeecheung): this has to be returned as an anonymous function
// wrapped in a closure, see the comment of the original
// process._fatalException in lib/internal/process/execution.js
function createWorkerFatalExeception(port) {
  const {
    fatalException: originalFatalException
  } = require('internal/process/execution');

  return (error) => {
    debug(`[${threadId}] gets fatal exception`);
    let caught = false;
    try {
      caught = originalFatalException.call(this, error);
    } catch (e) {
      error = e;
    }
    debug(`[${threadId}] fatal exception caught = ${caught}`);

    if (!caught) {
      let serialized;
      try {
        const { serializeError } = require('internal/error-serdes');
        serialized = serializeError(error);
      } catch {}
      debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
      if (serialized)
        port.postMessage({
          type: messageTypes.ERROR_MESSAGE,
          error: serialized
        });
      else
        port.postMessage({ type: messageTypes.COULD_NOT_SERIALIZE_ERROR });

      const { clearAsyncIdStack } = require('internal/async_hooks');
      clearAsyncIdStack();

      process.exit();
    }
  };
}

module.exports = {
  initializeWorkerStdio,
  createMessageHandler,
  createWorkerFatalExeception
};