1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
'use strict';
const EventEmitter = require('events');
const util = require('util');
const { internalBinding } = require('internal/bootstrap/loaders');
const { MessagePort, MessageChannel } = internalBinding('messaging');
const { handle_onclose } = internalBinding('symbols');
util.inherits(MessagePort, EventEmitter);
const kOnMessageListener = Symbol('kOnMessageListener');
const debug = util.debuglog('worker');
// A MessagePort consists of a handle (that wraps around an
// uv_async_t) which can receive information from other threads and emits
// .onmessage events, and a function used for sending data to a MessagePort
// in some other thread.
MessagePort.prototype[kOnMessageListener] = function onmessage(payload) {
debug('received message', payload);
// Emit the deserialized object to userland.
this.emit('message', payload);
};
// This is for compatibility with the Web's MessagePort API. It makes sense to
// provide it as an `EventEmitter` in Node.js, but if somebody overrides
// `onmessage`, we'll switch over to the Web API model.
Object.defineProperty(MessagePort.prototype, 'onmessage', {
enumerable: true,
configurable: true,
get() {
return this[kOnMessageListener];
},
set(value) {
this[kOnMessageListener] = value;
if (typeof value === 'function') {
this.ref();
this.start();
} else {
this.unref();
this.stop();
}
}
});
// This is called from inside the `MessagePort` constructor.
function oninit() {
setupPortReferencing(this, this, 'message');
}
Object.defineProperty(MessagePort.prototype, 'oninit', {
enumerable: true,
writable: false,
value: oninit
});
// This is called after the underlying `uv_async_t` has been closed.
function onclose() {
if (typeof this.onclose === 'function') {
// Not part of the Web standard yet, but there aren't many reasonable
// alternatives in a non-EventEmitter usage setting.
// Refs: https://github.com/whatwg/html/issues/1766
this.onclose();
}
this.emit('close');
}
Object.defineProperty(MessagePort.prototype, handle_onclose, {
enumerable: false,
writable: false,
value: onclose
});
const originalClose = MessagePort.prototype.close;
MessagePort.prototype.close = function(cb) {
if (typeof cb === 'function')
this.once('close', cb);
originalClose.call(this);
};
function setupPortReferencing(port, eventEmitter, eventName) {
// Keep track of whether there are any workerMessage listeners:
// If there are some, ref() the channel so it keeps the event loop alive.
// If there are none or all are removed, unref() the channel so the worker
// can shutdown gracefully.
port.unref();
eventEmitter.on('newListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
port.ref();
port.start();
}
});
eventEmitter.on('removeListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
port.stop();
port.unref();
}
});
}
module.exports = {
MessagePort,
MessageChannel
};
|