加入收藏 | 设为首页 | 会员中心 | 我要投稿 核心网 (https://www.hxwgxz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 建站 > 正文

Node.js多线程完全指南

发布时间:2019-03-29 23:34:32 所属栏目:建站 来源:疯狂的技术宅
导读:很多人都想知道单线程的 Node.js 怎么能与多线程后端竞争。考虑到其所谓的单线程特性,许多大公司选择 Node 作为其后端似乎违反直觉。要想知道原因,必须理解其单线程的真正含义。 JavaScript 的设计非常适合在网上做比较简单的事情,比如验证表单,或者说

在 promise 函数里,我们首先通过调用 .getInactiveWorkerId() 来检查是否存在空闲的 worker 可以来处理数据:

  1. private getInactiveWorkerId(): number {  
  2.   for (let i = 0; i < this.numberOfThreads; i += 1) {  
  3.     if (!this.activeWorkersById[i]) {  
  4.       return i;  
  5.     }  
  6.   }  
  7.   return -1;  

接下来,我们创建一个 queueItem,在其中保存传递给 .run() 方法的 getData 函数以及回调。在回调中,我们要么 resolve 或者 reject promise,这取决于 worker 是否将错误传递给回调。

如果 availableWorkerId 的值是 -1,意味着当前没有可用的 worker,我们将 queueItem 添加到 queue。如果有可用的 worker,则调用 .runWorker() 方法来执行 worker。

在 .runWorker() 方法中,我们必须把当前 worker 的 activeWorkersById 设置为使用状态;为 message 和 error 事件设置事件监听器(并在之后清理它们);最后将数据发送给 worker。

  1. private async runWorker(workerId: number, queueItem: QueueItem<T, N>) {  
  2.  const worker = this.workersById[workerId];  
  3.  this.activeWorkersById[workerId] = true;  
  4.  const messageCallback = (result: N) => {  
  5.    queueItem.callback(null, result);  
  6.    cleanUp();  
  7.  };  
  8.  const errorCallback = (error: any) => {  
  9.    queueItem.callback(error);  
  10.    cleanUp();  
  11.  };  
  12.  const cleanUp = () => {  
  13.    worker.removeAllListeners('message');  
  14.    worker.removeAllListeners('error');  
  15.    this.activeWorkersById[workerId] = false;  
  16.    if (!this.queue.length) {  
  17.      return null;  
  18.    }  
  19.    this.runWorker(workerId, this.queue.shift());  
  20.  };  
  21.  worker.once('message', messageCallback);  
  22.  worker.once('error', errorCallback);  
  23.  worker.postMessage(await queueItem.getData());  

首先,通过使用传递的 workerId,我们从 workersById 中获得 worker 引用。然后,在 activeWorkersById 中,将 [workerId] 属性设置为true,这样我们就能知道在 worker 在忙,不要运行其他任务。

接下来,分别创建 messageCallback 和 errorCallback 用来在消息和错误事件上调用,然后注册所述函数来监听事件并将数据发送给 worker。

在回调中,我们调用 queueItem 的回调,然后调用 cleanUp 函数。在 cleanUp 函数中,要删除事件侦听器,,因为我们会多次重用同一个 worker。如果没有删除监听器的话就会发生内存泄漏,内存会被慢慢耗尽。

在 activeWorkersById 状态中,我们将 [workerId] 属性设置为 false,并检查队列是否为空。如果不是,就从 queue 中删除第一个项目,并用另一个 queueItem 再次调用 worker。

接着创建一个在收到 message 事件中的数据后进行一些计算的 worker:

  1. import { isMainThread, parentPort } from 'worker_threads';  
  2. if (isMainThread) {  
  3.  throw new Error('Its not a worker');  
  4. }  
  5. const doCalcs = (data: any) => {  
  6.  const collection = [];  
  7.  for (let i = 0; i < 1000000; i += 1) {  
  8.    collection[i] = Math.round(Math.random() * 100000);  
  9.  }  
  10.  return collection.sort((a, b) => {  
  11.    if (a > b) {  
  12.      return 1;  
  13.    }  
  14.    return -1;  
  15.  });  
  16. };  
  17. parentPort.on('message', (data: any) => {  
  18.  const result = doCalcs(data);  
  19.  parentPort.postMessage(result);  
  20. }); 

worker 创建了一个包含 100 万个随机数的数组,然后对它们进行排序。只要能够多花费一些时间才能完成,做些什么事情并不重要。

(编辑:核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读