Skip to main content

A Small Guide On NestJS Queues

NestJS Application Queues helps to deal with application scaling and performance challenges.

When To Use Queues?:

API request that mostly involves in time taking operations like CPU bound operation, doing them synchronously which will result in thread blocking. So to avoid these issues, it is an appropriate way to make the CPU-bound operation separate background job. 

In nestjs one of the best solutions for these kinds of tasks is to implement the Queues.
For queueing mechanism in the nestjs application most recommended library is '@nestjs/bull'(Bull is nodejs queue library). The 'Bull' depends on Redis cache for data storage like a job. So in this queueing technique, we will create services like 'Producer' and 'Consumer'. The 'Producer' is used to push our jobs into the Redis stores. The consumer will read those jobs(eg: CPU Bound Operations) and process them.

So by using this queues technique user requests processed very fastly because actually logics(for example time taking operations like CPU related) will be carried out by the consumers later as a background job.

Setup Redis Docker Image Container:

For this sample to use Redis instance locally we will use Docker. If you don't have any prior knowledge of docker, not a problem just follow the steps below. Click here for a video session on Redis docker setup
Note:
Skip this section if you already have redis direct instance or azure or any cloud provider that have redis
Step1:
Download docker into our local system "https://docs.docker.com/desktop/". Docker was available for all desktop operating systems.
Step2:
After downloading the docker installer, then install it. Now to run any docker containers(eg: Redis, MongoDB, PostgreSQL, etc) this docker instance we just installed should be active(should be running).
Step3:
Now we need to pull the docker Redis image from the docker hub "https://hub.docker.com/_/redis".
Command To Pull Redis Image:
docker pull redis
Step4:
The final step to run the docker Redis image container by mapping our local system port. By default, the Redis instance runs with the '6379' default port inside of the docker container. So to access the Redis we need to port mapping on starting off the container.
Command To Start Redis Container:
docker run --name your_containerName -p your_PortNumber:6379 -d redis
The '--name your_containerName' flag to specify the Redis container name. The '-p your_PortNumber:6379' mapping the Redis port '6379' to our local machine port all our application will use local machine port to communicate with Redis. The '-d' flag represents to run the container in the detached mode which means run in the background. At the last of the command 'redis' to specify the image to run in our container.
Step5:
After creating a docker container, it will be stored in our local machine so to start again the container any time run the following command
docker start your_container_name

Step6:(Optional Step)
Let test our Redis instance

Command To Use Redis CLI
docker exec -it your_docker_container_name redis-cli

Create A Sample NestJS Application:

Let's begin our journey by creating a sample NetsJS application.
Command To Install CLI:
npm i -g @nestjs/cli
Command To Create NestJS App:
nest new your_project_name

Install Bull Library Package:

npm install --save @nestjs/bull bull

npm install --save-dev @types/bull

Register BullModule And Setup Redis Connection:

Configure the BullModule into the AppModule.ts. The Bull library depends on the 'Redis' store for data communication. So BullModule provides configuration options to register the 'Redis' store.
src/app.module.ts:
import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';

@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 5003,
      },
    }),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

Register A Queue:

Use the 'registerQueue' method from 'BullModule' to configure a queue. So queue a can be identified by its name, we have to name our queue on configuring. Also support multiple queues can be registered as well.
src/app.module.ts:
import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';

@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 5003,
      },
    }),
    BullModule.registerQueue({
      name:'message-queue'
    })
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}
  • Here configure a new queue and its name is 'message-queue'.

Create A Producer To Push Jobs Into Queue:

Job producers add the job to the queues. Producers are typically services. Now we will implement a producer to push the messages into the queue.

Note: Pushing messages into the queues might not be the ideal task. But for beginners to understand the nestjs queues it is the ideal choice.

so let's create a new Producer like 'message.producer.service.ts'.
src/message.producer.service.ts:
import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';

@Injectable()
export class MessageProducerService {
  constructor(@InjectQueue('message-queue') private queue: Queue) {}

  async sendMessage(message:string){
    await this.queue.add('message-job',{
        text: message
    });
  }
}
  • So to read the queue we have to use '@InjectQueue' decorator along with the name of the queue like 'message-queue'.
  • To push the job into the queue, we have to use the 'add' method of the 'Queue' instance. Here we can specify the job name as well. So this job name will be used by the consumer to read messages as well.
  • So to the 'add' method we can pass the data of our job.
Register the 'MessageProducerService' into the provider array of our AppModule.
src/message.producer.service.ts:
import { MessageProducerService } from './message.producer.service';
@Module({
  providers: [MessageProducerService],
})
export class AppModule {}
Let's create a new endpoint to invoke our newly created producer service.
src/app.controller.ts:
import { Controller, Get, Query } from '@nestjs/common';
import { MessageProducerService } from './message.producer.service';

@Controller()
export class AppController {
  constructor(
    private readonly messageProducerService:MessageProducerService) {}
	
  @Get('invoke-msg')
  getInvokeMsg(@Query('msg') msg:string){
    this.messageProducerService.sendMessage(msg);
    return msg;
  }
}

Create A Consumer To Read Jobs:

A consumer is a class-defining method that processes jobs added into the queue. To make a class consumer it should be decorated with '@Processor()' and with the queue name.  A consumer class must contain a handler method to process the jobs. The handler method should register with '@Process()'.
src/message.consumer.ts:
import { Process, Processor } from "@nestjs/bull";
import { Job } from "bull";

@Processor('message-queue')
export class MessageConsumer {

    @Process('message-job')
    readOperationJob(job:Job<unknown>){
        console.log(job.data);
    }
}
Now register our 'MessageConsumer' class in provider array of AppModule. By registering automatically NestJS application activates consumer to read and process jobs.
src/app.module.ts:
import { MessageConsumer } from './message.consumer';

@Module({
  providers: [MessageProducerService],
})
export class AppModule {}
Now to test our queue, run the application then access the endpoint and check the console log.

Create A New Queue For CPU Bound Operation:

Our previous queue is a simple message reading job, which is good for understand about the nestjs queue concept. Now here we will create a new queue that depends on CPU operations.

Case Study:
Let's assume we need to create an endpoint to delete the files from the physical folder path and database as well. Deleting files will be CPU dependent operation, so doing this process synchronously with the user request to the endpoint might lead to an increase in the time taken by the endpoint to serve the users. This may lead to user thread blocking as well.

Queue Solution:
For the above case study best solution is to implement the queues. So if we implement queues then our logic should be like, in the endpoint request, we have to delete the file name from the database, and then we need to save the physical path for the file in the Redis as job data then return the response to the user immediately. So by this time file is not deleted from the physical location. Now server consumer runs regular interval of times to finish the jobs in the Redis store. So a consumer on reading our job related to file deletion and its physical path value then it will execute the job handler method to delete the file as a background job.

Let's register a new queue in AppModule.
src/app.module.ts:
import { BullModule } from '@nestjs/bull';
// code hidden for display purpose

@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 5003,
      },
    }),
    BullModule.registerQueue(
      {
        name: 'message-queue',
      },
      {
        name: 'file-operation-queue',
      },
    ),
  ]
})
export class AppModule {}
  • Our new queue name is 'file-operation-queue'
Let's create a new producer like 'file.producer.service.ts'.
src/file.producer.service.ts:
import { InjectQueue } from "@nestjs/bull";
import { Injectable } from "@nestjs/common";
import { Queue } from "bull";

@Injectable()
export class FileProducerService{
    constructor(@InjectQueue('file-operation-queue') private queue:Queue){

    }

    async deleteFile(fileName:string){
        let filePath = `C:/Users/hchowdary/Desktop/testimages/${fileName}`;
        // implementh logic delete the file record from database.
        await this.queue.add('delete-file',{
            filePath: filePath
        });
    }
}
  • Here we pushing the physical location of the file into the Redis store. 
Register our 'FileProducerService' in AppModule.
src/app.module.ts:
import { FileProducerService } from './file.producer.service';
// code hidden for displya purpose
@Module({
  providers: [
    FileProducerService,
  ],
})
export class AppModule {}
Now create a new endpoint as below.
src/app.controller.cs:
import { FileProducerService } from './file.producer.service';
// code hidden for display purpose
@Controller()
export class AppController {
  constructor(
    private readonly fileProducerService:FileProducerService) {}

  @Get('remove-file')
  async deleteFile(@Query('file') file:string){
    await this.fileProducerService.deleteFile(file);
    return 'file deleted';
  }
}
Let's create a Consumer for our new queue.
src/file.consumer.ts:
import { Process, Processor } from "@nestjs/bull";
import { Job } from "bull";
import * as fs from 'fs';

@Processor('file-operation-queue')
export class FileConsumer{

    @Process('delete-file')
    async filedeletionJob(job:Job<unknown>){
        let jobData:any = job.data;
        fs.unlinkSync(jobData.filePath);
    }
}
  • Here consumer deleting the file from the specified location as a background job.
Now register our consumer in the provider array of  AppModule.
src/app.module.ts:
import { FileConsumer } from './file.consumer';
// code hidden for display purpose
@Module({
  providers: [
    FileConsumer,
  ],
})
export class AppModule {}
So that all about the usage of queues in the nestjs application.

Video Session:

 

Support Me!
Buy Me A Coffee PayPal Me

Wrapping Up:

Hopefully, I think this article delivered some useful information on queues in NestJS. I love to have your feedback, suggestions, and better techniques in the comment section below.

Follow Me:

Comments

Popular posts from this blog

Endpoint Routing In Asp.Net Core

How Routing Works In  Core 2.1 And Below Versions?: In Asp.Net Core routing is configured using app.UseRouter() or app.UseMvc() middleware. app.UseMvc(routes => { routes.MapRoute( name: "default", template: "{controller=Home}/{action=Index}/{id?}"); }); Here in Dotnet Core version 2.1 or below versions on the execution of route middleware request will be navigated appropriate controller matched to the route. An operation or functionality which is dependent on route URL or route values and that need to be implemented before the execution of route middleware can be done by accessing the route path from the current request context as below app.Use(async (context, next) => { if(context.Request.Path.Value.IndexOf("oldvehicle") != -1) { context.Response.Redirect("vehicle"); } else { await next(); } }); app.UseMvc(routes => { routes.MapRoute( name: "vehicleRoute", template: "vehicle", defaul

Asp.Net Core MVC Form Validation Techniques

Introduction: Form validations in any applications are like assures that a valid data is storing on servers. All programing frameworks have their own individual implementations for form validations. In Dotnet Core MVC application server-side validations carried on by the models with the help of Data Annotations and the client-side validations carried by the plugin jQuery Unobtrusive Validation. jQuery Unobtrusive Validation is a custom library developed by Microsoft based on the popular library  jQuery Validate . In this article, we are going to learn how the model validation and client-side validation works in Asp.Net Core MVC Application with sample examples. Getting Started: Let's create an Asp.Net Core MVC application project using preferred editors like Microsoft Visual Studio or Microsoft Visual Studio Code. Here I'm using Visual Studio. Let's create an MVC controller and name it as 'PersonController.cs' and add an action method as bel

How Response Caching Works In Asp.Net Core

What Is Response Caching?: Response Caching means storing of response output and using stored response until it's under it's the expiration time. Response Caching approach cuts down some requests to the server and also reduces some workload on the server. Response Caching Headers: Response Caching carried out by the few Http based headers information between client and server. Main Response Caching Headers are like below Cache-Control Pragma Vary Cache-Control Header: Cache-Control header is the main header type for the response caching. Cache-Control will be decorated with the following directives. public - this directive indicates any cache may store the response. private - this directive allows to store response with respect to a single user and can't be stored with shared cache stores. max-age - this directive represents a time to hold a response in the cache. no-cache - this directive represents no storing of response and always fetch the fr

.NET Core MVC Application File Upload To Physical Location With Buffered Technique

Buffering Technique In File Upload: The server will use its Memory(RAM) or Disk Storage to save the files on receiving a file upload request from the client.  Usage of Memory(RAM) or Disk depends on the number of file requests and the size of the file.  Any single buffered file exceeding 64KB is moved from Memory to a temp file on disk.  If an application receives heavy traffic of uploading files there might be a chance of out of Disk or RAM memory which leads to crash application. So this Buffer technique used for small files uploading. In the following article, we create a sample for the file uploading using .NET Core MVC application. Create The .NET Core MVC Project: Let's create a .NET Core MVC project, here for this sample I'm using Visual Studio Code as below.   Check the link to use the Visual Studio Code for .NET Core Application . IFormFile: Microsoft.AspNetCore.Http.IFormFile used for file upload with buffered technique. On uploading files f

Ionic Picker Sample Code In Angular

Introduction: Ionic Picker(ion-picker) is a popup slides up from the bottom of the device screen, which contains rows with selectable column separated items. The main building block of ion-picker as follows: PickerController PickerOptions PickerController: PickerController object helps in creating an ion-picker overlay. create(opts?: Opts): Promise<Overlay> PickerController create method helps in create the picker overlay with the picker options PickerOptions: PickerOptions is a configuration object used by PickerController to display ion-picker. Single Column Ionic Picker: single.item.picker.ts: import { Component } from "@angular/core"; import { PickerController } from "@ionic/angular"; import { PickerOptions } from "@ionic/core"; @Component({ selector: "single-column-picker", templateUrl:"single.item.picker.html" }) export class SingleItemPicker { animals: string[] = ["Tiger&quo

Blazor WebAssembly Custom Authentication From Scratch

In this article, we are going to explore and implement custom authentication from the scratch. In this sample, we will use JWT authentication for user authentication. Main Building Blocks Of Blazor WebAssembly Authentication: The core concepts of blazor webassembly authentication are: AuthenticationStateProvider Service AuthorizeView Component Task<AuthenticationState> Cascading Property CascadingAuthenticationState Component AuthorizeRouteView Component AuthenticationStateProvider Service - this provider holds the authentication information about the login user. The 'GetAuthenticationStateAsync()' method in the Authentication state provider returns user AuthenticationState. The 'NotifyAuthenticationStateChaged()' to notify the latest user information within the components which using this AuthenticationStateProvider. AuthorizeView Component - displays different content depending on the user authorization state. This component uses the AuthenticationStateProvider

GraphQL API Integration In Asp.Net Core Application

Introduction: GraphQL is a query language for your API and a server-side runtime for executing queries by using a type system you define for your data. GraphQL can be integrated into any framework like ASP.NET, Java, NestJs, etc and it isn't tied to any specific database or storage engine and is instead backed by your existing code and data. How GraphQL API Different From Rest API: GraphQL exposes a single end-point or route for the entire application, regardless of its responses or actions. HTTP-POST is the only Http verb recommended by the GraphQL. The client applications (consumers of API) can give instructions to GraphQL API about what type of properties to be returned in the response. Building Blocks Of GraphQL API: The main building blocks of GraphQL API is Schemas and Types.  A 'Schema' in GrpahQL API describes the functionality available to the clients connect to API. Schema mostly consists of GraphQL Object Types, Queries, Mutations, etc. T

ASP.NET Core Web API Versioning

Introduction: An iteration and evolutionary changes of an ASP.NET Core Web API is handled by Versioning. Versioning of an API gives confidence to the clients which consumes API for a long time. Any changes or development of an API will be accessible using the new version and it won't cause issues to the clients consuming the old version of API. When To Use Versioning: Any API response changes. Developing an API by implementing testing levels like 'Alpha', 'Beta', and 'RC' versions before releasing Production. Deprecating an API which means API going to be removed or upgraded by a version within a short period. Versioning Types: Query String Versioning Url Path Versioning Media Type Versioning API Version Nuget: To Configure versioning to AspNet Core Web API Microsoft provided a library(Microsoft.AspNetCore.Mvc.Versioning). So to use the versioning library please install NuGet below.              Install-Package Microsoft.A

.Net Core HttpClient JSON Extension Methods Using System.Net.Http.Json Package

.Net Core 3.0 onwards Microsoft brought up a new package called System.Net.Http.Json. This new package provides JSON extension methods for HttpClient. These JSON extension methods will have a prebuild mechanism for serializing or deserializing response data or payload of HttpClient call. System.Net.Http.Json extension methods that are provided to HttpClient, few of them are mentioned below. GetFromJsonAsync PostAsJsonAsync PutAsJsonAsync ReadFromJsonAsync In this article, we understand System.Net.Http.Json package by implementing the HttpClient samples by with and without JSON extension methods and compare them. Create A .Net Core Web API Sample Application: Let's create a .Net Core sample Web API application, from this application we will consume another Web API by implementing HttpClient calls. We can create a Web API sample application using IDE like Visual Studio 2019(Supports .Net Core 3.0 plus) or  Visual Studio Code . Create A Typed Client: In .Net Core using the Http

Blazor Server CRUD Operations

Introduction: Blazor Server is a web framework to develop server-side single-page applications. Blazor is made up of components with the combinations on C#, Html, CSS.  Blazor Server is production-ready from the .Net Core 3.0.  Blazor Server Working Mechanism: Blazor Server is a very light-weight web development framework.  In Blazor Server, not all code gets downloaded to the client browsers. Blazor Server made of components these components can be a block of code or page with respective navigation.  Blazor server application communicates with the server with a SignalR background connection which is inbuilt functionality. Application click,  form submission, change events, application page navigation every operation is carried out by the SignalR connection by communicating with the server.  Blazor updates the Html DOM very gently on every data update without any overhead. Blazor Server application maintains a nice intelligent tree structure to update the required inform