暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

NodeJS 10.5.0 中的线程:实用介绍

IT实战联盟 2018-06-27
129

几天前,Node.js版本10.5.0发布,其中包含的主要功能之一是添加了(实验)线程支持。

这很有趣,特别是来自一种总以不需要的线程感到自豪的语言,这要归功于它非常棒的异步I O。那么为什么我们需要Node中的线程呢?

简洁的答案是:过去的Node中唯一的区域中表现突出的问题:处理繁重的CPU密集型计算这主要是为什么Node.js在人工智能,机器学习,数据科学等领域不够强大的原因。有很多努力正在解决这个问题,但我们仍然没有像部署微服务时那样表现出色。

因此,我将尝试将官方文档提供的技术文档简化为更实用,更简单的示例。希望这足以让你开始。

那么我们如何使用新的线程模块呢?

首先,您将需要一个名为“worker_threads”的模块。

请注意,只有 --experimental-worker
在执行脚本时使用该标志才能使用,否则将不会找到该模块。

注意flag是指worker而不是线程,这是他们在整个文档中被引用的方式:工作者线程或简单的worker。

如果您以前使用过multi-processing,您会发现这种方法有很多相似之处,但如果您没有,请不要担心,我会尽可能多地解释。

你可以用他们做什么?

就像我之前提到的,工作线程是指CPU密集型任务,使用它们进行I / O会浪费资源,因为根据官方文档,Node提供的处理异步I / O的内部机制要多得多比使用工作线程更有效率,所以...不要麻烦。

我们从一个简单的例子开始,介绍如何创建一个 worker
并使用它。

实例1

  1. const { Worker, isMainThread,  workerData } = require('worker_threads');


  2. let currentVal = 0;

  3. let intervals = [100,1000, 500]


  4. function counter(id, i){

  5.    console.log("[", id, "]", i)

  6.    return i;

  7. }


  8. if(isMainThread) {

  9.    console.log("this is the main thread")

  10.    for(let i = 0; i < 2; i++) {

  11.        let w = new Worker(__filename, {workerData: i});

  12.    }


  13.    setInterval((a) => currentVal = counter(a,currentVal + 1), intervals[2], "MainThread");

  14. } else {


  15.    console.log("this isn't")


  16.    setInterval((a) => currentVal = counter(a,currentVal + 1), intervals[workerData], workerData);


  17. }

复制

上面的例子将简单地输出一组显示递增计数器的行,它们将使用不同的速度增加它们的值。

让我们分解一下:

  1. IF语句中的代码创建2个工作线程[worker threads],由于__filename 参数传递,它们的代码从同一个文件中获取。工作线程[Workers]现在需要完整的文件路径,他们不能处理相对路径,所以这就是为什么使用这个值。

  2. 这两名worker将作为全局参数发送一个值,其形式为workerData,您在第二个参数中看到的属性。然后可以通过具有相同名称的常量访问该值(请参阅常量是如何在文件的第一行中创建的,并在以后的最后一行中使用)。

这个例子是你可以用这个模块做的最基本的事情之一,但它不是很有趣,是吗?我们来看另一个例子。

实例2

让我们现在尝试做一些“繁重”的计算,同时在主线程中做一些异步的东西。

  1. const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

  2. const request = require("request");



  3. if(isMainThread) {

  4.    console.log("This is the main thread")


  5.    let w = new Worker(__filename, {workerData: null});

  6.    w.on('message', (msg) => { //A message from the worker!

  7.        console.log("First value is: ", msg.val);

  8.        console.log("Took: ", (msg.timeDiff / 1000), " seconds");

  9.    })

  10.    w.on('error', console.error);

  11.    w.on('exit', (code) => {

  12.        if(code != 0)

  13.            console.error(new Error(`Worker stopped with exit code ${code}`))

  14.   });


  15.    request.get('http://www.google.com', (err, resp) => {

  16.        if(err) {

  17.            return console.error(err);

  18.        }

  19.    console.log("Total bytes received: ", resp.body.length);

  20.    })


  21. } else { //the worker's code


  22.    function random(min, max) {

  23.        return Math.random() * (max - min) + min

  24.    }


  25.    const sorter = require("./test2-worker");


  26.    const start = Date.now()

  27.    let bigList = Array(1000000).fill().map( (_) => random(1,10000))


  28.    sorter.sort(bigList);

  29.   parentPort.postMessage({ val: sorter.firstValue, timeDiff: Date.now() - start});


  30. }

复制
  1. <!--test2-worker-->


  2. module.exports = {

  3.    firstValue: null,

  4.    sort: function(list) {

  5.        let sorted = list.sort();

  6.        this.firstValue = sorted[0]

  7.    }

  8. }

复制

这一次,我们访问Google.com的首页,同时对随机生成的100万个数字进行排序。这花费了几秒钟的时间,所以对我们来说,这是完美的表现。我们还将测量工作线程执行排序所需的时间,并且我们将把该值(以及第一个排序后的值)发送到主线程,在那里我们将显示结果。

这个例子的主要内容是线程之间的通信。

Workers可以通过该 on方法在主线程
接收消息
。我们可以听到的事件是代码中显示的事件。该message每当我们使用从实际线程发送消息触发事件 parentPort.postMessag
e的方法。您也可以使用相同的方法在您的Workers实例上向线程的代码发送消息,并使用 parentPort对象捕获它们。

现在让我们看看一个非常相似的例子,但是使用更简洁的代码,给你一个关于如何构建工作线程代码的最终想法。

例3:将它们放在一起

作为最后一个例子,我将继续使用相同的功能,但向您展示如何将其清理干净并拥有更易维护的版本。

  1. const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

  2. const request = require("request");


  3. function startWorker(path, cb) {

  4.    let w = new Worker(path, {workerData: null});

  5.    w.on('message', (msg) => {

  6.        cb(null, msg)

  7.    })

  8.    w.on('error', cb);

  9.    w.on('exit', (code) => {

  10.        if(code != 0)

  11.            console.error(new Error(`Worker stopped with exit code ${code}`))

  12.   });

  13.    return w;

  14. }


  15. console.log("this is the main thread")


  16. let myWorker = startWorker(__dirname + '/workerCode.js', (err, result) => {

  17.    if(err) return console.error(err);

  18.    console.log("[[Heavy computation function finished]]")

  19.    console.log("First value is: ", result.val);

  20.    console.log("Took: ", (result.timeDiff / 1000), " seconds");

  21. })


  22. const start = Date.now();

  23. request.get('http://www.google.com', (err, resp) => {

  24.    if(err) {

  25.        return console.error(err);

  26.    }

  27.    console.log("Total bytes received: ", resp.body.length);

  28.    //myWorker.postMessage({finished: true, timeDiff: Date.now() - start}) //you could send messages to your workers like this

  29. })

复制

你的线程代码可以在另一个文件中,比如:

  1. const {  parentPort } = require('worker_threads');


  2. function random(min, max) {

  3.    return Math.random() * (max - min) + min

  4. }


  5. const sorter = require("./test2-worker");


  6. const start = Date.now()

  7. let bigList = Array(1000000).fill().map( (_) => random(1,10000))


  8. /**

  9. //you can receive messages from the main thread this way:

  10. parentPort.on('message', (msg) => {

  11.    console.log("Main thread finished on: ", (msg.timeDiff / 1000), " seconds...");

  12. })

  13. */


  14. sorter.sort(bigList);

  15. parentPort.postMessage({ val: sorter.firstValue, timeDiff: Date.now() - start});

复制

我们看到:

  1. 主线程和工作线程现在将他们的代码放在不同的文件中。这更容易维护和扩展。

  2. 该startWorker函数返回新的实例,如果您愿意,您可以稍后向其发送消息。

  3. 如果主线程的代码实际上是主线程(我们删除了主要的IF语句),则不再需要担心。

  4. 您可以在worder的代码中看到如何从主线程接收消息,从而实现双向异步通信。

这将是这篇文章的重点,我希望你已经足够了解如何开始玩这个新模块。请记住:

  1. 这仍然是高度实验性的,这些东西在未来的版本中可能会改变;

  2. 去阅读评论和文档,那里有更多关于这方面的信息,我只关注实现它的基本步骤。

  3. 玩的开心!报告错误并提出改进建议,这才刚刚开始!

欢迎关注IT实战联盟


文章转载自IT实战联盟,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论