Skip to main content

Part-4 Asynchronous Data Communication Between Microservices Using RabbitMQ Message Broker With MassTransit[.NET6 Microservice Series]

In this article, we are going to establish asynchronous data communication between microservice using the RabbitMQ message broker along with MassTransit.

Now let's take look at the 'Orders' GET endpoint response.
Here we can understand we don't have 'Product' information. So we have to make a way to save the required 'Product' information into the 'SalesBusiness.API' microservice application from the 'Manufacture.API' microservice application.

Steps we are going to accomplish this demo are:
  • Configure RabbitMQ message broker with MassTransit for asynchronous data communication between the microservices.
  • In the 'Manufacture' database we have a 'Products' table(like Master table), now we will create a new 'Products' table(like child table) in 'SalesDatabase', but here we will add only required columns. So that while fetching the 'Orders' endpoint it is to fetch required products information.
  • So with help of RabbitMQ, we always maintain consistent or exact data between both parent and child tables.

Part-3 Create A Microservice For The Orders Endpoint[.NET6 Microservice Series]

RabbitMQ:

RabbitMQ is a message broker that helps to share data asynchronously between the publishers and receivers.
RabbitMQ exchange flow:
  • RabbitMQ supports different exchange types like 'Direct', 'Topic', 'Headers', 'Fanout'. By default RabitMQ uses 'Fanout' exchange, we also use this fanout exchange for our demo.
  • In Fanout exchange when a producer sends a message to RabbitMQ then the message will be sent to the 'FanoutExchange' and then the from the fanout exchange messages will be delivered to all queues then from the queued messages will be received by the respective receivers.

MassTransit:

MassTransit is free open-source software that can wrap around message brokers like 'RabbitMQ', 'Azure Service Bus', 'SQS', 'ActiveMQ Service Bus' etc.

MassTransit provides an easy way to configure the message brokers in our .NET applications.

RabbitMQ Docker-Compose:

step1:
Install Docker Desktop into your local machine.(https://www.docker.com/products/docker-desktop/)

step2:
Now create a folder of any name then in that folder add the 'docker-compose.yaml' file. Then add the following docker-compose configurations.
docker-compose.yaml:
version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - 15672:15672
      - 4001:5672
    container_name: myrabbitcontainer
  • (Line: 1) The 'version' is the docker-compose document version.
  • (Line: 3) The 'services' represents software or service we are going to run on docker.
  • (Line: 4) The 'rabbitmq' custom name of our service, is recommended giving a meaningful name that represents the service.
  • (Line: 5) Docker image of 'RabbitMQ'.
  • (Line: 7) The right-hand side port '15672' runs the RabbitMQ admin tool.
  • (Line: 8) The right-hand side port '5672' runs the RabbitMQ service.
  • (Line: 9) Name of the container to run our 'RabbitMQ' image.
Step3:
Command to run the docker container.
docker-compose up -d

Create A Shared Library Contains DTOs For RabbitMQ Messages:

Our two microservices in which one will act as publisher and another will act as a consumer using the RabbitMQ message broker. So the 'Type' of the message will be a model(POCO class) that will be created in 'Shared Library' that will be consumed by both our Microservices applications.

Now create a .NET Library.
CLI command
dotnet new classlib -o Your_Project_Name

Note: Now add the newly created class library reference into both microservice applications.

Now in the shared library let's add a new folder like 'RabbitMqModels', next add a model(POCO class) like 'ProductCreated'(this type of our message shared by RabbitMq).
{Shared_Project}/RabbitMqModels/ProductCreated.cs:
namespace Shared.Models.RabbitMqModel;

public class ProductCreated
{
    public int Id { get; set; }
    public string Name { get; set; }
}

Install NuGet Packges:

To configure RabbitMQ and MassTransit we have to install the following NuGet Package into our both microservice applications.

Install 'MassTransit' NuGet Package.
CLI command
dotnet add package MassTransit --version 8.0.0

Package Manager
Install-Package MassTransit -Version 8.0.0

Install 'MassTransit.AspNetCore' NuGet Package.
CLI command
dotnet add package MassTransit.AspNetCore --version 7.3.1

Package Manager
Install-Package MassTransit.AspNetCore -Version 7.3.1

Install 'MassTransit.RabbitMQ'
CLI command
dotnet add package MassTransit.RabbitMQ --version 8.0.0

Package Manager
Install-Package MassTransit.RabbitMQ -Version 8.0.0

Publisher:

Our 'Manufacturer.API' microservice application will be the publisher.

Configure 'MassTransit' and 'RabbitMQ' services in to the 'Program.cs'.
Manufacturer.API/Program.cs:
builder.Services.AddMassTransit(options => {
    options.UsingRabbitMq((context,cfg) => {
        cfg.Host(new Uri("rabbitmq://localhost:4001"), h => {
            h.Username("guest");
            h.Password("guest");
        });
        
    });
});

  • (Line: 1) Registered the 'MassTransit' service.
  • (Line: 2) The 'RabbitMQ' service is configured inside of the 'MassTransit' service.
  • (Line: 3) Defined our 'RabbitMQ' host. Here port '4001' is my custom port exposed from the docker container.
  • (Line: 4&5) The default username and password for 'RabbitMQ' are 'guest'.
Inside of the 'ProductController' constructor let's inject the 'MassTransit.IPublishEndpoint'
Manufactures.API/Controllers/ProductController.cs:
using MassTransit;

// existing code hidden for display purpose

[ApiController]
[Route("[controller]")]
public class ProductsController : ControllerBase
{
    private readonly IPublishEndpoint _publishEndpoint;
    public ProductsController(
    IPublishEndpoint publishEndpoint)
    {
        
        _publishEndpoint = publishEndpoint;
    }
}
Now let's implement publish logic in our 'PostAsync' action method.
Manufactures.API/Controllers/ProductController.cs:
using Shared.Models.RabbitMqModel;

// existing code hidden for display purpose

[HttpPost]
public async Task<IActionResult> PostAsync(Products newProduct)
{
	_manufactureContext.Products.Add(newProduct);
	await _manufactureContext.SaveChangesAsync();
	await _publishEndpoint.Publish<ProductCreated>(new ProductCreated
	{
		Id = newProduct.Id,
		Name = newProduct.Name
	});

	return CreatedAtAction("Get", new { id = newProduct.Id }, newProduct);
}
  • (Line: 10-14) The 'IPublishEndpoint.Publish<T>()' method pushes the message into the 'Fanout Exchange' of RabbitMQ. The message of type is 'Shared.Models.RabbitMqMode.ProductCreated'.

Create A Child Products Table In SalesBusiness Database:

Let's create a child 'Products' table in the 'SalesBusiness' database.
CREATE TABLE [dbo].[Products](
	[Id] [int] NOT NULL,
	[Name] [varchar](500) NULL,
 CONSTRAINT [PK_Products] PRIMARY KEY CLUSTERED 
(
	[Id] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]

Configure Products Table Entity In SalesBusiness.API Project:

Let's add the 'Product' entity in our 'SalesBusiness.API' project
SalesBusiness.API/Data/Entities/Products.cs:
namespace SalesBusiness.Api.Data.Entities;

public class Products
{
    public int Id { get; set; }
    public string Name { get; set; }
}
Now register the 'Products' inside of the 'SalesBusinessContext'.
SalesBusiness.API/Data/SalesBusinesContext.cs:
using Microsoft.EntityFrameworkCore;
using SalesBusiness.Api.Data.Entities;

namespace SalesBusiness.Api.Data;
public class SalesBusinessContext: DbContext
{
    public SalesBusinessContext(DbContextOptions<SalesBusinessContext> options):base(options)
    {
        
    }
    public DbSet<Orders> Orders{get;set;}
    public DbSet<Products> Products{get;set;}
}

Consumer:

Our 'SalesBusiness.API' microservice application will be the consumer.

Using 'MassTransit.IConsumer<T>' we will make a RabbitMQ queue consumer. So let's create a folder like 'Consumer' and then add a class inside of it like 'ProductCreatedConsumer.cs'.
SalesBusiness.API/Consumer/ProductCreatedConsumer.cs:
using MassTransit;
using SalesBusiness.Api.Data;
using SalesBusiness.Api.Data.Entities;
using Shared.Models.RabbitMqModel;

namespace SalesBusiness.Api.Consumer;

public class ProductCreatedConsumer : IConsumer<ProductCreated>
{
    private readonly SalesBusinessContext _salesBusinessContext;
    public ProductCreatedConsumer(SalesBusinessContext salesBusinessContext)
    {
        _salesBusinessContext = salesBusinessContext;
    }
    public async Task Consume(ConsumeContext<ProductCreated> context)
    {
        var newProduct = new Products{
            Id = context.Message.Id,
            Name = context.Message.Name
        };
        _salesBusinessContext.Add(newProduct);
        await _salesBusinessContext.SaveChangesAsync();
    }
}
  • To make 'ProductCreatedConsumer' entity as a RabbitMQ queue consumer it must inherit the 'MassTransit.IConsumer'.
  • (Line: 15-23) Here implemented 'Consume' asynchronous method that's gets executed on every new message received by the queue. Inside of this method, our logic to store the message data has been implemented.
In 'Program.cs' let's register 'MassTransit' and 'RabbitMQ' services.
SalesBusiness.API/Programc.s:
builder.Services.AddMassTransit(x => {
    x.AddConsumer<ProductCreatedConsumer>();
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(new Uri("rabbitmq://localhost:4001"), h => {
            h.Username("guest");
            h.Password("guest");
        });
        cfg.ReceiveEndpoint("event-listener", e =>
        {
            e.ConfigureConsumer<ProductCreatedConsumer>(context);
        });
    });
});


  • (Line: 9-12) Here defined our channel or queue name like 'event-listener'. So 'RabbitMQ' Fanout exchange pushes messages to all queues. Here register our channel entity that is 'ProductCreatedConsumer' which listens for every new message from the RabbitMQ queue.

Test Microservices Communication With RabbitMQ Message Broker:

Step1:
Make sure our microservice application running under different port numbers. To change port number
goto 'Properties/launchsettings.json'.

Step2:
Make sure to run the Docker desktop application on our local machine and then run our docker-compose for 'RabbitMQ' as explained above.

Step3:
Let's try to create a new product from our Product Endpoint(Manufacture.API microservice application). In the action method we are publishing our new product record as a message to the RabbitMQ queue channel, so our 'SalesBusiness.API' project act as a consumer and reads the message and saves the message into our child 'Products' table in the 'SalesBusiness' database. This step represents asynchronous communication between our microservices.

Now we can observe data between '[Manufacture].[dbo].[Products]' and '[SalesBusiness].[dbo].[Products]' these 2 tables are synced because of RabbitMQ asynchronous communication between the microservices.

Update Orders GET Endpoint To Fetch Products Information:

First, let's create DTO for our 'Orders' like 'Orders.Dto' that contains 'Prodcut.Dto' as one of its properties.
SalesBusiness.API/DTOs/Orders.Dto.cs:
namespace SalesBusiness.Api.DTOs;

public class OrdersDto
{
    public int Id { get; set; }
    public string UserId { get; set; }
    public DateTime? OrderDate { get; set; }
    public ProductDto ProductInfo { get; set; }
}

public class ProductDto
{
    public int Id { get; set; }
    public string Name { get; set; }
}
Now let's update our Orders GET endpoint to fetch the 'Product' information along with the 'Orders'.
SalesBusiness.API/Controllers/OrdersController.cs:
[HttpGet]
public async Task<IActionResult> GetAsync()
{
	var orders = await _salesBusinessContext.Orders
	.Join(
		_salesBusinessContext.Products,
		order => order.ProductId,
		product => product.Id,
		(order, product) => new { Order = order, Product = product }
	)
	.Select(_ => new OrdersDto
	{
		Id = _.Order.Id,
		OrderDate = _.Order.OrderDate,
		UserId = _.Order.UserId,
		ProductInfo = new ProductDto
		{
			Id = _.Product.Id,
			Name = _.Product.Name
		}
	}).ToListAsync();
	return Ok(orders);
}
  • Here 'Orders' table is joined with the 'Product' table while fetching the collection of 'Orders'.
In the next article, we are going to implement the 'API Gateway'.

Support Me!
Buy Me A Coffee PayPal Me

Video Session:

Wrapping Up:

Hopefully, I think this article delivered some useful information Microservice flow in .NET6 applications. using I love to have your feedback, suggestions, and better techniques in the comment section below.

Refer:


Comments

Post a Comment

Popular posts from this blog

Angular 14 Reactive Forms Example

In this article, we will explore the Angular(14) reactive forms with an example. Reactive Forms: Angular reactive forms support model-driven techniques to handle the form's input values. The reactive forms state is immutable, any form filed change creates a new state for the form. Reactive forms are built around observable streams, where form inputs and values are provided as streams of input values, which can be accessed synchronously. Some key notations that involve in reactive forms are like: FormControl - each input element in the form is 'FormControl'. The 'FormControl' tracks the value and validation status of form fields. FormGroup - Track the value and validate the state of the group of 'FormControl'. FormBuilder - Angular service which can be used to create the 'FormGroup' or FormControl instance quickly. Form Array - That can hold infinite form control, this helps to create dynamic forms. Create An Angular(14) Application: Let'

.NET 7 Web API CRUD Using Entity Framework Core

In this article, we are going to implement a sample .NET 7 Web API CRUD using the Entity Framework Core. Web API: Web API is a framework for building HTTP services that can be accessed from any client like browser, mobile devices, and desktop apps. In simple terminology API(Application Programming Interface) means an interface module that contains programming functions that can be requested via HTTP calls either to fetch or update data for their respective clients. Some of the Key Characteristics of API: Supports HTTP verbs like 'GET', 'POST', 'PUT', 'DELETE', etc. Supports default responses like 'XML' and 'JSON'. Also can define custom responses. Supports self-hosting or individual hosting, so that all different kinds of apps can consume it. Authentication and Authorization are easy to implement. The ideal platform to build the REST full services. Install The SQL Server And SQL Management Studio: Let's install the SQL server on our l

ReactJS(v18) JWT Authentication Using HTTP Only Cookie

In this article, we will implement the ReactJS application authentication using the HTTP-only cookie. HTTP Only Cookie: In a SPA(Single Page Application) Authentication JWT token either can be stored in browser 'LocalStorage' or in 'Cookie'. Storing the JWT token inside of the cookie then the cookie should be HTTP Only. The HTTP-ONly cookie nature is that it will be only accessible by the server application. Client apps like javascript-based apps can't access the HTTP-Only cookie. So if we use the authentication with HTTP-only JWT cookie then we no need to implement the custom logic like adding authorization header or storing token data, etc at our client application. Because once the user authenticated cookie will be automatically sent to the server by the browser on every API call. Authentication API: To authenticate our client application with JWT HTTP-only cookie, I developed a NetJS(which is a node) Mock API. Check the GitHub link and read the document on G

.NET6 Web API CRUD Operation With Entity Framework Core

In this article, we are going to do a small demo on AspNetCore 6 Web API CRUD operations. What Is Web API: Web API is a framework for building HTTP services that can be accessed from any client like browser, mobile devices, desktop apps. In simple terminology API(Application Programming Interface) means an interface module that contains a programming function that can be requested via HTTP calls to save or fetch the data for their respective clients. Some of the key characteristics of API: Supports HTTP verbs like 'GET', 'POST', 'PUT', 'DELETE', etc. Supports default responses like 'XML' and 'JSON'. Also can define custom responses. Supports self-hosting or individual hosting, so that all different kinds of apps can consume it. Authentication and Authorization are easy to implement. The ideal platform to build REST full services. Create A .NET6 Web API Application: Let's create a .Net6 Web API sample application to accomplish our

Angular 14 State Management CRUD Example With NgRx(14)

In this article, we are going to implement the Angular(14) state management CRUD example with NgRx(14) NgRx Store For State Management: In an angular application to share consistent data between multiple components, we use NgRx state management. Using NgRx state helps to avoid unwanted API calls, easy to maintain consistent data, etc. The main building blocks for the NgRx store are: Actions - NgRx actions represents event to trigger the reducers to save the data into the stores. Reducer - Reducer's pure function, which is used to create a new state on data change. Store - The store is the model or entity that holds the data. Selector - Selector to fetch the slices of data from the store to angular components. Effects - Effects deals with external network calls like API. The effect gets executed based the action performed Ngrx State Management flow: The angular component needs data for binding.  So angular component calls an action that is responsible for invoking the API call.  Aft

Angular 14 Crud Example

In this article, we will implement CRUD operation in the Angular 14 application. Angular: Angular is a framework that can be used to build a single-page application. Angular applications are built with components that make our code simple and clean. Angular components compose of 3 files like TypeScript File(*.ts), Html File(*.html), CSS File(*.cs) Components typescript file and HTML file support 2-way binding which means data flow is bi-directional Component typescript file listens for all HTML events from the HTML file. Create Angular(14) Application: Let's create an Angular(14) application to begin our sample. Make sure to install the Angular CLI tool into our local machine because it provides easy CLI commands to play with the angular application. Command To Install Angular CLI npm install -g @angular/cli Run the below command to create the angular application. Command To Create Angular Application ng new name_of_your_app Note: While creating the app, you will see a noti

Unit Testing Asp.NetCore Web API Using xUnit[.NET6]

In this article, we are going to write test cases to an Asp.NetCore Web API(.NET6) application using the xUnit. xUnit For .NET: The xUnit for .Net is a free, open-source, community-focused unit testing tool for .NET applications. By default .Net also provides a xUnit project template to implement test cases. Unit test cases build upon the 'AAA' formula that means 'Arrange', 'Act' and 'Assert' Arrange - Declaring variables, objects, instantiating mocks, etc. Act - Calling or invoking the method that needs to be tested. Assert - The assert ensures that code behaves as expected means yielding expected output. Create An API And Unit Test Projects: Let's create a .Net6 Web API and xUnit sample applications to accomplish our demo. We can use either Visual Studio 2022 or Visual Studio Code(using .NET CLI commands) to create any.Net6 application. For this demo, I'm using the 'Visual Studio Code'(using the .NET CLI command) editor. Create a fo

Part-1 Angular JWT Authentication Using HTTP Only Cookie[Angular V13]

In this article, we are going to implement a sample angular application authentication using HTTP only cookie that contains a JWT token. HTTP Only JWT Cookie: In a SPA(Single Page Application) Authentication JWT token either can be stored in browser 'LocalStorage' or in 'Cookie'. Storing JWT token inside of the cookie then the cookie should be HTTP Only. The HTTP-Only cookie nature is that it will be only accessible by the server application. Client apps like javascript-based apps can't access the HTTP-Only cookie. So if we use authentication with HTTP only JWT cookie then we no need to implement custom logic like adding authorization header or storing token data, etc at our client application. Because once the user authenticated cookie will be automatically sent to the server by the browser on every API call. Authentication API: To implement JWT cookie authentication we need to set up an API. For that, I had created a mock authentication API(Using the NestJS Se

ReactJS(v18) Authentication With JWT AccessToken And Refresh Token

In this article, we are going to do ReactJS(v18) application authentication using the JWT Access Token and Refresh Token. JSON Web Token(JWT): JSON Web Token is a digitally signed and secured token for user validation. The JWT is constructed with 3 important parts: Header Payload Signature Create ReactJS Application: Let's create a ReactJS application to accomplish our demo. npx create-react-app name-of-your-app Configure React Bootstrap Library: Let's install the React Bootstrap library npm install react-bootstrap bootstrap Now add the bootstrap CSS reference in 'index.js'. src/index.js: import 'bootstrap/dist/css/bootstrap.min.css' Create A React Component 'Layout': Let's add a React component like 'Layout' in 'components/shared' folders(new folders). src/components/shared/Layout.js: import Navbar from "react-bootstrap/Navbar"; import { Container } from "react-bootstrap"; import Nav from "react-boot

A Small Guide On NestJS Queues

NestJS Application Queues helps to deal with application scaling and performance challenges. When To Use Queues?: API request that mostly involves in time taking operations like CPU bound operation, doing them synchronously which will result in thread blocking. So to avoid these issues, it is an appropriate way to make the CPU-bound operation separate background job.  In nestjs one of the best solutions for these kinds of tasks is to implement the Queues. For queueing mechanism in the nestjs application most recommended library is '@nestjs/bull'(Bull is nodejs queue library). The 'Bull' depends on Redis cache for data storage like a job. So in this queueing technique, we will create services like 'Producer' and 'Consumer'. The 'Producer' is used to push our jobs into the Redis stores. The consumer will read those jobs(eg: CPU Bound Operations) and process them. So by using this queues technique user requests processed very fastly because actually