Worker Threads in Node.js

Worker Threads in Node.js

Introduction

Workers threads in Node.js are a way to offload CPU intensive tasks away from the single-threaded process which Node gives you.

Firstly, we need to understand why you can't put a CPU intensive task in the main process of your Node.js instance. This is because Node.js is single-threaded and you only get one process out of the box, A process is a global object that has the information of what is being executed at the time.

I have but One Thread to give - Node.js

The decision to make Node.js single-threaded came from the decision of not changing the language design itself, Adding a multithread module to Javascript can change the way the language is written itself.

Node.js has one event loop, this is what gives Node it's asynchronous nature by offloading operations to the system's kernel and getting back results through the use of callbacks, promises and async/await, thus we don't have to worry about concurrency problems.

This can become an issue when you have a CPU intensive task to be executed. For example, performing synchronous tasks that takes a lot of time to be executed or has complex mathematical calculations that can block the thread while it's being executed, which means all other tasks to be executed at that time has to wait. If it was an API request any other HTTP request that comes in at that time would be blocked, which keeps the end-user waiting, A solution for this is the use of worker threads.

Working with Worker Threads

We would be using worker threads to calculate Fibonacci of numbers and also making use of Atomics and Shared Buffers to help us handle race conditions between our threads.

Check out this wonderful article about Shared buffers and Atomics here

We can easily use the worker thread module by importing it into our file.

const { Worker } = require('worker_threads');

Main Process

// main.js
const { Worker } = require("worker_threads");
const runFibonnaci = (nums) => {
    // get the length of the array
    let length = nums.length;

    // int32 buffer of each element in the array
    let size = Int32Array.BYTES_PER_ELEMENT * length;

    // Create buffer for the size ofthe input array
    let sharedBuffer = new SharedArrayBuffer(size);
    let sharedArray = new Int32Array(sharedBuffer);


    for(let i = 0; i < length; i++ ) {
        // store each value into the shareArray 
        Atomics.store(sharedArray, i, nums[i]);

        // Spin up a new worker thread
        let worker = new Worker('./worker.js');

       // Once calculation is done print out result
        worker.once('message', (message) => {
            console.log('Result received --- ', message);
        })

        // Send array data and index to worker thread.
        worker.postMessage({data: sharedArray, index: i});
    }
};

runFibonnaci([50, 20, 21, 24, 4 ]);

The rubFibonnaci function accepts an array of numbers to be calculated in the worker thread, The sharedBuffer variable is created using the SharedArrayBuffer class from the size variable which creates the size of the sharedArrayBuffer.

// get the length of the array
    let length = nums.length;

    // int32 buffer of each element in the array
    let size = Int32Array.BYTES_PER_ELEMENT * length;

    // Create buffer for the size ofthe input array
    let sharedBuffer = new SharedArrayBuffer(size);
    let sharedArray = new Int32Array(sharedBuffer);

The sharedArray variable is also created using the int32Array class to create an array of 32 bit signed integers. We are use Atomics to store our sharedArray so each worker thread can access the shareArray variable from a single memory instance, Atomics only works with SharedArrayBuffers and ArrayBuffers.

When memory is shared, multiple threads can read and write the same data in memory. Atomic operations make sure that predictable values are written and read, that operations are finished before the next operation starts and those operations are not interrupted. According to the MDN Docs

We proceed to loop through through the nums array passed into the runFibonnaci function, then store each value, using the Atomic.store static function.

for(let i = 0; i < length; i++ ) {
        // store each value into the shareArray 
        Atomics.store(sharedArray, i, nums[i]);

        // Spin up a new worker thread
        let worker = new Worker('./worker.js');

       // Once calculation is done print out result
        worker.once('message', (message) => {
            console.log('Result received --- ', message);
        })

        // Send array data and index to worker thread.
        worker.postMessage({data: sharedArray, index: i});
   }

We then spin up a new worker thread and send the sharedArray and the index into the worker thread. The worker.once('message') function is called once the worker thread has finished executing its task and returns a value, which we would see in the worker file below.

Worker Process

// worker.js
const { Worker, isMainThread, parentPort } = require('worker_threads');

// Listen for message from main thread
parentPort.once('message', (event) => {
    const sharedArray = event.data;
    const index = event.index;

    const arrValue = Atomics.load(sharedArray, index);
    const fibonaciValue = calculateFibonacci(arrValue);
    parentPort.postMessage(fibonaciValue);   

});


const calculateFibonacci = (num) => {
    var a = 1, b = 0, temp;

    while (num >= 0){
      temp = a;
      a = a + b;
      b = temp;
      num--;
    }

    return b;
}

The parentPort.once function is called once the worker is initialized and data is passed into it, it loads the sharedArray and index and stores it in a variable. the arrValue fetch the value from the sharedArray using the Atomics.load function, then calculates the Fibonacci of the value by calling the calculateFibonacci function, it then returns the value to the main process to be printed on the console.

You can run the code by running this command on the console node main.js .

// console 
Fibonacci received ---  20365011074
Fibonacci received ---  17711
Fibonacci received ---  75025
Fibonacci received ---  10946
Fibonacci received ---  5

Conclusion

Using worker threads can help your Node.js application by executing tasks that are CPU intensive in threads, worker threads don't magically make your application faster, but it can help in situations where some particular sets of instructions are blocking the single process and making other tasks fail.

Source code can be found here

Photo by K15 Photos on Unsplash