本文译自Maciej Cieślar所写的A complete guide to threads in Node.js。
原文地址: https://blog.logrocket.com/a-complete-guide-to-threads-in-node-js-4fa3898fe74f。
翻译已得到作者许可。
不少人会疑惑单线程的Node.js怎么和多线程的后端(语言)竞争。许多大公司采用Node作为后端,似乎是违反直觉的。要找到原因,我们就不得不谈谈当我们说Node是单线程时,我们到底在指什么。
JavaScript当初创立的目的,是给web页面添加一些简单的东西,像验证表单或者给鼠标加一个彩虹尾巴 。直到2009年,Ryan Dahl创立了Node.js,才使得用js开发后端成为可能。
后端的语言通常都支持多线程,拥有在线程间同步数据的机制以及其它面向线程的特性。为JavaScript加入这类支持,意味着改变整个语言,这自然不是当时Dahl的目标。让简单的JavaScript支持多线程,不得不使用一些变通方案。我们一起探索下。
Node.js是如何工作的
Node.js中有2类线程:一个主线程,由event loop处理;几个辅助线程,在工作线程池中(worker pool)。
Event loop是这样一种机制,它接收回调(函数)并标记它们在未来某个点执行。JS代码和它运行在一个线程里。当某个JavaScript操作阻塞了线程,Event loop也会被阻塞。
工作线程池是一种创建并管理额外线程的执行模型,它同步地执行任务并将结果返回给event loop。然后event loop将收到的结果作为入参来执行对应的回调函数。
简言之,工作线程池负责异步的I/O操作 —— 主要是与系统磁盘和网络的交互。像fs
(I/O密集型)或crypto
(CPU密集型)等模块就会用到它。工作线程池由libuv实现,正因为此,每当JavaScript需要和C++通信时,都会有轻微的延迟。当然,这个延迟很难被注意到。
有了这2类机制的支持,我们得以写出这样的代码:
fs.readFile(path.join(__dirname, './package.json'), (err, content) => {
if (err) {
return null;
}
console.log(content.toString());
});
fs
模块告诉工作线程池使用它的一个线程读取文件内容并在读取完毕时告诉event loop。然后event loop取出给定的回调函数,把文件内容作为入参来执行。
上面的代码是非阻塞的,这样做的好处是 我们不必同步地等着某事发生。我们告诉工作线程池读取文件并将结果提供给对应的函数。因为工作线程池有它自己的线程,文件在读取的过程中,event loop可以继续正常地运行。
一切都很好,直到有一个需求,要同步地执行复杂的操作:任何一个需要花费很长时间的函数都会阻塞主线程。如果一个应用有许多这样的函数,它会严重地降低服务器的吞吐量,甚至让服务器直接卡死。这种情况下,我们也没法将工作交给工作线程池了。
需要复杂计算的领域 —— 例如AI、机器学习或者大数据 —— 都无法高效地使用Node.js,因为它们的操作会阻塞主线程,让服务器失去响应。Node.js v10.5.0 结束了这种局面,加入了多线程的支持。
工作线程(worker_threads)
worker_threads
模块允许我们创建完全可用的多线程Node.js应用。
一个工作者(thread worker)是一块在另一个线程中执行的代码(通常取自一个文件)。
注意,thread worker、worker和thread通常可以互换着使用,都是一个意思。
使用工作者线程前,我们必须引入worker_threads
模块。作为起步,我们写一个函数来帮助创建工作者线程,然后谈谈它们有哪些属性。
type WorkerCallback = (err: any, result?: any) => any;
export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) {
const worker = new Worker(path, { workerData });
worker.on('message', cb.bind(null, null));
worker.on('error', cb);
worker.on('exit', (exitCode) => {
if (exitCode === 0) {
return null;
}
return cb(new Error(`Worker has stopped with code ${exitCode}`));
});
return worker;
}
要创建一个工作者线程,我们必须构造Worker
类的一个实例。Worker
构造函数的第一个参数是所要执行的代码文件的路径;第二个参数是含有workerData
属性的对象。workerData
是我们希望线程启动后能访问到的数据。
注意,无论你用的是JavaScript或者可以编译为JavaScript的语言(比如 TypeScript), 文件路径都应该指向以.js
或.ms
为后缀的文件。
有一点我想要指出,为什么这里我用了回调而不是直接返回一个promise。这是因为工作者线程内的message
事件中可以触发多次的,而不仅仅是一次。
如上例所见,线程间的通信是基于事件的(event-based)。这意味着我们只需要设定好监听器(listeners),等着事件触发就好了。
举一些最常用的事件:
worker.on('error', (error) => {});
当线程内出现未捕获的异常时,error
事件会触发。线程会被终止,error对象作为回调的第一个参数。
worker.on('exit', (exitCode) => {});
exit
事件发生在线程退出时。如果线程内调用了process.exit()
, exitCode
会传给回调。如果线程是用worker.terminate
终止的,code的值为1。
worker.on('online', () => {});
当线程解析完JavaScript代码并开始执行时,online
事件被触发。这个不太常用,在特定情况下可以拿来获取相关信息。
worker.on('message', (data) => {});
线程向父线程发送数据时会触发message
事件。
现在,我们看看线程间是如何进行数据共享的。
在线程间交换数据
要把数据传给另一个线程,我们使用port.postMessage()
方法。它的方法签名如下:
port.postMessage(data[, transferList])
port对象可以是 parentPort
或者 MessagePort
的一个实例 —— 稍后会详细说到。
data参数
第一个参数,data
,是一个会被拷贝到其它线程的对象。它可以包含任何拷贝算法(copying algorithm)所支持的东西。
data参数根据结构化克隆算法(structured clone algorithm)
进行复制。参考 Mozilla文档(见:https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm):
It builds up a clone by recursing through the input object while maintaining a map of previously visited references in order to avoid infinitely traversing cycles.
它创建一个克隆,基于递归遍历输入的对象,并且维护了一个map,用来记录已经访问过的对象,以避免遍历进入死循环(字面翻译)。
该算法不会拷贝函数、异常、属性描述符(property descriptors)和原型链(prototype chains)。应该注意的是,这种拷贝方式和JSON是有区别的,前者可以包含相互引用(circular references)以及类型数组(typed arrays)等,而JSON则不支持这些。
借助于对类型数组的支持,该算法也使得线程间共享内存成为可能。
线程间共享内存
有人可能会争论说在很早之前像cluster
或child_process
这样的模块就支持使用线程了。怎么说呢,是也不是。
cluster
模块可以创建多个node实例,其中一个是主进程,它对进入的请求在多个实例间进行路由。让应用有多个实例(clustering an application),也能有效提高服务器的负载能力。但cluster
模块无法让我们创建额外的线程。
与自己写代码相比,人们倾向于使用像PM2这样的工具管理多实例的应用。但如果你有兴趣,可以读我的关于如何使用cluster
模块的文章(见: https://medium.freecodecamp.org/how-to-add-socket-io-to-multi-threaded-node-js-df404b424276)。
child_process
可以运行任何一个可执行文件,无论它是不是用JavaScript编写的。它和workder_threads
相当像,但也缺少后者所支持的重要特性。
尤其是,工作者线程更加轻量并且和其父线程共享一个进程ID。它们也可以与父线程共享内存,避免序列化数据带来的开销,因此,数据来回传输更加高效。
现在看一个线程间如何共享内存的例子。要共享内存,必须使用ArrayBuffer
或SharedArrayBuffer
的实例作为data参数或作为data的一部分。
下面是一个和父线程共享内存的工作者线程:
import { parentPort } from 'worker_threads';
parentPort.on('message', () => {
const numberOfElements = 100;
const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * numberOfElements);
const arr = new Int32Array(sharedBuffer);
for (let i = 0; i < numberOfElements; i += 1) {
arr[i] = Math.round(Math.random() * 30);
}
parentPort.postMessage({ arr });
});
首先,我们创建了一个可以容纳100个32位整型的SharedArrayBuffer
实例。接着,我们创建了一个Int32Array
对象,它将使用上面的buffer来保存结构。最后我们把array填满随机数并将其传送给父线程。
在父线程中:
import path from 'path';
import { runWorker } from '../run-worker';
const worker = runWorker(path.join(__dirname, 'worker.js'), (err, { arr }) => {
if (err) {
return null;
}
arr[0] = 5;
});
worker.postMessage({});
通过将arr[0]
修改为 5
,我们实际上在2个线程中都做了修改。
当然,共享内存时,我们也会冒着改变一个线程的数据,也会改变另一个线程的数据这样的风险。但是我们也得到了好处:数据不用序列化,在另一个线程中就能直接使用,这提高了效率。但是要记住管理好对象的引用,当任务结束时才方便垃圾回收。
共享一个整型数组很方便,但我们对共享对象(object)更感兴趣,因为我们通常将信息封装在对象中。不幸的是,没有SharedObjectBuffer
或类似的东西,但也有其它办法(见: https://stackoverflow.com/questions/51053222/nodejs-worker-threads-shared-object-store)。
transferList参数
transferList
只能包含ArrayBuffer
或MessagePort
。一旦它们被传递给另一个线程,传送方就不能再使用它。内存会被转移给接收方线程,对于传送方不再可用。
当下,我们无法将网络套接字(network sockets)包含在transferList
中(而child_process
模块支持这么做)。
创建一个通信通道(channel for communications)
线程之间通过ports通信,ports是MessagePort
的实例,支持基于事件的交流。
有两种方式来使用ports。第一种是默认的,也是两者中较简单的一个。在工作者线程中,我们从worker_threads
模块中引入parentPort
对象,使用它的.postMessage()
方法给父线程发送消息。
例子如下:
import { parentPort } from 'worker_threads';
const data = {
_// ..._
};
parentPort.postMessage(data);
parentPort
也是MessagePort
的一个实例,是Node.js帮我们创建好的,允许用来和父线程通信。这样,我们便能使用parentPort
和worker
对象在线程间通信了。
第二种通信的方法,是手动创建一个MessageChannel
对象,并传递给工作者线程。下面演示了如何创建一个新的MessagePort
并分享给工作者:
import path from 'path';
import { Worker, MessageChannel } from 'worker_threads';
const worker = new Worker(path.join(__dirname, 'worker.js'));
const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => {
console.log('message from worker:', message);
});
worker.postMessage({ port: port2 }, [port2]);
创建好port1
和port2
后,我们给port1
创建好事件监听器并将port2
传递给工作者线程。要实现两边通信,必须把port2
放在transferList
中。
再看工作者线程这边:
import { parentPort, MessagePort } from 'worker_threads';
parentPort.on('message', (data) => {
const { port }: { port: MessagePort } = data;
port.postMessage('here is your message!');
});
这样,我们就能用到父线程传过来的port了。
使用parentPort
也不算错误,但创建一个新的MessagePort
然后共享给其它worker是更好的方式(即: 关注点分离,separation of concerns)。
注意在后续的示例中,为了使事情简单,我也用parentPort
。
使用worker的两种方式
存在2种使用worker的方式。一种是(每次有任务时都)创建一个新的worker, 执行完代码,将结果传回给父线程(,然后销毁worker)。这种方式,每次有新任务来,我们都要重新创建一个worker。
第二种方式,是创建一个worker,并设定好监听器(listeners)。一有message
事件触发,就执行任务并将结果回传给主线程。这种方式可以让worker一直存活着供后续使用。
Node.js文档中推荐使用方式二。因为创建一个工作者线程开销大,它需要创建虚拟机(viertual machine)、解析并执行代码。方式二比频繁创建工作者线程要高效得多。
这种方式也被叫做 线程池(worker pool),因为我们创建了一组线程,让它们等待message事件,在需要时执行任务。(上文只说到了单个线程的复用,多个线程的复用叫线程池更合适一些。)
下面的例子演示了线程的第一种使用方式(创建、使用、退出):
import { parentPort } from 'worker_threads';
const collection = [];
for (let i = 0; i < 10; i += 1) {
collection[i] = i;
}
parentPort.postMessage(collection);
在将collection传递给父线程后,它就退出了。
下面的例子则演示了,工作者在接到新的任务前,可能会等待一段时间(即第二种使用方式,监听message事件,允许多次使用):
import { parentPort } from 'worker_threads';
parentPort.on('message', (data: any) => {
const result = doSomething(data);
parentPort.postMessage(result);
});
worker_threads 模块中一些有用的属性
worker_threads
模块中存在一些属性。
isMainThread
当代码不是在工作者线程中执行时,该属性是true
。如果觉得有必要,你可以在工作者线程中加一个if
语句,来确保该代码是在工作者线程中运行的。
import { isMainThread } from 'worker_threads';
if (isMainThread) {
throw new Error('Its not a worker');
}
workerData
父线程通过Worker的构造方法传递的数据。
const worker = new Worker(path, { workerData });
在工作者线程内:
import { workerData } from 'worker_threads';
console.log(workerData.property);
parentPort
前文提过的MessagePort
的实例,用来和父线程通信。
threadId
设置给worker的惟一标识。
既然了解了技术细节,让我们实现一些功能,在实践中测试我们的知识。
实现setTimeout
setTimeout
用于在指定时间(毫秒)后执行一个函数。
import { parentPort, workerData } from 'worker_threads';
const time = Date.now();
while (true) {
if (time + workerData.time <= Date.now()) {
parentPort.postMessage({});
break;
}
}
上面的实现简单的说就是创建一个线程、执行代码然后退出。
我们尝试编写利用到上述worker的代码。首先,创建一个状态对象,用来记录工作者线程:
const timeoutState: { [key: string]: Worker } = {};
然后写一个负责创建工作者线程和记录状态的函数:
export function setTimeout(callback: (err: any) => any, time: number) {
const id = uuidv4();
const worker = runWorker(
path.join(__dirname, './timeout-worker.js'),
(err) => {
if (!timeoutState[id]) {
return null;
}
timeoutState[id] = null;
if (err) {
return callback(err);
}
callback(null);
},
{
time,
},
);
timeoutState[id] = worker;
return id;
}
首先,我们用UUID模块为worker创建一个惟一的标识,然后我们使用之前定义的工具函数runWorker
来得到一个新的worker。我们还给worker传了一个回调函数,一旦worker执行完,回调就会执行。最后,我们将worker记录在state中,并返回id
。
在回调函数内,我们必须检查worker是否还存在于state中,因为可能已经被cancelTimeout
移除了。如果还在,就从state中移除它,然后执行传递给setTimeout
的回调函数。
cancelTimeout
函数使用.terminate
方法强制worker退出,并将其从state中移除:
export function cancelTimeout(id: string) {
if (timeoutState[id]) {
timeoutState[id].terminate();
timeoutState[id] = undefined;
return true;
}
return false;
}
我写了一段小的测试代码,来检查这个实现和原生的有多少不同。下面是结果:
native setTimeout { ms: 7004, averageCPUCost: 0.1416 }
worker setTimeout { ms: 7046, averageCPUCost: 0.308 }
可以看到我们的setTimeout
有轻微的延迟 —— 大约40ms —— 原因是创建worker的开销。CPU使用也稍高一点,但仍是可承受的。
如果复用工作者线程,我们可以降低延迟和cpu消耗,接下来我们就实现一个自己的线程池。
编写一个编程池
上面说过,线程池是一组已经创建好了的线程,它们待在那里,监听着message
事件。一有message
事件触发,它们就开始工作,完成任务后将结果送回。
为了更好的演示我们将要做的,下面的代码演示了如何创建一个有8个工作者的线程池:
const pool = new WorkerPool(
path.join(__dirname, './test-worker.js'),
8
);
你如果熟悉我的另一篇文章limiting concurrent operations(见 https://medium.freecodecamp.org/how-to-limit-concurrent-operations-in-javascript-b57d7b80d573),你会发现它们逻辑几乎一样,只是使用场景不同。
在上面代码中,我们给WorkerPool
构造方法传入了worker的路径以及需要多少个worker。
export class WorkerPool<T, N> {
private queue: QueueItem<T, N>[] = [];
private workersById: { [key: number]: Worker } = {};
private activeWorkersById: { [key: number]: boolean } = {};
public constructor(public workerPath: string, public numberOfThreads: number) {
this.init();
}
}
这里,我们加了额外的属性,像workersById
和activeWorkersById
,用来保存现有的worker和查询给定worker的状态。还有一个queue
属性,用来保存以下结构的对象:
type QueueCallback<N> = (err: any, result?: N) => void;
interface QueueItem<T, N> {
callback: QueueCallback<N>;
getData: () => T;
}
callback
只是node中常见的回调,error是第一个参数,可能的结果作为第二个参数。getData
是传递给线程池的.run()
方法的(下面会解释),会在任务被处理时调用。getData
返回的数据会传递给工作者线程。
在.init()
方法内,我们创建线程并将它们保存在states中:
private init() {
if (this.numberOfThreads < 1) {
return null;
}
for (let i = 0; i < this.numberOfThreads; i += 1) {
const worker = new Worker(this.workerPath);
this.workersById[i] = worker;
this.activeWorkersById[i] = false;
}
}
为了避免死循环,我们首先确保线程数量大于1。然后我们创建给定数量的线程,并将它们按下标顺序存在workersById
中。线程的运行状态记录在activeWorkersById
中,在初始化时,它们默认为false。
现在到了实现.run()
方法的时候了,一旦有线程可用,任务就会被安排执行。
public run(getData: () => T) {
return new Promise<N>((resolve, reject) => {
const availableWorkerId = this.getInactiveWorkerId();
const queueItem: QueueItem<T, N> = {
getData,
callback: (error, result) => {
if (error) {
return reject(error);
}
return resolve(result);
},
};
if (availableWorkerId === -1) {
this.queue.push(queueItem);
return null;
}
this.runWorker(availableWorkerId, queueItem);
});
}
在promise函数内,我们首先使用.getInactiveWorkerId()
检查是否有线程可用:
private getInactiveWorkerId(): number {
for (let i = 0; i < this.numberOfThreads; i += 1) {
if (!this.activeWorkersById[i]) {
return i;
}
}
return -1;
}
接着,我们创建一个queueItem
,将getData
函数以及回调记录在其中。取决于worker是否执行成功,在回调中我们要么resolve
,要么reject
。
如果availableWorkerId
是-1,说明没有线程可用,我们将queueItem
存在queue
中。如果有线程可用,我们调用.runWorker()
来执行线程。
在.runWorker()
方法内,我们更新activeWorkersById
状态,表明线程被占用了;为线程注册message
和error
监听器(后面还要清理它们);最后,将数据传给worker。
private async runWorker(workerId: number, queueItem: QueueItem<T, N>) {
const worker = this.workersById[workerId];
this.activeWorkersById[workerId] = true;
const messageCallback = (result: N) => {
queueItem.callback(null, result);
cleanUp();
};
const errorCallback = (error: any) => {
queueItem.callback(error);
cleanUp();
};
const cleanUp = () => {
worker.removeAllListeners('message');
worker.removeAllListeners('error');
this.activeWorkersById[workerId] = false;
if (!this.queue.length) {
return null;
}
this.runWorker(workerId, this.queue.shift());
};
worker.once('message', messageCallback);
worker.once('error', errorCallback);
worker.postMessage(await queueItem.getData());
}
首先,根据传入的workerId
,我们从workersById
对象中取得worker的引用。然后,我们更新activeWorkersById
对象的[workerId]
属性为true,表明该worker处于忙碌状态。
然后,我们创建了2个回调,messageCallback
和errorCallback
,用来针对message和error事件。注册完回调后,我们把数据传给了worker。
在回调方法内,我们调用了queueItem
中的回调函数,然后再调用cleanUp
函数。在cleanUp
函数内,我们要确保移除事件监听器,因为同一个worker要复用多次。如果不移除监听器,可能会有内存泄漏;进而会慢慢地用光内存。
我们将activeWorkersById
对像的[workerId]
设置为false后,还检查了queue是否为空。如果不为空,我们从queue
中取出第一个item, 将它传给线程继续执行。
我们来创建一个线程,在监听到message事件后,它会做一些计算:
import { isMainThread, parentPort } from 'worker_threads';
if (isMainThread) {
throw new Error('Its not a worker');
}
const doCalcs = (data: any) => {
const collection = [];
for (let i = 0; i < 1000000; i += 1) {
collection[i] = Math.round(Math.random() * 100000);
}
return collection.sort((a, b) => {
if (a > b) {
return 1;
}
return -1;
});
};
parentPort.on('message', (data: any) => {
const result = doCalcs(data);
parentPort.postMessage(result);
});
该线程创建了一个有100万个随机数的数组,然后对它进行排序。做什么并不重要,只要它能消耗一点时间。
下面是应用用了上面的线程池的例子:
const pool = new WorkerPool<{ i: number }, number>(path.join(__dirname, './test-worker.js'), 8);
const items = [...new Array(100)].fill(null);
Promise.all(
items.map(async (_, i) => {
await pool.run(() => ({ i }));
console.log('finished', i);
}),
).then(() => {
console.log('finished all');
});
我们先是创建了有8个工作者的线程池。然后创建了一个有100个元素的数组,对于每个元素,我们用线程池执行一次任务。开始时,有8个任务立即被执行,剩余的任务会被放在queue中,逐步地执行。有了线程池,我们不再需要每次都创建一个线程,这将大大地提升效率。
总结
worker_threads
模块为多线程操作提供了一个简单易用的方法。通过将繁重的CPU运算代理给其它线程,服务器的吞吐量可以得到明显的提升。随着线程有了官方的支持,我们可以预想到会有更多来自AI、机器学习和大数据领域的的开发者、工程师们开始使用Node.js。